From b78cbffa934794bbbf3f6cd5309bf481e90d6648 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Wed, 20 Jul 2022 11:17:19 +0000 Subject: [PATCH] This patch disables FairThreadPool to double check if this feature contributes to multiple strange side-effects and ocassional failed MTR tests --- primitives/primproc/bppseeder.h | 2 +- primitives/primproc/bppsendthread.h | 7 +- primitives/primproc/primitiveserver.cpp | 38 ++-- primitives/primproc/primitiveserver.h | 13 +- .../primproc/primitiveserverthreadpools.h | 17 +- primitives/primproc/primproc.cpp | 1 + primitives/primproc/primproc.h | 14 +- primitives/primproc/sqlfrontsessionthread.cpp | 3 +- tests/CMakeLists.txt | 5 - tests/fair_threadpool.cpp | 202 ------------------ utils/threadpool/CMakeLists.txt | 2 +- utils/threadpool/prioritythreadpool.h | 24 ++- 12 files changed, 76 insertions(+), 252 deletions(-) delete mode 100644 tests/fair_threadpool.cpp diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index adda29b6a..bb9acb6ad 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -47,7 +47,7 @@ namespace primitiveprocessor { -class BPPSeeder : public threadpool::FairThreadPool::Functor +class BPPSeeder : public threadpool::PriorityThreadPool::Functor { public: BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads, diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index 7de14f881..6756122ef 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -24,13 +24,12 @@ #pragma once -#include "fair_threadpool.h" #include "umsocketselector.h" #include #include #include #include "threadnaming.h" -#include "fair_threadpool.h" +#include "prioritythreadpool.h" namespace primitiveprocessor { @@ -85,7 +84,7 @@ class BPPSendThread { return die; } - void setProcessorPool(boost::shared_ptr processorPool) + void setProcessorPool(boost::shared_ptr processorPool) { fProcessorPool = processorPool; } @@ -149,7 +148,7 @@ class BPPSendThread uint64_t maxByteSize; // Used to tell the ThreadPool It should consider additional threads because a // queue full event has happened and a thread has been blocked. - boost::shared_ptr fProcessorPool; + boost::shared_ptr fProcessorPool; }; } // namespace primitiveprocessor diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index fffc0d971..620419089 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2016 MariaDB Corporation + Copyright (C) 2016-2022 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache(); // FIXME: there is an anon ns burried later in between 2 named namespaces... namespace primitiveprocessor { - +boost::shared_ptr OOBPool; BlockRequestProcessor** BRPp; #ifndef _MSC_VER dbbc::Stats stats; @@ -1049,7 +1049,7 @@ using namespace primitiveprocessor; /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM) * TODO: Move this & the impl into different files */ -class DictScanJob : public threadpool::FairThreadPool::Functor +class DictScanJob : public threadpool::PriorityThreadPool::Functor { public: DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); @@ -1240,7 +1240,7 @@ struct BPPHandler scoped.unlock(); } - struct BPPHandlerFunctor : public FairThreadPool::Functor + struct BPPHandlerFunctor : public PriorityThreadPool::Functor { BPPHandlerFunctor(boost::shared_ptr r, SBS b) : bs(b) { @@ -1706,7 +1706,7 @@ return 0; PrimitiveServer* fPrimitiveServerPtr; }; -class DictionaryOp : public FairThreadPool::Functor +class DictionaryOp : public PriorityThreadPool::Functor { public: DictionaryOp(SBS cmd) : bs(cmd) @@ -1943,7 +1943,8 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - boost::shared_ptr procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); + boost::shared_ptr procPoolPtr = + fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -1994,9 +1995,6 @@ struct ReadThread idbassert(bs->length() >= sizeof(ISMPacketHeader)); const ISMPacketHeader* ismHdr = reinterpret_cast(bs->buf()); - // uint64_t someVal = ismHdr->Command; - // std::cout << " PP read thread Command " << someVal << std::endl; - /* This switch is for the OOB commands */ switch (ismHdr->Command) { @@ -2056,7 +2054,7 @@ struct ReadThread const uint32_t weight = 1; const uint32_t priority = 0; uint32_t id = 0; - boost::shared_ptr functor; + boost::shared_ptr functor; if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) { functor.reset(new CreateEqualityFilter(bs)); @@ -2088,7 +2086,7 @@ struct ReadThread id = fBPPHandler->getUniqueID(bs, ismHdr->Command); functor.reset(new BPPHandler::Abort(fBPPHandler, bs)); } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); procPoolPtr->addJob(job); break; } @@ -2097,7 +2095,7 @@ struct ReadThread case BATCH_PRIMITIVE_RUN: { TokenByScanRequestHeader* hdr = nullptr; - boost::shared_ptr functor; + boost::shared_ptr functor; uint32_t id = 0; uint32_t weight = 0; uint32_t priority = 0; @@ -2137,9 +2135,8 @@ struct ReadThread } else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) { - functor.reset(new BPPSeeder(bs, writeLock, outIos, - fPrimitiveServerPtr->ProcessorThreads(), - fPrimitiveServerPtr->PTTrace())); + functor.reset(new BPPSeeder(bs, writeLock, outIos, fPrimitiveServerPtr->ProcessorThreads(), + fPrimitiveServerPtr->PTTrace())); BPPSeeder* bpps = dynamic_cast(functor.get()); id = bpps->getID(); priority = bpps->priority(); @@ -2150,7 +2147,7 @@ struct ReadThread uniqueID = *((uint32_t*)&buf[pos + 10]); weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); procPoolPtr->addJob(job); break; @@ -2310,8 +2307,13 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, - medPriorityThreads, lowPriorityThreads, 0)); + fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, + medPriorityThreads, lowPriorityThreads, 0)); + // We're not using either the priority or the job-clustering features, just need a threadpool + // that can reschedule jobs, and an unlimited non-blocking queue + OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); + // Initialize a local pointer. + fOOBPool = OOBPool; asyncCounter = 0; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 79c1e20ef..f69c04e1f 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -36,7 +36,7 @@ #include #include "threadpool.h" -#include "fair_threadpool.h" +#include "prioritythreadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" #include "batchprimitiveprocessor.h" @@ -48,6 +48,7 @@ extern oam::OamCache* oamCache; namespace primitiveprocessor { +extern boost::shared_ptr OOBPool; extern dbbc::BlockRequestProcessor** BRPp; extern BRM::DBRM* brm; extern boost::mutex bppLock; @@ -129,11 +130,16 @@ class PrimitiveServer /** @brief get a pointer the shared processor thread pool */ - inline boost::shared_ptr getProcessorThreadPool() const + inline boost::shared_ptr getProcessorThreadPool() const { return fProcessorPool; } + inline boost::shared_ptr getOOBThreadPool() const + { + return fOOBPool; + } + int ReadAheadBlocks() const { return fReadAheadBlocks; @@ -165,7 +171,8 @@ class PrimitiveServer /** @brief the thread pool used to process * primitive commands */ - boost::shared_ptr fProcessorPool; + boost::shared_ptr fProcessorPool; + boost::shared_ptr fOOBPool; int fServerThreads; int fServerQueueSize; diff --git a/primitives/primproc/primitiveserverthreadpools.h b/primitives/primproc/primitiveserverthreadpools.h index db23d290c..5297adfe8 100644 --- a/primitives/primproc/primitiveserverthreadpools.h +++ b/primitives/primproc/primitiveserverthreadpools.h @@ -18,22 +18,29 @@ #pragma once #include -#include "fair_threadpool.h" +#include "prioritythreadpool.h" class PrimitiveServerThreadPools { public: PrimitiveServerThreadPools() = default; - PrimitiveServerThreadPools(boost::shared_ptr primServerThreadPool) - : fPrimServerThreadPool(primServerThreadPool) + PrimitiveServerThreadPools(boost::shared_ptr primServerThreadPool, + boost::shared_ptr OOBThreadPool) + : fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool) { } - boost::shared_ptr getPrimitiveServerThreadPool() + boost::shared_ptr getPrimitiveServerThreadPool() { return fPrimServerThreadPool; } + boost::shared_ptr getOOBThreadPool() + { + return fOOBThreadPool; + } + private: - boost::shared_ptr fPrimServerThreadPool; + boost::shared_ptr fPrimServerThreadPool; + boost::shared_ptr fOOBThreadPool; }; diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 7282c0343..599f001d7 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -725,6 +725,7 @@ int ServicePrimProc::Child() #endif primServerThreadPool = server.getProcessorThreadPool(); + OOBThreadPool = server.getOOBThreadPool(); server.start(this, startupRaceLock); diff --git a/primitives/primproc/primproc.h b/primitives/primproc/primproc.h index 4fb757c1f..7180fe922 100644 --- a/primitives/primproc/primproc.h +++ b/primitives/primproc/primproc.h @@ -35,7 +35,7 @@ #include #include "service.h" -#include "fair_threadpool.h" +#include "prioritythreadpool.h" #include "pp_logger.h" namespace primitiveprocessor @@ -155,12 +155,12 @@ class ServicePrimProc : public Service, public Opt void LogErrno() override { - cerr << strerror(errno) << endl; + std::cerr << strerror(errno) << std::endl; } void ParentLogChildMessage(const std::string& str) override { - cout << str << endl; + std::cout << str << std::endl; } int Child() override; int Run() @@ -172,12 +172,12 @@ class ServicePrimProc : public Service, public Opt return startupRaceFlag_; } - boost::shared_ptr getPrimitiveServerThreadPool() + boost::shared_ptr getPrimitiveServerThreadPool() { return primServerThreadPool; } - boost::shared_ptr getOOBThreadPool() + boost::shared_ptr getOOBThreadPool() { return OOBThreadPool; } @@ -190,6 +190,6 @@ class ServicePrimProc : public Service, public Opt static ServicePrimProc* fInstance; // Since C++20 flag's init value is false. std::atomic_flag startupRaceFlag_{false}; - boost::shared_ptr primServerThreadPool; - boost::shared_ptr OOBThreadPool; + boost::shared_ptr primServerThreadPool; + boost::shared_ptr OOBThreadPool; }; diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index 545d10a32..e0b91f22c 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -520,7 +520,8 @@ namespace exemgr statementsRunningCount->incr(stmtCounted); PrimitiveServerThreadPools primitiveServerThreadPools( - ServicePrimProc::instance()->getPrimitiveServerThreadPool()); + ServicePrimProc::instance()->getPrimitiveServerThreadPool(), + ServicePrimProc::instance()->getOOBThreadPool()); if (tryTuples) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4f760200f..e4e82b864 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -59,11 +59,6 @@ if (WITH_UNITTESTS) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:) - add_executable(fair_threadpool_test fair_threadpool.cpp) - add_dependencies(fair_threadpool_test googletest) - target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) - gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:) - add_executable(comparators_tests comparators-tests.cpp) target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) add_test(NAME columnstore:comparators_tests COMMAND comparators_tests) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp deleted file mode 100644 index a74674516..000000000 --- a/tests/fair_threadpool.cpp +++ /dev/null @@ -1,202 +0,0 @@ -/* Copyright (C) 2022 MariaDB Corporation - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - as published by the Free Software Foundation; version 2 of - the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, - MA 02110-1301, USA. */ - -#include -#include -#include - -#include "utils/threadpool/fair_threadpool.h" - -using namespace primitiveprocessor; -using namespace std; -using namespace threadpool; - -using ResultsType = std::vector; -static ResultsType results; - -class FairThreadPoolTest : public testing::Test -{ - public: - void SetUp() override - { - results.clear(); - threadPool = new FairThreadPool(1, 1, 0, 0); - } - - FairThreadPool* threadPool; -}; - -class TestFunctor : public FairThreadPool::Functor -{ - public: - TestFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay) - { - } - ~TestFunctor(){}; - int operator()() override - { - usleep(delay_); - results.push_back(id_); - return 0; - } - - private: - size_t id_; - size_t delay_; -}; - -class TestRescheduleFunctor : public FairThreadPool::Functor -{ - public: - TestRescheduleFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay) - { - } - ~TestRescheduleFunctor(){}; - int operator()() override - { - if (firstRun) - { - firstRun = false; - return 1; // re-schedule the Job - } - usleep(delay_); - results.push_back(id_); - return 0; - } - - private: - size_t id_; - size_t delay_; - bool firstRun = true; -}; - -testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, - const size_t idxB, const int b) -{ - if (arr.empty() || arr.size() <= max(idxA, idxB)) - return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; - if (arr[idxA] == a && arr[idxB] == b) - return testing::AssertionSuccess(); - if (arr[idxA] == b && arr[idxB] == a) - return testing::AssertionSuccess(); - return testing::AssertionFailure() << "The values at positions " << idxA << " " << idxB << " are not " << a - << " and " << b << std::endl; -} - -TEST_F(FairThreadPoolTest, FairThreadPoolAdd) -{ - SP_UM_IOSOCK sock(new messageqcpp::IOSocket); - auto functor1 = boost::shared_ptr(new TestFunctor(1, 50000)); - FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1); - auto functor2 = boost::shared_ptr(new TestFunctor(2, 5000)); - FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1); - auto functor3 = boost::shared_ptr(new TestFunctor(3, 5000)); - FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1); - - threadPool->addJob(job1); - threadPool->addJob(job2); - threadPool->addJob(job3); - - while (threadPool->queueSize()) - { - usleep(250000); - } - - EXPECT_EQ(threadPool->queueSize(), 0ULL); - EXPECT_EQ(results.size(), 3ULL); - EXPECT_EQ(results[0], 1); - EXPECT_EQ(results[1], 3); - EXPECT_EQ(results[2], 2); -} - -TEST_F(FairThreadPoolTest, FairThreadPoolRemove) -{ - SP_UM_IOSOCK sock(new messageqcpp::IOSocket); - auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); - FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); - auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); - FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); - auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); - FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); - - threadPool->addJob(job1); - threadPool->addJob(job2); - threadPool->addJob(job3); - threadPool->removeJobs(job2.id_); - - while (threadPool->queueSize()) - { - usleep(250000); - } - - EXPECT_EQ(threadPool->queueSize(), 0ULL); - EXPECT_EQ(results.size(), 2ULL); - EXPECT_EQ(results[0], 1); - EXPECT_EQ(results[1], 3); -} - -TEST_F(FairThreadPoolTest, FairThreadPoolCleanUp) -{ - SP_UM_IOSOCK sock(new messageqcpp::IOSocket); - auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); - FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); - auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); - FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); - auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); - FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); - - threadPool->addJob(job1); - threadPool->removeJobs(job1.id_); - threadPool->addJob(job2); - threadPool->removeJobs(job2.id_); - threadPool->addJob(job3); - threadPool->removeJobs(job3.id_); - threadPool->removeJobs(job1.id_); - threadPool->removeJobs(job1.id_); - - while (threadPool->queueSize()) - { - usleep(250000); - } - - EXPECT_EQ(threadPool->queueSize(), 0ULL); -} - -TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) -{ - SP_UM_IOSOCK sock(new messageqcpp::IOSocket); - auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); - FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); - auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); - FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2); - auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); - FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3); - - threadPool->addJob(job1); - threadPool->addJob(job2); - threadPool->addJob(job3); - - while (threadPool->queueSize()) - { - usleep(250000); - } - - EXPECT_EQ(threadPool->queueSize(), 0ULL); - EXPECT_EQ(results.size(), 3ULL); - EXPECT_EQ(results[0], 1); - EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); -} \ No newline at end of file diff --git a/utils/threadpool/CMakeLists.txt b/utils/threadpool/CMakeLists.txt index dd96304de..c8db05b2e 100644 --- a/utils/threadpool/CMakeLists.txt +++ b/utils/threadpool/CMakeLists.txt @@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp) +set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp) add_library(threadpool SHARED ${threadpool_LIB_SRCS}) add_dependencies(threadpool loggingcpp) target_link_libraries(threadpool Boost::chrono) diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index fbd479966..c7f842ac4 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -52,19 +52,33 @@ class PriorityThreadPool // this thread pool to reschedule the job, 0 will throw it away on return. virtual int operator()() = 0; }; - + using TransactionIdxT = uint32_t; struct Job { Job() : weight(1), priority(0), id(0) { } + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, + const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) + : uniqueID(uniqueID) + , stepID(stepID) + , txnIdx(txnIdx) + , functor(functor) + , sock(sock) + , weight(weight) + , priority(priority) + , id(id) + { + } + uint32_t uniqueID; + uint32_t stepID; + TransactionIdxT txnIdx; boost::shared_ptr functor; + primitiveprocessor::SP_UM_IOSOCK sock; uint32_t weight; uint32_t priority; uint32_t id; - uint32_t uniqueID; - uint32_t stepID; - primitiveprocessor::SP_UM_IOSOCK sock; }; enum Priority @@ -114,7 +128,7 @@ class PriorityThreadPool { return blockedThreads; } - + protected: private: struct ThreadHelper