1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-4694 Add the JobList.DECConnectionsPerQuery setting to allow to

distribute Jobs b/w DEC connections from EM to PPs
This commit is contained in:
Roman Nozdrin
2021-05-11 12:25:11 +00:00
committed by Roman Nozdrin
parent 5155a08a67
commit 736b9b81fc
4 changed files with 62 additions and 18 deletions

View File

@ -243,6 +243,7 @@ void DistributedEngineComm::Setup()
uint32_t newPmCount = fRm->getPsCount();
throttleThreshold = fRm->getDECThrottleThreshold();
tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
fDECConnectionsPerQuery = fRm->getDECConnectionsPerQuery();
unsigned numConnections = getNumConnections();
oam::Oam oam;
ModuleTypeConfig moduletypeconfig;
@ -458,7 +459,8 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
boost::mutex* lock = new boost::mutex();
condition* cond = new condition();
boost::shared_ptr<MQE> mqe(new MQE(pmCount));
uint32_t firstPMInterleavedConnectionId = key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size();
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId));
mqe->queue = StepMsgQueue(lock, cond);
mqe->sendACKs = sendACKs;
@ -972,30 +974,31 @@ void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t ta
mqe->targetQueueSize = targetSize;
}
int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving)
int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs, uint32_t senderUniqueID, bool doInterleaving)
{
boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
MessageQueueMap::iterator it;
// Keep mqe's stats from being freed early
boost::shared_ptr<MQE> mqe;
Stats* senderStats = NULL;
uint32_t interleaver = 0;
if (fPmConnections.size() == 0)
return 0;
if (sender != numeric_limits<uint32_t>::max())
uint32_t connectionId = aPMIndex;
if (senderUniqueID != numeric_limits<uint32_t>::max())
{
lk.lock();
it = fSessionMessages.find(sender);
it = fSessionMessages.find(senderUniqueID);
if (it != fSessionMessages.end())
{
mqe = it->second;
senderStats = &(mqe->stats);
if (doInterleaving)
interleaver = it->second->interleaver[index % it->second->pmCount]++;
size_t pmIndex = aPMIndex % mqe->pmCount;
connectionId = it->second->getNextConnectionId(pmIndex,
fPmConnections.size(),
fDECConnectionsPerQuery);
}
lk.unlock();
@ -1003,14 +1006,11 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
try
{
if (doInterleaving)
index = (index + (interleaver * pmCount)) % fPmConnections.size();
ClientList::value_type client = fPmConnections[index];
ClientList::value_type client = fPmConnections[connectionId];
if (!client->isAvailable()) return 0;
boost::mutex::scoped_lock lk(*(fWlock[index]));
boost::mutex::scoped_lock lk(*(fWlock[connectionId]));
client->write(bs, NULL, senderStats);
return 0;
}
@ -1114,13 +1114,40 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
return empty;
}
DistributedEngineComm::MQE::MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false),
targetQueueSize(targetRecvQueueSize)
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue)
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(targetRecvQueueSize)
{
unackedWork.reset(new volatile uint32_t[pmCount]);
interleaver.reset(new uint32_t[pmCount]);
memset((void*) unackedWork.get(), 0, pmCount * sizeof(uint32_t));
memset((void*) interleaver.get(), 0, pmCount * sizeof(uint32_t));
uint32_t interleaverValue = initialInterleaverValue;
initialConnectionId = initialInterleaverValue;
for (size_t pmId = 0; pmId < pmCount; ++pmId)
{
interleaver.get()[pmId] = interleaverValue++;
}
}
// Here is the assumed connections distribution schema in a pmConnections vector.
// (PM1-PM2...-PMX)-(PM1-PM2..-PMX)...
uint32_t DistributedEngineComm::MQE::getNextConnectionId(const size_t pmIndex,
const size_t pmConnectionsNumber,
const uint32_t DECConnectionsPerQuery)
{
uint32_t nextConnectionId = (interleaver[pmIndex] + pmCount) % pmConnectionsNumber;
// Wrap around the connection id.
if ((nextConnectionId - pmIndex) % DECConnectionsPerQuery == 0)
{
// The sum must be < connections vector size.
nextConnectionId = initialConnectionId + pmIndex;
interleaver[pmIndex] = nextConnectionId;
}
else
{
interleaver[pmIndex] = nextConnectionId;
}
return nextConnectionId;
}
}

View File

@ -217,12 +217,16 @@ private:
/* To keep some state associated with the connection. These aren't copyable. */
struct MQE : public boost::noncopyable
{
MQE(uint32_t pmCount);
MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue);
uint32_t getNextConnectionId(const size_t pmIndex,
const size_t pmConnectionsNumber,
const uint32_t DECConnectionsPerQuery);
messageqcpp::Stats stats;
StepMsgQueue queue;
uint32_t ackSocketIndex;
boost::scoped_array<volatile uint32_t> unackedWork;
boost::scoped_array<uint32_t> interleaver;
uint32_t initialConnectionId;
uint32_t pmCount;
// non-BPP primitives don't do ACKs
bool sendACKs;
@ -286,6 +290,7 @@ private:
static const uint32_t targetRecvQueueSize = 50000000;
static const uint32_t disableThreshold = 10000000;
uint32_t tbpsThreadCount;
uint32_t fDECConnectionsPerQuery;
void sendAcks(uint32_t uniqueID, const std::vector<messageqcpp::SBS>& msgs,
boost::shared_ptr<MQE> mqe, size_t qSize);

View File

@ -152,6 +152,11 @@ ResourceManager::ResourceManager(bool runningInExeMgr) :
if (temp > 0)
fJlNumScanReceiveThreads = temp;
fDECConnectionsPerQuery = getUintVal(fJobListStr, "DECConnectionsPerQuery", 0);
fDECConnectionsPerQuery = (fDECConnectionsPerQuery)
? fDECConnectionsPerQuery
: getPsConnectionsPerPrimProc();
temp = getIntVal(fTupleWSDLStr, "NumThreads", -1);
if (temp > 0)

View File

@ -128,6 +128,7 @@ const uint64_t defaultDECThrottleThreshold = 200000000; // ~200 MB
const uint8_t defaultUseCpimport = 1;
const bool defaultAllowDiskAggregation = false;
/** @brief ResourceManager
* Returns requested values from Config
*
@ -179,11 +180,16 @@ public:
return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize);
}
bool getAllowDiskAggregation() const
bool getAllowDiskAggregation() const
{
return fAllowedDiskAggregation;
}
uint64_t getDECConnectionsPerQuery() const
{
return fDECConnectionsPerQuery;
}
int getHjMaxBuckets() const
{
return getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets);
@ -615,6 +621,7 @@ private:
bool isExeMgr;
bool fUseHdfs;
bool fAllowedDiskAggregation{false};
uint64_t fDECConnectionsPerQuery;
};