1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-05 16:15:50 +03:00

fix(DEC):MCOL-5805,5808 to resolve UM-only node crash inside DEC when there is no local PP to send the local requests to. (#3350)

* Revert "fix(DEC): MCOL-5602 fixing potentially endless loop in DEC (#3049)"

This reverts commit 1d416bc6ed.

* fix(DEC):MCOL-5805,5808 to resolve UM-only node crash inside DEC when there is no local PP to send the local requests to.
This commit is contained in:
drrtuy
2024-11-11 18:31:15 +00:00
committed by GitHub
parent 4cba8301d5
commit eaba4d33b4
3 changed files with 102 additions and 78 deletions

View File

@@ -181,6 +181,17 @@ struct QueueShutdown
namespace joblist namespace joblist
{ {
bool isDefaultLocalConnectionId(const uint32_t connectionId)
{
return defaultLocalConnectionId() == connectionId;
}
bool needToSendToLocalPM(const bool isExeMgr, const uint32_t connectionId)
{
return isExeMgr && !isDefaultLocalConnectionId(connectionId);
}
DistributedEngineComm* DistributedEngineComm::fInstance = 0; DistributedEngineComm* DistributedEngineComm::fInstance = 0;
/*static*/ /*static*/
@@ -356,6 +367,8 @@ int32_t DistributedEngineComm::Setup()
fWlock.swap(newLocks); fWlock.swap(newLocks);
fPmConnections.swap(newClients); fPmConnections.swap(newClients);
// memory barrier to prevent the pmCount assignment migrating upward
atomicops::atomicMb();
pmCount = newPmCount; pmCount = newPmCount;
newLocks.clear(); newLocks.clear();
@@ -417,14 +430,14 @@ Error:
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{ {
map_tok->second->queue.clear(); map_tok->second->queue.clear();
++map_tok->second->unackedWork[0]; (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs); map_tok->second->queue.push(sbs);
} }
lk.unlock(); lk.unlock();
if (fIsExeMgr) if (fIsExeMgr)
{ {
const uint32_t originalPMCount = pmCount; decltype(pmCount) originalPMCount = pmCount;
// Re-establish if a remote PM restarted. // Re-establish if a remote PM restarted.
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(3));
auto rc = Setup(); auto rc = Setup();
@@ -453,7 +466,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
mqe->throttled = false; mqe->throttled = false;
std::lock_guard lk(fMlock); std::lock_guard lk(fMlock);
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE>>(key, mqe)).second; b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
if (!b) if (!b)
{ {
@@ -645,7 +658,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
size_t queueSize) size_t queueSize)
{ {
ISMPacketHeader* ism; ISMPacketHeader* ism;
size_t l_msgCount = msgs.size(); uint32_t l_msgCount = msgs.size();
/* If the current queue size > target, do nothing. /* If the current queue size > target, do nothing.
* If the original queue size > target, ACK the msgs below the target. * If the original queue size > target, ACK the msgs below the target.
@@ -653,13 +666,14 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
if (!mqe->throttled || queueSize >= mqe->targetQueueSize) if (!mqe->throttled || queueSize >= mqe->targetQueueSize)
{ {
/* no acks will be sent, but update unackedwork to keep the #s accurate */ /* no acks will be sent, but update unackedwork to keep the #s accurate */
uint16_t numack = 0;
uint32_t sockidx = 0; uint32_t sockidx = 0;
while (l_msgCount > 0) while (l_msgCount > 0)
{ {
size_t numAcked = subsMsgCounterAndRotatePM(mqe, l_msgCount, &sockidx); nextPMToACK(mqe, l_msgCount, &sockidx, &numack);
idbassert(numAcked <= l_msgCount); idbassert(numack <= l_msgCount);
l_msgCount -= numAcked; l_msgCount -= numack;
} }
return; return;
@@ -674,8 +688,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
{ {
/* update unackedwork for the overage that will never be acked */ /* update unackedwork for the overage that will never be acked */
int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize; int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize;
uint16_t numack = 0;
uint32_t sockidx = 0; uint32_t sockidx = 0;
size_t msgsToIgnore; uint32_t msgsToIgnore;
for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++) for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++)
overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead(); overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead();
@@ -687,15 +702,16 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
while (msgsToIgnore > 0) while (msgsToIgnore > 0)
{ {
size_t numAcked = subsMsgCounterAndRotatePM(mqe, msgsToIgnore, &sockidx); nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack);
idbassert(numAcked <= msgsToIgnore); idbassert(numack <= msgsToIgnore);
msgsToIgnore -= numAcked; msgsToIgnore -= numack;
} }
} }
if (l_msgCount > 0) if (l_msgCount > 0)
{ {
SBS msg(new ByteStream(sizeof(ISMPacketHeader))); SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
uint16_t* toAck;
vector<bool> pmAcked(pmCount, false); vector<bool> pmAcked(pmCount, false);
ism = (ISMPacketHeader*)msg->getInputPtr(); ism = (ISMPacketHeader*)msg->getInputPtr();
@@ -705,6 +721,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
ism->Interleave = uniqueID; ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK; ism->Command = BATCH_PRIMITIVE_ACK;
toAck = &ism->Size;
msg->advanceInputPtr(sizeof(ISMPacketHeader)); msg->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here. // There must be only one local connection here.
@@ -716,21 +733,10 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
/* This will reset the ACK field in the Bytestream directly, and nothing /* This will reset the ACK field in the Bytestream directly, and nothing
* else needs to change if multiple msgs are sent. */ * else needs to change if multiple msgs are sent. */
size_t numAcked = subsMsgCounterAndRotatePM(mqe, l_msgCount, &sockIndex); nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
// Potential truncation here from size_t to 16 bits. idbassert(*toAck <= l_msgCount);
ism->Size = static_cast<uint16_t>( l_msgCount -= *toAck;
std::min(static_cast<size_t>(std::numeric_limits<uint16_t>::max()), numAcked)); if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex))
idbassert(numAcked <= l_msgCount);
l_msgCount -= numAcked;
// We get some logs if the above mentioned truncation happens.
if (numAcked > std::numeric_limits<uint16_t>::max())
{
cerr << "DEC::sendAcks(): msgs acked number [" << numAcked
<< "] overflows PrimMsg protocol header field capacity 2^16." << std::endl;
}
if (sockIndex == localConnectionId_ && fIsExeMgr)
{ {
sendToLocal = true; sendToLocal = true;
continue; continue;
@@ -756,7 +762,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
if (totalUnackedWork == 0) if (totalUnackedWork == 0)
{ {
ism->Size = 1; // size of the batch acked *toAck = 1;
for (uint32_t i = 0; i < pmCount; ++i) for (uint32_t i = 0; i < pmCount; ++i)
{ {
@@ -771,7 +777,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
writeToClient(i, ackCommand); writeToClient(i, ackCommand);
} }
} }
if (!pmAcked[localConnectionId_] && fIsExeMgr) if (needToSendToLocalPM(fIsExeMgr, localConnectionId_) && !pmAcked[localConnectionId_])
{ {
writeToClient(localConnectionId_, msg); writeToClient(localConnectionId_, msg);
} }
@@ -780,8 +786,8 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
} }
} }
size_t DistributedEngineComm::subsMsgCounterAndRotatePM(boost::shared_ptr<MQE> mqe, const size_t maxAck, void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck, uint32_t* sockIndex,
uint32_t* sockIndex) uint16_t* numToAck)
{ {
uint32_t& nextIndex = mqe->ackSocketIndex; uint32_t& nextIndex = mqe->ackSocketIndex;
@@ -789,57 +795,56 @@ size_t DistributedEngineComm::subsMsgCounterAndRotatePM(boost::shared_ptr<MQE> m
* the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn * the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
* is a safe minimum at the point of use. */ * is a safe minimum at the point of use. */
if (mqe->unackedWork[nextIndex].load() >= maxAck) if (mqe->unackedWork[nextIndex] >= maxAck)
{ {
atomicops::atomicSubstitute(mqe->unackedWork[nextIndex], maxAck); (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck);
*sockIndex = nextIndex; *sockIndex = nextIndex;
// FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
*numToAck = maxAck;
if (pmCount > 0) if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount; nextIndex = (nextIndex + 1) % pmCount;
return maxAck; return;
} }
else else
{ {
for (int i = pmCount - 1; i >= 0; --i) for (int i = pmCount - 1; i >= 0; --i)
{ {
const size_t curVal = mqe->unackedWork[nextIndex].load(); uint32_t curVal = mqe->unackedWork[nextIndex];
const size_t unackedWork = std::min(curVal, maxAck); uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
if (unackedWork > 0) if (unackedWork > 0)
{ {
atomicops::atomicSubstitute(mqe->unackedWork[nextIndex], unackedWork); (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork);
*sockIndex = nextIndex; *sockIndex = nextIndex;
*numToAck = unackedWork;
if (pmCount > 0) if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount; nextIndex = (nextIndex + 1) % pmCount;
return unackedWork; return;
} }
if (pmCount > 0) if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount; nextIndex = (nextIndex + 1) % pmCount;
} }
cerr << "DEC::subsMsgCounterAndRotatePM(): Couldn't find a PM to ACK! "; cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
for (int i = pmCount - 1; i >= 0; --i) for (int i = pmCount - 1; i >= 0; --i)
cerr << mqe->unackedWork[i] << " "; cerr << mqe->unackedWork[i] << " ";
cerr << " max: " << maxAck << endl; cerr << " max: " << maxAck;
cerr << endl;
// make sure the returned vars are legitimate // make sure the returned vars are legitimate
*sockIndex = nextIndex; *sockIndex = nextIndex;
*numToAck = maxAck / pmCount;
// This will potentially results in invalid pmCount but it's better than crashing on 0 div. if (pmCount > 0)
const uint32_t localPmCount = pmCount.load(); nextIndex = (nextIndex + 1) % pmCount;
if (localPmCount > 0)
{
nextIndex = (nextIndex + 1) % localPmCount;
return maxAck / localPmCount;
}
cerr << "DEC::subsMsgCounterAndRotatePM(): The number of PMs is 0."; return;
return maxAck;
} }
} }
@@ -863,8 +868,10 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
} }
writeToClient(i, msg); writeToClient(i, msg);
} }
if (fIsExeMgr) if (needToSendToLocalPM(fIsExeMgr, localConnectionId_))
{
writeToClient(localConnectionId_, msg); writeToClient(localConnectionId_, msg);
}
} }
int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg) int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
@@ -904,7 +911,7 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
return rc; return rc;
} }
} }
if (fIsExeMgr) if (needToSendToLocalPM(fIsExeMgr, localConnectionId_))
{ {
return writeToClient(localConnectionId_, msg); return writeToClient(localConnectionId_, msg);
} }
@@ -990,7 +997,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
if (pmCount > 0) if (pmCount > 0)
{ {
++mqe->unackedWork[connIndex % pmCount]; (void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
} }
TSQSize_t queueSize = mqe->queue.push(sbs); TSQSize_t queueSize = mqe->queue.push(sbs);
@@ -1112,7 +1119,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{ {
map_tok->second->queue.clear(); map_tok->second->queue.clear();
++map_tok->second->unackedWork[0]; (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs); map_tok->second->queue.push(sbs);
} }
@@ -1140,6 +1147,37 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
// Connection was established. // Connection was established.
return 1; return 1;
/*
// reconfig the connection array
ClientList tempConns;
{
//cout << "WARNING: DEC WRITE BROKEN PIPE " <<
fPmConnections[index]->otherEnd()<< endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string
moduleName = fPmConnections[index]->moduleName();
//cout << "module name = " << moduleName << endl;
if (index >= fPmConnections.size()) return 0;
for (uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
}
if (tempConns.size() == fPmConnections.size()) return 0;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
}
// send alarm
ALARMManager alarmMgr;
string alarmItem("UNKNOWN");
if (index < fPmConnections.size())
{
alarmItem = fPmConnections[index]->addr2String();
}
alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
*/
} }
return 0; return 0;
} }
@@ -1195,15 +1233,9 @@ DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInt
const uint64_t flowControlEnableBytesThresh) const uint64_t flowControlEnableBytesThresh)
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh)
{ {
unackedWork.reset(new volatile uint32_t[pmCount]);
interleaver.reset(new uint32_t[pmCount]); interleaver.reset(new uint32_t[pmCount]);
const uint32_t localPmCount = pmCount; memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t));
AtomicSizeVec s(localPmCount);
for (size_t i = 0; i < localPmCount; ++i)
{
s[i].store(0);
}
unackedWork.swap(s);
uint32_t interleaverValue = initialInterleaverValue; uint32_t interleaverValue = initialInterleaverValue;
initialConnectionId = initialInterleaverValue; initialConnectionId = initialInterleaverValue;
for (size_t pmId = 0; pmId < pmCount; ++pmId) for (size_t pmId = 0; pmId < pmCount; ++pmId)

