1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5044 This patch replaces PriorityThreadPool with FairThreadPool that uses a simple

operations + morsel size weight model to equally allocate CPU b/w parallel query morsels.
This patch delivers better parallel query timings distribution(timings graph resembles normal
distribution with a bigger left side thus more queries runs faster comparing with PrioThreadPool-based
single-node installation).
See changes in batchprimitiveprocessor-jl.h and comments in fair_threadpool.h for
important implementation details
This commit is contained in:
Roman Nozdrin
2022-05-24 17:57:40 +00:00
committed by Roman Nozdrin
parent 0f0b3a2bed
commit fd8ba33f21
12 changed files with 173 additions and 254 deletions

View File

@ -140,6 +140,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor()
, ptMask(0)
, firstInstance(false)
, valuesLBID(0)
, weight_(0)
{
pp.setLogicalBlockMode(true);
pp.setBlockPtr((int*)blockData);
@ -193,6 +194,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
// ptMask(processorThreads - 1),
, firstInstance(true)
, valuesLBID(0)
, weight_(0)
{
// promote processorThreads to next power of 2. also need to change the name to bucketCount or similar
processorThreads = nextPowOf2(processorThreads);
@ -542,6 +544,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con
// skip the header, sessionID, stepID, uniqueID, and priority
bs.advance(sizeof(ISMPacketHeader) + 16);
bs >> weight_;
bs >> dbRoot;
bs >> count;
bs >> ridCount;

View File

@ -137,6 +137,11 @@ class BatchPrimitiveProcessor
fBusy = b;
}
size_t getWeight() const
{
return weight_;
}
uint16_t FilterCount() const
{
return filterCount;
@ -433,6 +438,9 @@ class BatchPrimitiveProcessor
uint ptMask;
bool firstInstance;
uint64_t valuesLBID;
uint32_t weight_;
static const uint64_t maxResultCount = 1048576; // 2^20
friend class Command;
friend class ColumnCommand;

View File

@ -47,7 +47,7 @@
namespace primitiveprocessor
{
class BPPSeeder : public threadpool::PriorityThreadPool::Functor
class BPPSeeder : public threadpool::FairThreadPool::Functor
{
public:
BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads,
@ -71,6 +71,11 @@ class BPPSeeder : public threadpool::PriorityThreadPool::Functor
{
return _priority;
}
size_t getWeight() const
{
assert(bpp);
return bpp->getWeight();
}
private:
BPPSeeder();

View File

@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache();
// FIXME: there is an anon ns burried later in between 2 named namespaces...
namespace primitiveprocessor
{
boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
boost::shared_ptr<threadpool::FairThreadPool> OOBPool;
BlockRequestProcessor** BRPp;
#ifndef _MSC_VER
@ -1050,7 +1050,7 @@ using namespace primitiveprocessor;
/** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
* TODO: Move this & the impl into different files
*/
class DictScanJob : public threadpool::PriorityThreadPool::Functor
class DictScanJob : public threadpool::FairThreadPool::Functor
{
public:
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
@ -1242,7 +1242,7 @@ struct BPPHandler
scoped.unlock();
}
struct BPPHandlerFunctor : public PriorityThreadPool::Functor
struct BPPHandlerFunctor : public FairThreadPool::Functor
{
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
{
@ -1710,7 +1710,7 @@ return 0;
PrimitiveServer* fPrimitiveServerPtr;
};
class DictionaryOp : public PriorityThreadPool::Functor
class DictionaryOp : public FairThreadPool::Functor
{
public:
DictionaryOp(SBS cmd) : bs(cmd)
@ -1947,8 +1947,7 @@ struct ReadThread
void operator()()
{
utils::setThreadName("PPReadThread");
boost::shared_ptr<threadpool::PriorityThreadPool> procPoolPtr =
fPrimitiveServerPtr->getProcessorThreadPool();
threadpool::FairThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool();
SBS bs;
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
@ -2044,35 +2043,69 @@ struct ReadThread
switch (ismHdr->Command)
{
case DICT_CREATE_EQUALITY_FILTER:
{
PriorityThreadPool::Job job;
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new CreateEqualityFilter(bs));
OOBPool->addJob(job);
break;
}
case DICT_DESTROY_EQUALITY_FILTER:
case BATCH_PRIMITIVE_CREATE:
case BATCH_PRIMITIVE_ADD_JOINER:
case BATCH_PRIMITIVE_END_JOINER:
case BATCH_PRIMITIVE_DESTROY:
case BATCH_PRIMITIVE_ABORT:
{
PriorityThreadPool::Job job;
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
job.functor = boost::shared_ptr<PriorityThreadPool::Functor>(new DestroyEqualityFilter(bs));
const uint32_t txnId = *((uint32_t*)&buf[pos + 2]);
const uint32_t stepID = *((uint32_t*)&buf[pos + 6]);
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
const uint32_t weight = 1;
const uint32_t priority = 0;
uint32_t id = 0;
boost::shared_ptr<FairThreadPool::Functor> functor;
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
{
functor.reset(new CreateEqualityFilter(bs));
}
else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER)
{
functor.reset(new DestroyEqualityFilter(bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE)
{
functor.reset(new BPPHandler::Create(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER)
{
functor.reset(new BPPHandler::AddJoiner(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER)
{
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
functor.reset(new BPPHandler::LastJoiner(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY)
{
functor.reset(new BPPHandler::Destroy(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT)
{
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
functor.reset(new BPPHandler::Abort(fBPPHandler, bs));
}
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
OOBPool->addJob(job);
break;
}
case DICT_TOKEN_BY_SCAN_COMPARE:
case BATCH_PRIMITIVE_RUN:
{
idbassert(bs->length() >= sizeof(TokenByScanRequestHeader));
TokenByScanRequestHeader* hdr = (TokenByScanRequestHeader*)ismHdr;
TokenByScanRequestHeader* hdr = nullptr;
boost::shared_ptr<FairThreadPool::Functor> functor;
uint32_t id = 0;
uint32_t weight = 0;
uint32_t priority = 0;
uint32_t txnId = 0;
uint32_t stepID = 0;
uint32_t uniqueID = 0;
bool isSyscat = false;
if (bRotateDest)
{
@ -2090,23 +2123,41 @@ struct ReadThread
}
}
PriorityThreadPool::Job job;
job.functor = boost::shared_ptr<DictScanJob>(new DictScanJob(outIos, bs, writeLock));
job.id = hdr->Hdr.UniqueID;
job.weight = LOGICAL_BLOCK_RIDS;
job.priority = hdr->Hdr.Priority;
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
if (hdr->flags & IS_SYSCAT)
if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE)
{
idbassert(bs->length() >= sizeof(TokenByScanRequestHeader));
hdr = (TokenByScanRequestHeader*)ismHdr;
functor.reset(new DictScanJob(outIos, bs, writeLock));
id = hdr->Hdr.UniqueID;
weight = LOGICAL_BLOCK_RIDS;
priority = hdr->Hdr.Priority;
const uint8_t* buf = bs->buf();
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]);
isSyscat = hdr->flags & IS_SYSCAT;
}
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
{
functor.reset(new BPPSeeder(bs, writeLock, outIos,
fPrimitiveServerPtr->ProcessorThreads(),
fPrimitiveServerPtr->PTTrace()));
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
id = bpps->getID();
priority = bpps->priority();
const uint8_t* buf = bs->buf();
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]);
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
isSyscat = bpps->isSysCat();
}
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
if (isSyscat)
{
// boost::thread t(DictScanJob(outIos, bs, writeLock));
// using already-existing threads may cut latency
// if it's changed back to running in an independent thread
// change the issyscat() checks in BPPSeeder as well
OOBPool->addJob(job);
}
else
@ -2117,146 +2168,11 @@ struct ReadThread
break;
}
case BATCH_PRIMITIVE_RUN:
{
if (bRotateDest)
{
if (!pUmSocketSelector->nextIOSocket(fIos, outIos, writeLock))
{
// If we ever fall into this part of the
// code we have a "bug" of some sort.
// See handleUmSockSelErr() for more info.
// We reset ios and mutex to defaults.
handleUmSockSelErr(string("BPR cmd"));
outIos = outIosDefault;
writeLock = writeLockDefault;
pUmSocketSelector->delConnection(fIos);
bRotateDest = false;
}
}
/* Decide whether this is a syscat call and run
right away instead of queueing */
boost::shared_ptr<BPPSeeder> bpps(new BPPSeeder(bs, writeLock, outIos,
fPrimitiveServerPtr->ProcessorThreads(),
fPrimitiveServerPtr->PTTrace()));
PriorityThreadPool::Job job;
job.functor = bpps;
job.id = bpps->getID();
job.weight = ismHdr->Size;
job.priority = bpps->priority();
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
if (bpps->isSysCat())
{
// boost::thread t(*bpps);
// using already-existing threads may cut latency
// if it's changed back to running in an independent thread
// change the issyscat() checks in BPPSeeder as well
OOBPool->addJob(job);
}
else
{
procPoolPtr->addJob(job);
}
break;
}
case BATCH_PRIMITIVE_CREATE:
{
PriorityThreadPool::Job job;
job.functor =
boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Create(fBPPHandler, bs));
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
OOBPool->addJob(job);
// fBPPHandler->createBPP(*bs);
break;
}
case BATCH_PRIMITIVE_ADD_JOINER:
{
PriorityThreadPool::Job job;
job.functor =
boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::AddJoiner(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
OOBPool->addJob(job);
// fBPPHandler->addJoinerToBPP(*bs);
break;
}
case BATCH_PRIMITIVE_END_JOINER:
{
// lastJoinerMsg can block; must do this in a different thread
// OOBPool->invoke(BPPHandler::LastJoiner(fBPPHandler, bs)); // needs a threadpool that can
// resched boost::thread tmp(BPPHandler::LastJoiner(fBPPHandler, bs));
PriorityThreadPool::Job job;
job.functor =
boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::LastJoiner(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
OOBPool->addJob(job);
break;
}
case BATCH_PRIMITIVE_DESTROY:
{
// OOBPool->invoke(BPPHandler::Destroy(fBPPHandler, bs)); // needs a threadpool that can
// resched boost::thread tmp(BPPHandler::Destroy(fBPPHandler, bs));
PriorityThreadPool::Job job;
job.functor =
boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Destroy(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
OOBPool->addJob(job);
// fBPPHandler->destroyBPP(*bs);
break;
}
case BATCH_PRIMITIVE_ACK:
{
fBPPHandler->doAck(*bs);
break;
}
case BATCH_PRIMITIVE_ABORT:
{
// OBPool->invoke(BPPHandler::Abort(fBPPHandler, bs));
// fBPPHandler->doAbort(*bs);
PriorityThreadPool::Job job;
job.functor =
boost::shared_ptr<PriorityThreadPool::Functor>(new BPPHandler::Abort(fBPPHandler, bs));
job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
job.stepID = *((uint32_t*)&buf[pos + 6]);
job.uniqueID = *((uint32_t*)&buf[pos + 10]);
job.sock = outIos;
OOBPool->addJob(job);
break;
}
default:
{
std::ostringstream os;
@ -2406,12 +2322,12 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
fServerpool.setQueueSize(fServerQueueSize);
fServerpool.setName("PrimitiveServer");
fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0));
fProcessorPool = new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0);
// We're not using either the priority or the job-clustering features, just need a threadpool
// that can reschedule jobs, and an unlimited non-blocking queue
OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
OOBPool.reset(new threadpool::FairThreadPool(1, 5, 0, 0, 1));
asyncCounter = 0;

View File

@ -37,6 +37,7 @@
#include "threadpool.h"
#include "../../utils/threadpool/prioritythreadpool.h"
#include "fair_threadpool.h"
#include "messagequeue.h"
#include "blockrequestprocessor.h"
#include "batchprimitiveprocessor.h"
@ -48,7 +49,7 @@ extern oam::OamCache* oamCache;
namespace primitiveprocessor
{
extern boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
extern boost::shared_ptr<threadpool::FairThreadPool> OOBPool;
extern dbbc::BlockRequestProcessor** BRPp;
extern BRM::DBRM* brm;
extern boost::mutex bppLock;
@ -128,7 +129,7 @@ class PrimitiveServer
/** @brief get a pointer the shared processor thread pool
*/
inline boost::shared_ptr<threadpool::PriorityThreadPool> getProcessorThreadPool() const
inline threadpool::FairThreadPool* getProcessorThreadPool() const
{
return fProcessorPool;
}
@ -165,7 +166,7 @@ class PrimitiveServer
/** @brief the thread pool used to process
* primitive commands
*/
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool;
threadpool::FairThreadPool* fProcessorPool;
int fServerThreads;
int fServerQueueSize;