From 9b272f59d285a220e8d6b57c39fdf46a9e8971a3 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Tue, 4 Feb 2025 09:37:01 +0000 Subject: [PATCH] fix(DEC): DEC now sends multiple acks with the correct number of msgs acked to facilitate ControlFlow using same node communication bypass. (#3345) --- dbcon/joblist/distributedenginecomm.cpp | 77 ++++++++++--------------- 1 file changed, 32 insertions(+), 45 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 1bb79bfc0..10f791918 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - * Copyright (C) 2016-2022 MariaDB Corporation. + * Copyright (C) 2016-2024 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -659,7 +659,6 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector& msgs, boost::shared_ptr mqe, size_t queueSize) { - ISMPacketHeader* ism; uint32_t l_msgCount = msgs.size(); /* If the current queue size > target, do nothing. @@ -712,44 +711,42 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (l_msgCount > 0) { - SBS msg(new ByteStream(sizeof(ISMPacketHeader))); - uint16_t* toAck; vector pmAcked(pmCount, false); - ism = (ISMPacketHeader*)msg->getInputPtr(); - // The only var checked by ReadThread is the Command var. The others - // are wasted space. We hijack the Size, & Flags fields for the - // params to the ACK msg. - - ism->Interleave = uniqueID; - ism->Command = BATCH_PRIMITIVE_ACK; - toAck = &ism->Size; - - msg->advanceInputPtr(sizeof(ISMPacketHeader)); - // There must be only one local connection here. - bool sendToLocal = false; - while (l_msgCount > 0) { - /* could have to send up to pmCount ACKs */ - uint32_t sockIndex = 0; + SBS ackCommand(new ByteStream(sizeof(ISMPacketHeader))); - /* This will reset the ACK field in the Bytestream directly, and nothing - * else needs to change if multiple msgs are sent. */ - nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); - idbassert(*toAck <= l_msgCount); - l_msgCount -= *toAck; - if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex)) + ISMPacketHeader* ism = reinterpret_cast(ackCommand->getInputPtr()); + // The only var checked by ReadThread is the Command var. The others + // are wasted space. We hijack the Size, & Flags fields for the + // params to the ACK msg. + + ism->Interleave = uniqueID; + ism->Command = BATCH_PRIMITIVE_ACK; + uint16_t* toAck = &ism->Size; + + ackCommand->advanceInputPtr(sizeof(ISMPacketHeader)); + // There must be only one local connection here. + while (l_msgCount > 0) { - sendToLocal = true; - continue; + /* could have to send up to pmCount ACKs */ + uint32_t sockIndex = 0; + + /* This will reset the ACK field in the Bytestream directly, and nothing + * else needs to change if multiple msgs are sent. */ + nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); + idbassert(*toAck <= l_msgCount); + l_msgCount -= *toAck; + pmAcked[sockIndex] = true; + + if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex)) + { + SBS sameNodeAckCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, *toAck); + writeToClient(sockIndex, sameNodeAckCommand); + continue; + } + writeToClient(sockIndex, ackCommand); } - pmAcked[sockIndex] = true; - writeToClient(sockIndex, msg); - } - if (sendToLocal) - { - pmAcked[localConnectionId_] = true; - writeToClient(localConnectionId_, msg); } // @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked. @@ -764,25 +761,15 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (totalUnackedWork == 0) { - *toAck = 1; - + // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server. for (uint32_t i = 0; i < pmCount; ++i) { if (!pmAcked[i]) { - if (i == localConnectionId_ && fIsExeMgr) - { - continue; - } - // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server. SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1); writeToClient(i, ackCommand); } } - if (needToSendToLocalPM(fIsExeMgr, localConnectionId_) && !pmAcked[localConnectionId_]) - { - writeToClient(localConnectionId_, msg); - } } } }