From eaba4d33b4af1e8910341aa061e5fa70e221b858 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Mon, 11 Nov 2024 18:31:15 +0000 Subject: [PATCH] fix(DEC):MCOL-5805,5808 to resolve UM-only node crash inside DEC when there is no local PP to send the local requests to. (#3350) * Revert "fix(DEC): MCOL-5602 fixing potentially endless loop in DEC (#3049)" This reverts commit 1d416bc6ed85893f7315a31e0c716d73f64dbdaa. * fix(DEC):MCOL-5805,5808 to resolve UM-only node crash inside DEC when there is no local PP to send the local requests to. --- dbcon/joblist/distributedenginecomm.cpp | 154 ++++++++++++++---------- dbcon/joblist/distributedenginecomm.h | 14 ++- utils/common/atomicops.h | 12 -- 3 files changed, 102 insertions(+), 78 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index aa4aef26d..c718deaa0 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -181,6 +181,17 @@ struct QueueShutdown namespace joblist { + +bool isDefaultLocalConnectionId(const uint32_t connectionId) +{ + return defaultLocalConnectionId() == connectionId; +} + +bool needToSendToLocalPM(const bool isExeMgr, const uint32_t connectionId) +{ + return isExeMgr && !isDefaultLocalConnectionId(connectionId); +} + DistributedEngineComm* DistributedEngineComm::fInstance = 0; /*static*/ @@ -356,6 +367,8 @@ int32_t DistributedEngineComm::Setup() fWlock.swap(newLocks); fPmConnections.swap(newClients); + // memory barrier to prevent the pmCount assignment migrating upward + atomicops::atomicMb(); pmCount = newPmCount; newLocks.clear(); @@ -417,14 +430,14 @@ Error: for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); - ++map_tok->second->unackedWork[0]; + (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); map_tok->second->queue.push(sbs); } lk.unlock(); if (fIsExeMgr) { - const uint32_t originalPMCount = pmCount; + decltype(pmCount) originalPMCount = pmCount; // Re-establish if a remote PM restarted. std::this_thread::sleep_for(std::chrono::seconds(3)); auto rc = Setup(); @@ -453,7 +466,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) mqe->throttled = false; std::lock_guard lk(fMlock); - b = fSessionMessages.insert(pair>(key, mqe)).second; + b = fSessionMessages.insert(pair >(key, mqe)).second; if (!b) { @@ -645,7 +658,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, size_t queueSize) { ISMPacketHeader* ism; - size_t l_msgCount = msgs.size(); + uint32_t l_msgCount = msgs.size(); /* If the current queue size > target, do nothing. * If the original queue size > target, ACK the msgs below the target. @@ -653,13 +666,14 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (!mqe->throttled || queueSize >= mqe->targetQueueSize) { /* no acks will be sent, but update unackedwork to keep the #s accurate */ + uint16_t numack = 0; uint32_t sockidx = 0; while (l_msgCount > 0) { - size_t numAcked = subsMsgCounterAndRotatePM(mqe, l_msgCount, &sockidx); - idbassert(numAcked <= l_msgCount); - l_msgCount -= numAcked; + nextPMToACK(mqe, l_msgCount, &sockidx, &numack); + idbassert(numack <= l_msgCount); + l_msgCount -= numack; } return; @@ -674,8 +688,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { /* update unackedwork for the overage that will never be acked */ int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize; + uint16_t numack = 0; uint32_t sockidx = 0; - size_t msgsToIgnore; + uint32_t msgsToIgnore; for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++) overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead(); @@ -687,15 +702,16 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, while (msgsToIgnore > 0) { - size_t numAcked = subsMsgCounterAndRotatePM(mqe, msgsToIgnore, &sockidx); - idbassert(numAcked <= msgsToIgnore); - msgsToIgnore -= numAcked; + nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack); + idbassert(numack <= msgsToIgnore); + msgsToIgnore -= numack; } } if (l_msgCount > 0) { SBS msg(new ByteStream(sizeof(ISMPacketHeader))); + uint16_t* toAck; vector pmAcked(pmCount, false); ism = (ISMPacketHeader*)msg->getInputPtr(); @@ -705,6 +721,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, ism->Interleave = uniqueID; ism->Command = BATCH_PRIMITIVE_ACK; + toAck = &ism->Size; msg->advanceInputPtr(sizeof(ISMPacketHeader)); // There must be only one local connection here. @@ -716,21 +733,10 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, /* This will reset the ACK field in the Bytestream directly, and nothing * else needs to change if multiple msgs are sent. */ - size_t numAcked = subsMsgCounterAndRotatePM(mqe, l_msgCount, &sockIndex); - // Potential truncation here from size_t to 16 bits. - ism->Size = static_cast( - std::min(static_cast(std::numeric_limits::max()), numAcked)); - idbassert(numAcked <= l_msgCount); - l_msgCount -= numAcked; - - // We get some logs if the above mentioned truncation happens. - if (numAcked > std::numeric_limits::max()) - { - cerr << "DEC::sendAcks(): msgs acked number [" << numAcked - << "] overflows PrimMsg protocol header field capacity 2^16." << std::endl; - } - - if (sockIndex == localConnectionId_ && fIsExeMgr) + nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); + idbassert(*toAck <= l_msgCount); + l_msgCount -= *toAck; + if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex)) { sendToLocal = true; continue; @@ -756,7 +762,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (totalUnackedWork == 0) { - ism->Size = 1; // size of the batch acked + *toAck = 1; for (uint32_t i = 0; i < pmCount; ++i) { @@ -771,7 +777,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, writeToClient(i, ackCommand); } } - if (!pmAcked[localConnectionId_] && fIsExeMgr) + if (needToSendToLocalPM(fIsExeMgr, localConnectionId_) && !pmAcked[localConnectionId_]) { writeToClient(localConnectionId_, msg); } @@ -780,8 +786,8 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, } } -size_t DistributedEngineComm::subsMsgCounterAndRotatePM(boost::shared_ptr mqe, const size_t maxAck, - uint32_t* sockIndex) +void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t maxAck, uint32_t* sockIndex, + uint16_t* numToAck) { uint32_t& nextIndex = mqe->ackSocketIndex; @@ -789,57 +795,56 @@ size_t DistributedEngineComm::subsMsgCounterAndRotatePM(boost::shared_ptr m * the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn * is a safe minimum at the point of use. */ - if (mqe->unackedWork[nextIndex].load() >= maxAck) + if (mqe->unackedWork[nextIndex] >= maxAck) { - atomicops::atomicSubstitute(mqe->unackedWork[nextIndex], maxAck); + (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck); *sockIndex = nextIndex; + // FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit... + *numToAck = maxAck; if (pmCount > 0) nextIndex = (nextIndex + 1) % pmCount; - return maxAck; + return; } else { for (int i = pmCount - 1; i >= 0; --i) { - const size_t curVal = mqe->unackedWork[nextIndex].load(); - const size_t unackedWork = std::min(curVal, maxAck); + uint32_t curVal = mqe->unackedWork[nextIndex]; + uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal); + if (unackedWork > 0) { - atomicops::atomicSubstitute(mqe->unackedWork[nextIndex], unackedWork); - + (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork); *sockIndex = nextIndex; + *numToAck = unackedWork; if (pmCount > 0) nextIndex = (nextIndex + 1) % pmCount; - return unackedWork; + return; } if (pmCount > 0) nextIndex = (nextIndex + 1) % pmCount; } - cerr << "DEC::subsMsgCounterAndRotatePM(): Couldn't find a PM to ACK! "; + cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! "; for (int i = pmCount - 1; i >= 0; --i) cerr << mqe->unackedWork[i] << " "; - cerr << " max: " << maxAck << endl; + cerr << " max: " << maxAck; + cerr << endl; // make sure the returned vars are legitimate *sockIndex = nextIndex; + *numToAck = maxAck / pmCount; - // This will potentially results in invalid pmCount but it's better than crashing on 0 div. - const uint32_t localPmCount = pmCount.load(); - if (localPmCount > 0) - { - nextIndex = (nextIndex + 1) % localPmCount; - return maxAck / localPmCount; - } + if (pmCount > 0) + nextIndex = (nextIndex + 1) % pmCount; - cerr << "DEC::subsMsgCounterAndRotatePM(): The number of PMs is 0."; - return maxAck; + return; } } @@ -863,8 +868,10 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos } writeToClient(i, msg); } - if (fIsExeMgr) + if (needToSendToLocalPM(fIsExeMgr, localConnectionId_)) + { writeToClient(localConnectionId_, msg); + } } int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg) @@ -904,7 +911,7 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg) return rc; } } - if (fIsExeMgr) + if (needToSendToLocalPM(fIsExeMgr, localConnectionId_)) { return writeToClient(localConnectionId_, msg); } @@ -990,7 +997,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* if (pmCount > 0) { - ++mqe->unackedWork[connIndex % pmCount]; + (void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]); } TSQSize_t queueSize = mqe->queue.push(sbs); @@ -1112,7 +1119,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) { map_tok->second->queue.clear(); - ++map_tok->second->unackedWork[0]; + (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); map_tok->second->queue.push(sbs); } @@ -1140,6 +1147,37 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ // Connection was established. return 1; + /* + // reconfig the connection array + ClientList tempConns; + { + //cout << "WARNING: DEC WRITE BROKEN PIPE " << + fPmConnections[index]->otherEnd()<< endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string + moduleName = fPmConnections[index]->moduleName(); + //cout << "module name = " << moduleName << endl; + if (index >= fPmConnections.size()) return 0; + + for (uint32_t i = 0; i < fPmConnections.size(); i++) + { + if (moduleName != fPmConnections[i]->moduleName()) + tempConns.push_back(fPmConnections[i]); + } + if (tempConns.size() == fPmConnections.size()) return 0; + fPmConnections.swap(tempConns); + pmCount = (pmCount == 0 ? 0 : pmCount - 1); + } + // send alarm + ALARMManager alarmMgr; + string alarmItem("UNKNOWN"); + + if (index < fPmConnections.size()) + { + alarmItem = fPmConnections[index]->addr2String(); + } + + alarmItem.append(" PrimProc"); + alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); + */ } return 0; } @@ -1195,15 +1233,9 @@ DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInt const uint64_t flowControlEnableBytesThresh) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh) { + unackedWork.reset(new volatile uint32_t[pmCount]); interleaver.reset(new uint32_t[pmCount]); - const uint32_t localPmCount = pmCount; - AtomicSizeVec s(localPmCount); - for (size_t i = 0; i < localPmCount; ++i) - { - s[i].store(0); - } - unackedWork.swap(s); - + memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t)); uint32_t interleaverValue = initialInterleaverValue; initialConnectionId = initialInterleaverValue; for (size_t pmId = 0; pmId < pmCount; ++pmId) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 65eb002e3..d5c687b87 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -69,6 +69,11 @@ class Config; */ namespace joblist { +constexpr uint32_t defaultLocalConnectionId() +{ + return std::numeric_limits::max(); +} + class DECEventListener { public: @@ -225,7 +230,6 @@ class DistributedEngineComm // A queue of ByteStreams coming in from PrimProc heading for a JobStep typedef ThreadSafeQueue StepMsgQueue; - using AtomicSizeVec = std::vector>; // Creates a ByteStream as a command for Primitive Server and initializes it with a given `command`, // `uniqueID` and `size`. @@ -240,7 +244,7 @@ class DistributedEngineComm messageqcpp::Stats stats; StepMsgQueue queue; uint32_t ackSocketIndex; - AtomicSizeVec unackedWork; + boost::scoped_array unackedWork; boost::scoped_array interleaver; uint32_t initialConnectionId; uint32_t pmCount; @@ -288,7 +292,7 @@ class DistributedEngineComm std::mutex fMlock; // sessionMessages mutex std::vector> fWlock; // PrimProc socket write mutexes bool fBusy; - std::atomic pmCount; + volatile uint32_t pmCount; boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition boost::mutex fSetupMutex; @@ -310,13 +314,13 @@ class DistributedEngineComm void sendAcks(uint32_t uniqueID, const std::vector& msgs, boost::shared_ptr mqe, size_t qSize); - size_t subsMsgCounterAndRotatePM(boost::shared_ptr mqe, const size_t maxAck, uint32_t* sockIndex); + void nextPMToACK(boost::shared_ptr mqe, uint32_t maxAck, uint32_t* sockIndex, uint16_t* numToAck); void setFlowControl(bool enable, uint32_t uniqueID, boost::shared_ptr mqe); void doHasBigMsgs(boost::shared_ptr mqe, uint64_t targetSize); boost::mutex ackLock; // localConnectionId_ is set running Setup() method - uint32_t localConnectionId_ = std::numeric_limits::max(); + uint32_t localConnectionId_ = defaultLocalConnectionId(); std::vector localNetIfaceSins_; std::mutex inMemoryEM2PPExchMutex_; std::condition_variable inMemoryEM2PPExchCV_; diff --git a/utils/common/atomicops.h b/utils/common/atomicops.h index 2f9d84f81..94e3e3201 100644 --- a/utils/common/atomicops.h +++ b/utils/common/atomicops.h @@ -22,7 +22,6 @@ #include #include #include -#include /* This is an attempt to wrap the differneces between Windows and Linux around atomic ops. @@ -93,15 +92,4 @@ inline void atomicYield() sched_yield(); } -// This f assumes decrement is smaller than minuend. -template -inline void atomicSubstitute(typename std::atomic& minuend, T decrement) -{ - T expected = minuend.load(); - do - { - expected = minuend.load(); - } while (!minuend.compare_exchange_weak(expected, expected - decrement)); -} - } // namespace atomicops