You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
fix(DEC): DEC now sends multiple acks with the correct number of msgs acked to facilitate ControlFlow using same node communication bypass. (#3345)
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||||
* Copyright (C) 2016-2022 MariaDB Corporation.
|
* Copyright (C) 2016-2024 MariaDB Corporation.
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
This program is free software; you can redistribute it and/or
|
||||||
modify it under the terms of the GNU General Public License
|
modify it under the terms of the GNU General Public License
|
||||||
@@ -659,7 +659,6 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
|
|||||||
void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs, boost::shared_ptr<MQE> mqe,
|
void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs, boost::shared_ptr<MQE> mqe,
|
||||||
size_t queueSize)
|
size_t queueSize)
|
||||||
{
|
{
|
||||||
ISMPacketHeader* ism;
|
|
||||||
uint32_t l_msgCount = msgs.size();
|
uint32_t l_msgCount = msgs.size();
|
||||||
|
|
||||||
/* If the current queue size > target, do nothing.
|
/* If the current queue size > target, do nothing.
|
||||||
@@ -712,22 +711,22 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
|||||||
|
|
||||||
if (l_msgCount > 0)
|
if (l_msgCount > 0)
|
||||||
{
|
{
|
||||||
SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
|
|
||||||
uint16_t* toAck;
|
|
||||||
vector<bool> pmAcked(pmCount, false);
|
vector<bool> pmAcked(pmCount, false);
|
||||||
|
|
||||||
ism = (ISMPacketHeader*)msg->getInputPtr();
|
{
|
||||||
|
SBS ackCommand(new ByteStream(sizeof(ISMPacketHeader)));
|
||||||
|
|
||||||
|
ISMPacketHeader* ism = reinterpret_cast<ISMPacketHeader*>(ackCommand->getInputPtr());
|
||||||
// The only var checked by ReadThread is the Command var. The others
|
// The only var checked by ReadThread is the Command var. The others
|
||||||
// are wasted space. We hijack the Size, & Flags fields for the
|
// are wasted space. We hijack the Size, & Flags fields for the
|
||||||
// params to the ACK msg.
|
// params to the ACK msg.
|
||||||
|
|
||||||
ism->Interleave = uniqueID;
|
ism->Interleave = uniqueID;
|
||||||
ism->Command = BATCH_PRIMITIVE_ACK;
|
ism->Command = BATCH_PRIMITIVE_ACK;
|
||||||
toAck = &ism->Size;
|
uint16_t* toAck = &ism->Size;
|
||||||
|
|
||||||
msg->advanceInputPtr(sizeof(ISMPacketHeader));
|
ackCommand->advanceInputPtr(sizeof(ISMPacketHeader));
|
||||||
// There must be only one local connection here.
|
// There must be only one local connection here.
|
||||||
bool sendToLocal = false;
|
|
||||||
while (l_msgCount > 0)
|
while (l_msgCount > 0)
|
||||||
{
|
{
|
||||||
/* could have to send up to pmCount ACKs */
|
/* could have to send up to pmCount ACKs */
|
||||||
@@ -738,18 +737,16 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
|||||||
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
|
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
|
||||||
idbassert(*toAck <= l_msgCount);
|
idbassert(*toAck <= l_msgCount);
|
||||||
l_msgCount -= *toAck;
|
l_msgCount -= *toAck;
|
||||||
|
pmAcked[sockIndex] = true;
|
||||||
|
|
||||||
if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex))
|
if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex))
|
||||||
{
|
{
|
||||||
sendToLocal = true;
|
SBS sameNodeAckCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, *toAck);
|
||||||
|
writeToClient(sockIndex, sameNodeAckCommand);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
pmAcked[sockIndex] = true;
|
writeToClient(sockIndex, ackCommand);
|
||||||
writeToClient(sockIndex, msg);
|
|
||||||
}
|
}
|
||||||
if (sendToLocal)
|
|
||||||
{
|
|
||||||
pmAcked[localConnectionId_] = true;
|
|
||||||
writeToClient(localConnectionId_, msg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
|
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
|
||||||
@@ -764,25 +761,15 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
|||||||
|
|
||||||
if (totalUnackedWork == 0)
|
if (totalUnackedWork == 0)
|
||||||
{
|
{
|
||||||
*toAck = 1;
|
// MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
|
||||||
|
|
||||||
for (uint32_t i = 0; i < pmCount; ++i)
|
for (uint32_t i = 0; i < pmCount; ++i)
|
||||||
{
|
{
|
||||||
if (!pmAcked[i])
|
if (!pmAcked[i])
|
||||||
{
|
{
|
||||||
if (i == localConnectionId_ && fIsExeMgr)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
|
|
||||||
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
|
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
|
||||||
writeToClient(i, ackCommand);
|
writeToClient(i, ackCommand);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (needToSendToLocalPM(fIsExeMgr, localConnectionId_) && !pmAcked[localConnectionId_])
|
|
||||||
{
|
|
||||||
writeToClient(localConnectionId_, msg);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user