View File

@@ -69,6 +69,11 @@ class Config;
*/ */
namespace joblist namespace joblist
{ {
constexpr uint32_t defaultLocalConnectionId()
{
return std::numeric_limits<uint32_t>::max();
}
class DECEventListener class DECEventListener
{ {
public: public:
@@ -225,7 +230,6 @@ class DistributedEngineComm
// A queue of ByteStreams coming in from PrimProc heading for a JobStep // A queue of ByteStreams coming in from PrimProc heading for a JobStep
typedef ThreadSafeQueue<messageqcpp::SBS> StepMsgQueue; typedef ThreadSafeQueue<messageqcpp::SBS> StepMsgQueue;
using AtomicSizeVec = std::vector<std::atomic<size_t>>;
// Creates a ByteStream as a command for Primitive Server and initializes it with a given `command`, // Creates a ByteStream as a command for Primitive Server and initializes it with a given `command`,
// `uniqueID` and `size`. // `uniqueID` and `size`.
@@ -240,7 +244,7 @@ class DistributedEngineComm
messageqcpp::Stats stats; messageqcpp::Stats stats;
StepMsgQueue queue; StepMsgQueue queue;
uint32_t ackSocketIndex; uint32_t ackSocketIndex;
AtomicSizeVec unackedWork; boost::scoped_array<volatile uint32_t> unackedWork;
boost::scoped_array<uint32_t> interleaver; boost::scoped_array<uint32_t> interleaver;
uint32_t initialConnectionId; uint32_t initialConnectionId;
uint32_t pmCount; uint32_t pmCount;
@@ -288,7 +292,7 @@ class DistributedEngineComm
std::mutex fMlock; // sessionMessages mutex std::mutex fMlock; // sessionMessages mutex
std::vector<std::shared_ptr<std::mutex>> fWlock; // PrimProc socket write mutexes std::vector<std::shared_ptr<std::mutex>> fWlock; // PrimProc socket write mutexes
bool fBusy; bool fBusy;
std::atomic<uint32_t> pmCount; volatile uint32_t pmCount;
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
boost::mutex fSetupMutex; boost::mutex fSetupMutex;
@@ -310,13 +314,13 @@ class DistributedEngineComm
void sendAcks(uint32_t uniqueID, const std::vector<messageqcpp::SBS>& msgs, boost::shared_ptr<MQE> mqe, void sendAcks(uint32_t uniqueID, const std::vector<messageqcpp::SBS>& msgs, boost::shared_ptr<MQE> mqe,
size_t qSize); size_t qSize);
size_t subsMsgCounterAndRotatePM(boost::shared_ptr<MQE> mqe, const size_t maxAck, uint32_t* sockIndex); void nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck, uint32_t* sockIndex, uint16_t* numToAck);
void setFlowControl(bool enable, uint32_t uniqueID, boost::shared_ptr<MQE> mqe); void setFlowControl(bool enable, uint32_t uniqueID, boost::shared_ptr<MQE> mqe);
void doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize); void doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize);
boost::mutex ackLock; boost::mutex ackLock;
// localConnectionId_ is set running Setup() method // localConnectionId_ is set running Setup() method
uint32_t localConnectionId_ = std::numeric_limits<uint32_t>::max(); uint32_t localConnectionId_ = defaultLocalConnectionId();
std::vector<struct in_addr> localNetIfaceSins_; std::vector<struct in_addr> localNetIfaceSins_;
std::mutex inMemoryEM2PPExchMutex_; std::mutex inMemoryEM2PPExchMutex_;
std::condition_variable inMemoryEM2PPExchCV_; std::condition_variable inMemoryEM2PPExchCV_;

View File

@@ -22,7 +22,6 @@
#include <unistd.h> #include <unistd.h>
#include <stdint.h> #include <stdint.h>
#include <sched.h> #include <sched.h>
#include <atomic>
/* /*
This is an attempt to wrap the differneces between Windows and Linux around atomic ops. This is an attempt to wrap the differneces between Windows and Linux around atomic ops.
@@ -93,15 +92,4 @@ inline void atomicYield()
sched_yield(); sched_yield();
} }
// This f assumes decrement is smaller than minuend.
template <typename T>
inline void atomicSubstitute(typename std::atomic<T>& minuend, T decrement)
{
T expected = minuend.load();
do
{
expected = minuend.load();
} while (!minuend.compare_exchange_weak(expected, expected - decrement));
}
} // namespace atomicops } // namespace atomicops