From 4ccab757209925d2c46053364d1ad0db700f7fc3 Mon Sep 17 00:00:00 2001 From: "David.Hall" Date: Thu, 3 Nov 2022 09:42:12 -0500 Subject: [PATCH] Mcol 5279 2 (#2606) * MCOL-5279 Send ByteStream with Primitive message to remote PPs first and to the local PP last MCOL-5166 forces EM to interact with PP over a messaging queue when they are in the same process. When ByteStream is taken from the queue it is drained so it is impossible to re-use the same ByteStream to send to the other nodes. * MCOL-5279 Fix a corner case when sending PP messages Multi-node can break when flow control is used to backpressure PPs that overflows EM with Primitive messages. --- dbcon/joblist/distributedenginecomm.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index b250c1c66..59ac011aa 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - * Copyright (C) 2016-2020 MariaDB Corporation. + * Copyright (C) 2016-2022 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -749,7 +749,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { uint64_t totalUnackedWork = 0; - for (uint32_t i = 0; i < pmCount; i++) + for (int i = pmCount - 1; i >= 0; --i) totalUnackedWork += mqe->unackedWork[i]; if (totalUnackedWork == 0) @@ -790,7 +790,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max } else { - for (i = 0; i < pmCount; i++) + for (int i = pmCount - 1; i >= 0; --i) { uint32_t curVal = mqe->unackedWork[nextIndex]; uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal); @@ -813,7 +813,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! "; - for (i = 0; i < pmCount; i++) + for (int i = pmCount - 1; i >= 0; --i) cerr << mqe->unackedWork[i] << " "; cerr << " max: " << maxAck; @@ -1051,6 +1051,9 @@ void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp:: inMemoryEM2PPExchCV_.notify_one(); } +// This routine has an unexpected side-effect on its argument's SBS, namely +// SBS is cleared when it is sent to PP at the same host. This fact forces any +// code uses ::writeToClient to send to a local node in the last turn. int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID, bool doInterleaving) {