diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 483db763e..7b258c4de 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1274,6 +1274,10 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isEx bs << uniqueID; bs << _priority; + // The weight is used by PrimProc thread pool algo + uint32_t weight = calculateBPPWeight(); + bs << weight; + bs << dbRoot; bs << count; uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 5418dc169..b0b9a7ae6 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -252,8 +252,27 @@ class BatchPrimitiveProcessorJL } private: - // void setLBIDForScan(uint64_t rid, uint32_t dbroot); + const size_t perColumnProjectWeight_ = 10; + const size_t perColumnFilteringWeight_ = 10; + const size_t fe1Weight_ = 10; + const size_t fe2Weight_ = 10; + const size_t joinWeight_ = 500; + const size_t aggregationWeight_ = 500; + // This is simple SQL operations-based model leveraged by + // FairThreadPool run by PP facility. + // Every operation mentioned in this calculation spends + // some CPU so the morsel uses this op weights more. + uint32_t calculateBPPWeight() const + { + uint32_t weight = perColumnProjectWeight_ * projectCount; + weight += filterCount * perColumnFilteringWeight_; + weight += tJoiners.size() * joinWeight_; + weight += (aggregatorPM) ? aggregationWeight_ : 0; + weight += (fe1) ? fe1Weight_ : 0; + weight += (fe2) ? fe2Weight_ : 0; + return weight; + } BPSOutputType ot; bool needToSetLBID; diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index 204a7ef60..1acadfb27 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -20,7 +20,6 @@ #include "tuplehashjoin.h" #include "joinpartition.h" #include "threadnaming.h" -#include "../../utils/threadpool/prioritythreadpool.h" #pragma once diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 59f298566..1c800ea84 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -142,6 +142,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() , firstInstance(false) , valuesLBID(0) , initiatedByEM_(false) + , weight_(0) { pp.setLogicalBlockMode(true); pp.setBlockPtr((int*)blockData); @@ -196,6 +197,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, , firstInstance(true) , valuesLBID(0) , initiatedByEM_(false) + , weight_(0) { // promote processorThreads to next power of 2. also need to change the name to bucketCount or similar processorThreads = nextPowOf2(processorThreads); @@ -545,6 +547,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con // skip the header, sessionID, stepID, uniqueID, and priority bs.advance(sizeof(ISMPacketHeader) + 16); + bs >> weight_; bs >> dbRoot; bs >> count; uint8_t u8 = 0; diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index e7cd20a89..292a6cf39 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -135,6 +135,12 @@ class BatchPrimitiveProcessor { fBusy = b; } + + size_t getWeight() const + { + return weight_; + } + uint16_t FilterCount() const { return filterCount; @@ -433,9 +439,9 @@ class BatchPrimitiveProcessor bool firstInstance; uint64_t valuesLBID; bool initiatedByEM_; + uint32_t weight_; static const uint64_t maxResultCount = 1048576; // 2^20 - friend class Command; friend class ColumnCommand; friend class DictStep; diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index 5d56d3559..adda29b6a 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -47,7 +47,7 @@ namespace primitiveprocessor { -class BPPSeeder : public threadpool::PriorityThreadPool::Functor +class BPPSeeder : public threadpool::FairThreadPool::Functor { public: BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads, @@ -71,6 +71,11 @@ class BPPSeeder : public threadpool::PriorityThreadPool::Functor { return _priority; } + size_t getWeight() const + { + assert(bpp); + return bpp->getWeight(); + } private: BPPSeeder(); diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 059dab8dd..271963629 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache(); // FIXME: there is an anon ns burried later in between 2 named namespaces... namespace primitiveprocessor { -boost::shared_ptr OOBPool; +boost::shared_ptr OOBPool; BlockRequestProcessor** BRPp; #ifndef _MSC_VER @@ -1050,7 +1050,7 @@ using namespace primitiveprocessor; /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM) * TODO: Move this & the impl into different files */ -class DictScanJob : public threadpool::PriorityThreadPool::Functor +class DictScanJob : public threadpool::FairThreadPool::Functor { public: DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); @@ -1242,7 +1242,7 @@ struct BPPHandler scoped.unlock(); } - struct BPPHandlerFunctor : public PriorityThreadPool::Functor + struct BPPHandlerFunctor : public FairThreadPool::Functor { BPPHandlerFunctor(boost::shared_ptr r, SBS b) : bs(b) { @@ -1710,7 +1710,7 @@ return 0; PrimitiveServer* fPrimitiveServerPtr; }; -class DictionaryOp : public PriorityThreadPool::Functor +class DictionaryOp : public FairThreadPool::Functor { public: DictionaryOp(SBS cmd) : bs(cmd) @@ -1947,8 +1947,7 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - boost::shared_ptr procPoolPtr = - fPrimitiveServerPtr->getProcessorThreadPool(); + threadpool::FairThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2044,35 +2043,69 @@ struct ReadThread switch (ismHdr->Command) { case DICT_CREATE_EQUALITY_FILTER: - { - PriorityThreadPool::Job job; - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - job.functor = boost::shared_ptr(new CreateEqualityFilter(bs)); - OOBPool->addJob(job); - break; - } - case DICT_DESTROY_EQUALITY_FILTER: + case BATCH_PRIMITIVE_CREATE: + case BATCH_PRIMITIVE_ADD_JOINER: + case BATCH_PRIMITIVE_END_JOINER: + case BATCH_PRIMITIVE_DESTROY: + case BATCH_PRIMITIVE_ABORT: { - PriorityThreadPool::Job job; const uint8_t* buf = bs->buf(); uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - job.functor = boost::shared_ptr(new DestroyEqualityFilter(bs)); + const uint32_t txnId = *((uint32_t*)&buf[pos + 2]); + const uint32_t stepID = *((uint32_t*)&buf[pos + 6]); + const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); + const uint32_t weight = 1; + const uint32_t priority = 0; + uint32_t id = 0; + boost::shared_ptr functor; + if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) + { + functor.reset(new CreateEqualityFilter(bs)); + } + else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER) + { + functor.reset(new DestroyEqualityFilter(bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE) + { + functor.reset(new BPPHandler::Create(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER) + { + functor.reset(new BPPHandler::AddJoiner(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER) + { + id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + functor.reset(new BPPHandler::LastJoiner(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY) + { + functor.reset(new BPPHandler::Destroy(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT) + { + id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + functor.reset(new BPPHandler::Abort(fBPPHandler, bs)); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); OOBPool->addJob(job); break; } case DICT_TOKEN_BY_SCAN_COMPARE: + case BATCH_PRIMITIVE_RUN: { - idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); - TokenByScanRequestHeader* hdr = (TokenByScanRequestHeader*)ismHdr; + TokenByScanRequestHeader* hdr = nullptr; + boost::shared_ptr functor; + uint32_t id = 0; + uint32_t weight = 0; + uint32_t priority = 0; + uint32_t txnId = 0; + uint32_t stepID = 0; + uint32_t uniqueID = 0; + bool isSyscat = false; if (bRotateDest) { @@ -2090,23 +2123,41 @@ struct ReadThread } } - PriorityThreadPool::Job job; - job.functor = boost::shared_ptr(new DictScanJob(outIos, bs, writeLock)); - job.id = hdr->Hdr.UniqueID; - job.weight = LOGICAL_BLOCK_RIDS; - job.priority = hdr->Hdr.Priority; - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - - if (hdr->flags & IS_SYSCAT) + if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE) + { + idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); + hdr = (TokenByScanRequestHeader*)ismHdr; + functor.reset(new DictScanJob(outIos, bs, writeLock)); + id = hdr->Hdr.UniqueID; + weight = LOGICAL_BLOCK_RIDS; + priority = hdr->Hdr.Priority; + const uint8_t* buf = bs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + isSyscat = hdr->flags & IS_SYSCAT; + } + else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) + { + functor.reset(new BPPSeeder(bs, writeLock, outIos, + fPrimitiveServerPtr->ProcessorThreads(), + fPrimitiveServerPtr->PTTrace())); + BPPSeeder* bpps = dynamic_cast(functor.get()); + id = bpps->getID(); + priority = bpps->priority(); + const uint8_t* buf = bs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); + isSyscat = bpps->isSysCat(); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + + if (isSyscat) { - // boost::thread t(DictScanJob(outIos, bs, writeLock)); - // using already-existing threads may cut latency - // if it's changed back to running in an independent thread - // change the issyscat() checks in BPPSeeder as well OOBPool->addJob(job); } else @@ -2117,146 +2168,11 @@ struct ReadThread break; } - case BATCH_PRIMITIVE_RUN: - { - if (bRotateDest) - { - if (!pUmSocketSelector->nextIOSocket(fIos, outIos, writeLock)) - { - // If we ever fall into this part of the - // code we have a "bug" of some sort. - // See handleUmSockSelErr() for more info. - // We reset ios and mutex to defaults. - handleUmSockSelErr(string("BPR cmd")); - outIos = outIosDefault; - writeLock = writeLockDefault; - pUmSocketSelector->delConnection(fIos); - bRotateDest = false; - } - } - - /* Decide whether this is a syscat call and run - right away instead of queueing */ - boost::shared_ptr bpps(new BPPSeeder(bs, writeLock, outIos, - fPrimitiveServerPtr->ProcessorThreads(), - fPrimitiveServerPtr->PTTrace())); - PriorityThreadPool::Job job; - job.functor = bpps; - job.id = bpps->getID(); - job.weight = ismHdr->Size; - job.priority = bpps->priority(); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - - if (bpps->isSysCat()) - { - // boost::thread t(*bpps); - // using already-existing threads may cut latency - // if it's changed back to running in an independent thread - // change the issyscat() checks in BPPSeeder as well - OOBPool->addJob(job); - } - else - { - procPoolPtr->addJob(job); - } - - break; - } - - case BATCH_PRIMITIVE_CREATE: - { - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Create(fBPPHandler, bs)); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->createBPP(*bs); - break; - } - - case BATCH_PRIMITIVE_ADD_JOINER: - { - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::AddJoiner(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->addJoinerToBPP(*bs); - break; - } - - case BATCH_PRIMITIVE_END_JOINER: - { - // lastJoinerMsg can block; must do this in a different thread - // OOBPool->invoke(BPPHandler::LastJoiner(fBPPHandler, bs)); // needs a threadpool that can - // resched boost::thread tmp(BPPHandler::LastJoiner(fBPPHandler, bs)); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::LastJoiner(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - break; - } - - case BATCH_PRIMITIVE_DESTROY: - { - // OOBPool->invoke(BPPHandler::Destroy(fBPPHandler, bs)); // needs a threadpool that can - // resched boost::thread tmp(BPPHandler::Destroy(fBPPHandler, bs)); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Destroy(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->destroyBPP(*bs); - break; - } - case BATCH_PRIMITIVE_ACK: { fBPPHandler->doAck(*bs); break; } - - case BATCH_PRIMITIVE_ABORT: - { - // OBPool->invoke(BPPHandler::Abort(fBPPHandler, bs)); - // fBPPHandler->doAbort(*bs); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Abort(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - break; - } - default: { std::ostringstream os; @@ -2406,12 +2322,12 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, - medPriorityThreads, lowPriorityThreads, 0)); + fProcessorPool = new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, + medPriorityThreads, lowPriorityThreads, 0); // We're not using either the priority or the job-clustering features, just need a threadpool // that can reschedule jobs, and an unlimited non-blocking queue - OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); + OOBPool.reset(new threadpool::FairThreadPool(1, 5, 0, 0, 1)); // Initialize a local pointer. fOOBPool = OOBPool; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 2b1fe2939..3c3817405 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -37,6 +37,7 @@ #include "threadpool.h" #include "../../utils/threadpool/prioritythreadpool.h" +#include "fair_threadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" #include "batchprimitiveprocessor.h" @@ -48,7 +49,7 @@ extern oam::OamCache* oamCache; namespace primitiveprocessor { -extern boost::shared_ptr OOBPool; +extern boost::shared_ptr OOBPool; extern dbbc::BlockRequestProcessor** BRPp; extern BRM::DBRM* brm; extern boost::mutex bppLock; @@ -130,12 +131,12 @@ class PrimitiveServer /** @brief get a pointer the shared processor thread pool */ - inline boost::shared_ptr getProcessorThreadPool() const + inline boost::shared_ptr getProcessorThreadPool() const { return fProcessorPool; } - inline boost::shared_ptr getOOBThreadPool() const + inline boost::shared_ptr getOOBThreadPool() const { return fOOBPool; } @@ -172,8 +173,8 @@ class PrimitiveServer /** @brief the thread pool used to process * primitive commands */ - boost::shared_ptr fProcessorPool; - boost::shared_ptr fOOBPool; + boost::shared_ptr fProcessorPool; + boost::shared_ptr fOOBPool; int fServerThreads; int fServerQueueSize; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4ac31e2c1..aaf40fc4a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,6 +52,11 @@ if (WITH_UNITTESTS) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:) + add_executable(fair_threadpool_test fair_threadpool.cpp) + add_dependencies(fair_threadpool_test googletest) + target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) + gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) + # CPPUNIT TESTS add_executable(we_shared_components_tests shared_components_tests.cpp) add_dependencies(we_shared_components_tests loggingcpp) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp new file mode 100644 index 000000000..c4a299bd1 --- /dev/null +++ b/tests/fair_threadpool.cpp @@ -0,0 +1,173 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include + +#include "utils/threadpool/fair_threadpool.h" + +using namespace primitiveprocessor; +using namespace std; +using namespace threadpool; + +using ResultsType = std::vector; +static ResultsType results; + +class FairThreadPoolTest : public testing::Test { + public: + + void SetUp() override + { + results.clear(); + threadPool = new FairThreadPool(1, 1, 0, 0); + } + + + FairThreadPool* threadPool; +}; + +class TestFunctor: public FairThreadPool::Functor +{ + public: + TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestFunctor() {}; + int operator()() override + { + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; +}; + +class TestRescheduleFunctor: public FairThreadPool::Functor +{ + public: + TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestRescheduleFunctor() {}; + int operator()() override + { + if (firstRun) + { + firstRun = false; + return 1; // re-schedule the Job + } + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; + bool firstRun = true; +}; + +testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b) +{ + if (arr.empty() || arr.size() <= max(idxA, idxB)) + return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; + if (arr[idxA] == a && arr[idxB] == b) + return testing::AssertionSuccess(); + if (arr[idxA] == b && arr[idxB] == a) + return testing::AssertionSuccess(); + return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB + << " are not " << a << " and " << b << std::endl; +} + +TEST_F(FairThreadPoolTest, FairThreadPoolAdd) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 50000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 5000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 5000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); + EXPECT_EQ(results[2], 2); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolRemove) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + threadPool->removeJobs(job2.id_); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 2ULL); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); + EXPECT_EQ(results[0], 1); + EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); +} \ No newline at end of file diff --git a/utils/threadpool/CMakeLists.txt b/utils/threadpool/CMakeLists.txt index 519ba3b5c..dd96304de 100644 --- a/utils/threadpool/CMakeLists.txt +++ b/utils/threadpool/CMakeLists.txt @@ -4,11 +4,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp) - +set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp) add_library(threadpool SHARED ${threadpool_LIB_SRCS}) - add_dependencies(threadpool loggingcpp) target_link_libraries(threadpool Boost::chrono) - -install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) +install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp new file mode 100644 index 000000000..44d9cd3a9 --- /dev/null +++ b/utils/threadpool/fair_threadpool.cpp @@ -0,0 +1,295 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +using namespace std; + +#include "messageobj.h" +#include "messagelog.h" +#include "threadnaming.h" +using namespace logging; + +#include "fair_threadpool.h" +using namespace boost; + +#include "dbcon/joblist/primitivemsg.h" + +namespace threadpool +{ +FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, + uint ID) + : weightPerRun(targetWeightPerRun), id(ID) +{ + boost::thread* newThread; + size_t numberOfThreads = highThreads + midThreads + lowThreads; + for (uint32_t i = 0; i < numberOfThreads; ++i) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + } + cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n"; + threadCounts_.store(numberOfThreads, std::memory_order_relaxed); + defaultThreadCounts = numberOfThreads; +} + +FairThreadPool::~FairThreadPool() +{ + stop(); +} + +void FairThreadPool::addJob(const Job& job) +{ + addJob_(job); +} + +void FairThreadPool::addJob_(const Job& job, bool useLock) +{ + boost::thread* newThread; + std::unique_lock lk(mutex, std::defer_lock_t()); + + // Create any missing threads + if (defaultThreadCounts != threadCounts_.load(std::memory_order_relaxed)) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + threadCounts_.fetch_add(1, std::memory_order_relaxed); + } + + if (useLock) + lk.lock(); + + auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); + if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map + { + ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; + jobsList->push_back(job); + txn2JobsListMap_[job.txnIdx_] = jobsList; + weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); + } + else // txn is in the map + { + if (jobsListMapIter->second->empty()) // there are no jobs for the txn + { + weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); + } + jobsListMapIter->second->push_back(job); + } + + if (useLock) + newJob.notify_one(); +} + +void FairThreadPool::removeJobs(uint32_t id) +{ + std::unique_lock lk(mutex); + + for (auto& txnJobsMapPair : txn2JobsListMap_) + { + ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; + auto job = txnJobsList->begin(); + while (job != txnJobsList->end()) + { + if (job->id_ == id) + { + job = txnJobsList->erase(job); // update the job iter + if (txnJobsList->empty()) + { + txn2JobsListMap_.erase(txnJobsMapPair.first); + delete txnJobsList; + break; + // There is no clean-up for PQ. It will happen later in threadFcn + } + continue; // go-on skiping job iter increment + } + ++job; + } + } +} + +void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) +{ + utils::setThreadName("Idle"); + RunListT runList(1); // This is a vector to allow to grab multiple jobs + RescheduleVecType reschedule; + bool running = false; + bool rescheduleJob = false; + + try + { + while (!stop_.load(std::memory_order_relaxed)) + { + runList.clear(); // remove the job + std::unique_lock lk(mutex); + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); + continue; // just go on w/o re-taking the lock + } + + WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); + auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + // Looking for non-empty jobsList in a loop + // The loop waits on newJob cond_var if PQ is empty(no jobs in this thread pool) + while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty()) + { + // JobList is empty. This can happen when this method pops the last Job. + if (txnAndJobListPair != txn2JobsListMap_.end()) + { + ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second; + delete txnJobsList; + txn2JobsListMap_.erase(txnAndJobListPair->first); + } + weightedTxnsQueue_.pop(); + if (weightedTxnsQueue_.empty()) // remove the empty + { + break; + } + weightedTxn = weightedTxnsQueue_.top(); + txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + } + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); // might need a lock here + continue; + } + + // We have non-empty jobsList at this point. + // Remove the txn from a queue first to add it later + weightedTxnsQueue_.pop(); + TransactionIdxT txnIdx = txnAndJobListPair->first; + ThreadPoolJobsList* jobsList = txnAndJobListPair->second; + runList.push_back(jobsList->front()); + + jobsList->pop_front(); + // Add the jobList back into the PQ adding some weight to it + // Current algo doesn't reduce total txn weight if the job is rescheduled. + if (!jobsList->empty()) + { + weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx}); + } + + lk.unlock(); + + running = true; + jobsRunning_.fetch_add(1, std::memory_order_relaxed); + rescheduleJob = (*(runList[0].functor_))(); // run the functor + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + running = false; + + utils::setThreadName("Idle"); + + if (rescheduleJob) + { + // to avoid excessive CPU usage waiting for data from storage + usleep(500); + lk.lock(); + addJob_(runList[0], false); + newJob.notify_one(); + lk.unlock(); + } + } + } + catch (std::exception& ex) + { + if (running) + { + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + } + // Log the exception and exit this thread + try + { + threadCounts_.fetch_sub(1, std::memory_order_relaxed); +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("threadFcn: Caught exception: "); + args.add(ex.what()); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } + catch (...) + { + // Log the exception and exit this thread + try + { + if (running) + { + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + } + threadCounts_.fetch_sub(1, std::memory_order_relaxed); + ; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(6); + args.add("threadFcn: Caught unknown exception!"); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } +} + +void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) +{ + ISMPacketHeader ism; + PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; + + ism.Status = logging::primitiveServerErr; + ph.UniqueID = id; + ph.StepID = step; + messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + msg.append((uint8_t*)&ism, sizeof(ism)); + msg.append((uint8_t*)&ph, sizeof(ph)); + + sock->write(msg); +} + +void FairThreadPool::stop() +{ + stop_.store(true, std::memory_order_relaxed); +} + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h new file mode 100644 index 000000000..ab3afaa71 --- /dev/null +++ b/utils/threadpool/fair_threadpool.h @@ -0,0 +1,165 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "primitives/primproc/umsocketselector.h" +#include "prioritythreadpool.h" + +namespace threadpool +{ +// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time +// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight +// stored in PriorityQueue that thread increases before run another morsel for the query. When query is +// done(ThreadPoolJobsList is empty) it is removed from PQ and the Map(txn to ThreadPoolJobsList). +// I tested multiple morsels per one loop iteration in ::threadFcn. This approach reduces CPU consumption +// and increases query timings. +class FairThreadPool +{ + public: + using Functor = PriorityThreadPool::Functor; + + using TransactionIdxT = uint32_t; + struct Job + { + Job() : weight_(1), priority_(0), id_(0) + { + } + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, + const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) + : uniqueID_(uniqueID) + , stepID_(stepID) + , txnIdx_(txnIdx) + , functor_(functor) + , sock_(sock) + , weight_(weight) + , priority_(priority) + , id_(id) + { + } + uint32_t uniqueID_; + uint32_t stepID_; + TransactionIdxT txnIdx_; + boost::shared_ptr functor_; + primitiveprocessor::SP_UM_IOSOCK sock_; + uint32_t weight_; + uint32_t priority_; + uint32_t id_; + }; + + /********************************************* + * ctor/dtor + * + *********************************************/ + + /** @brief ctor + */ + + FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0); + virtual ~FairThreadPool(); + + void removeJobs(uint32_t id); + void addJob(const Job& job); + void stop(); + + /** @brief for use in debugging + */ + void dump(); + + size_t queueSize() const + { + return weightedTxnsQueue_.size(); + } + // This method enables a pool current workload estimate. + size_t jobsRunning() const + { + return jobsRunning_.load(std::memory_order_relaxed); + } + + protected: + private: + struct ThreadHelper + { + ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue) + { + } + void operator()() + { + ptp->threadFcn(preferredQueue); + } + FairThreadPool* ptp; + PriorityThreadPool::Priority preferredQueue; + }; + + explicit FairThreadPool(); + explicit FairThreadPool(const FairThreadPool&); + FairThreadPool& operator=(const FairThreadPool&); + + void addJob_(const Job& job, bool useLock = true); + void threadFcn(const PriorityThreadPool::Priority preferredQueue); + void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); + + uint32_t defaultThreadCounts; + std::mutex mutex; + std::condition_variable newJob; + boost::thread_group threads; + uint32_t weightPerRun; + volatile uint id; // prevent it from being optimized out + + using WeightT = uint32_t; + using WeightedTxnT = std::pair; + using WeightedTxnVec = std::vector; + struct PrioQueueCmp + { + bool operator()(WeightedTxnT lhs, WeightedTxnT rhs) + { + if (lhs.first == rhs.first) + return lhs.second > rhs.second; + return lhs.first > rhs.first; + } + }; + using RunListT = std::vector; + using RescheduleVecType = std::vector; + using WeightedTxnPrioQueue = std::priority_queue; + using ThreadPoolJobsList = std::list; + using Txn2ThreadPoolJobsListMap = std::unordered_map; + Txn2ThreadPoolJobsListMap txn2JobsListMap_; + WeightedTxnPrioQueue weightedTxnsQueue_; + std::atomic jobsRunning_{0}; + std::atomic threadCounts_{0}; + std::atomic stop_{false}; +}; + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index 7608b4166..fbd479966 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -53,8 +53,6 @@ class PriorityThreadPool virtual int operator()() = 0; }; - // typedef boost::function0 Functor; - struct Job { Job() : weight(1), priority(0), id(0)