From 1d416bc6ed85893f7315a31e0c716d73f64dbdaa Mon Sep 17 00:00:00 2001 From: drrtuy Date: Tue, 5 Dec 2023 17:39:24 +0200 Subject: [PATCH] fix(DEC): MCOL-5602 fixing potentially endless loop in DEC (#3049) --- dbcon/joblist/distributedenginecomm.cpp | 133 ++++++++++-------------- dbcon/joblist/distributedenginecomm.h | 7 +- utils/common/atomicops.h | 12 +++ 3 files changed, 73 insertions(+), 79 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 8cc70fe73..a374e1cb9 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -356,8 +356,6 @@ int32_t DistributedEngineComm::Setup() fWlock.swap(newLocks); fPmConnections.swap(newClients); - // memory barrier to prevent the pmCount assignment migrating upward - atomicops::atomicMb(); pmCount = newPmCount; newLocks.clear(); @@ -419,14 +417,14 @@ Error: for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); - (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); + ++map_tok->second->unackedWork[0]; map_tok->second->queue.push(sbs); } lk.unlock(); if (fIsExeMgr) { - decltype(pmCount) originalPMCount = pmCount; + const uint32_t originalPMCount = pmCount; // Re-establish if a remote PM restarted. std::this_thread::sleep_for(std::chrono::seconds(3)); auto rc = Setup(); @@ -455,7 +453,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) mqe->throttled = false; std::lock_guard lk(fMlock); - b = fSessionMessages.insert(pair >(key, mqe)).second; + b = fSessionMessages.insert(pair>(key, mqe)).second; if (!b) { @@ -635,7 +633,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, size_t queueSize) { ISMPacketHeader* ism; - uint32_t l_msgCount = msgs.size(); + size_t l_msgCount = msgs.size(); /* If the current queue size > target, do nothing. * If the original queue size > target, ACK the msgs below the target. @@ -643,14 +641,13 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (!mqe->throttled || queueSize >= mqe->targetQueueSize) { /* no acks will be sent, but update unackedwork to keep the #s accurate */ - uint16_t numack = 0; uint32_t sockidx = 0; while (l_msgCount > 0) { - nextPMToACK(mqe, l_msgCount, &sockidx, &numack); - idbassert(numack <= l_msgCount); - l_msgCount -= numack; + size_t numAcked = subsMsgCounterAndRotatePM(mqe, l_msgCount, &sockidx); + idbassert(numAcked <= l_msgCount); + l_msgCount -= numAcked; } return; @@ -665,9 +662,8 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { /* update unackedwork for the overage that will never be acked */ int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize; - uint16_t numack = 0; uint32_t sockidx = 0; - uint32_t msgsToIgnore; + size_t msgsToIgnore; for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++) overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead(); @@ -679,16 +675,15 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, while (msgsToIgnore > 0) { - nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack); - idbassert(numack <= msgsToIgnore); - msgsToIgnore -= numack; + size_t numAcked = subsMsgCounterAndRotatePM(mqe, msgsToIgnore, &sockidx); + idbassert(numAcked <= msgsToIgnore); + msgsToIgnore -= numAcked; } } if (l_msgCount > 0) { SBS msg(new ByteStream(sizeof(ISMPacketHeader))); - uint16_t* toAck; vector pmAcked(pmCount, false); ism = (ISMPacketHeader*)msg->getInputPtr(); @@ -698,7 +693,6 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, ism->Interleave = uniqueID; ism->Command = BATCH_PRIMITIVE_ACK; - toAck = &ism->Size; msg->advanceInputPtr(sizeof(ISMPacketHeader)); // There must be only one local connection here. @@ -710,9 +704,20 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, /* This will reset the ACK field in the Bytestream directly, and nothing * else needs to change if multiple msgs are sent. */ - nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); - idbassert(*toAck <= l_msgCount); - l_msgCount -= *toAck; + size_t numAcked = subsMsgCounterAndRotatePM(mqe, l_msgCount, &sockIndex); + // Potential truncation here from size_t to 16 bits. + ism->Size = static_cast( + std::min(static_cast(std::numeric_limits::max()), numAcked)); + idbassert(numAcked <= l_msgCount); + l_msgCount -= numAcked; + + // We get some logs if the above mentioned truncation happens. + if (numAcked > std::numeric_limits::max()) + { + cerr << "DEC::sendAcks(): msgs acked number [" << numAcked + << "] overflows PrimMsg protocol header field capacity 2^16." << std::endl; + } + if (sockIndex == localConnectionId_ && fIsExeMgr) { sendToLocal = true; @@ -739,7 +744,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (totalUnackedWork == 0) { - *toAck = 1; + ism->Size = 1; // size of the batch acked for (uint32_t i = 0; i < pmCount; ++i) { @@ -761,8 +766,8 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, } } -void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t maxAck, uint32_t* sockIndex, - uint16_t* numToAck) +size_t DistributedEngineComm::subsMsgCounterAndRotatePM(boost::shared_ptr mqe, const size_t maxAck, + uint32_t* sockIndex) { uint32_t& nextIndex = mqe->ackSocketIndex; @@ -770,56 +775,57 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max * the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn * is a safe minimum at the point of use. */ - if (mqe->unackedWork[nextIndex] >= maxAck) + if (mqe->unackedWork[nextIndex].load() >= maxAck) { - (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck); + atomicops::atomicSubstitute(mqe->unackedWork[nextIndex], maxAck); *sockIndex = nextIndex; - // FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit... - *numToAck = maxAck; if (pmCount > 0) nextIndex = (nextIndex + 1) % pmCount; - return; + return maxAck; } else { for (int i = pmCount - 1; i >= 0; --i) { - uint32_t curVal = mqe->unackedWork[nextIndex]; - uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal); - + const size_t curVal = mqe->unackedWork[nextIndex].load(); + const size_t unackedWork = std::min(curVal, maxAck); if (unackedWork > 0) { - (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork); + atomicops::atomicSubstitute(mqe->unackedWork[nextIndex], unackedWork); + *sockIndex = nextIndex; - *numToAck = unackedWork; if (pmCount > 0) nextIndex = (nextIndex + 1) % pmCount; - return; + return unackedWork; } if (pmCount > 0) nextIndex = (nextIndex + 1) % pmCount; } - cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! "; + cerr << "DEC::subsMsgCounterAndRotatePM(): Couldn't find a PM to ACK! "; for (int i = pmCount - 1; i >= 0; --i) cerr << mqe->unackedWork[i] << " "; - cerr << " max: " << maxAck; - cerr << endl; + cerr << " max: " << maxAck << endl; // make sure the returned vars are legitimate *sockIndex = nextIndex; - *numToAck = maxAck / pmCount; - if (pmCount > 0) - nextIndex = (nextIndex + 1) % pmCount; + // This will potentially results in invalid pmCount but it's better than crashing on 0 div. + const uint32_t localPmCount = pmCount.load(); + if (localPmCount > 0) + { + nextIndex = (nextIndex + 1) % localPmCount; + return maxAck / localPmCount; + } - return; + cerr << "DEC::subsMsgCounterAndRotatePM(): The number of PMs is 0."; + return maxAck; } } @@ -970,7 +976,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* if (pmCount > 0) { - (void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]); + ++mqe->unackedWork[connIndex % pmCount]; } TSQSize_t queueSize = mqe->queue.push(sbs); @@ -1092,7 +1098,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); - (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); + ++map_tok->second->unackedWork[0]; map_tok->second->queue.push(sbs); } @@ -1120,37 +1126,6 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ // Connection was established. return 1; - /* - // reconfig the connection array - ClientList tempConns; - { - //cout << "WARNING: DEC WRITE BROKEN PIPE " << - fPmConnections[index]->otherEnd()<< endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string - moduleName = fPmConnections[index]->moduleName(); - //cout << "module name = " << moduleName << endl; - if (index >= fPmConnections.size()) return 0; - - for (uint32_t i = 0; i < fPmConnections.size(); i++) - { - if (moduleName != fPmConnections[i]->moduleName()) - tempConns.push_back(fPmConnections[i]); - } - if (tempConns.size() == fPmConnections.size()) return 0; - fPmConnections.swap(tempConns); - pmCount = (pmCount == 0 ? 0 : pmCount - 1); - } - // send alarm - ALARMManager alarmMgr; - string alarmItem("UNKNOWN"); - - if (index < fPmConnections.size()) - { - alarmItem = fPmConnections[index]->addr2String(); - } - - alarmItem.append(" PrimProc"); - alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); - */ } return 0; } @@ -1206,9 +1181,15 @@ DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInt const uint64_t flowControlEnableBytesThresh) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh) { - unackedWork.reset(new volatile uint32_t[pmCount]); interleaver.reset(new uint32_t[pmCount]); - memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t)); + const uint32_t localPmCount = pmCount; + AtomicSizeVec s(localPmCount); + for (size_t i = 0; i < localPmCount; ++i) + { + s[i].store(0); + } + unackedWork.swap(s); + uint32_t interleaverValue = initialInterleaverValue; initialConnectionId = initialInterleaverValue; for (size_t pmId = 0; pmId < pmCount; ++pmId) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 2c34d7077..a49a27f6d 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -225,6 +225,7 @@ class DistributedEngineComm // A queue of ByteStreams coming in from PrimProc heading for a JobStep typedef ThreadSafeQueue StepMsgQueue; + using AtomicSizeVec = std::vector>; /* To keep some state associated with the connection. These aren't copyable. */ struct MQE : public boost::noncopyable @@ -235,7 +236,7 @@ class DistributedEngineComm messageqcpp::Stats stats; StepMsgQueue queue; uint32_t ackSocketIndex; - boost::scoped_array unackedWork; + AtomicSizeVec unackedWork; boost::scoped_array interleaver; uint32_t initialConnectionId; uint32_t pmCount; @@ -283,7 +284,7 @@ class DistributedEngineComm std::mutex fMlock; // sessionMessages mutex std::vector> fWlock; // PrimProc socket write mutexes bool fBusy; - volatile uint32_t pmCount; + std::atomic pmCount; boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition boost::mutex fSetupMutex; @@ -305,7 +306,7 @@ class DistributedEngineComm void sendAcks(uint32_t uniqueID, const std::vector& msgs, boost::shared_ptr mqe, size_t qSize); - void nextPMToACK(boost::shared_ptr mqe, uint32_t maxAck, uint32_t* sockIndex, uint16_t* numToAck); + size_t subsMsgCounterAndRotatePM(boost::shared_ptr mqe, const size_t maxAck, uint32_t* sockIndex); void setFlowControl(bool enable, uint32_t uniqueID, boost::shared_ptr mqe); void doHasBigMsgs(boost::shared_ptr mqe, uint64_t targetSize); boost::mutex ackLock; diff --git a/utils/common/atomicops.h b/utils/common/atomicops.h index 94e3e3201..2f9d84f81 100644 --- a/utils/common/atomicops.h +++ b/utils/common/atomicops.h @@ -22,6 +22,7 @@ #include #include #include +#include /* This is an attempt to wrap the differneces between Windows and Linux around atomic ops. @@ -92,4 +93,15 @@ inline void atomicYield() sched_yield(); } +// This f assumes decrement is smaller than minuend. +template +inline void atomicSubstitute(typename std::atomic& minuend, T decrement) +{ + T expected = minuend.load(); + do + { + expected = minuend.load(); + } while (!minuend.compare_exchange_weak(expected, expected - decrement)); +} + } // namespace atomicops