1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

fix(DEC):MCOL-5805,5808 to resolve UM-only node crash inside DEC when there is no local PP to send the local requests to. (#3349)

This commit is contained in:
drrtuy 2024-11-11 18:29:53 +00:00 committed by GitHub
parent 6f4274760e
commit c6b747b7c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 5 deletions

View File

@ -181,6 +181,17 @@ struct QueueShutdown
namespace joblist
{
bool isDefaultLocalConnectionId(const uint32_t connectionId)
{
return defaultLocalConnectionId() == connectionId;
}
bool needToSendToLocalPM(const bool isExeMgr, const uint32_t connectionId)
{
return isExeMgr && !isDefaultLocalConnectionId(connectionId);
}
DistributedEngineComm* DistributedEngineComm::fInstance = 0;
/*static*/
@ -725,7 +736,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
if (sockIndex == localConnectionId_ && fIsExeMgr)
if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex))
{
sendToLocal = true;
continue;
@ -766,7 +777,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
writeToClient(i, ackCommand);
}
}
if (!pmAcked[localConnectionId_] && fIsExeMgr)
if (needToSendToLocalPM(fIsExeMgr, localConnectionId_) && !pmAcked[localConnectionId_])
{
writeToClient(localConnectionId_, msg);
}
@ -857,8 +868,10 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
}
writeToClient(i, msg);
}
if (fIsExeMgr)
if (needToSendToLocalPM(fIsExeMgr, localConnectionId_))
{
writeToClient(localConnectionId_, msg);
}
}
int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
@ -898,7 +911,7 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
return rc;
}
}
if (fIsExeMgr)
if (needToSendToLocalPM(fIsExeMgr, localConnectionId_))
{
return writeToClient(localConnectionId_, msg);
}

View File

@ -69,6 +69,11 @@ class Config;
*/
namespace joblist
{
constexpr uint32_t defaultLocalConnectionId()
{
return std::numeric_limits<uint32_t>::max();
}
class DECEventListener
{
public:
@ -315,7 +320,7 @@ class DistributedEngineComm
boost::mutex ackLock;
// localConnectionId_ is set running Setup() method
uint32_t localConnectionId_ = std::numeric_limits<uint32_t>::max();
uint32_t localConnectionId_ = defaultLocalConnectionId();
std::vector<struct in_addr> localNetIfaceSins_;
std::mutex inMemoryEM2PPExchMutex_;
std::condition_variable inMemoryEM2PPExchCV_;