diff --git a/VERSION b/VERSION index ea8ff71fc..1bb40e0e7 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ COLUMNSTORE_VERSION_MAJOR=22 COLUMNSTORE_VERSION_MINOR=08 -COLUMNSTORE_VERSION_PATCH=3 +COLUMNSTORE_VERSION_PATCH=4 COLUMNSTORE_VERSION_RELEASE=1 diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 3051f4d0e..1ddfb2842 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -727,7 +727,14 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, toAck = &ism->Size; msg->advanceInputPtr(sizeof(ISMPacketHeader)); - + // There must be only one local connection here. + uint32_t localConnectionId = std::numeric_limits::max(); + for (uint32_t i = 0; i < pmCount; ++i) + { + if (fPmConnections[i]->atTheSameHost() && fIsExeMgr) + localConnectionId = i; + } + bool sendToLocal = false; while (l_msgCount > 0) { /* could have to send up to pmCount ACKs */ @@ -738,9 +745,19 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); idbassert(*toAck <= l_msgCount); l_msgCount -= *toAck; + if (sockIndex == localConnectionId) + { + sendToLocal = true; + continue; + } pmAcked[sockIndex] = true; writeToClient(sockIndex, msg); } + if (sendToLocal && localConnectionId < fPmConnections.size()) + { + pmAcked[localConnectionId] = true; + writeToClient(localConnectionId, msg); + } // @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked. // This is apply to the big message case only. For small messages, the flow control is @@ -749,17 +766,27 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { uint64_t totalUnackedWork = 0; - for (int i = pmCount - 1; i >= 0; --i) + for (uint32_t i = 0; i < pmCount; ++i) totalUnackedWork += mqe->unackedWork[i]; if (totalUnackedWork == 0) { *toAck = 1; - for (int i = pmCount - 1; i >= 0; --i) + for (uint32_t i = 0; i < pmCount; ++i) { if (!pmAcked[i]) + { + if (i == localConnectionId) + { + continue; + } writeToClient(i, msg); + } + } + if (!pmAcked[localConnectionId]) + { + writeToClient(localConnectionId, msg); } } } @@ -847,9 +874,19 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos #endif msg->advanceInputPtr(sizeof(ISMPacketHeader)); + uint32_t localConnectionId = std::numeric_limits::max(); - for (int i = mqe->pmCount - 1; i >= 0; --i) + for (uint32_t i = 0; i < mqe->pmCount; ++i) + { + if (fPmConnections[i]->atTheSameHost() && fIsExeMgr) + { + localConnectionId = i; + continue; + } writeToClient(i, msg); + } + if (localConnectionId < fPmConnections.size()) + writeToClient(localConnectionId, msg); } void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) @@ -875,9 +912,21 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) case DICT_DESTROY_EQUALITY_FILTER: /* XXXPAT: This relies on the assumption that the first pmCount "PMS*" entries in the config file point to unique PMs */ - for (int i = pmCount - 1; i >= 0; --i) - writeToClient(i, msg, senderID); + { + uint32_t localConnectionId = std::numeric_limits::max(); + for (uint32_t i = 0; i < pmCount; ++i) + { + if (fPmConnections[i]->atTheSameHost() && fIsExeMgr) + { + localConnectionId = i; + continue; + } + writeToClient(i, msg, senderID); + } + if (localConnectionId < fPmConnections.size()) + writeToClient(localConnectionId, msg); + } return; case BATCH_PRIMITIVE_RUN: