From 2f29e22ba0bf964cb1e3d5d9479216ccc900f496 Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Fri, 9 Feb 2024 22:27:54 +0300 Subject: [PATCH] fix(DEC): MCOL-5637 Initialize a new bytestream before write to PS (#3118) --- dbcon/joblist/distributedenginecomm.cpp | 16 +++++++++++++++- dbcon/joblist/distributedenginecomm.h | 4 ++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index a374e1cb9..aa4aef26d 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -566,6 +566,18 @@ const ByteStream DistributedEngineComm::read(uint32_t key) return *sbs; } +SBS DistributedEngineComm::createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID, + uint16_t size) +{ + SBS bpCommand(new ByteStream(sizeof(ISMPacketHeader))); + auto* ism = (ISMPacketHeader*)bpCommand->getInputPtr(); + ism->Interleave = uniqueID; + ism->Command = command; + ism->Size = size; + bpCommand->advanceInputPtr(sizeof(ISMPacketHeader)); + return bpCommand; +} + void DistributedEngineComm::read_all(uint32_t key, vector& v) { boost::shared_ptr mqe; @@ -754,7 +766,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { continue; } - writeToClient(i, msg); + // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server. + SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1); + writeToClient(i, ackCommand); } } if (!pmAcked[localConnectionId_] && fIsExeMgr) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index a49a27f6d..65eb002e3 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -227,6 +227,10 @@ class DistributedEngineComm typedef ThreadSafeQueue StepMsgQueue; using AtomicSizeVec = std::vector>; + // Creates a ByteStream as a command for Primitive Server and initializes it with a given `command`, + // `uniqueID` and `size`. + messageqcpp::SBS createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID, uint16_t size); + /* To keep some state associated with the connection. These aren't copyable. */ struct MQE : public boost::noncopyable {