diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 8cc70fe73..b9a157934 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -568,6 +568,18 @@ const ByteStream DistributedEngineComm::read(uint32_t key) return *sbs; } +SBS DistributedEngineComm::createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID, + uint16_t size) +{ + SBS bpCommand(new ByteStream(sizeof(ISMPacketHeader))); + auto* ism = (ISMPacketHeader*)bpCommand->getInputPtr(); + ism->Interleave = uniqueID; + ism->Command = command; + ism->Size = size; + bpCommand->advanceInputPtr(sizeof(ISMPacketHeader)); + return bpCommand; +} + void DistributedEngineComm::read_all(uint32_t key, vector& v) { boost::shared_ptr mqe; @@ -749,7 +761,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { continue; } - writeToClient(i, msg); + // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server. + SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1); + writeToClient(i, ackCommand); } } if (!pmAcked[localConnectionId_] && fIsExeMgr) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 2c34d7077..f48295bea 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -226,6 +226,10 @@ class DistributedEngineComm // A queue of ByteStreams coming in from PrimProc heading for a JobStep typedef ThreadSafeQueue StepMsgQueue; + // Creates a ByteStream as a command for Primitive Server and initializes it with a given `command`, + // `uniqueID` and `size`. + messageqcpp::SBS createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID, uint16_t size); + /* To keep some state associated with the connection. These aren't copyable. */ struct MQE : public boost::noncopyable {