1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge pull request #2624 from mariadb-corporation/columnstore-22.08.4-1

Columnstore 22.08.4 1
This commit is contained in:
Roman Nozdrin
2022-11-18 19:01:33 +03:00
committed by GitHub
2 changed files with 56 additions and 7 deletions

View File

@ -727,7 +727,14 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
toAck = &ism->Size;
msg->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here.
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
for (uint32_t i = 0; i < pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
localConnectionId = i;
}
bool sendToLocal = false;
while (l_msgCount > 0)
{
/* could have to send up to pmCount ACKs */
@ -738,9 +745,19 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
if (sockIndex == localConnectionId)
{
sendToLocal = true;
continue;
}
pmAcked[sockIndex] = true;
writeToClient(sockIndex, msg);
}
if (sendToLocal && localConnectionId < fPmConnections.size())
{
pmAcked[localConnectionId] = true;
writeToClient(localConnectionId, msg);
}
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
// This is apply to the big message case only. For small messages, the flow control is
@ -749,17 +766,27 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
{
uint64_t totalUnackedWork = 0;
for (int i = pmCount - 1; i >= 0; --i)
for (uint32_t i = 0; i < pmCount; ++i)
totalUnackedWork += mqe->unackedWork[i];
if (totalUnackedWork == 0)
{
*toAck = 1;
for (int i = pmCount - 1; i >= 0; --i)
for (uint32_t i = 0; i < pmCount; ++i)
{
if (!pmAcked[i])
{
if (i == localConnectionId)
{
continue;
}
writeToClient(i, msg);
}
}
if (!pmAcked[localConnectionId])
{
writeToClient(localConnectionId, msg);
}
}
}
@ -847,9 +874,19 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
#endif
msg->advanceInputPtr(sizeof(ISMPacketHeader));
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
for (int i = mqe->pmCount - 1; i >= 0; --i)
for (uint32_t i = 0; i < mqe->pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
{
localConnectionId = i;
continue;
}
writeToClient(i, msg);
}
if (localConnectionId < fPmConnections.size())
writeToClient(localConnectionId, msg);
}
void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
@ -875,9 +912,21 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
case DICT_DESTROY_EQUALITY_FILTER:
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
entries in the config file point to unique PMs */
for (int i = pmCount - 1; i >= 0; --i)
writeToClient(i, msg, senderID);
{
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
for (uint32_t i = 0; i < pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
{
localConnectionId = i;
continue;
}
writeToClient(i, msg, senderID);
}
if (localConnectionId < fPmConnections.size())
writeToClient(localConnectionId, msg);
}
return;
case BATCH_PRIMITIVE_RUN: