1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

MCOL-5499 Enable ControlFlow for same node communication processing path to avoid DEC queue overloading (#2847)

This commit is contained in:
Roman Nozdrin
2023-06-07 13:42:47 +01:00
committed by GitHub
parent 23a969dbe2
commit 9e1eac448a
9 changed files with 193 additions and 202 deletions

View File

@@ -241,10 +241,11 @@ int32_t DistributedEngineComm::Setup()
newLocks.clear();
uint32_t newPmCount = fRm->getPsCount();
throttleThreshold = fRm->getDECThrottleThreshold();
tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
fDECConnectionsPerQuery = fRm->getDECConnectionsPerQuery();
unsigned numConnections = getNumConnections();
flowControlEnableBytesThresh = fRm->getDECEnableBytesThresh();
flowControlDisableBytesThresh = fRm->getDECDisableBytesThresh();
oam::Oam oam;
ModuleTypeConfig moduletypeconfig;
@@ -282,6 +283,8 @@ int32_t DistributedEngineComm::Setup()
if (clientAtTheSameHost(cl))
{
cl->atTheSameHost(true);
assert(connectionId <= std::numeric_limits<uint32_t>::max());
localConnectionId_ = connectionId;
}
std::shared_ptr<std::mutex> nl(new std::mutex());
@@ -433,33 +436,6 @@ Error:
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
}
/*
// reset the pmconnection vector
ClientList tempConns;
boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
//else
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" <<
fPmConnections[i]->moduleName() << endl;
}
if (tempConns.size() == fPmConnections.size()) return;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
//cout << "PMCOUNT=" << pmCount << endl;
// log it
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
*/
}
return;
}
@@ -472,7 +448,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
condition* cond = new condition();
uint32_t firstPMInterleavedConnectionId =
key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size();
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId));
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh));
mqe->queue = StepMsgQueue(lock, cond);
mqe->sendACKs = sendACKs;
@@ -540,7 +516,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs)
{
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
setFlowControl(false, key, mqe);
vector<SBS> v;
@@ -578,7 +554,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
{
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
setFlowControl(false, key, mqe);
vector<SBS> v;
@@ -645,7 +621,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
{
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
setFlowControl(false, key, mqe);
sendAcks(key, v, mqe, queueSize.size);
@@ -726,12 +702,6 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
msg->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here.
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
for (uint32_t i = 0; i < pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
localConnectionId = i;
}
bool sendToLocal = false;
while (l_msgCount > 0)
{
@@ -743,7 +713,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
if (sockIndex == localConnectionId)
if (sockIndex == localConnectionId_ && fIsExeMgr)
{
sendToLocal = true;
continue;
@@ -751,15 +721,15 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
pmAcked[sockIndex] = true;
writeToClient(sockIndex, msg);
}
if (sendToLocal && localConnectionId < fPmConnections.size())
if (sendToLocal)
{
pmAcked[localConnectionId] = true;
writeToClient(localConnectionId, msg);
pmAcked[localConnectionId_] = true;
writeToClient(localConnectionId_, msg);
}
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
// This is apply to the big message case only. For small messages, the flow control is
// disabled when the queue size is below the disableThreshold.
// disabled when the queue size is below the flowControlDisableBytesThresh.
if (mqe->hasBigMsgs)
{
uint64_t totalUnackedWork = 0;
@@ -775,16 +745,16 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
{
if (!pmAcked[i])
{
if (i == localConnectionId)
if (i == localConnectionId_ && fIsExeMgr)
{
continue;
}
writeToClient(i, msg);
}
}
if (!pmAcked[localConnectionId])
if (!pmAcked[localConnectionId_] && fIsExeMgr)
{
writeToClient(localConnectionId, msg);
writeToClient(localConnectionId_, msg);
}
}
}
@@ -863,28 +833,18 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
ism->Command = BATCH_PRIMITIVE_ACK;
ism->Size = (enabled ? 0 : -1);
#ifdef VALGRIND
/* XXXPAT: For testing in valgrind, init the vars that don't get used */
ism->Flags = 0;
ism->Type = 0;
ism->MsgCount = 0;
ism->Status = 0;
#endif
msg->advanceInputPtr(sizeof(ISMPacketHeader));
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
for (uint32_t i = 0; i < mqe->pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
if (i == localConnectionId_ && fIsExeMgr)
{
localConnectionId = i;
continue;
}
writeToClient(i, msg);
}
if (localConnectionId < fPmConnections.size())
writeToClient(localConnectionId, msg);
if (fIsExeMgr)
writeToClient(localConnectionId_, msg);
}
int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
@@ -911,23 +871,23 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
entries in the config file point to unique PMs */
{
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
int32_t rc = 0;
for (uint32_t i = 0; i < pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
if (i == localConnectionId_ && fIsExeMgr)
{
localConnectionId = i;
continue;
}
rc =writeToClient(i, msg, senderID);
if (rc)
if ((rc = writeToClient(i, msg, senderID)))
{
return rc;
}
}
if (fIsExeMgr)
{
return writeToClient(localConnectionId_, msg);
}
if (localConnectionId < fPmConnections.size())
rc = writeToClient(localConnectionId, msg);
return rc;
}
@@ -984,12 +944,16 @@ void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueCl
}
void DistributedEngineComm::addDataToOutput(SBS sbs)
{
assert(localConnectionId_ < pmCount);
return addDataToOutput(sbs, localConnectionId_, nullptr);
}
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
{
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
uint32_t uniqueId = p->UniqueID;
boost::shared_ptr<MQE> mqe;
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
@@ -1001,40 +965,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs)
return;
}
mqe = map_tok->second;
lk.unlock();
if (pmCount > 0)
{
// I hardcoded the unacked Worker id here. ACK isn't important
// for the local exchange b/c there is no need to
// enable flowcontrol localy on PM.
(void)atomicops::atomicInc(&mqe->unackedWork[0]);
}
[[maybe_unused]] TSQSize_t queueSize = mqe->queue.push(sbs);
// There will be no statistics about data transfered
// over the memory.
}
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
{
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
uint32_t uniqueId = p->UniqueID;
boost::shared_ptr<MQE> mqe;
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
if (map_tok == fSessionMessages.end())
{
// For debugging...
// cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " <<
// uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl;
return;
}
mqe = map_tok->second;
auto mqe = map_tok->second;
lk.unlock();
if (pmCount > 0)
@@ -1049,9 +980,9 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
std::lock_guard lk(ackLock);
uint64_t msgSize = sbs->lengthWithHdrOverhead();
if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2))
doHasBigMsgs(mqe, (300 * 1024 * 1024 > 3 * msgSize ? 300 * 1024 * 1024
: 3 * msgSize)); // buffer at least 3 big msgs
if (!mqe->throttled && msgSize > (flowControlEnableBytesThresh / 2))
doHasBigMsgs(
mqe, (bigMessageSize > 3 * msgSize ? bigMessageSize : 3 * msgSize)); // buffer at least 3 big msgs
if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize)
setFlowControl(true, uniqueId, mqe);
@@ -1271,8 +1202,9 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
return empty;
}
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue)
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(targetRecvQueueSize)
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue,
const uint64_t flowControlEnableBytesThresh)
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh)
{
unackedWork.reset(new volatile uint32_t[pmCount]);
interleaver.reset(new uint32_t[pmCount]);