You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Mcol 5279 2 (#2606)
* MCOL-5279 Send ByteStream with Primitive message to remote PPs first and to the local PP last MCOL-5166 forces EM to interact with PP over a messaging queue when they are in the same process. When ByteStream is taken from the queue it is drained so it is impossible to re-use the same ByteStream to send to the other nodes. * MCOL-5279 Fix a corner case when sending PP messages Multi-node can break when flow control is used to backpressure PPs that overflows EM with Primitive messages.
This commit is contained in:
@ -1,5 +1,5 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||||
* Copyright (C) 2016-2020 MariaDB Corporation.
|
* Copyright (C) 2016-2022 MariaDB Corporation.
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
This program is free software; you can redistribute it and/or
|
||||||
modify it under the terms of the GNU General Public License
|
modify it under the terms of the GNU General Public License
|
||||||
@ -749,7 +749,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
|||||||
{
|
{
|
||||||
uint64_t totalUnackedWork = 0;
|
uint64_t totalUnackedWork = 0;
|
||||||
|
|
||||||
for (uint32_t i = 0; i < pmCount; i++)
|
for (int i = pmCount - 1; i >= 0; --i)
|
||||||
totalUnackedWork += mqe->unackedWork[i];
|
totalUnackedWork += mqe->unackedWork[i];
|
||||||
|
|
||||||
if (totalUnackedWork == 0)
|
if (totalUnackedWork == 0)
|
||||||
@ -790,7 +790,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
for (i = 0; i < pmCount; i++)
|
for (int i = pmCount - 1; i >= 0; --i)
|
||||||
{
|
{
|
||||||
uint32_t curVal = mqe->unackedWork[nextIndex];
|
uint32_t curVal = mqe->unackedWork[nextIndex];
|
||||||
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
|
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
|
||||||
@ -813,7 +813,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
|
|||||||
|
|
||||||
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
|
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
|
||||||
|
|
||||||
for (i = 0; i < pmCount; i++)
|
for (int i = pmCount - 1; i >= 0; --i)
|
||||||
cerr << mqe->unackedWork[i] << " ";
|
cerr << mqe->unackedWork[i] << " ";
|
||||||
|
|
||||||
cerr << " max: " << maxAck;
|
cerr << " max: " << maxAck;
|
||||||
@ -1051,6 +1051,9 @@ void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp::
|
|||||||
inMemoryEM2PPExchCV_.notify_one();
|
inMemoryEM2PPExchCV_.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This routine has an unexpected side-effect on its argument's SBS, namely
|
||||||
|
// SBS is cleared when it is sent to PP at the same host. This fact forces any
|
||||||
|
// code uses ::writeToClient to send to a local node in the last turn.
|
||||||
int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID,
|
int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID,
|
||||||
bool doInterleaving)
|
bool doInterleaving)
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user