1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

fix(DEC): MCOL-5637 Initialize a new bytestream before write to PS (#3118)

This commit is contained in:
Denis Khalikov
2024-02-09 22:27:54 +03:00
committed by GitHub
parent 21c77840cb
commit 2f29e22ba0
2 changed files with 19 additions and 1 deletions

View File

@@ -566,6 +566,18 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
return *sbs; return *sbs;
} }
SBS DistributedEngineComm::createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID,
uint16_t size)
{
SBS bpCommand(new ByteStream(sizeof(ISMPacketHeader)));
auto* ism = (ISMPacketHeader*)bpCommand->getInputPtr();
ism->Interleave = uniqueID;
ism->Command = command;
ism->Size = size;
bpCommand->advanceInputPtr(sizeof(ISMPacketHeader));
return bpCommand;
}
void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v) void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
{ {
boost::shared_ptr<MQE> mqe; boost::shared_ptr<MQE> mqe;
@@ -754,7 +766,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
{ {
continue; continue;
} }
writeToClient(i, msg); // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
writeToClient(i, ackCommand);
} }
} }
if (!pmAcked[localConnectionId_] && fIsExeMgr) if (!pmAcked[localConnectionId_] && fIsExeMgr)

View File

@@ -227,6 +227,10 @@ class DistributedEngineComm
typedef ThreadSafeQueue<messageqcpp::SBS> StepMsgQueue; typedef ThreadSafeQueue<messageqcpp::SBS> StepMsgQueue;
using AtomicSizeVec = std::vector<std::atomic<size_t>>; using AtomicSizeVec = std::vector<std::atomic<size_t>>;
// Creates a ByteStream as a command for Primitive Server and initializes it with a given `command`,
// `uniqueID` and `size`.
messageqcpp::SBS createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID, uint16_t size);
/* To keep some state associated with the connection. These aren't copyable. */ /* To keep some state associated with the connection. These aren't copyable. */
struct MQE : public boost::noncopyable struct MQE : public boost::noncopyable
{ {