From 3b87532413a9eda06267011a7a9cd81928ca27eb Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Fri, 22 Jul 2022 14:04:06 +0000 Subject: [PATCH] Revert "This patch disables FairThreadPool to double check if this feature contributes to multiple strange side-effects and ocassional failed MTR tests" This reverts commit b78cbffa934794bbbf3f6cd5309bf481e90d6648. --- primitives/primproc/bppseeder.h | 2 +- primitives/primproc/bppsendthread.h | 7 +- primitives/primproc/primitiveserver.cpp | 38 ++-- primitives/primproc/primitiveserver.h | 13 +- .../primproc/primitiveserverthreadpools.h | 17 +- primitives/primproc/primproc.cpp | 1 - primitives/primproc/primproc.h | 14 +- primitives/primproc/sqlfrontsessionthread.cpp | 3 +- tests/CMakeLists.txt | 5 + tests/fair_threadpool.cpp | 202 ++++++++++++++++++ utils/threadpool/CMakeLists.txt | 2 +- utils/threadpool/prioritythreadpool.h | 24 +-- 12 files changed, 252 insertions(+), 76 deletions(-) create mode 100644 tests/fair_threadpool.cpp diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index bb9acb6ad..adda29b6a 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -47,7 +47,7 @@ namespace primitiveprocessor { -class BPPSeeder : public threadpool::PriorityThreadPool::Functor +class BPPSeeder : public threadpool::FairThreadPool::Functor { public: BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads, diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index 6756122ef..7de14f881 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -24,12 +24,13 @@ #pragma once +#include "fair_threadpool.h" #include "umsocketselector.h" #include #include #include #include "threadnaming.h" -#include "prioritythreadpool.h" +#include "fair_threadpool.h" namespace primitiveprocessor { @@ -84,7 +85,7 @@ class BPPSendThread { return die; } - void setProcessorPool(boost::shared_ptr processorPool) + void setProcessorPool(boost::shared_ptr processorPool) { fProcessorPool = processorPool; } @@ -148,7 +149,7 @@ class BPPSendThread uint64_t maxByteSize; // Used to tell the ThreadPool It should consider additional threads because a // queue full event has happened and a thread has been blocked. - boost::shared_ptr fProcessorPool; + boost::shared_ptr fProcessorPool; }; } // namespace primitiveprocessor diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 620419089..fffc0d971 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2016-2022 MariaDB Corporation + Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache(); // FIXME: there is an anon ns burried later in between 2 named namespaces... namespace primitiveprocessor { -boost::shared_ptr OOBPool; + BlockRequestProcessor** BRPp; #ifndef _MSC_VER dbbc::Stats stats; @@ -1049,7 +1049,7 @@ using namespace primitiveprocessor; /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM) * TODO: Move this & the impl into different files */ -class DictScanJob : public threadpool::PriorityThreadPool::Functor +class DictScanJob : public threadpool::FairThreadPool::Functor { public: DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); @@ -1240,7 +1240,7 @@ struct BPPHandler scoped.unlock(); } - struct BPPHandlerFunctor : public PriorityThreadPool::Functor + struct BPPHandlerFunctor : public FairThreadPool::Functor { BPPHandlerFunctor(boost::shared_ptr r, SBS b) : bs(b) { @@ -1706,7 +1706,7 @@ return 0; PrimitiveServer* fPrimitiveServerPtr; }; -class DictionaryOp : public PriorityThreadPool::Functor +class DictionaryOp : public FairThreadPool::Functor { public: DictionaryOp(SBS cmd) : bs(cmd) @@ -1943,8 +1943,7 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - boost::shared_ptr procPoolPtr = - fPrimitiveServerPtr->getProcessorThreadPool(); + boost::shared_ptr procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -1995,6 +1994,9 @@ struct ReadThread idbassert(bs->length() >= sizeof(ISMPacketHeader)); const ISMPacketHeader* ismHdr = reinterpret_cast(bs->buf()); + // uint64_t someVal = ismHdr->Command; + // std::cout << " PP read thread Command " << someVal << std::endl; + /* This switch is for the OOB commands */ switch (ismHdr->Command) { @@ -2054,7 +2056,7 @@ struct ReadThread const uint32_t weight = 1; const uint32_t priority = 0; uint32_t id = 0; - boost::shared_ptr functor; + boost::shared_ptr functor; if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) { functor.reset(new CreateEqualityFilter(bs)); @@ -2086,7 +2088,7 @@ struct ReadThread id = fBPPHandler->getUniqueID(bs, ismHdr->Command); functor.reset(new BPPHandler::Abort(fBPPHandler, bs)); } - PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); procPoolPtr->addJob(job); break; } @@ -2095,7 +2097,7 @@ struct ReadThread case BATCH_PRIMITIVE_RUN: { TokenByScanRequestHeader* hdr = nullptr; - boost::shared_ptr functor; + boost::shared_ptr functor; uint32_t id = 0; uint32_t weight = 0; uint32_t priority = 0; @@ -2135,8 +2137,9 @@ struct ReadThread } else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) { - functor.reset(new BPPSeeder(bs, writeLock, outIos, fPrimitiveServerPtr->ProcessorThreads(), - fPrimitiveServerPtr->PTTrace())); + functor.reset(new BPPSeeder(bs, writeLock, outIos, + fPrimitiveServerPtr->ProcessorThreads(), + fPrimitiveServerPtr->PTTrace())); BPPSeeder* bpps = dynamic_cast(functor.get()); id = bpps->getID(); priority = bpps->priority(); @@ -2147,7 +2150,7 @@ struct ReadThread uniqueID = *((uint32_t*)&buf[pos + 10]); weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); } - PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); procPoolPtr->addJob(job); break; @@ -2307,13 +2310,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, - medPriorityThreads, lowPriorityThreads, 0)); - // We're not using either the priority or the job-clustering features, just need a threadpool - // that can reschedule jobs, and an unlimited non-blocking queue - OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); - // Initialize a local pointer. - fOOBPool = OOBPool; + fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, + medPriorityThreads, lowPriorityThreads, 0)); asyncCounter = 0; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index f69c04e1f..79c1e20ef 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -36,7 +36,7 @@ #include #include "threadpool.h" -#include "prioritythreadpool.h" +#include "fair_threadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" #include "batchprimitiveprocessor.h" @@ -48,7 +48,6 @@ extern oam::OamCache* oamCache; namespace primitiveprocessor { -extern boost::shared_ptr OOBPool; extern dbbc::BlockRequestProcessor** BRPp; extern BRM::DBRM* brm; extern boost::mutex bppLock; @@ -130,16 +129,11 @@ class PrimitiveServer /** @brief get a pointer the shared processor thread pool */ - inline boost::shared_ptr getProcessorThreadPool() const + inline boost::shared_ptr getProcessorThreadPool() const { return fProcessorPool; } - inline boost::shared_ptr getOOBThreadPool() const - { - return fOOBPool; - } - int ReadAheadBlocks() const { return fReadAheadBlocks; @@ -171,8 +165,7 @@ class PrimitiveServer /** @brief the thread pool used to process * primitive commands */ - boost::shared_ptr fProcessorPool; - boost::shared_ptr fOOBPool; + boost::shared_ptr fProcessorPool; int fServerThreads; int fServerQueueSize; diff --git a/primitives/primproc/primitiveserverthreadpools.h b/primitives/primproc/primitiveserverthreadpools.h index 5297adfe8..db23d290c 100644 --- a/primitives/primproc/primitiveserverthreadpools.h +++ b/primitives/primproc/primitiveserverthreadpools.h @@ -18,29 +18,22 @@ #pragma once #include -#include "prioritythreadpool.h" +#include "fair_threadpool.h" class PrimitiveServerThreadPools { public: PrimitiveServerThreadPools() = default; - PrimitiveServerThreadPools(boost::shared_ptr primServerThreadPool, - boost::shared_ptr OOBThreadPool) - : fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool) + PrimitiveServerThreadPools(boost::shared_ptr primServerThreadPool) + : fPrimServerThreadPool(primServerThreadPool) { } - boost::shared_ptr getPrimitiveServerThreadPool() + boost::shared_ptr getPrimitiveServerThreadPool() { return fPrimServerThreadPool; } - boost::shared_ptr getOOBThreadPool() - { - return fOOBThreadPool; - } - private: - boost::shared_ptr fPrimServerThreadPool; - boost::shared_ptr fOOBThreadPool; + boost::shared_ptr fPrimServerThreadPool; }; diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 599f001d7..7282c0343 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -725,7 +725,6 @@ int ServicePrimProc::Child() #endif primServerThreadPool = server.getProcessorThreadPool(); - OOBThreadPool = server.getOOBThreadPool(); server.start(this, startupRaceLock); diff --git a/primitives/primproc/primproc.h b/primitives/primproc/primproc.h index 7180fe922..4fb757c1f 100644 --- a/primitives/primproc/primproc.h +++ b/primitives/primproc/primproc.h @@ -35,7 +35,7 @@ #include #include "service.h" -#include "prioritythreadpool.h" +#include "fair_threadpool.h" #include "pp_logger.h" namespace primitiveprocessor @@ -155,12 +155,12 @@ class ServicePrimProc : public Service, public Opt void LogErrno() override { - std::cerr << strerror(errno) << std::endl; + cerr << strerror(errno) << endl; } void ParentLogChildMessage(const std::string& str) override { - std::cout << str << std::endl; + cout << str << endl; } int Child() override; int Run() @@ -172,12 +172,12 @@ class ServicePrimProc : public Service, public Opt return startupRaceFlag_; } - boost::shared_ptr getPrimitiveServerThreadPool() + boost::shared_ptr getPrimitiveServerThreadPool() { return primServerThreadPool; } - boost::shared_ptr getOOBThreadPool() + boost::shared_ptr getOOBThreadPool() { return OOBThreadPool; } @@ -190,6 +190,6 @@ class ServicePrimProc : public Service, public Opt static ServicePrimProc* fInstance; // Since C++20 flag's init value is false. std::atomic_flag startupRaceFlag_{false}; - boost::shared_ptr primServerThreadPool; - boost::shared_ptr OOBThreadPool; + boost::shared_ptr primServerThreadPool; + boost::shared_ptr OOBThreadPool; }; diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index e0b91f22c..545d10a32 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -520,8 +520,7 @@ namespace exemgr statementsRunningCount->incr(stmtCounted); PrimitiveServerThreadPools primitiveServerThreadPools( - ServicePrimProc::instance()->getPrimitiveServerThreadPool(), - ServicePrimProc::instance()->getOOBThreadPool()); + ServicePrimProc::instance()->getPrimitiveServerThreadPool()); if (tryTuples) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e4e82b864..4f760200f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -59,6 +59,11 @@ if (WITH_UNITTESTS) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:) + add_executable(fair_threadpool_test fair_threadpool.cpp) + add_dependencies(fair_threadpool_test googletest) + target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) + gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:) + add_executable(comparators_tests comparators-tests.cpp) target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) add_test(NAME columnstore:comparators_tests COMMAND comparators_tests) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp new file mode 100644 index 000000000..a74674516 --- /dev/null +++ b/tests/fair_threadpool.cpp @@ -0,0 +1,202 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include + +#include "utils/threadpool/fair_threadpool.h" + +using namespace primitiveprocessor; +using namespace std; +using namespace threadpool; + +using ResultsType = std::vector; +static ResultsType results; + +class FairThreadPoolTest : public testing::Test +{ + public: + void SetUp() override + { + results.clear(); + threadPool = new FairThreadPool(1, 1, 0, 0); + } + + FairThreadPool* threadPool; +}; + +class TestFunctor : public FairThreadPool::Functor +{ + public: + TestFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay) + { + } + ~TestFunctor(){}; + int operator()() override + { + usleep(delay_); + results.push_back(id_); + return 0; + } + + private: + size_t id_; + size_t delay_; +}; + +class TestRescheduleFunctor : public FairThreadPool::Functor +{ + public: + TestRescheduleFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay) + { + } + ~TestRescheduleFunctor(){}; + int operator()() override + { + if (firstRun) + { + firstRun = false; + return 1; // re-schedule the Job + } + usleep(delay_); + results.push_back(id_); + return 0; + } + + private: + size_t id_; + size_t delay_; + bool firstRun = true; +}; + +testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, + const size_t idxB, const int b) +{ + if (arr.empty() || arr.size() <= max(idxA, idxB)) + return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; + if (arr[idxA] == a && arr[idxB] == b) + return testing::AssertionSuccess(); + if (arr[idxA] == b && arr[idxB] == a) + return testing::AssertionSuccess(); + return testing::AssertionFailure() << "The values at positions " << idxA << " " << idxB << " are not " << a + << " and " << b << std::endl; +} + +TEST_F(FairThreadPoolTest, FairThreadPoolAdd) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 50000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 5000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 5000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); + EXPECT_EQ(results[2], 2); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolRemove) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + threadPool->removeJobs(job2.id_); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 2ULL); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolCleanUp) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->removeJobs(job1.id_); + threadPool->addJob(job2); + threadPool->removeJobs(job2.id_); + threadPool->addJob(job3); + threadPool->removeJobs(job3.id_); + threadPool->removeJobs(job1.id_); + threadPool->removeJobs(job1.id_); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); + EXPECT_EQ(results[0], 1); + EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); +} \ No newline at end of file diff --git a/utils/threadpool/CMakeLists.txt b/utils/threadpool/CMakeLists.txt index c8db05b2e..dd96304de 100644 --- a/utils/threadpool/CMakeLists.txt +++ b/utils/threadpool/CMakeLists.txt @@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp) +set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp) add_library(threadpool SHARED ${threadpool_LIB_SRCS}) add_dependencies(threadpool loggingcpp) target_link_libraries(threadpool Boost::chrono) diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index c7f842ac4..fbd479966 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -52,33 +52,19 @@ class PriorityThreadPool // this thread pool to reschedule the job, 0 will throw it away on return. virtual int operator()() = 0; }; - using TransactionIdxT = uint32_t; + struct Job { Job() : weight(1), priority(0), id(0) { } - Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, - const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, - const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) - : uniqueID(uniqueID) - , stepID(stepID) - , txnIdx(txnIdx) - , functor(functor) - , sock(sock) - , weight(weight) - , priority(priority) - , id(id) - { - } - uint32_t uniqueID; - uint32_t stepID; - TransactionIdxT txnIdx; boost::shared_ptr functor; - primitiveprocessor::SP_UM_IOSOCK sock; uint32_t weight; uint32_t priority; uint32_t id; + uint32_t uniqueID; + uint32_t stepID; + primitiveprocessor::SP_UM_IOSOCK sock; }; enum Priority @@ -128,7 +114,7 @@ class PriorityThreadPool { return blockedThreads; } - + protected: private: struct ThreadHelper