diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index af95dae09..4699042bc 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1274,10 +1274,6 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum) bs << uniqueID; bs << _priority; - // The weight is used by PrimProc thread pool algo - uint32_t weight = calculateBPPWeight(); - bs << weight; - bs << dbRoot; bs << count; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 73a636f2f..a249b3102 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -252,27 +252,8 @@ class BatchPrimitiveProcessorJL } private: - const size_t perColumnProjectWeight_ = 10; - const size_t perColumnFilteringWeight_ = 10; - const size_t fe1Weight_ = 10; - const size_t fe2Weight_ = 10; - const size_t joinWeight_ = 500; - const size_t aggregationWeight_ = 500; + // void setLBIDForScan(uint64_t rid, uint32_t dbroot); - // This is simple SQL operations-based model leveraged by - // FairThreadPool run by PP facility. - // Every operation mentioned in this calculation spends - // some CPU so the morsel uses this op weights more. - uint32_t calculateBPPWeight() const - { - uint32_t weight = perColumnProjectWeight_ * projectCount; - weight += filterCount * perColumnFilteringWeight_; - weight += tJoiners.size() * joinWeight_; - weight += (aggregatorPM) ? aggregationWeight_ : 0; - weight += (fe1) ? fe1Weight_ : 0; - weight += (fe2) ? fe2Weight_ : 0; - return weight; - } BPSOutputType ot; bool needToSetLBID; diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index 1acadfb27..204a7ef60 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -20,6 +20,7 @@ #include "tuplehashjoin.h" #include "joinpartition.h" #include "threadnaming.h" +#include "../../utils/threadpool/prioritythreadpool.h" #pragma once diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 0a0080f61..99224c7ac 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -140,7 +140,6 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() , ptMask(0) , firstInstance(false) , valuesLBID(0) - , weight_(0) { pp.setLogicalBlockMode(true); pp.setBlockPtr((int*)blockData); @@ -194,7 +193,6 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, // ptMask(processorThreads - 1), , firstInstance(true) , valuesLBID(0) - , weight_(0) { // promote processorThreads to next power of 2. also need to change the name to bucketCount or similar processorThreads = nextPowOf2(processorThreads); @@ -544,7 +542,6 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con // skip the header, sessionID, stepID, uniqueID, and priority bs.advance(sizeof(ISMPacketHeader) + 16); - bs >> weight_; bs >> dbRoot; bs >> count; bs >> ridCount; diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 70ab4c837..07261fb45 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -137,11 +137,6 @@ class BatchPrimitiveProcessor fBusy = b; } - size_t getWeight() const - { - return weight_; - } - uint16_t FilterCount() const { return filterCount; @@ -439,9 +434,6 @@ class BatchPrimitiveProcessor uint ptMask; bool firstInstance; uint64_t valuesLBID; - uint32_t weight_; - - static const uint64_t maxResultCount = 1048576; // 2^20 friend class Command; friend class ColumnCommand; diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index adda29b6a..5d56d3559 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -47,7 +47,7 @@ namespace primitiveprocessor { -class BPPSeeder : public threadpool::FairThreadPool::Functor +class BPPSeeder : public threadpool::PriorityThreadPool::Functor { public: BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads, @@ -71,11 +71,6 @@ class BPPSeeder : public threadpool::FairThreadPool::Functor { return _priority; } - size_t getWeight() const - { - assert(bpp); - return bpp->getWeight(); - } private: BPPSeeder(); diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 35d2cf63a..fd13ba90d 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache(); // FIXME: there is an anon ns burried later in between 2 named namespaces... namespace primitiveprocessor { -boost::shared_ptr OOBPool; +boost::shared_ptr OOBPool; BlockRequestProcessor** BRPp; #ifndef _MSC_VER @@ -1050,7 +1050,7 @@ using namespace primitiveprocessor; /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM) * TODO: Move this & the impl into different files */ -class DictScanJob : public threadpool::FairThreadPool::Functor +class DictScanJob : public threadpool::PriorityThreadPool::Functor { public: DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); @@ -1242,7 +1242,7 @@ struct BPPHandler scoped.unlock(); } - struct BPPHandlerFunctor : public FairThreadPool::Functor + struct BPPHandlerFunctor : public PriorityThreadPool::Functor { BPPHandlerFunctor(boost::shared_ptr r, SBS b) : bs(b) { @@ -1710,7 +1710,7 @@ return 0; PrimitiveServer* fPrimitiveServerPtr; }; -class DictionaryOp : public FairThreadPool::Functor +class DictionaryOp : public PriorityThreadPool::Functor { public: DictionaryOp(SBS cmd) : bs(cmd) @@ -1947,7 +1947,8 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - threadpool::FairThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); + boost::shared_ptr procPoolPtr = + fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2043,69 +2044,35 @@ struct ReadThread switch (ismHdr->Command) { case DICT_CREATE_EQUALITY_FILTER: - case DICT_DESTROY_EQUALITY_FILTER: - case BATCH_PRIMITIVE_CREATE: - case BATCH_PRIMITIVE_ADD_JOINER: - case BATCH_PRIMITIVE_END_JOINER: - case BATCH_PRIMITIVE_DESTROY: - case BATCH_PRIMITIVE_ABORT: { + PriorityThreadPool::Job job; const uint8_t* buf = bs->buf(); uint32_t pos = sizeof(ISMPacketHeader) - 2; - const uint32_t txnId = *((uint32_t*)&buf[pos + 2]); - const uint32_t stepID = *((uint32_t*)&buf[pos + 6]); - const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); - const uint32_t weight = 1; - const uint32_t priority = 0; - uint32_t id = 0; - boost::shared_ptr functor; - if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) - { - functor.reset(new CreateEqualityFilter(bs)); - } - else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER) - { - functor.reset(new DestroyEqualityFilter(bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE) - { - functor.reset(new BPPHandler::Create(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER) - { - functor.reset(new BPPHandler::AddJoiner(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER) - { - id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - functor.reset(new BPPHandler::LastJoiner(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY) - { - functor.reset(new BPPHandler::Destroy(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT) - { - id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - functor.reset(new BPPHandler::Abort(fBPPHandler, bs)); - } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + job.functor = boost::shared_ptr(new CreateEqualityFilter(bs)); + OOBPool->addJob(job); + break; + } + + case DICT_DESTROY_EQUALITY_FILTER: + { + PriorityThreadPool::Job job; + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + job.functor = boost::shared_ptr(new DestroyEqualityFilter(bs)); OOBPool->addJob(job); break; } case DICT_TOKEN_BY_SCAN_COMPARE: - case BATCH_PRIMITIVE_RUN: { - TokenByScanRequestHeader* hdr = nullptr; - boost::shared_ptr functor; - uint32_t id = 0; - uint32_t weight = 0; - uint32_t priority = 0; - uint32_t txnId = 0; - uint32_t stepID = 0; - uint32_t uniqueID = 0; - bool isSyscat = false; + idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); + TokenByScanRequestHeader* hdr = (TokenByScanRequestHeader*)ismHdr; if (bRotateDest) { @@ -2123,41 +2090,23 @@ struct ReadThread } } - if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE) - { - idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); - hdr = (TokenByScanRequestHeader*)ismHdr; - functor.reset(new DictScanJob(outIos, bs, writeLock)); - id = hdr->Hdr.UniqueID; - weight = LOGICAL_BLOCK_RIDS; - priority = hdr->Hdr.Priority; - const uint8_t* buf = bs->buf(); - const uint32_t pos = sizeof(ISMPacketHeader) - 2; - txnId = *((uint32_t*)&buf[pos + 2]); - stepID = *((uint32_t*)&buf[pos + 6]); - uniqueID = *((uint32_t*)&buf[pos + 10]); - isSyscat = hdr->flags & IS_SYSCAT; - } - else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) - { - functor.reset(new BPPSeeder(bs, writeLock, outIos, - fPrimitiveServerPtr->ProcessorThreads(), - fPrimitiveServerPtr->PTTrace())); - BPPSeeder* bpps = dynamic_cast(functor.get()); - id = bpps->getID(); - priority = bpps->priority(); - const uint8_t* buf = bs->buf(); - const uint32_t pos = sizeof(ISMPacketHeader) - 2; - txnId = *((uint32_t*)&buf[pos + 2]); - stepID = *((uint32_t*)&buf[pos + 6]); - uniqueID = *((uint32_t*)&buf[pos + 10]); - weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); - isSyscat = bpps->isSysCat(); - } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + PriorityThreadPool::Job job; + job.functor = boost::shared_ptr(new DictScanJob(outIos, bs, writeLock)); + job.id = hdr->Hdr.UniqueID; + job.weight = LOGICAL_BLOCK_RIDS; + job.priority = hdr->Hdr.Priority; + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; - if (isSyscat) + if (hdr->flags & IS_SYSCAT) { + // boost::thread t(DictScanJob(outIos, bs, writeLock)); + // using already-existing threads may cut latency + // if it's changed back to running in an independent thread + // change the issyscat() checks in BPPSeeder as well OOBPool->addJob(job); } else @@ -2168,11 +2117,146 @@ struct ReadThread break; } + case BATCH_PRIMITIVE_RUN: + { + if (bRotateDest) + { + if (!pUmSocketSelector->nextIOSocket(fIos, outIos, writeLock)) + { + // If we ever fall into this part of the + // code we have a "bug" of some sort. + // See handleUmSockSelErr() for more info. + // We reset ios and mutex to defaults. + handleUmSockSelErr(string("BPR cmd")); + outIos = outIosDefault; + writeLock = writeLockDefault; + pUmSocketSelector->delConnection(fIos); + bRotateDest = false; + } + } + + /* Decide whether this is a syscat call and run + right away instead of queueing */ + boost::shared_ptr bpps(new BPPSeeder(bs, writeLock, outIos, + fPrimitiveServerPtr->ProcessorThreads(), + fPrimitiveServerPtr->PTTrace())); + PriorityThreadPool::Job job; + job.functor = bpps; + job.id = bpps->getID(); + job.weight = ismHdr->Size; + job.priority = bpps->priority(); + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + + if (bpps->isSysCat()) + { + // boost::thread t(*bpps); + // using already-existing threads may cut latency + // if it's changed back to running in an independent thread + // change the issyscat() checks in BPPSeeder as well + OOBPool->addJob(job); + } + else + { + procPoolPtr->addJob(job); + } + + break; + } + + case BATCH_PRIMITIVE_CREATE: + { + PriorityThreadPool::Job job; + job.functor = + boost::shared_ptr(new BPPHandler::Create(fBPPHandler, bs)); + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + OOBPool->addJob(job); + // fBPPHandler->createBPP(*bs); + break; + } + + case BATCH_PRIMITIVE_ADD_JOINER: + { + PriorityThreadPool::Job job; + job.functor = + boost::shared_ptr(new BPPHandler::AddJoiner(fBPPHandler, bs)); + job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + OOBPool->addJob(job); + // fBPPHandler->addJoinerToBPP(*bs); + break; + } + + case BATCH_PRIMITIVE_END_JOINER: + { + // lastJoinerMsg can block; must do this in a different thread + // OOBPool->invoke(BPPHandler::LastJoiner(fBPPHandler, bs)); // needs a threadpool that can + // resched boost::thread tmp(BPPHandler::LastJoiner(fBPPHandler, bs)); + PriorityThreadPool::Job job; + job.functor = + boost::shared_ptr(new BPPHandler::LastJoiner(fBPPHandler, bs)); + job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + OOBPool->addJob(job); + break; + } + + case BATCH_PRIMITIVE_DESTROY: + { + // OOBPool->invoke(BPPHandler::Destroy(fBPPHandler, bs)); // needs a threadpool that can + // resched boost::thread tmp(BPPHandler::Destroy(fBPPHandler, bs)); + PriorityThreadPool::Job job; + job.functor = + boost::shared_ptr(new BPPHandler::Destroy(fBPPHandler, bs)); + job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + OOBPool->addJob(job); + // fBPPHandler->destroyBPP(*bs); + break; + } + case BATCH_PRIMITIVE_ACK: { fBPPHandler->doAck(*bs); break; } + + case BATCH_PRIMITIVE_ABORT: + { + // OBPool->invoke(BPPHandler::Abort(fBPPHandler, bs)); + // fBPPHandler->doAbort(*bs); + PriorityThreadPool::Job job; + job.functor = + boost::shared_ptr(new BPPHandler::Abort(fBPPHandler, bs)); + job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t* buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t*)&buf[pos + 6]); + job.uniqueID = *((uint32_t*)&buf[pos + 10]); + job.sock = outIos; + OOBPool->addJob(job); + break; + } + default: { std::ostringstream os; @@ -2322,12 +2406,12 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool = new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, - medPriorityThreads, lowPriorityThreads, 0); + fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, + medPriorityThreads, lowPriorityThreads, 0)); // We're not using either the priority or the job-clustering features, just need a threadpool // that can reschedule jobs, and an unlimited non-blocking queue - OOBPool.reset(new threadpool::FairThreadPool(1, 5, 0, 0, 1)); + OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); asyncCounter = 0; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 0d6c9df2a..bc58e7a7e 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -37,7 +37,6 @@ #include "threadpool.h" #include "../../utils/threadpool/prioritythreadpool.h" -#include "fair_threadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" #include "batchprimitiveprocessor.h" @@ -49,7 +48,7 @@ extern oam::OamCache* oamCache; namespace primitiveprocessor { -extern boost::shared_ptr OOBPool; +extern boost::shared_ptr OOBPool; extern dbbc::BlockRequestProcessor** BRPp; extern BRM::DBRM* brm; extern boost::mutex bppLock; @@ -131,7 +130,7 @@ class PrimitiveServer /** @brief get a pointer the shared processor thread pool */ - inline threadpool::FairThreadPool* getProcessorThreadPool() const + inline boost::shared_ptr getProcessorThreadPool() const { return fProcessorPool; } @@ -168,7 +167,7 @@ class PrimitiveServer /** @brief the thread pool used to process * primitive commands */ - threadpool::FairThreadPool* fProcessorPool; + boost::shared_ptr fProcessorPool; int fServerThreads; int fServerQueueSize; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b6438868f..4ac31e2c1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,4 +1,5 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ${ENGINE_BLOCKCACHE_INCLUDE} ${ENGINE_PRIMPROC_INCLUDE} ) +cmake_policy(SET CMP0054 NEW) if (WITH_UNITTESTS) set(EXTERNAL_INSTALL_LOCATION ${CMAKE_BINARY_DIR}/external) @@ -18,43 +19,38 @@ if (WITH_UNITTESTS) add_executable(rowgroup_tests rowgroup-tests.cpp) add_dependencies(rowgroup_tests googletest) target_link_libraries(rowgroup_tests ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS} ${MARIADB_CLIENT_LIBS}) - gtest_discover_tests(rowgroup_tests TEST_PREFIX columnstore:) + gtest_add_tests(TARGET rowgroup_tests TEST_PREFIX columnstore:) add_executable(mcs_decimal_tests mcs_decimal-tests.cpp) add_dependencies(mcs_decimal_tests googletest) target_link_libraries(mcs_decimal_tests ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS} ${MARIADB_CLIENT_LIBS}) - gtest_discover_tests(mcs_decimal_tests TEST_PREFIX columnstore:) + gtest_add_tests(TARGET mcs_decimal_tests TEST_PREFIX columnstore:) add_executable(dataconvert_tests dataconvert-tests.cpp) add_dependencies(dataconvert_tests googletest) target_link_libraries(dataconvert_tests ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS} ${MARIADB_CLIENT_LIBS}) - gtest_discover_tests(dataconvert_tests TEST_PREFIX columnstore:) + gtest_add_tests(TARGET dataconvert_tests TEST_PREFIX columnstore:) add_executable(rebuild_em_tests rebuild-em-tests.cpp) add_dependencies(rebuild_em_tests googletest) target_link_libraries(rebuild_em_tests ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS}) - gtest_discover_tests(rebuild_em_tests TEST_PREFIX columnstore:) + gtest_add_tests(TARGET rebuild_em_tests TEST_PREFIX columnstore:) add_executable(compression_tests compression-tests.cpp) add_dependencies(compression_tests googletest) target_link_libraries(compression_tests ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS}) - gtest_discover_tests(compression_tests TEST_PREFIX columnstore:) + gtest_add_tests(TARGET compression_tests TEST_PREFIX columnstore:) add_executable(column_scan_filter_tests primitives_column_scan_and_filter.cpp) target_compile_options(column_scan_filter_tests PRIVATE -Wno-error -Wno-sign-compare) add_dependencies(column_scan_filter_tests googletest) target_link_libraries(column_scan_filter_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) - gtest_discover_tests(column_scan_filter_tests TEST_PREFIX columnstore:) + gtest_add_tests(TARGET column_scan_filter_tests TEST_PREFIX columnstore:) add_executable(simd_processors simd_processors.cpp) add_dependencies(simd_processors googletest) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) - gtest_discover_tests(simd_processors TEST_PREFIX columnstore:) - - add_executable(fair_threadpool_test fair_threadpool.cpp) - add_dependencies(fair_threadpool_test googletest) - target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) - gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) + gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:) # CPPUNIT TESTS add_executable(we_shared_components_tests shared_components_tests.cpp) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp deleted file mode 100644 index c4a299bd1..000000000 --- a/tests/fair_threadpool.cpp +++ /dev/null @@ -1,173 +0,0 @@ -/* Copyright (C) 2022 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#include -#include -#include - -#include "utils/threadpool/fair_threadpool.h" - -using namespace primitiveprocessor; -using namespace std; -using namespace threadpool; - -using ResultsType = std::vector; -static ResultsType results; - -class FairThreadPoolTest : public testing::Test { - public: - - void SetUp() override - { - results.clear(); - threadPool = new FairThreadPool(1, 1, 0, 0); - } - - - FairThreadPool* threadPool; -}; - -class TestFunctor: public FairThreadPool::Functor -{ - public: - TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) - { - } - ~TestFunctor() {}; - int operator()() override - { - usleep(delay_); - results.push_back(id_); - return 0; - } - private: - size_t id_; - size_t delay_; -}; - -class TestRescheduleFunctor: public FairThreadPool::Functor -{ - public: - TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) - { - } - ~TestRescheduleFunctor() {}; - int operator()() override - { - if (firstRun) - { - firstRun = false; - return 1; // re-schedule the Job - } - usleep(delay_); - results.push_back(id_); - return 0; - } - private: - size_t id_; - size_t delay_; - bool firstRun = true; -}; - -testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b) -{ - if (arr.empty() || arr.size() <= max(idxA, idxB)) - return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; - if (arr[idxA] == a && arr[idxB] == b) - return testing::AssertionSuccess(); - if (arr[idxA] == b && arr[idxB] == a) - return testing::AssertionSuccess(); - return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB - << " are not " << a << " and " << b << std::endl; -} - -TEST_F(FairThreadPoolTest, FairThreadPoolAdd) -{ - SP_UM_IOSOCK sock(new messageqcpp::IOSocket); - auto functor1 = boost::shared_ptr(new TestFunctor(1, 50000)); - FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1); - auto functor2 = boost::shared_ptr(new TestFunctor(2, 5000)); - FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1); - auto functor3 = boost::shared_ptr(new TestFunctor(3, 5000)); - FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1); - - threadPool->addJob(job1); - threadPool->addJob(job2); - threadPool->addJob(job3); - - while (threadPool->queueSize()) - { - usleep(250000); - } - - EXPECT_EQ(threadPool->queueSize(), 0ULL); - EXPECT_EQ(results.size(), 3ULL); - EXPECT_EQ(results[0], 1); - EXPECT_EQ(results[1], 3); - EXPECT_EQ(results[2], 2); -} - -TEST_F(FairThreadPoolTest, FairThreadPoolRemove) -{ - SP_UM_IOSOCK sock(new messageqcpp::IOSocket); - auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); - FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); - auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); - FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); - auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); - FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); - - threadPool->addJob(job1); - threadPool->addJob(job2); - threadPool->addJob(job3); - threadPool->removeJobs(job2.id_); - - while (threadPool->queueSize()) - { - usleep(250000); - } - - EXPECT_EQ(threadPool->queueSize(), 0ULL); - EXPECT_EQ(results.size(), 2ULL); - EXPECT_EQ(results[0], 1); - EXPECT_EQ(results[1], 3); -} - -TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) -{ - SP_UM_IOSOCK sock(new messageqcpp::IOSocket); - auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); - FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); - auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); - FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2); - auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); - FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3); - - threadPool->addJob(job1); - threadPool->addJob(job2); - threadPool->addJob(job3); - - while (threadPool->queueSize()) - { - usleep(250000); - } - - EXPECT_EQ(threadPool->queueSize(), 0ULL); - EXPECT_EQ(results.size(), 3ULL); - EXPECT_EQ(results[0], 1); - EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); -} \ No newline at end of file diff --git a/utils/common/pipe.h b/utils/common/pipe.h index de25dd1ae..17ac16330 100644 --- a/utils/common/pipe.h +++ b/utils/common/pipe.h @@ -20,6 +20,11 @@ /* A helper class to hold the file descriptors returned from a pipe() call. */ +#include +#include +#include +#include + class Pipe { int fd[2]; @@ -60,7 +65,7 @@ class Pipe FD_ZERO(&rfds); FD_SET(fd[0], &rfds); struct timeval tmptv = tv; - int retval = select(fd[0] + 1, &rfds, NULL, NULL, &tmptv); + int retval = select(fd[0] + 1, &rfds, nullptr, nullptr, &tmptv); if (retval == -1) return -1; if (!retval) diff --git a/utils/common/service.h b/utils/common/service.h index a4c25f296..898b587ec 100644 --- a/utils/common/service.h +++ b/utils/common/service.h @@ -18,11 +18,16 @@ #pragma once #include +#include +#include #include "pipe.h" class Service { protected: + // The read operation implicitly controls how long binary waits + // before it starts. This is import for DMLProc to survive rollbacks. See MCOL-5105. + static constexpr const size_t PipeReadTimeout = 1200; // The service name, for logging const std::string m_name; // The pipe to send messages from the child to the parent @@ -62,7 +67,7 @@ class Service { char str[100]; // Read the message from the child - ssize_t nbytes = m_pipe.readtm({120, 0}, str, sizeof(str)); + ssize_t nbytes = m_pipe.readtm({PipeReadTimeout, 0}, str, sizeof(str)); if (nbytes >= 0) { ParentLogChildMessage(std::string(str, nbytes)); diff --git a/utils/threadpool/CMakeLists.txt b/utils/threadpool/CMakeLists.txt index dd96304de..519ba3b5c 100644 --- a/utils/threadpool/CMakeLists.txt +++ b/utils/threadpool/CMakeLists.txt @@ -4,8 +4,11 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp) +set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp) + add_library(threadpool SHARED ${threadpool_LIB_SRCS}) + add_dependencies(threadpool loggingcpp) target_link_libraries(threadpool Boost::chrono) -install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) \ No newline at end of file + +install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp deleted file mode 100644 index 44d9cd3a9..000000000 --- a/utils/threadpool/fair_threadpool.cpp +++ /dev/null @@ -1,295 +0,0 @@ -/* Copyright (c) 2022 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#include -#include -#include -#include -using namespace std; - -#include "messageobj.h" -#include "messagelog.h" -#include "threadnaming.h" -using namespace logging; - -#include "fair_threadpool.h" -using namespace boost; - -#include "dbcon/joblist/primitivemsg.h" - -namespace threadpool -{ -FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, - uint ID) - : weightPerRun(targetWeightPerRun), id(ID) -{ - boost::thread* newThread; - size_t numberOfThreads = highThreads + midThreads + lowThreads; - for (uint32_t i = 0; i < numberOfThreads; ++i) - { - newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); - newThread->detach(); - } - cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n"; - threadCounts_.store(numberOfThreads, std::memory_order_relaxed); - defaultThreadCounts = numberOfThreads; -} - -FairThreadPool::~FairThreadPool() -{ - stop(); -} - -void FairThreadPool::addJob(const Job& job) -{ - addJob_(job); -} - -void FairThreadPool::addJob_(const Job& job, bool useLock) -{ - boost::thread* newThread; - std::unique_lock lk(mutex, std::defer_lock_t()); - - // Create any missing threads - if (defaultThreadCounts != threadCounts_.load(std::memory_order_relaxed)) - { - newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); - newThread->detach(); - threadCounts_.fetch_add(1, std::memory_order_relaxed); - } - - if (useLock) - lk.lock(); - - auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); - if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map - { - ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; - jobsList->push_back(job); - txn2JobsListMap_[job.txnIdx_] = jobsList; - weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); - } - else // txn is in the map - { - if (jobsListMapIter->second->empty()) // there are no jobs for the txn - { - weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); - } - jobsListMapIter->second->push_back(job); - } - - if (useLock) - newJob.notify_one(); -} - -void FairThreadPool::removeJobs(uint32_t id) -{ - std::unique_lock lk(mutex); - - for (auto& txnJobsMapPair : txn2JobsListMap_) - { - ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; - auto job = txnJobsList->begin(); - while (job != txnJobsList->end()) - { - if (job->id_ == id) - { - job = txnJobsList->erase(job); // update the job iter - if (txnJobsList->empty()) - { - txn2JobsListMap_.erase(txnJobsMapPair.first); - delete txnJobsList; - break; - // There is no clean-up for PQ. It will happen later in threadFcn - } - continue; // go-on skiping job iter increment - } - ++job; - } - } -} - -void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) -{ - utils::setThreadName("Idle"); - RunListT runList(1); // This is a vector to allow to grab multiple jobs - RescheduleVecType reschedule; - bool running = false; - bool rescheduleJob = false; - - try - { - while (!stop_.load(std::memory_order_relaxed)) - { - runList.clear(); // remove the job - std::unique_lock lk(mutex); - - if (weightedTxnsQueue_.empty()) - { - newJob.wait(lk); - continue; // just go on w/o re-taking the lock - } - - WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); - auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); - // Looking for non-empty jobsList in a loop - // The loop waits on newJob cond_var if PQ is empty(no jobs in this thread pool) - while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty()) - { - // JobList is empty. This can happen when this method pops the last Job. - if (txnAndJobListPair != txn2JobsListMap_.end()) - { - ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second; - delete txnJobsList; - txn2JobsListMap_.erase(txnAndJobListPair->first); - } - weightedTxnsQueue_.pop(); - if (weightedTxnsQueue_.empty()) // remove the empty - { - break; - } - weightedTxn = weightedTxnsQueue_.top(); - txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); - } - - if (weightedTxnsQueue_.empty()) - { - newJob.wait(lk); // might need a lock here - continue; - } - - // We have non-empty jobsList at this point. - // Remove the txn from a queue first to add it later - weightedTxnsQueue_.pop(); - TransactionIdxT txnIdx = txnAndJobListPair->first; - ThreadPoolJobsList* jobsList = txnAndJobListPair->second; - runList.push_back(jobsList->front()); - - jobsList->pop_front(); - // Add the jobList back into the PQ adding some weight to it - // Current algo doesn't reduce total txn weight if the job is rescheduled. - if (!jobsList->empty()) - { - weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx}); - } - - lk.unlock(); - - running = true; - jobsRunning_.fetch_add(1, std::memory_order_relaxed); - rescheduleJob = (*(runList[0].functor_))(); // run the functor - jobsRunning_.fetch_sub(1, std::memory_order_relaxed); - running = false; - - utils::setThreadName("Idle"); - - if (rescheduleJob) - { - // to avoid excessive CPU usage waiting for data from storage - usleep(500); - lk.lock(); - addJob_(runList[0], false); - newJob.notify_one(); - lk.unlock(); - } - } - } - catch (std::exception& ex) - { - if (running) - { - jobsRunning_.fetch_sub(1, std::memory_order_relaxed); - } - // Log the exception and exit this thread - try - { - threadCounts_.fetch_sub(1, std::memory_order_relaxed); -#ifndef NOLOGGING - logging::Message::Args args; - logging::Message message(5); - args.add("threadFcn: Caught exception: "); - args.add(ex.what()); - - message.format(args); - - logging::LoggingID lid(22); - logging::MessageLog ml(lid); - - ml.logErrorMessage(message); -#endif - - if (running) - sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); - } - catch (...) - { - } - } - catch (...) - { - // Log the exception and exit this thread - try - { - if (running) - { - jobsRunning_.fetch_sub(1, std::memory_order_relaxed); - } - threadCounts_.fetch_sub(1, std::memory_order_relaxed); - ; -#ifndef NOLOGGING - logging::Message::Args args; - logging::Message message(6); - args.add("threadFcn: Caught unknown exception!"); - - message.format(args); - - logging::LoggingID lid(22); - logging::MessageLog ml(lid); - - ml.logErrorMessage(message); -#endif - - if (running) - sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); - } - catch (...) - { - } - } -} - -void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) -{ - ISMPacketHeader ism; - PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; - - ism.Status = logging::primitiveServerErr; - ph.UniqueID = id; - ph.StepID = step; - messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); - - sock->write(msg); -} - -void FairThreadPool::stop() -{ - stop_.store(true, std::memory_order_relaxed); -} - -} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h deleted file mode 100644 index ab3afaa71..000000000 --- a/utils/threadpool/fair_threadpool.h +++ /dev/null @@ -1,165 +0,0 @@ -/* Copyright (c) 2022 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "primitives/primproc/umsocketselector.h" -#include "prioritythreadpool.h" - -namespace threadpool -{ -// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time -// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight -// stored in PriorityQueue that thread increases before run another morsel for the query. When query is -// done(ThreadPoolJobsList is empty) it is removed from PQ and the Map(txn to ThreadPoolJobsList). -// I tested multiple morsels per one loop iteration in ::threadFcn. This approach reduces CPU consumption -// and increases query timings. -class FairThreadPool -{ - public: - using Functor = PriorityThreadPool::Functor; - - using TransactionIdxT = uint32_t; - struct Job - { - Job() : weight_(1), priority_(0), id_(0) - { - } - Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, - const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, - const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) - : uniqueID_(uniqueID) - , stepID_(stepID) - , txnIdx_(txnIdx) - , functor_(functor) - , sock_(sock) - , weight_(weight) - , priority_(priority) - , id_(id) - { - } - uint32_t uniqueID_; - uint32_t stepID_; - TransactionIdxT txnIdx_; - boost::shared_ptr functor_; - primitiveprocessor::SP_UM_IOSOCK sock_; - uint32_t weight_; - uint32_t priority_; - uint32_t id_; - }; - - /********************************************* - * ctor/dtor - * - *********************************************/ - - /** @brief ctor - */ - - FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0); - virtual ~FairThreadPool(); - - void removeJobs(uint32_t id); - void addJob(const Job& job); - void stop(); - - /** @brief for use in debugging - */ - void dump(); - - size_t queueSize() const - { - return weightedTxnsQueue_.size(); - } - // This method enables a pool current workload estimate. - size_t jobsRunning() const - { - return jobsRunning_.load(std::memory_order_relaxed); - } - - protected: - private: - struct ThreadHelper - { - ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue) - { - } - void operator()() - { - ptp->threadFcn(preferredQueue); - } - FairThreadPool* ptp; - PriorityThreadPool::Priority preferredQueue; - }; - - explicit FairThreadPool(); - explicit FairThreadPool(const FairThreadPool&); - FairThreadPool& operator=(const FairThreadPool&); - - void addJob_(const Job& job, bool useLock = true); - void threadFcn(const PriorityThreadPool::Priority preferredQueue); - void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); - - uint32_t defaultThreadCounts; - std::mutex mutex; - std::condition_variable newJob; - boost::thread_group threads; - uint32_t weightPerRun; - volatile uint id; // prevent it from being optimized out - - using WeightT = uint32_t; - using WeightedTxnT = std::pair; - using WeightedTxnVec = std::vector; - struct PrioQueueCmp - { - bool operator()(WeightedTxnT lhs, WeightedTxnT rhs) - { - if (lhs.first == rhs.first) - return lhs.second > rhs.second; - return lhs.first > rhs.first; - } - }; - using RunListT = std::vector; - using RescheduleVecType = std::vector; - using WeightedTxnPrioQueue = std::priority_queue; - using ThreadPoolJobsList = std::list; - using Txn2ThreadPoolJobsListMap = std::unordered_map; - Txn2ThreadPoolJobsListMap txn2JobsListMap_; - WeightedTxnPrioQueue weightedTxnsQueue_; - std::atomic jobsRunning_{0}; - std::atomic threadCounts_{0}; - std::atomic stop_{false}; -}; - -} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index 7ba2b8a96..82caac6b3 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -54,6 +54,8 @@ class PriorityThreadPool virtual int operator()() = 0; }; + // typedef boost::function0 Functor; + struct Job { Job() : weight(1), priority(0), id(0)