mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
MCOL-5499 Enable ControlFlow for same node communication processing path to avoid DEC queue overloading (#2848)
This commit is contained in:
parent
cacbbee1c2
commit
62dc392476
@ -33,7 +33,6 @@
|
||||
#include "serializeable.h"
|
||||
#include "brm.h"
|
||||
#include "jobstep.h"
|
||||
// #include "primitiveprocessor.h"
|
||||
#include "batchprimitiveprocessor-jl.h"
|
||||
#include "command-jl.h"
|
||||
#include "columncommand-jl.h"
|
||||
|
@ -241,10 +241,11 @@ int32_t DistributedEngineComm::Setup()
|
||||
newLocks.clear();
|
||||
|
||||
uint32_t newPmCount = fRm->getPsCount();
|
||||
throttleThreshold = fRm->getDECThrottleThreshold();
|
||||
tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
|
||||
fDECConnectionsPerQuery = fRm->getDECConnectionsPerQuery();
|
||||
unsigned numConnections = getNumConnections();
|
||||
flowControlEnableBytesThresh = fRm->getDECEnableBytesThresh();
|
||||
flowControlDisableBytesThresh = fRm->getDECDisableBytesThresh();
|
||||
oam::Oam oam;
|
||||
ModuleTypeConfig moduletypeconfig;
|
||||
|
||||
@ -282,6 +283,8 @@ int32_t DistributedEngineComm::Setup()
|
||||
if (clientAtTheSameHost(cl))
|
||||
{
|
||||
cl->atTheSameHost(true);
|
||||
assert(connectionId <= std::numeric_limits<uint32_t>::max());
|
||||
localConnectionId_ = connectionId;
|
||||
}
|
||||
std::shared_ptr<std::mutex> nl(new std::mutex());
|
||||
|
||||
@ -433,33 +436,6 @@ Error:
|
||||
os << "DEC: lost connection to " << client->addr2String();
|
||||
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
|
||||
}
|
||||
|
||||
/*
|
||||
// reset the pmconnection vector
|
||||
ClientList tempConns;
|
||||
boost::mutex::scoped_lock onErrLock(fOnErrMutex);
|
||||
string moduleName = client->moduleName();
|
||||
//cout << "moduleName=" << moduleName << endl;
|
||||
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
|
||||
{
|
||||
if (moduleName != fPmConnections[i]->moduleName())
|
||||
tempConns.push_back(fPmConnections[i]);
|
||||
//else
|
||||
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" <<
|
||||
fPmConnections[i]->moduleName() << endl;
|
||||
}
|
||||
|
||||
if (tempConns.size() == fPmConnections.size()) return;
|
||||
|
||||
fPmConnections.swap(tempConns);
|
||||
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
|
||||
//cout << "PMCOUNT=" << pmCount << endl;
|
||||
|
||||
// log it
|
||||
ostringstream os;
|
||||
os << "DEC: lost connection to " << client->addr2String();
|
||||
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
|
||||
*/
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -472,7 +448,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
|
||||
condition* cond = new condition();
|
||||
uint32_t firstPMInterleavedConnectionId =
|
||||
key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size();
|
||||
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId));
|
||||
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh));
|
||||
|
||||
mqe->queue = StepMsgQueue(lock, cond);
|
||||
mqe->sendACKs = sendACKs;
|
||||
@ -540,7 +516,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs)
|
||||
{
|
||||
std::unique_lock lk(ackLock);
|
||||
|
||||
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
|
||||
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
|
||||
setFlowControl(false, key, mqe);
|
||||
|
||||
vector<SBS> v;
|
||||
@ -578,7 +554,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
|
||||
{
|
||||
std::unique_lock lk(ackLock);
|
||||
|
||||
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
|
||||
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
|
||||
setFlowControl(false, key, mqe);
|
||||
|
||||
vector<SBS> v;
|
||||
@ -645,7 +621,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
|
||||
{
|
||||
std::unique_lock lk(ackLock);
|
||||
|
||||
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
|
||||
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
|
||||
setFlowControl(false, key, mqe);
|
||||
|
||||
sendAcks(key, v, mqe, queueSize.size);
|
||||
@ -726,12 +702,6 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
||||
|
||||
msg->advanceInputPtr(sizeof(ISMPacketHeader));
|
||||
// There must be only one local connection here.
|
||||
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
|
||||
for (uint32_t i = 0; i < pmCount; ++i)
|
||||
{
|
||||
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
|
||||
localConnectionId = i;
|
||||
}
|
||||
bool sendToLocal = false;
|
||||
while (l_msgCount > 0)
|
||||
{
|
||||
@ -743,7 +713,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
||||
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
|
||||
idbassert(*toAck <= l_msgCount);
|
||||
l_msgCount -= *toAck;
|
||||
if (sockIndex == localConnectionId)
|
||||
if (sockIndex == localConnectionId_ && fIsExeMgr)
|
||||
{
|
||||
sendToLocal = true;
|
||||
continue;
|
||||
@ -751,15 +721,15 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
||||
pmAcked[sockIndex] = true;
|
||||
writeToClient(sockIndex, msg);
|
||||
}
|
||||
if (sendToLocal && localConnectionId < fPmConnections.size())
|
||||
if (sendToLocal)
|
||||
{
|
||||
pmAcked[localConnectionId] = true;
|
||||
writeToClient(localConnectionId, msg);
|
||||
pmAcked[localConnectionId_] = true;
|
||||
writeToClient(localConnectionId_, msg);
|
||||
}
|
||||
|
||||
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
|
||||
// This is apply to the big message case only. For small messages, the flow control is
|
||||
// disabled when the queue size is below the disableThreshold.
|
||||
// disabled when the queue size is below the flowControlDisableBytesThresh.
|
||||
if (mqe->hasBigMsgs)
|
||||
{
|
||||
uint64_t totalUnackedWork = 0;
|
||||
@ -775,16 +745,16 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
||||
{
|
||||
if (!pmAcked[i])
|
||||
{
|
||||
if (i == localConnectionId)
|
||||
if (i == localConnectionId_ && fIsExeMgr)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
writeToClient(i, msg);
|
||||
}
|
||||
}
|
||||
if (!pmAcked[localConnectionId])
|
||||
if (!pmAcked[localConnectionId_] && fIsExeMgr)
|
||||
{
|
||||
writeToClient(localConnectionId, msg);
|
||||
writeToClient(localConnectionId_, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -863,28 +833,18 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
|
||||
ism->Command = BATCH_PRIMITIVE_ACK;
|
||||
ism->Size = (enabled ? 0 : -1);
|
||||
|
||||
#ifdef VALGRIND
|
||||
/* XXXPAT: For testing in valgrind, init the vars that don't get used */
|
||||
ism->Flags = 0;
|
||||
ism->Type = 0;
|
||||
ism->MsgCount = 0;
|
||||
ism->Status = 0;
|
||||
#endif
|
||||
|
||||
msg->advanceInputPtr(sizeof(ISMPacketHeader));
|
||||
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
|
||||
|
||||
for (uint32_t i = 0; i < mqe->pmCount; ++i)
|
||||
{
|
||||
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
|
||||
if (i == localConnectionId_ && fIsExeMgr)
|
||||
{
|
||||
localConnectionId = i;
|
||||
continue;
|
||||
}
|
||||
writeToClient(i, msg);
|
||||
}
|
||||
if (localConnectionId < fPmConnections.size())
|
||||
writeToClient(localConnectionId, msg);
|
||||
if (fIsExeMgr)
|
||||
writeToClient(localConnectionId_, msg);
|
||||
}
|
||||
|
||||
int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
|
||||
@ -911,23 +871,23 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
|
||||
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
|
||||
entries in the config file point to unique PMs */
|
||||
{
|
||||
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
|
||||
int32_t rc = 0;
|
||||
|
||||
for (uint32_t i = 0; i < pmCount; ++i)
|
||||
{
|
||||
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
|
||||
if (i == localConnectionId_ && fIsExeMgr)
|
||||
{
|
||||
localConnectionId = i;
|
||||
continue;
|
||||
}
|
||||
|
||||
rc =writeToClient(i, msg, senderID);
|
||||
if (rc)
|
||||
if ((rc = writeToClient(i, msg, senderID)))
|
||||
{
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
if (fIsExeMgr)
|
||||
{
|
||||
return writeToClient(localConnectionId_, msg);
|
||||
}
|
||||
if (localConnectionId < fPmConnections.size())
|
||||
rc = writeToClient(localConnectionId, msg);
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -984,12 +944,16 @@ void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueCl
|
||||
}
|
||||
|
||||
void DistributedEngineComm::addDataToOutput(SBS sbs)
|
||||
{
|
||||
assert(localConnectionId_ < pmCount);
|
||||
return addDataToOutput(sbs, localConnectionId_, nullptr);
|
||||
}
|
||||
|
||||
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
|
||||
{
|
||||
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
|
||||
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
|
||||
uint32_t uniqueId = p->UniqueID;
|
||||
boost::shared_ptr<MQE> mqe;
|
||||
|
||||
std::unique_lock lk(fMlock);
|
||||
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
|
||||
|
||||
@ -1001,40 +965,7 @@ void DistributedEngineComm::addDataToOutput(SBS sbs)
|
||||
return;
|
||||
}
|
||||
|
||||
mqe = map_tok->second;
|
||||
lk.unlock();
|
||||
|
||||
if (pmCount > 0)
|
||||
{
|
||||
// I hardcoded the unacked Worker id here. ACK isn't important
|
||||
// for the local exchange b/c there is no need to
|
||||
// enable flowcontrol localy on PM.
|
||||
(void)atomicops::atomicInc(&mqe->unackedWork[0]);
|
||||
}
|
||||
|
||||
[[maybe_unused]] TSQSize_t queueSize = mqe->queue.push(sbs);
|
||||
// There will be no statistics about data transfered
|
||||
// over the memory.
|
||||
}
|
||||
|
||||
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
|
||||
{
|
||||
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
|
||||
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
|
||||
uint32_t uniqueId = p->UniqueID;
|
||||
boost::shared_ptr<MQE> mqe;
|
||||
std::unique_lock lk(fMlock);
|
||||
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
|
||||
|
||||
if (map_tok == fSessionMessages.end())
|
||||
{
|
||||
// For debugging...
|
||||
// cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " <<
|
||||
// uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
mqe = map_tok->second;
|
||||
auto mqe = map_tok->second;
|
||||
lk.unlock();
|
||||
|
||||
if (pmCount > 0)
|
||||
@ -1049,9 +980,9 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
|
||||
std::lock_guard lk(ackLock);
|
||||
uint64_t msgSize = sbs->lengthWithHdrOverhead();
|
||||
|
||||
if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2))
|
||||
doHasBigMsgs(mqe, (300 * 1024 * 1024 > 3 * msgSize ? 300 * 1024 * 1024
|
||||
: 3 * msgSize)); // buffer at least 3 big msgs
|
||||
if (!mqe->throttled && msgSize > (flowControlEnableBytesThresh / 2))
|
||||
doHasBigMsgs(
|
||||
mqe, (bigMessageSize > 3 * msgSize ? bigMessageSize : 3 * msgSize)); // buffer at least 3 big msgs
|
||||
|
||||
if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize)
|
||||
setFlowControl(true, uniqueId, mqe);
|
||||
@ -1271,8 +1202,9 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
|
||||
return empty;
|
||||
}
|
||||
|
||||
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue)
|
||||
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(targetRecvQueueSize)
|
||||
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue,
|
||||
const uint64_t flowControlEnableBytesThresh)
|
||||
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh)
|
||||
{
|
||||
unackedWork.reset(new volatile uint32_t[pmCount]);
|
||||
interleaver.reset(new uint32_t[pmCount]);
|
||||
|
@ -229,7 +229,7 @@ class DistributedEngineComm
|
||||
/* To keep some state associated with the connection. These aren't copyable. */
|
||||
struct MQE : public boost::noncopyable
|
||||
{
|
||||
MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue);
|
||||
MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue, const uint64_t recvQueueSize);
|
||||
uint32_t getNextConnectionId(const size_t pmIndex, const size_t pmConnectionsNumber,
|
||||
const uint32_t DECConnectionsPerQuery);
|
||||
messageqcpp::Stats stats;
|
||||
@ -297,9 +297,9 @@ class DistributedEngineComm
|
||||
bool fIsExeMgr;
|
||||
|
||||
// send-side throttling vars
|
||||
uint64_t throttleThreshold;
|
||||
static const uint32_t targetRecvQueueSize = 50000000;
|
||||
static const uint32_t disableThreshold = 10000000;
|
||||
uint64_t flowControlEnableBytesThresh = 50000000;
|
||||
uint64_t flowControlDisableBytesThresh = 10000000;
|
||||
uint64_t bigMessageSize = 300 * 1024 * 1024;
|
||||
uint32_t tbpsThreadCount;
|
||||
uint32_t fDECConnectionsPerQuery;
|
||||
|
||||
@ -310,6 +310,8 @@ class DistributedEngineComm
|
||||
void doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize);
|
||||
boost::mutex ackLock;
|
||||
|
||||
// localConnectionId_ is set running Setup() method
|
||||
uint32_t localConnectionId_ = std::numeric_limits<uint32_t>::max();
|
||||
std::vector<struct in_addr> localNetIfaceSins_;
|
||||
std::mutex inMemoryEM2PPExchMutex_;
|
||||
std::condition_variable inMemoryEM2PPExchCV_;
|
||||
|
@ -239,8 +239,6 @@ ResourceManager::ResourceManager(bool runningInExeMgr, config::Config* aConfig)
|
||||
fAllowedDiskAggregation =
|
||||
getBoolVal(fRowAggregationStr, "AllowDiskBasedAggregation", defaultAllowDiskAggregation);
|
||||
|
||||
fMaxBPPSendQueue = getUintVal(fPrimitiveServersStr, "MaxBPPSendQueue", defaultMaxBPPSendQueue);
|
||||
|
||||
if (!load_encryption_keys())
|
||||
{
|
||||
Logger log;
|
||||
|
@ -39,8 +39,6 @@
|
||||
|
||||
#include "atomicops.h"
|
||||
|
||||
#define EXPORT
|
||||
|
||||
namespace joblist
|
||||
{
|
||||
// aggfilterstep
|
||||
@ -102,7 +100,10 @@ const uint64_t defaultRowsPerBatch = 10000;
|
||||
/* HJ CP feedback, see bug #1465 */
|
||||
const uint32_t defaultHjCPUniqueLimit = 100;
|
||||
|
||||
const uint64_t defaultDECThrottleThreshold = 200000000; // ~200 MB
|
||||
const constexpr uint64_t defaultFlowControlEnableBytesThresh = 50000000; // ~50Mb
|
||||
const constexpr uint64_t defaultFlowControlDisableBytesThresh = 10000000; // ~10 MB
|
||||
const constexpr uint64_t defaultBPPSendThreadBytesThresh = 250000000; // ~250 MB
|
||||
const constexpr uint64_t BPPSendThreadMsgThresh = 100;
|
||||
|
||||
const bool defaultAllowDiskAggregation = false;
|
||||
|
||||
@ -116,8 +117,8 @@ class ResourceManager
|
||||
/** @brief ctor
|
||||
*
|
||||
*/
|
||||
EXPORT ResourceManager(bool runningInExeMgr = false, config::Config *aConfig = nullptr);
|
||||
static ResourceManager* instance(bool runningInExeMgr = false, config::Config *aConfig = nullptr);
|
||||
ResourceManager(bool runningInExeMgr = false, config::Config* aConfig = nullptr);
|
||||
static ResourceManager* instance(bool runningInExeMgr = false, config::Config* aConfig = nullptr);
|
||||
config::Config* getConfig()
|
||||
{
|
||||
return fConfig;
|
||||
@ -149,7 +150,7 @@ class ResourceManager
|
||||
{
|
||||
return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct);
|
||||
}
|
||||
EXPORT int getEmPriority() const; // FOr Windows only
|
||||
int getEmPriority() const; // FOr Windows only
|
||||
int getEmExecQueueSize() const
|
||||
{
|
||||
return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize);
|
||||
@ -236,7 +237,6 @@ class ResourceManager
|
||||
uint32_t getJlMaxOutstandingRequests() const
|
||||
{
|
||||
return fJlMaxOutstandingRequests;
|
||||
// getUintVal(fJobListStr, "MaxOutstandingRequests", defaultMaxOutstandingRequests);
|
||||
}
|
||||
uint32_t getJlJoinerChunkSize() const
|
||||
{
|
||||
@ -284,35 +284,45 @@ class ResourceManager
|
||||
return getUintVal(fBatchInsertStr, "RowsPerBatch", defaultRowsPerBatch);
|
||||
}
|
||||
|
||||
uint64_t getDECThrottleThreshold() const
|
||||
uint64_t getDECEnableBytesThresh() const
|
||||
{
|
||||
return getUintVal(fJobListStr, "DECThrottleThreshold", defaultDECThrottleThreshold);
|
||||
return getUintVal(FlowControlStr, "DECFlowControlEnableBytesThresh(", defaultFlowControlEnableBytesThresh);
|
||||
}
|
||||
|
||||
uint64_t getMaxBPPSendQueue() const
|
||||
uint32_t getDECDisableBytesThresh() const
|
||||
{
|
||||
return fMaxBPPSendQueue;
|
||||
return getUintVal(FlowControlStr, "DECFlowControlDisableBytesThresh", defaultFlowControlDisableBytesThresh);
|
||||
}
|
||||
|
||||
EXPORT void emServerThreads();
|
||||
EXPORT void emServerQueueSize();
|
||||
EXPORT void emSecondsBetweenMemChecks();
|
||||
EXPORT void emMaxPct();
|
||||
EXPORT void emPriority();
|
||||
EXPORT void emExecQueueSize();
|
||||
uint32_t getBPPSendThreadBytesThresh() const
|
||||
{
|
||||
return getUintVal(FlowControlStr, "BPPSendThreadBytesThresh", defaultBPPSendThreadBytesThresh);
|
||||
}
|
||||
|
||||
EXPORT void hjNumThreads();
|
||||
EXPORT void hjMaxBuckets();
|
||||
EXPORT void hjMaxElems();
|
||||
EXPORT void hjFifoSizeLargeSide();
|
||||
EXPORT void hjPmMaxMemorySmallSide();
|
||||
uint32_t getBPPSendThreadMsgThresh() const
|
||||
{
|
||||
return getUintVal(FlowControlStr, "BPPSendThreadMsgThresh", BPPSendThreadMsgThresh);
|
||||
}
|
||||
|
||||
void emServerThreads();
|
||||
void emServerQueueSize();
|
||||
void emSecondsBetweenMemChecks();
|
||||
void emMaxPct();
|
||||
void emPriority();
|
||||
void emExecQueueSize();
|
||||
|
||||
void hjNumThreads();
|
||||
void hjMaxBuckets();
|
||||
void hjMaxElems();
|
||||
void hjFifoSizeLargeSide();
|
||||
void hjPmMaxMemorySmallSide();
|
||||
|
||||
/* new HJ/Union/Aggregation mem interface, used by TupleBPS */
|
||||
/* sessionLimit is a pointer to the var holding the session-scope limit, should be JobInfo.umMemLimit
|
||||
for the query. */
|
||||
/* Temporary parameter 'patience', will wait for up to 10s to get the memory. */
|
||||
EXPORT bool getMemory(int64_t amount, boost::shared_ptr<int64_t>& sessionLimit, bool patience = true);
|
||||
EXPORT bool getMemory(int64_t amount, bool patience = true);
|
||||
bool getMemory(int64_t amount, boost::shared_ptr<int64_t>& sessionLimit, bool patience = true);
|
||||
bool getMemory(int64_t amount, bool patience = true);
|
||||
inline void returnMemory(int64_t amount)
|
||||
{
|
||||
atomicops::atomicAdd(&totalUmMemLimit, amount);
|
||||
@ -341,14 +351,14 @@ class ResourceManager
|
||||
return fHJUmMaxMemorySmallSideDistributor.getTotalResource();
|
||||
}
|
||||
|
||||
EXPORT void addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
|
||||
void addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
|
||||
|
||||
void removeHJUmMaxSmallSideMap(uint32_t sessionID)
|
||||
{
|
||||
fHJUmMaxMemorySmallSideDistributor.removeSession(sessionID);
|
||||
}
|
||||
|
||||
EXPORT void addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
|
||||
void addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
|
||||
void removeHJPmMaxSmallSideMap(uint32_t sessionID)
|
||||
{
|
||||
fHJPmMaxMemorySmallSideSessionMap.removeSession(sessionID);
|
||||
@ -435,9 +445,9 @@ class ResourceManager
|
||||
return fUseHdfs;
|
||||
}
|
||||
|
||||
EXPORT bool getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const;
|
||||
EXPORT bool queryStatsEnabled() const;
|
||||
EXPORT bool userPriorityEnabled() const;
|
||||
bool getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const;
|
||||
bool queryStatsEnabled() const;
|
||||
bool userPriorityEnabled() const;
|
||||
|
||||
uint64_t getConfiguredUMMemLimit() const
|
||||
{
|
||||
@ -471,12 +481,13 @@ class ResourceManager
|
||||
std::string fExeMgrStr;
|
||||
inline static const std::string fHashJoinStr = "HashJoin";
|
||||
inline static const std::string fJobListStr = "JobList";
|
||||
inline static const std::string FlowControlStr = "FlowControl";
|
||||
|
||||
inline static const std::string fPrimitiveServersStr = "PrimitiveServers";
|
||||
/*static const*/ std::string fSystemConfigStr;
|
||||
inline static const std::string fExtentMapStr = "ExtentMap";
|
||||
/*static const*/ std::string fDMLProcStr;
|
||||
/*static const*/ std::string fBatchInsertStr;
|
||||
inline static const std::string fOrderByLimitStr = "OrderByLimit";
|
||||
inline static const std::string fRowAggregationStr = "RowAggregation";
|
||||
config::Config* fConfig;
|
||||
static ResourceManager* fInstance;
|
||||
@ -509,7 +520,6 @@ class ResourceManager
|
||||
bool fUseHdfs;
|
||||
bool fAllowedDiskAggregation{false};
|
||||
uint64_t fDECConnectionsPerQuery;
|
||||
uint64_t fMaxBPPSendQueue = 250000000;
|
||||
};
|
||||
|
||||
inline std::string ResourceManager::getStringVal(const std::string& section, const std::string& name,
|
||||
|
52
docs/Control Flow.md
Normal file
52
docs/Control Flow.md
Normal file
@ -0,0 +1,52 @@
|
||||
|
||||
Let's begin from 1/3 of a journey, name when TupleBPS starts sending PrimitiveTasks towards PrimProc process - worker. The task usually causes BPP to process a series of records called RowGroup. BPP::execute() has a method called sendResponse() that serializes RowGroup buffer class - RGData into transmitable ByteStream. This ByteStream is to be sent back to ExeMgr facility(coordinator) via either network or a function call(if PP and EM belongs to the same process). The ByteStream goes into DistributedEngineCommunication queue(via either TCP socket reading thread or function call from the prev stage puts it into the DEC queue) that belongs to that particular uniqueID(subquery, small side of a JOIN, a query). TBPS at this point has a reading thread that runs the main loop of TBPS::receiveMultiPrimitiveMessage. TBPS reads a bunch of ByteStreams from the queue with the uniqueID, turns them into RGData buffers and then either puts it into DataList(another queue) that is used b/w JobSteps or preprocesses RGDatas and puts it into DL. After EM query execution pipeline is done the coordinator sends the results to the plugin, or in case of a syscat query whatever facility asks for the results.
|
||||
The bottlenecks and limitations becomes obvious when there is a load or computation resources are scarce, e.g. in containers(see [MCOL-5489](https://jira.mariadb.org/browse/MCOL-5489))
|
||||
- DEC queues are unlimited that hides the fact that TBPS can't handle the workload. This fact also increases the latency and consumes RAM to store ByteStreams that TBPSn JobSteps or MDB can't handle in time b/c there are multiple producer threads in PP and single consumer;
|
||||
- This behavior was uncovered with the patch that forces PP worker to put a Bytestream directly into DEC via method call. Previously PP sends ByteStream over a local loopback.
|
||||
- There is a flow control mechanism that forces PP to collect ByteStreams locally, this flow control also asks for a confirmation for every PrimitiveMsg. This mechanism proves itself ineffective b/c it is basically the same, namely it has an unlimited queue;
|
||||
- DataList queues are limited so DLs back propagate a processing congestion
|
||||
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
PP --> DEC : ByteStream
|
||||
PP --> DEC : ByteStream
|
||||
PP --> DEC : ByteStream
|
||||
DEC --> TBPS
|
||||
TBPS --> PP : PrimitiveMsg
|
||||
TBPS --> PP : PrimitiveMsg
|
||||
TBPS --> PP : PrimitiveMsg
|
||||
TBPS --> TNS : DataList
|
||||
```
|
||||
There is a mechanism called Control Flow that does it best to prevent ExeMgr facility overload. There are number of parameters that modify Control Flow behavior. Here are all of them with their defaults.
|
||||
```xml
|
||||
<ControlFlow>
|
||||
<BPPSendThreadBytesThresh>250000000</BPPSendThreadBytesThresh>
|
||||
<BPPSendThreadMsgThresh>100</BPPSendThreadMsgThresh>
|
||||
<DECFlowControlEnableBytesThresh>50000000</DECFlowControlEnableBytesThresh>
|
||||
<DECFlowControlDisableBytesThresh>10000000</DECFlowControlDisableBytesThresh>
|
||||
</ControlFlow
|
||||
```
|
||||
|
||||
The previous schema had an addition, namely a SendingThread b/w PP and DEC that has a limited ByteStream queue. If a certain DEC queue byte size gets over DECFlowControlEnableBytesThresh DEC signals to PP to send ByteStreams via SendingThread queue only. If DEC queue size in bytes becomes smaller than DECFlowControlDisableBytesThreshs DEC signals PP to stop using SendingThread. If (SendingThread queue size in bytes > BPPSendThreadBytesThresh and queue size > 3) or (SendingThread queue size > BPPSendThreadMsgThresh) PP begins to reschedule(effectively fail before PP starts doing them) PrimitiveMsgs it gets from TBPS[1].
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
PP --> DEC : ByteStream
|
||||
PP --> DEC : ByteStream
|
||||
PP --> DEC : ByteStream
|
||||
PP --> SendingThread : if DEC queue is overloaded
|
||||
SendingThread --> DEC
|
||||
DEC --> TBPS
|
||||
DEC --> PP : Signals that specific DEC queue is overloaded
|
||||
TBPS --> PP : PrimitiveMsg
|
||||
TBPS --> PP : PrimitiveMsg
|
||||
TBPS --> PP : PrimitiveMsg
|
||||
TBPS --> JobStep : DataList
|
||||
```
|
||||
|
||||
|
||||
There is an unlimited queue in DEC and IMHO it is hard to put a limit there so the suggested approach is:
|
||||
- Move control flow into TBPS
|
||||
- There will be a cummulative (bytes size + queue depth) metric.
|
||||
- The metric affects the number of maximum concurrent outstanding PrimitiveMsges, namely the bigger the metric value is the smaller the number of outstanding PrimitiveMsges
|
||||
|
||||
1. Not ideal at all b/c there might be multiple ExeMgrs in a cluster, some of them might be overloaded but SendThread is shared for all of them.
|
@ -33,7 +33,7 @@
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
#include <cstring>
|
||||
//#define NDEBUG
|
||||
// #define NDEBUG
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
@ -629,7 +629,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
|
||||
{
|
||||
uint64_t key;
|
||||
uint32_t value;
|
||||
} * arr;
|
||||
}* arr;
|
||||
#pragma pack(pop)
|
||||
|
||||
/* skip the header */
|
||||
@ -931,7 +931,7 @@ void BatchPrimitiveProcessor::initProcessor()
|
||||
strValues.reset(new utils::NullString[LOGICAL_BLOCK_RIDS]);
|
||||
|
||||
outMsgSize = defaultBufferSize;
|
||||
outputMsg.reset(new(std::align_val_t(MAXCOLUMNWIDTH)) uint8_t[outMsgSize]);
|
||||
outputMsg.reset(new (std::align_val_t(MAXCOLUMNWIDTH)) uint8_t[outMsgSize]);
|
||||
|
||||
if (ot == ROW_GROUP)
|
||||
{
|
||||
@ -1479,8 +1479,8 @@ void BatchPrimitiveProcessor::execute()
|
||||
|
||||
if (!asyncLoaded[p + 1])
|
||||
{
|
||||
loadBlockAsync(col->getLBIDAux(), versionInfo, txnID, 2, &cachedIO, &physIO,
|
||||
LBIDTrace, sessionID, &counterLock, &busyLoaderCount, sendThread, &vssCache);
|
||||
loadBlockAsync(col->getLBIDAux(), versionInfo, txnID, 2, &cachedIO, &physIO, LBIDTrace, sessionID,
|
||||
&counterLock, &busyLoaderCount, sendThread, &vssCache);
|
||||
asyncLoaded[p + 1] = true;
|
||||
}
|
||||
}
|
||||
@ -2169,8 +2169,18 @@ void BatchPrimitiveProcessor::sendResponse()
|
||||
// !sock has a 'same host connection' semantics here.
|
||||
if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock)))
|
||||
{
|
||||
exeMgrDecPtr->addDataToOutput(serialized);
|
||||
serialized.reset();
|
||||
// Flow Control now handles same node connections so the recieving DEC queue
|
||||
// is limited.
|
||||
if (sendThread->flowControlEnabled())
|
||||
{
|
||||
sendThread->sendResult({serialized, nullptr, nullptr, 0}, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
exeMgrDecPtr->addDataToOutput(serialized);
|
||||
serialized.reset();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include <mutex>
|
||||
#include "bppsendthread.h"
|
||||
#include "resourcemanager.h"
|
||||
#include "serviceexemgr.h"
|
||||
|
||||
namespace primitiveprocessor
|
||||
{
|
||||
@ -33,32 +34,9 @@ extern uint32_t connectionsPerUM;
|
||||
extern uint32_t BPPCount;
|
||||
|
||||
BPPSendThread::BPPSendThread()
|
||||
: die(false)
|
||||
, gotException(false)
|
||||
, mainThreadWaiting(false)
|
||||
, sizeThreshold(100)
|
||||
, msgsLeft(-1)
|
||||
, waiting(false)
|
||||
, sawAllConnections(false)
|
||||
, fcEnabled(false)
|
||||
, currentByteSize(0)
|
||||
{
|
||||
maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue();
|
||||
runner = boost::thread(Runner_t(this));
|
||||
}
|
||||
|
||||
BPPSendThread::BPPSendThread(uint32_t initMsgsLeft)
|
||||
: die(false)
|
||||
, gotException(false)
|
||||
, mainThreadWaiting(false)
|
||||
, sizeThreshold(100)
|
||||
, msgsLeft(initMsgsLeft)
|
||||
, waiting(false)
|
||||
, sawAllConnections(false)
|
||||
, fcEnabled(false)
|
||||
, currentByteSize(0)
|
||||
{
|
||||
maxByteSize = joblist::ResourceManager::instance()->getMaxBPPSendQueue();
|
||||
queueBytesThresh = joblist::ResourceManager::instance()->getBPPSendThreadBytesThresh();
|
||||
queueMsgThresh = joblist::ResourceManager::instance()->getBPPSendThreadMsgThresh();
|
||||
runner = boost::thread(Runner_t(this));
|
||||
}
|
||||
|
||||
@ -74,7 +52,7 @@ void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection)
|
||||
if (sizeTooBig())
|
||||
{
|
||||
std::unique_lock<std::mutex> sl1(respondLock);
|
||||
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
|
||||
while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die)
|
||||
{
|
||||
fProcessorPool->incBlockedThreads();
|
||||
okToRespond.wait(sl1);
|
||||
@ -119,7 +97,7 @@ void BPPSendThread::sendResults(const vector<Msg_t>& msgs, bool newConnection)
|
||||
if (sizeTooBig())
|
||||
{
|
||||
std::unique_lock<std::mutex> sl1(respondLock);
|
||||
while (currentByteSize >= maxByteSize && msgQueue.size() > 3 && !die)
|
||||
while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die)
|
||||
{
|
||||
fProcessorPool->incBlockedThreads();
|
||||
okToRespond.wait(sl1);
|
||||
@ -166,7 +144,6 @@ void BPPSendThread::sendMore(int num)
|
||||
{
|
||||
std::unique_lock<std::mutex> sl(ackLock);
|
||||
|
||||
// cout << "got an ACK for " << num << " msgsLeft=" << msgsLeft << endl;
|
||||
if (num == -1)
|
||||
fcEnabled = false;
|
||||
else if (num == 0)
|
||||
@ -256,18 +233,27 @@ void BPPSendThread::mainLoop()
|
||||
|
||||
bsSize = msg[msgsSent].msg->lengthWithHdrOverhead();
|
||||
|
||||
try
|
||||
// Same node processing path
|
||||
if (!sock)
|
||||
{
|
||||
boost::mutex::scoped_lock sl2(*lock);
|
||||
sock->write(*msg[msgsSent].msg);
|
||||
// cout << "sent 1 msg\n";
|
||||
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
|
||||
assert(exeMgrDecPtr);
|
||||
exeMgrDecPtr->addDataToOutput(msg[msgsSent].msg);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
else
|
||||
{
|
||||
sl.lock();
|
||||
exceptionString = e.what();
|
||||
gotException = true;
|
||||
return;
|
||||
try
|
||||
{
|
||||
boost::mutex::scoped_lock sl2(*lock);
|
||||
sock->write(*msg[msgsSent].msg);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
sl.lock();
|
||||
exceptionString = e.what();
|
||||
gotException = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
(void)atomicops::atomicDec(&msgsLeft);
|
||||
@ -275,7 +261,7 @@ void BPPSendThread::mainLoop()
|
||||
msg[msgsSent].msg.reset();
|
||||
}
|
||||
|
||||
if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < maxByteSize)
|
||||
if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < queueBytesThresh)
|
||||
{
|
||||
okToRespond.notify_one();
|
||||
}
|
||||
|
@ -37,8 +37,7 @@ namespace primitiveprocessor
|
||||
class BPPSendThread
|
||||
{
|
||||
public:
|
||||
BPPSendThread(); // starts unthrottled
|
||||
BPPSendThread(uint32_t initMsgsLeft); // starts throttled
|
||||
BPPSendThread(); // starts unthrottled
|
||||
virtual ~BPPSendThread();
|
||||
|
||||
struct Msg_t
|
||||
@ -71,7 +70,8 @@ class BPPSendThread
|
||||
{
|
||||
// keep the queue size below the 100 msg threshold & below the 250MB mark,
|
||||
// but at least 3 msgs so there is always 1 ready to be sent.
|
||||
return ((msgQueue.size() > sizeThreshold) || (currentByteSize >= maxByteSize && msgQueue.size() > 3)) &&
|
||||
return ((msgQueue.size() > queueMsgThresh) ||
|
||||
(currentByteSize >= queueBytesThresh && msgQueue.size() > 3)) &&
|
||||
!die;
|
||||
}
|
||||
|
||||
@ -111,11 +111,13 @@ class BPPSendThread
|
||||
std::queue<Msg_t> msgQueue;
|
||||
std::mutex msgQueueLock;
|
||||
std::condition_variable queueNotEmpty;
|
||||
volatile bool die, gotException, mainThreadWaiting;
|
||||
volatile bool die = false;
|
||||
volatile bool gotException = false;
|
||||
volatile bool mainThreadWaiting = false;
|
||||
std::string exceptionString;
|
||||
uint32_t sizeThreshold;
|
||||
volatile int32_t msgsLeft;
|
||||
bool waiting;
|
||||
uint32_t queueMsgThresh = 0;
|
||||
volatile int32_t msgsLeft = -1;
|
||||
bool waiting = false;
|
||||
std::mutex ackLock;
|
||||
std::condition_variable okToSend;
|
||||
// Condition to prevent run away queue
|
||||
@ -141,12 +143,12 @@ class BPPSendThread
|
||||
};
|
||||
std::set<Connection_t> connections_s;
|
||||
std::vector<Connection_t> connections_v;
|
||||
bool sawAllConnections;
|
||||
volatile bool fcEnabled;
|
||||
bool sawAllConnections = false;
|
||||
volatile bool fcEnabled = false;
|
||||
|
||||
/* secondary queue size restriction based on byte size */
|
||||
volatile uint64_t currentByteSize;
|
||||
uint64_t maxByteSize;
|
||||
volatile uint64_t currentByteSize = 0;
|
||||
uint64_t queueBytesThresh;
|
||||
// Used to tell the ThreadPool It should consider additional threads because a
|
||||
// queue full event has happened and a thread has been blocked.
|
||||
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
|
||||
|
Loading…
x
Reference in New Issue
Block a user