diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index a374e1cb9..aa4aef26d 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -566,6 +566,18 @@ const ByteStream DistributedEngineComm::read(uint32_t key) return *sbs; } +SBS DistributedEngineComm::createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID, + uint16_t size) +{ + SBS bpCommand(new ByteStream(sizeof(ISMPacketHeader))); + auto* ism = (ISMPacketHeader*)bpCommand->getInputPtr(); + ism->Interleave = uniqueID; + ism->Command = command; + ism->Size = size; + bpCommand->advanceInputPtr(sizeof(ISMPacketHeader)); + return bpCommand; +} + void DistributedEngineComm::read_all(uint32_t key, vector& v) { boost::shared_ptr mqe; @@ -754,7 +766,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { continue; } - writeToClient(i, msg); + // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server. + SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1); + writeToClient(i, ackCommand); } } if (!pmAcked[localConnectionId_] && fIsExeMgr) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index a49a27f6d..65eb002e3 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -227,6 +227,10 @@ class DistributedEngineComm typedef ThreadSafeQueue StepMsgQueue; using AtomicSizeVec = std::vector>; + // Creates a ByteStream as a command for Primitive Server and initializes it with a given `command`, + // `uniqueID` and `size`. + messageqcpp::SBS createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID, uint16_t size); + /* To keep some state associated with the connection. These aren't copyable. */ struct MQE : public boost::noncopyable {