diff --git a/dbcon/joblist/bpp-jl.h b/dbcon/joblist/bpp-jl.h index 4da0d5e1c..ae729cea1 100644 --- a/dbcon/joblist/bpp-jl.h +++ b/dbcon/joblist/bpp-jl.h @@ -33,7 +33,6 @@ #include "serializeable.h" #include "brm.h" #include "jobstep.h" -// #include "primitiveprocessor.h" #include "batchprimitiveprocessor-jl.h" #include "command-jl.h" #include "columncommand-jl.h" diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 1820c6d84..8cc70fe73 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -241,10 +241,11 @@ int32_t DistributedEngineComm::Setup() newLocks.clear(); uint32_t newPmCount = fRm->getPsCount(); - throttleThreshold = fRm->getDECThrottleThreshold(); tbpsThreadCount = fRm->getJlNumScanReceiveThreads(); fDECConnectionsPerQuery = fRm->getDECConnectionsPerQuery(); unsigned numConnections = getNumConnections(); + flowControlEnableBytesThresh = fRm->getDECEnableBytesThresh(); + flowControlDisableBytesThresh = fRm->getDECDisableBytesThresh(); oam::Oam oam; ModuleTypeConfig moduletypeconfig; @@ -282,6 +283,8 @@ int32_t DistributedEngineComm::Setup() if (clientAtTheSameHost(cl)) { cl->atTheSameHost(true); + assert(connectionId <= std::numeric_limits::max()); + localConnectionId_ = connectionId; } std::shared_ptr nl(new std::mutex()); @@ -433,33 +436,6 @@ Error: os << "DEC: lost connection to " << client->addr2String(); writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR); } - - /* - // reset the pmconnection vector - ClientList tempConns; - boost::mutex::scoped_lock onErrLock(fOnErrMutex); - string moduleName = client->moduleName(); - //cout << "moduleName=" << moduleName << endl; - for ( uint32_t i = 0; i < fPmConnections.size(); i++) - { - if (moduleName != fPmConnections[i]->moduleName()) - tempConns.push_back(fPmConnections[i]); - //else - //cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << - fPmConnections[i]->moduleName() << endl; - } - - if (tempConns.size() == fPmConnections.size()) return; - - fPmConnections.swap(tempConns); - pmCount = (pmCount == 0 ? 0 : pmCount - 1); - //cout << "PMCOUNT=" << pmCount << endl; - - // log it - ostringstream os; - os << "DEC: lost connection to " << client->addr2String(); - writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL); - */ } return; } @@ -472,7 +448,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) condition* cond = new condition(); uint32_t firstPMInterleavedConnectionId = key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size(); - boost::shared_ptr mqe(new MQE(pmCount, firstPMInterleavedConnectionId)); + boost::shared_ptr mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh)); mqe->queue = StepMsgQueue(lock, cond); mqe->sendACKs = sendACKs; @@ -540,7 +516,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs) { std::unique_lock lk(ackLock); - if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) + if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh) setFlowControl(false, key, mqe); vector v; @@ -578,7 +554,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key) { std::unique_lock lk(ackLock); - if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) + if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh) setFlowControl(false, key, mqe); vector v; @@ -645,7 +621,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vectorthrottled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold) + if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh) setFlowControl(false, key, mqe); sendAcks(key, v, mqe, queueSize.size); @@ -726,12 +702,6 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, msg->advanceInputPtr(sizeof(ISMPacketHeader)); // There must be only one local connection here. - uint32_t localConnectionId = std::numeric_limits::max(); - for (uint32_t i = 0; i < pmCount; ++i) - { - if (fPmConnections[i]->atTheSameHost() && fIsExeMgr) - localConnectionId = i; - } bool sendToLocal = false; while (l_msgCount > 0) { @@ -743,7 +713,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); idbassert(*toAck <= l_msgCount); l_msgCount -= *toAck; - if (sockIndex == localConnectionId) + if (sockIndex == localConnectionId_ && fIsExeMgr) { sendToLocal = true; continue; @@ -751,15 +721,15 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, pmAcked[sockIndex] = true; writeToClient(sockIndex, msg); } - if (sendToLocal && localConnectionId < fPmConnections.size()) + if (sendToLocal) { - pmAcked[localConnectionId] = true; - writeToClient(localConnectionId, msg); + pmAcked[localConnectionId_] = true; + writeToClient(localConnectionId_, msg); } // @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked. // This is apply to the big message case only. For small messages, the flow control is - // disabled when the queue size is below the disableThreshold. + // disabled when the queue size is below the flowControlDisableBytesThresh. if (mqe->hasBigMsgs) { uint64_t totalUnackedWork = 0; @@ -775,16 +745,16 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { if (!pmAcked[i]) { - if (i == localConnectionId) + if (i == localConnectionId_ && fIsExeMgr) { continue; } writeToClient(i, msg); } } - if (!pmAcked[localConnectionId]) + if (!pmAcked[localConnectionId_] && fIsExeMgr) { - writeToClient(localConnectionId, msg); + writeToClient(localConnectionId_, msg); } } } @@ -863,28 +833,18 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos ism->Command = BATCH_PRIMITIVE_ACK; ism->Size = (enabled ? 0 : -1); -#ifdef VALGRIND - /* XXXPAT: For testing in valgrind, init the vars that don't get used */ - ism->Flags = 0; - ism->Type = 0; - ism->MsgCount = 0; - ism->Status = 0; -#endif - msg->advanceInputPtr(sizeof(ISMPacketHeader)); - uint32_t localConnectionId = std::numeric_limits::max(); for (uint32_t i = 0; i < mqe->pmCount; ++i) { - if (fPmConnections[i]->atTheSameHost() && fIsExeMgr) + if (i == localConnectionId_ && fIsExeMgr) { - localConnectionId = i; continue; } writeToClient(i, msg); } - if (localConnectionId < fPmConnections.size()) - writeToClient(localConnectionId, msg); + if (fIsExeMgr) + writeToClient(localConnectionId_, msg); } int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg) @@ -911,23 +871,23 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg) /* XXXPAT: This relies on the assumption that the first pmCount "PMS*" entries in the config file point to unique PMs */ { - uint32_t localConnectionId = std::numeric_limits::max(); int32_t rc = 0; - for (uint32_t i = 0; i < pmCount; ++i) { - if (fPmConnections[i]->atTheSameHost() && fIsExeMgr) + if (i == localConnectionId_ && fIsExeMgr) { - localConnectionId = i; continue; } - rc =writeToClient(i, msg, senderID); - if (rc) + if ((rc = writeToClient(i, msg, senderID))) + { return rc; + } + } + if (fIsExeMgr) + { + return writeToClient(localConnectionId_, msg); } - if (localConnectionId < fPmConnections.size()) - rc = writeToClient(localConnectionId, msg); return rc; } @@ -984,12 +944,16 @@ void DistributedEngineComm::StartClientListener(boost::shared_ptrbuf()); PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1); uint32_t uniqueId = p->UniqueID; - boost::shared_ptr mqe; - std::unique_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); @@ -1001,40 +965,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs) return; } - mqe = map_tok->second; - lk.unlock(); - - if (pmCount > 0) - { - // I hardcoded the unacked Worker id here. ACK isn't important - // for the local exchange b/c there is no need to - // enable flowcontrol localy on PM. - (void)atomicops::atomicInc(&mqe->unackedWork[0]); - } - - [[maybe_unused]] TSQSize_t queueSize = mqe->queue.push(sbs); - // There will be no statistics about data transfered - // over the memory. -} - -void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats) -{ - ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf()); - PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1); - uint32_t uniqueId = p->UniqueID; - boost::shared_ptr mqe; - std::unique_lock lk(fMlock); - MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); - - if (map_tok == fSessionMessages.end()) - { - // For debugging... - // cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " << - // uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl; - return; - } - - mqe = map_tok->second; + auto mqe = map_tok->second; lk.unlock(); if (pmCount > 0) @@ -1049,9 +980,9 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* std::lock_guard lk(ackLock); uint64_t msgSize = sbs->lengthWithHdrOverhead(); - if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2)) - doHasBigMsgs(mqe, (300 * 1024 * 1024 > 3 * msgSize ? 300 * 1024 * 1024 - : 3 * msgSize)); // buffer at least 3 big msgs + if (!mqe->throttled && msgSize > (flowControlEnableBytesThresh / 2)) + doHasBigMsgs( + mqe, (bigMessageSize > 3 * msgSize ? bigMessageSize : 3 * msgSize)); // buffer at least 3 big msgs if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize) setFlowControl(true, uniqueId, mqe); @@ -1271,8 +1202,9 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID) return empty; } -DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue) - : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(targetRecvQueueSize) +DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue, + const uint64_t flowControlEnableBytesThresh) + : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh) { unackedWork.reset(new volatile uint32_t[pmCount]); interleaver.reset(new uint32_t[pmCount]); diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 6c3d45393..2c34d7077 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -229,7 +229,7 @@ class DistributedEngineComm /* To keep some state associated with the connection. These aren't copyable. */ struct MQE : public boost::noncopyable { - MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue); + MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue, const uint64_t recvQueueSize); uint32_t getNextConnectionId(const size_t pmIndex, const size_t pmConnectionsNumber, const uint32_t DECConnectionsPerQuery); messageqcpp::Stats stats; @@ -297,9 +297,9 @@ class DistributedEngineComm bool fIsExeMgr; // send-side throttling vars - uint64_t throttleThreshold; - static const uint32_t targetRecvQueueSize = 50000000; - static const uint32_t disableThreshold = 10000000; + uint64_t flowControlEnableBytesThresh = 50000000; + uint64_t flowControlDisableBytesThresh = 10000000; + uint64_t bigMessageSize = 300 * 1024 * 1024; uint32_t tbpsThreadCount; uint32_t fDECConnectionsPerQuery; @@ -310,6 +310,8 @@ class DistributedEngineComm void doHasBigMsgs(boost::shared_ptr mqe, uint64_t targetSize); boost::mutex ackLock; + // localConnectionId_ is set running Setup() method + uint32_t localConnectionId_ = std::numeric_limits::max(); std::vector localNetIfaceSins_; std::mutex inMemoryEM2PPExchMutex_; std::condition_variable inMemoryEM2PPExchCV_; diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index 8edaf9c35..e162f1a1f 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -239,8 +239,6 @@ ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig) fAllowedDiskAggregation = getBoolVal(fRowAggregationStr, "AllowDiskBasedAggregation", defaultAllowDiskAggregation); - fMaxBPPSendQueue = getUintVal(fPrimitiveServersStr, "MaxBPPSendQueue", defaultMaxBPPSendQueue); - if (!load_encryption_keys()) { Logger log; diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 959201ae7..733539d3a 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -39,8 +39,6 @@ #include "atomicops.h" -#define EXPORT - namespace joblist { // aggfilterstep @@ -102,7 +100,10 @@ const uint64_t defaultRowsPerBatch = 10000; /* HJ CP feedback, see bug #1465 */ const uint32_t defaultHjCPUniqueLimit = 100; -const uint64_t defaultDECThrottleThreshold = 200000000; // ~200 MB +const constexpr uint64_t defaultFlowControlEnableBytesThresh = 50000000; // ~50Mb +const constexpr uint64_t defaultFlowControlDisableBytesThresh = 10000000; // ~10 MB +const constexpr uint64_t defaultBPPSendThreadBytesThresh = 250000000; // ~250 MB +const constexpr uint64_t BPPSendThreadMsgThresh = 100; const bool defaultAllowDiskAggregation = false; @@ -116,8 +117,8 @@ class ResourceManager /** @brief ctor * */ - EXPORT ResourceManager(bool runningInExeMgr = false, config::Config *aConfig = nullptr); - static ResourceManager* instance(bool runningInExeMgr = false, config::Config *aConfig = nullptr); + ResourceManager(bool runningInExeMgr = false, config::Config* aConfig = nullptr); + static ResourceManager* instance(bool runningInExeMgr = false, config::Config* aConfig = nullptr); config::Config* getConfig() { return fConfig; @@ -149,7 +150,7 @@ class ResourceManager { return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct); } - EXPORT int getEmPriority() const; // FOr Windows only + int getEmPriority() const; // FOr Windows only int getEmExecQueueSize() const { return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); @@ -236,7 +237,6 @@ class ResourceManager uint32_t getJlMaxOutstandingRequests() const { return fJlMaxOutstandingRequests; - // getUintVal(fJobListStr, "MaxOutstandingRequests", defaultMaxOutstandingRequests); } uint32_t getJlJoinerChunkSize() const { @@ -284,35 +284,45 @@ class ResourceManager return getUintVal(fBatchInsertStr, "RowsPerBatch", defaultRowsPerBatch); } - uint64_t getDECThrottleThreshold() const + uint64_t getDECEnableBytesThresh() const { - return getUintVal(fJobListStr, "DECThrottleThreshold", defaultDECThrottleThreshold); + return getUintVal(FlowControlStr, "DECFlowControlEnableBytesThresh(", defaultFlowControlEnableBytesThresh); } - uint64_t getMaxBPPSendQueue() const + uint32_t getDECDisableBytesThresh() const { - return fMaxBPPSendQueue; + return getUintVal(FlowControlStr, "DECFlowControlDisableBytesThresh", defaultFlowControlDisableBytesThresh); } - EXPORT void emServerThreads(); - EXPORT void emServerQueueSize(); - EXPORT void emSecondsBetweenMemChecks(); - EXPORT void emMaxPct(); - EXPORT void emPriority(); - EXPORT void emExecQueueSize(); + uint32_t getBPPSendThreadBytesThresh() const + { + return getUintVal(FlowControlStr, "BPPSendThreadBytesThresh", defaultBPPSendThreadBytesThresh); + } - EXPORT void hjNumThreads(); - EXPORT void hjMaxBuckets(); - EXPORT void hjMaxElems(); - EXPORT void hjFifoSizeLargeSide(); - EXPORT void hjPmMaxMemorySmallSide(); + uint32_t getBPPSendThreadMsgThresh() const + { + return getUintVal(FlowControlStr, "BPPSendThreadMsgThresh", BPPSendThreadMsgThresh); + } + + void emServerThreads(); + void emServerQueueSize(); + void emSecondsBetweenMemChecks(); + void emMaxPct(); + void emPriority(); + void emExecQueueSize(); + + void hjNumThreads(); + void hjMaxBuckets(); + void hjMaxElems(); + void hjFifoSizeLargeSide(); + void hjPmMaxMemorySmallSide(); /* new HJ/Union/Aggregation mem interface, used by TupleBPS */ /* sessionLimit is a pointer to the var holding the session-scope limit, should be JobInfo.umMemLimit for the query. */ /* Temporary parameter 'patience', will wait for up to 10s to get the memory. */ - EXPORT bool getMemory(int64_t amount, boost::shared_ptr& sessionLimit, bool patience = true); - EXPORT bool getMemory(int64_t amount, bool patience = true); + bool getMemory(int64_t amount, boost::shared_ptr& sessionLimit, bool patience = true); + bool getMemory(int64_t amount, bool patience = true); inline void returnMemory(int64_t amount) { atomicops::atomicAdd(&totalUmMemLimit, amount); @@ -341,14 +351,14 @@ class ResourceManager return fHJUmMaxMemorySmallSideDistributor.getTotalResource(); } - EXPORT void addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem); + void addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem); void removeHJUmMaxSmallSideMap(uint32_t sessionID) { fHJUmMaxMemorySmallSideDistributor.removeSession(sessionID); } - EXPORT void addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem); + void addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem); void removeHJPmMaxSmallSideMap(uint32_t sessionID) { fHJPmMaxMemorySmallSideSessionMap.removeSession(sessionID); @@ -435,9 +445,9 @@ class ResourceManager return fUseHdfs; } - EXPORT bool getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const; - EXPORT bool queryStatsEnabled() const; - EXPORT bool userPriorityEnabled() const; + bool getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const; + bool queryStatsEnabled() const; + bool userPriorityEnabled() const; uint64_t getConfiguredUMMemLimit() const { @@ -471,12 +481,13 @@ class ResourceManager std::string fExeMgrStr; inline static const std::string fHashJoinStr = "HashJoin"; inline static const std::string fJobListStr = "JobList"; + inline static const std::string FlowControlStr = "FlowControl"; + inline static const std::string fPrimitiveServersStr = "PrimitiveServers"; /*static const*/ std::string fSystemConfigStr; inline static const std::string fExtentMapStr = "ExtentMap"; /*static const*/ std::string fDMLProcStr; /*static const*/ std::string fBatchInsertStr; - inline static const std::string fOrderByLimitStr = "OrderByLimit"; inline static const std::string fRowAggregationStr = "RowAggregation"; config::Config* fConfig; static ResourceManager* fInstance; @@ -509,7 +520,6 @@ class ResourceManager bool fUseHdfs; bool fAllowedDiskAggregation{false}; uint64_t fDECConnectionsPerQuery; - uint64_t fMaxBPPSendQueue = 250000000; }; inline std::string ResourceManager::getStringVal(const std::string& section, const std::string& name, diff --git a/docs/Control Flow.md b/docs/Control Flow.md new file mode 100644 index 000000000..183e0548f --- /dev/null +++ b/docs/Control Flow.md @@ -0,0 +1,52 @@ + +Let's begin from 1/3 of a journey, name when TupleBPS starts sending PrimitiveTasks towards PrimProc process - worker. The task usually causes BPP to process a series of records called RowGroup. BPP::execute() has a method called sendResponse() that serializes RowGroup buffer class - RGData into transmitable ByteStream. This ByteStream is to be sent back to ExeMgr facility(coordinator) via either network or a function call(if PP and EM belongs to the same process). The ByteStream goes into DistributedEngineCommunication queue(via either TCP socket reading thread or function call from the prev stage puts it into the DEC queue) that belongs to that particular uniqueID(subquery, small side of a JOIN, a query). TBPS at this point has a reading thread that runs the main loop of TBPS::receiveMultiPrimitiveMessage. TBPS reads a bunch of ByteStreams from the queue with the uniqueID, turns them into RGData buffers and then either puts it into DataList(another queue) that is used b/w JobSteps or preprocesses RGDatas and puts it into DL. After EM query execution pipeline is done the coordinator sends the results to the plugin, or in case of a syscat query whatever facility asks for the results. +The bottlenecks and limitations becomes obvious when there is a load or computation resources are scarce, e.g. in containers(see [MCOL-5489](https://jira.mariadb.org/browse/MCOL-5489)) +- DEC queues are unlimited that hides the fact that TBPS can't handle the workload. This fact also increases the latency and consumes RAM to store ByteStreams that TBPSn JobSteps or MDB can't handle in time b/c there are multiple producer threads in PP and single consumer; +- This behavior was uncovered with the patch that forces PP worker to put a Bytestream directly into DEC via method call. Previously PP sends ByteStream over a local loopback. +- There is a flow control mechanism that forces PP to collect ByteStreams locally, this flow control also asks for a confirmation for every PrimitiveMsg. This mechanism proves itself ineffective b/c it is basically the same, namely it has an unlimited queue; +- DataList queues are limited so DLs back propagate a processing congestion + +```mermaid +stateDiagram-v2 + PP --> DEC : ByteStream + PP --> DEC : ByteStream + PP --> DEC : ByteStream + DEC --> TBPS + TBPS --> PP : PrimitiveMsg + TBPS --> PP : PrimitiveMsg + TBPS --> PP : PrimitiveMsg + TBPS --> TNS : DataList +``` +There is a mechanism called Control Flow that does it best to prevent ExeMgr facility overload. There are number of parameters that modify Control Flow behavior. Here are all of them with their defaults. +```xml + + 250000000 + 100 + 50000000 + 10000000 + BPPSendThreadBytesThresh and queue size > 3) or (SendingThread queue size > BPPSendThreadMsgThresh) PP begins to reschedule(effectively fail before PP starts doing them) PrimitiveMsgs it gets from TBPS[1]. +```mermaid +stateDiagram-v2 + PP --> DEC : ByteStream + PP --> DEC : ByteStream + PP --> DEC : ByteStream + PP --> SendingThread : if DEC queue is overloaded + SendingThread --> DEC + DEC --> TBPS + DEC --> PP : Signals that specific DEC queue is overloaded + TBPS --> PP : PrimitiveMsg + TBPS --> PP : PrimitiveMsg + TBPS --> PP : PrimitiveMsg + TBPS --> JobStep : DataList +``` + + +There is an unlimited queue in DEC and IMHO it is hard to put a limit there so the suggested approach is: +- Move control flow into TBPS +- There will be a cummulative (bytes size + queue depth) metric. +- The metric affects the number of maximum concurrent outstanding PrimitiveMsges, namely the bigger the metric value is the smaller the number of outstanding PrimitiveMsges + +1. Not ideal at all b/c there might be multiple ExeMgrs in a cluster, some of them might be overloaded but SendThread is shared for all of them. \ No newline at end of file diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 1badf2c4a..11849a647 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -33,7 +33,7 @@ #include #include #include -//#define NDEBUG +// #define NDEBUG #include #include #include @@ -629,7 +629,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) { uint64_t key; uint32_t value; - } * arr; + }* arr; #pragma pack(pop) /* skip the header */ @@ -931,7 +931,7 @@ void BatchPrimitiveProcessor::initProcessor() strValues.reset(new string[LOGICAL_BLOCK_RIDS]); outMsgSize = defaultBufferSize; - outputMsg.reset(new(std::align_val_t(MAXCOLUMNWIDTH)) uint8_t[outMsgSize]); + outputMsg.reset(new (std::align_val_t(MAXCOLUMNWIDTH)) uint8_t[outMsgSize]); if (ot == ROW_GROUP) { @@ -1479,8 +1479,8 @@ void BatchPrimitiveProcessor::execute() if (!asyncLoaded[p + 1]) { - loadBlockAsync(col->getLBIDAux(), versionInfo, txnID, 2, &cachedIO, &physIO, - LBIDTrace, sessionID, &counterLock, &busyLoaderCount, sendThread, &vssCache); + loadBlockAsync(col->getLBIDAux(), versionInfo, txnID, 2, &cachedIO, &physIO, LBIDTrace, sessionID, + &counterLock, &busyLoaderCount, sendThread, &vssCache); asyncLoaded[p + 1] = true; } } @@ -2169,8 +2169,18 @@ void BatchPrimitiveProcessor::sendResponse() // !sock has a 'same host connection' semantics here. if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock))) { - exeMgrDecPtr->addDataToOutput(serialized); - serialized.reset(); + // Flow Control now handles same node connections so the recieving DEC queue + // is limited. + if (sendThread->flowControlEnabled()) + { + sendThread->sendResult({serialized, nullptr, nullptr, 0}, false); + } + else + { + exeMgrDecPtr->addDataToOutput(serialized); + serialized.reset(); + } + return; } diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index 356342d3b..d444e48b5 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -26,6 +26,7 @@ #include #include "bppsendthread.h" #include "resourcemanager.h" +#include "serviceexemgr.h" namespace primitiveprocessor { @@ -33,32 +34,9 @@ extern uint32_t connectionsPerUM; extern uint32_t BPPCount; BPPSendThread::BPPSendThread() - : die(false) - , gotException(false) - , mainThreadWaiting(false) - , sizeThreshold(100) - , msgsLeft(-1) - , waiting(false) - , sawAllConnections(false) - , fcEnabled(false) - , currentByteSize(0) { - maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue(); - runner = boost::thread(Runner_t(this)); -} - -BPPSendThread::BPPSendThread(uint32_t initMsgsLeft) - : die(false) - , gotException(false) - , mainThreadWaiting(false) - , sizeThreshold(100) - , msgsLeft(initMsgsLeft) - , waiting(false) - , sawAllConnections(false) - , fcEnabled(false) - , currentByteSize(0) -{ - maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue(); + queueBytesThresh = joblist::ResourceManager::instance()->getBPPSendThreadBytesThresh(); + queueMsgThresh = joblist::ResourceManager::instance()->getBPPSendThreadMsgThresh(); runner = boost::thread(Runner_t(this)); } @@ -74,7 +52,7 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) if (sizeTooBig()) { std::unique_lock sl1(respondLock); - while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) + while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die) { fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); @@ -119,7 +97,7 @@ void BPPSendThread::sendResults(const vector& msgs, bool newConnection) if (sizeTooBig()) { std::unique_lock sl1(respondLock); - while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die) + while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die) { fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); @@ -166,7 +144,6 @@ void BPPSendThread::sendMore(int num) { std::unique_lock sl(ackLock); - // cout << "got an ACK for " << num << " msgsLeft=" << msgsLeft << endl; if (num == -1) fcEnabled = false; else if (num == 0) @@ -256,18 +233,27 @@ void BPPSendThread::mainLoop() bsSize = msg[msgsSent].msg->lengthWithHdrOverhead(); - try + // Same node processing path + if (!sock) { - boost::mutex::scoped_lock sl2(*lock); - sock->write(*msg[msgsSent].msg); - // cout << "sent 1 msg\n"; + auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); + assert(exeMgrDecPtr); + exeMgrDecPtr->addDataToOutput(msg[msgsSent].msg); } - catch (std::exception& e) + else { - sl.lock(); - exceptionString = e.what(); - gotException = true; - return; + try + { + boost::mutex::scoped_lock sl2(*lock); + sock->write(*msg[msgsSent].msg); + } + catch (std::exception& e) + { + sl.lock(); + exceptionString = e.what(); + gotException = true; + return; + } } (void)atomicops::atomicDec(&msgsLeft); @@ -275,7 +261,7 @@ void BPPSendThread::mainLoop() msg[msgsSent].msg.reset(); } - if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < maxByteSize) + if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < queueBytesThresh) { okToRespond.notify_one(); } diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index 7de14f881..88f71456c 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -37,8 +37,7 @@ namespace primitiveprocessor class BPPSendThread { public: - BPPSendThread(); // starts unthrottled - BPPSendThread(uint32_t initMsgsLeft); // starts throttled + BPPSendThread(); // starts unthrottled virtual ~BPPSendThread(); struct Msg_t @@ -71,7 +70,8 @@ class BPPSendThread { // keep the queue size below the 100 msg threshold & below the 250MB mark, // but at least 3 msgs so there is always 1 ready to be sent. - return ((msgQueue.size() > sizeThreshold) || (currentByteSize >= maxByteSize && msgQueue.size() > 3)) && + return ((msgQueue.size() > queueMsgThresh) || + (currentByteSize >= queueBytesThresh && msgQueue.size() > 3)) && !die; } @@ -111,11 +111,13 @@ class BPPSendThread std::queue msgQueue; std::mutex msgQueueLock; std::condition_variable queueNotEmpty; - volatile bool die, gotException, mainThreadWaiting; + volatile bool die = false; + volatile bool gotException = false; + volatile bool mainThreadWaiting = false; std::string exceptionString; - uint32_t sizeThreshold; - volatile int32_t msgsLeft; - bool waiting; + uint32_t queueMsgThresh = 0; + volatile int32_t msgsLeft = -1; + bool waiting = false; std::mutex ackLock; std::condition_variable okToSend; // Condition to prevent run away queue @@ -141,12 +143,12 @@ class BPPSendThread }; std::set connections_s; std::vector connections_v; - bool sawAllConnections; - volatile bool fcEnabled; + bool sawAllConnections = false; + volatile bool fcEnabled = false; /* secondary queue size restriction based on byte size */ - volatile uint64_t currentByteSize; - uint64_t maxByteSize; + volatile uint64_t currentByteSize = 0; + uint64_t queueBytesThresh; // Used to tell the ThreadPool It should consider additional threads because a // queue full event has happened and a thread has been blocked. boost::shared_ptr fProcessorPool;