diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 4699042bc..af95dae09 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1274,6 +1274,10 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum) bs << uniqueID; bs << _priority; + // The weight is used by PrimProc thread pool algo + uint32_t weight = calculateBPPWeight(); + bs << weight; + bs << dbRoot; bs << count; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index a249b3102..73a636f2f 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -252,8 +252,27 @@ class BatchPrimitiveProcessorJL } private: - // void setLBIDForScan(uint64_t rid, uint32_t dbroot); + const size_t perColumnProjectWeight_ = 10; + const size_t perColumnFilteringWeight_ = 10; + const size_t fe1Weight_ = 10; + const size_t fe2Weight_ = 10; + const size_t joinWeight_ = 500; + const size_t aggregationWeight_ = 500; + // This is simple SQL operations-based model leveraged by + // FairThreadPool run by PP facility. + // Every operation mentioned in this calculation spends + // some CPU so the morsel uses this op weights more. + uint32_t calculateBPPWeight() const + { + uint32_t weight = perColumnProjectWeight_ * projectCount; + weight += filterCount * perColumnFilteringWeight_; + weight += tJoiners.size() * joinWeight_; + weight += (aggregatorPM) ? aggregationWeight_ : 0; + weight += (fe1) ? fe1Weight_ : 0; + weight += (fe2) ? fe2Weight_ : 0; + return weight; + } BPSOutputType ot; bool needToSetLBID; diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index 204a7ef60..1acadfb27 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -20,7 +20,6 @@ #include "tuplehashjoin.h" #include "joinpartition.h" #include "threadnaming.h" -#include "../../utils/threadpool/prioritythreadpool.h" #pragma once diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 50306b335..5b599e6d9 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -140,6 +140,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() , ptMask(0) , firstInstance(false) , valuesLBID(0) + , weight_(0) { pp.setLogicalBlockMode(true); pp.setBlockPtr((int*)blockData); @@ -193,6 +194,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, // ptMask(processorThreads - 1), , firstInstance(true) , valuesLBID(0) + , weight_(0) { // promote processorThreads to next power of 2. also need to change the name to bucketCount or similar processorThreads = nextPowOf2(processorThreads); @@ -542,6 +544,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con // skip the header, sessionID, stepID, uniqueID, and priority bs.advance(sizeof(ISMPacketHeader) + 16); + bs >> weight_; bs >> dbRoot; bs >> count; bs >> ridCount; diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index d930099a7..1f1f326f3 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -137,6 +137,11 @@ class BatchPrimitiveProcessor fBusy = b; } + size_t getWeight() const + { + return weight_; + } + uint16_t FilterCount() const { return filterCount; @@ -433,6 +438,9 @@ class BatchPrimitiveProcessor uint ptMask; bool firstInstance; uint64_t valuesLBID; + uint32_t weight_; + + static const uint64_t maxResultCount = 1048576; // 2^20 friend class Command; friend class ColumnCommand; diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index 5d56d3559..adda29b6a 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -47,7 +47,7 @@ namespace primitiveprocessor { -class BPPSeeder : public threadpool::PriorityThreadPool::Functor +class BPPSeeder : public threadpool::FairThreadPool::Functor { public: BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads, @@ -71,6 +71,11 @@ class BPPSeeder : public threadpool::PriorityThreadPool::Functor { return _priority; } + size_t getWeight() const + { + assert(bpp); + return bpp->getWeight(); + } private: BPPSeeder(); diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 89c6b70f5..bbefa8e20 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache(); // FIXME: there is an anon ns burried later in between 2 named namespaces... namespace primitiveprocessor { -boost::shared_ptr OOBPool; +boost::shared_ptr OOBPool; BlockRequestProcessor** BRPp; #ifndef _MSC_VER @@ -1050,7 +1050,7 @@ using namespace primitiveprocessor; /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM) * TODO: Move this & the impl into different files */ -class DictScanJob : public threadpool::PriorityThreadPool::Functor +class DictScanJob : public threadpool::FairThreadPool::Functor { public: DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); @@ -1242,7 +1242,7 @@ struct BPPHandler scoped.unlock(); } - struct BPPHandlerFunctor : public PriorityThreadPool::Functor + struct BPPHandlerFunctor : public FairThreadPool::Functor { BPPHandlerFunctor(boost::shared_ptr r, SBS b) : bs(b) { @@ -1710,7 +1710,7 @@ return 0; PrimitiveServer* fPrimitiveServerPtr; }; -class DictionaryOp : public PriorityThreadPool::Functor +class DictionaryOp : public FairThreadPool::Functor { public: DictionaryOp(SBS cmd) : bs(cmd) @@ -1947,8 +1947,7 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - boost::shared_ptr procPoolPtr = - fPrimitiveServerPtr->getProcessorThreadPool(); + threadpool::FairThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2044,35 +2043,69 @@ struct ReadThread switch (ismHdr->Command) { case DICT_CREATE_EQUALITY_FILTER: - { - PriorityThreadPool::Job job; - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - job.functor = boost::shared_ptr(new CreateEqualityFilter(bs)); - OOBPool->addJob(job); - break; - } - case DICT_DESTROY_EQUALITY_FILTER: + case BATCH_PRIMITIVE_CREATE: + case BATCH_PRIMITIVE_ADD_JOINER: + case BATCH_PRIMITIVE_END_JOINER: + case BATCH_PRIMITIVE_DESTROY: + case BATCH_PRIMITIVE_ABORT: { - PriorityThreadPool::Job job; const uint8_t* buf = bs->buf(); uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - job.functor = boost::shared_ptr(new DestroyEqualityFilter(bs)); + const uint32_t txnId = *((uint32_t*)&buf[pos + 2]); + const uint32_t stepID = *((uint32_t*)&buf[pos + 6]); + const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); + const uint32_t weight = 1; + const uint32_t priority = 0; + uint32_t id = 0; + boost::shared_ptr functor; + if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) + { + functor.reset(new CreateEqualityFilter(bs)); + } + else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER) + { + functor.reset(new DestroyEqualityFilter(bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE) + { + functor.reset(new BPPHandler::Create(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER) + { + functor.reset(new BPPHandler::AddJoiner(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER) + { + id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + functor.reset(new BPPHandler::LastJoiner(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY) + { + functor.reset(new BPPHandler::Destroy(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT) + { + id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + functor.reset(new BPPHandler::Abort(fBPPHandler, bs)); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); OOBPool->addJob(job); break; } case DICT_TOKEN_BY_SCAN_COMPARE: + case BATCH_PRIMITIVE_RUN: { - idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); - TokenByScanRequestHeader* hdr = (TokenByScanRequestHeader*)ismHdr; + TokenByScanRequestHeader* hdr = nullptr; + boost::shared_ptr functor; + uint32_t id = 0; + uint32_t weight = 0; + uint32_t priority = 0; + uint32_t txnId = 0; + uint32_t stepID = 0; + uint32_t uniqueID = 0; + bool isSyscat = false; if (bRotateDest) { @@ -2090,23 +2123,41 @@ struct ReadThread } } - PriorityThreadPool::Job job; - job.functor = boost::shared_ptr(new DictScanJob(outIos, bs, writeLock)); - job.id = hdr->Hdr.UniqueID; - job.weight = LOGICAL_BLOCK_RIDS; - job.priority = hdr->Hdr.Priority; - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - - if (hdr->flags & IS_SYSCAT) + if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE) + { + idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); + hdr = (TokenByScanRequestHeader*)ismHdr; + functor.reset(new DictScanJob(outIos, bs, writeLock)); + id = hdr->Hdr.UniqueID; + weight = LOGICAL_BLOCK_RIDS; + priority = hdr->Hdr.Priority; + const uint8_t* buf = bs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + isSyscat = hdr->flags & IS_SYSCAT; + } + else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) + { + functor.reset(new BPPSeeder(bs, writeLock, outIos, + fPrimitiveServerPtr->ProcessorThreads(), + fPrimitiveServerPtr->PTTrace())); + BPPSeeder* bpps = dynamic_cast(functor.get()); + id = bpps->getID(); + priority = bpps->priority(); + const uint8_t* buf = bs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); + isSyscat = bpps->isSysCat(); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + + if (isSyscat) { - // boost::thread t(DictScanJob(outIos, bs, writeLock)); - // using already-existing threads may cut latency - // if it's changed back to running in an independent thread - // change the issyscat() checks in BPPSeeder as well OOBPool->addJob(job); } else @@ -2117,146 +2168,11 @@ struct ReadThread break; } - case BATCH_PRIMITIVE_RUN: - { - if (bRotateDest) - { - if (!pUmSocketSelector->nextIOSocket(fIos, outIos, writeLock)) - { - // If we ever fall into this part of the - // code we have a "bug" of some sort. - // See handleUmSockSelErr() for more info. - // We reset ios and mutex to defaults. - handleUmSockSelErr(string("BPR cmd")); - outIos = outIosDefault; - writeLock = writeLockDefault; - pUmSocketSelector->delConnection(fIos); - bRotateDest = false; - } - } - - /* Decide whether this is a syscat call and run - right away instead of queueing */ - boost::shared_ptr bpps(new BPPSeeder(bs, writeLock, outIos, - fPrimitiveServerPtr->ProcessorThreads(), - fPrimitiveServerPtr->PTTrace())); - PriorityThreadPool::Job job; - job.functor = bpps; - job.id = bpps->getID(); - job.weight = ismHdr->Size; - job.priority = bpps->priority(); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - - if (bpps->isSysCat()) - { - // boost::thread t(*bpps); - // using already-existing threads may cut latency - // if it's changed back to running in an independent thread - // change the issyscat() checks in BPPSeeder as well - OOBPool->addJob(job); - } - else - { - procPoolPtr->addJob(job); - } - - break; - } - - case BATCH_PRIMITIVE_CREATE: - { - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Create(fBPPHandler, bs)); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->createBPP(*bs); - break; - } - - case BATCH_PRIMITIVE_ADD_JOINER: - { - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::AddJoiner(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->addJoinerToBPP(*bs); - break; - } - - case BATCH_PRIMITIVE_END_JOINER: - { - // lastJoinerMsg can block; must do this in a different thread - // OOBPool->invoke(BPPHandler::LastJoiner(fBPPHandler, bs)); // needs a threadpool that can - // resched boost::thread tmp(BPPHandler::LastJoiner(fBPPHandler, bs)); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::LastJoiner(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - break; - } - - case BATCH_PRIMITIVE_DESTROY: - { - // OOBPool->invoke(BPPHandler::Destroy(fBPPHandler, bs)); // needs a threadpool that can - // resched boost::thread tmp(BPPHandler::Destroy(fBPPHandler, bs)); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Destroy(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->destroyBPP(*bs); - break; - } - case BATCH_PRIMITIVE_ACK: { fBPPHandler->doAck(*bs); break; } - - case BATCH_PRIMITIVE_ABORT: - { - // OBPool->invoke(BPPHandler::Abort(fBPPHandler, bs)); - // fBPPHandler->doAbort(*bs); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Abort(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - break; - } - default: { std::ostringstream os; @@ -2406,12 +2322,12 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, - medPriorityThreads, lowPriorityThreads, 0)); + fProcessorPool = new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, + medPriorityThreads, lowPriorityThreads, 0); // We're not using either the priority or the job-clustering features, just need a threadpool // that can reschedule jobs, and an unlimited non-blocking queue - OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); + OOBPool.reset(new threadpool::FairThreadPool(1, 5, 0, 0, 1)); asyncCounter = 0; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index c79c0f807..9f6f60589 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -37,6 +37,7 @@ #include "threadpool.h" #include "../../utils/threadpool/prioritythreadpool.h" +#include "fair_threadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" #include "batchprimitiveprocessor.h" @@ -48,7 +49,7 @@ extern oam::OamCache* oamCache; namespace primitiveprocessor { -extern boost::shared_ptr OOBPool; +extern boost::shared_ptr OOBPool; extern dbbc::BlockRequestProcessor** BRPp; extern BRM::DBRM* brm; extern boost::mutex bppLock; @@ -128,7 +129,7 @@ class PrimitiveServer /** @brief get a pointer the shared processor thread pool */ - inline boost::shared_ptr getProcessorThreadPool() const + inline threadpool::FairThreadPool* getProcessorThreadPool() const { return fProcessorPool; } @@ -165,7 +166,7 @@ class PrimitiveServer /** @brief the thread pool used to process * primitive commands */ - boost::shared_ptr fProcessorPool; + threadpool::FairThreadPool* fProcessorPool; int fServerThreads; int fServerQueueSize; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a24366a69..b6438868f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -51,11 +51,10 @@ if (WITH_UNITTESTS) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_discover_tests(simd_processors TEST_PREFIX columnstore:) - # Comment this out b/c FairThreadPoolRemove segfaults in containers. Moreover this code isn't - # in production yet. - # add_executable(fair_threadpool_test fair_threadpool.cpp) - # target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) - # gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) + add_executable(fair_threadpool_test fair_threadpool.cpp) + add_dependencies(fair_threadpool_test googletest) + target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) + gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) # CPPUNIT TESTS add_executable(we_shared_components_tests shared_components_tests.cpp) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp index b4a215992..c4a299bd1 100644 --- a/tests/fair_threadpool.cpp +++ b/tests/fair_threadpool.cpp @@ -111,12 +111,11 @@ TEST_F(FairThreadPoolTest, FairThreadPoolAdd) while (threadPool->queueSize()) { - usleep(2500000); + usleep(250000); } - usleep(2500000); - EXPECT_EQ(threadPool->queueSize(), 0); - EXPECT_EQ(results.size(), 3); + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); EXPECT_EQ(results[0], 1); EXPECT_EQ(results[1], 3); EXPECT_EQ(results[2], 2); @@ -139,11 +138,11 @@ TEST_F(FairThreadPoolTest, FairThreadPoolRemove) while (threadPool->queueSize()) { - usleep(1500000); + usleep(250000); } - EXPECT_EQ(threadPool->queueSize(), 0); - EXPECT_EQ(results.size(), 2); + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 2ULL); EXPECT_EQ(results[0], 1); EXPECT_EQ(results[1], 3); } @@ -164,11 +163,11 @@ TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) while (threadPool->queueSize()) { - usleep(1500000); + usleep(250000); } - EXPECT_EQ(threadPool->queueSize(), 0); - EXPECT_EQ(results.size(), 3); + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); EXPECT_EQ(results[0], 1); EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); } \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index 631c4ac49..8e2721804 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -34,7 +34,7 @@ namespace threadpool { FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint ID) - : _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true) + : _stop(false), weightPerRun(targetWeightPerRun), id(ID) { boost::thread* newThread; size_t numberOfThreads = highThreads + midThreads + lowThreads; @@ -73,32 +73,20 @@ void FairThreadPool::addJob_(const Job& job, bool useLock) ++threadCounts; } - // If some threads have blocked (because of output queue full) - // Temporarily add some extra worker threads to make up for the blocked threads. - if (blockedThreads > extraThreads) - { - stopExtra = false; - newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA)); - newThread->detach(); - extraThreads++; - } - else if (blockedThreads == 0) - { - // Release the temporary threads -- some threads have become unblocked. - stopExtra = true; - } - auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); - if (jobsListMapIter == txn2JobsListMap_.end()) + if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map { ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; jobsList->push_back(job); txn2JobsListMap_[job.txnIdx_] = jobsList; - WeightT currentTopWeight = weightedTxnsQueue_.empty() ? 0 : weightedTxnsQueue_.top().first; - weightedTxnsQueue_.push({currentTopWeight, job.txnIdx_}); + weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); } - else + else // txn is in the map { + if (jobsListMapIter->second->empty()) // there are no jobs for the txn + { + weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); + } jobsListMapIter->second->push_back(job); } @@ -135,11 +123,8 @@ void FairThreadPool::removeJobs(uint32_t id) void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) { - if (preferredQueue == PriorityThreadPool::Priority::EXTRA) - utils::setThreadName("Extra"); - else - utils::setThreadName("Idle"); - RunListT runList; // This is a vector to allow to grab multiple jobs + utils::setThreadName("Idle"); + RunListT runList(1); // This is a vector to allow to grab multiple jobs RescheduleVecType reschedule; bool running = false; bool rescheduleJob = false; @@ -151,12 +136,6 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue runList.clear(); // remove the job std::unique_lock lk(mutex); - if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra) - { - --extraThreads; - return; - } - if (weightedTxnsQueue_.empty()) { newJob.wait(lk); @@ -166,7 +145,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); // Looking for non-empty jobsList in a loop - // Waiting on cond_var if PQ is empty(no jobs in this thread pool) + // The loop waits on newJob cond_var if PQ is empty(no jobs in this thread pool) while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty()) { // JobList is empty. This can happen when this method pops the last Job. @@ -196,7 +175,6 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue weightedTxnsQueue_.pop(); TransactionIdxT txnIdx = txnAndJobListPair->first; ThreadPoolJobsList* jobsList = txnAndJobListPair->second; - // Job& job = jobsList->front(); runList.push_back(jobsList->front()); jobsList->pop_front(); @@ -210,13 +188,15 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue lk.unlock(); running = true; - rescheduleJob = (*(runList[0].functor_))(); + rescheduleJob = (*(runList[0].functor_))(); // run the functor running = false; utils::setThreadName("Idle"); if (rescheduleJob) { + // to avoid excessive CPU usage waiting for data from storage + usleep(500); lk.lock(); addJob_(runList[0], false); newJob.notify_one(); diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h index f39f865a3..27a183c74 100644 --- a/utils/threadpool/fair_threadpool.h +++ b/utils/threadpool/fair_threadpool.h @@ -39,6 +39,12 @@ namespace threadpool { +// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time +// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight +// stored in PriorityQueue that thread increases before run another morsel for the query. When query is +// done(ThreadPoolJobsList is empty) it is removed from PQ and the Map(txn to ThreadPoolJobsList). +// I tested multiple morsels per one loop iteration in ::threadFcn. This approach reduces CPU consumption +// and increases query timings. class FairThreadPool { public: @@ -92,23 +98,6 @@ class FairThreadPool */ void dump(); - // If a job is blocked, we want to temporarily increase the number of threads managed by the pool - // A problem can occur if all threads are running long or blocked for a single query. Other - // queries won't get serviced, even though there are cpu cycles available. - // These calls are currently protected by respondLock in sendThread(). If you call from other - // places, you need to consider atomicity. - void incBlockedThreads() - { - blockedThreads++; - } - void decBlockedThreads() - { - blockedThreads--; - } - uint32_t blockedThreadCount() const - { - return blockedThreads; - } size_t queueSize() const { return weightedTxnsQueue_.size(); @@ -165,9 +154,6 @@ class FairThreadPool using Txn2ThreadPoolJobsListMap = std::unordered_map; Txn2ThreadPoolJobsListMap txn2JobsListMap_; WeightedTxnPrioQueue weightedTxnsQueue_; - std::atomic blockedThreads; - std::atomic extraThreads; - bool stopExtra; }; } // namespace threadpool \ No newline at end of file