From 4b51820db136a38c12119e55d4c6ae5ac961238f Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Sun, 17 Apr 2022 17:32:33 +0000 Subject: [PATCH] MCOL-5044 Initial version of a fair thread pool PP now uses PriorityThreadPool that arbitrary picks another jobs pack to run. This scheduling discipline tend to run portions of a single query forcing other simultaneous queries to wait. In result parallel queries timings variance is high. The FairThreadPool picks the job with the smallest amount of work done so far(see the code for details) --- tests/CMakeLists.txt | 4 + tests/fair_threadpool.cpp | 174 +++++++++++++++ utils/threadpool/CMakeLists.txt | 10 +- utils/threadpool/fair_threadpool.cpp | 302 ++++++++++++++++++++++++++ utils/threadpool/fair_threadpool.h | 173 +++++++++++++++ utils/threadpool/prioritythreadpool.h | 2 - 6 files changed, 659 insertions(+), 6 deletions(-) create mode 100644 tests/fair_threadpool.cpp create mode 100644 utils/threadpool/fair_threadpool.cpp create mode 100644 utils/threadpool/fair_threadpool.h diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2b45e83c3..36efa797a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,6 +38,10 @@ if (WITH_UNITTESTS) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_discover_tests(simd_processors TEST_PREFIX columnstore:) + add_executable(fair_threadpool_test fair_threadpool.cpp) + target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) + gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) + # CPPUNIT TESTS add_executable(we_shared_components_tests shared_components_tests.cpp) add_dependencies(we_shared_components_tests loggingcpp) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp new file mode 100644 index 000000000..b4a215992 --- /dev/null +++ b/tests/fair_threadpool.cpp @@ -0,0 +1,174 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include + +#include "utils/threadpool/fair_threadpool.h" + +using namespace primitiveprocessor; +using namespace std; +using namespace threadpool; + +using ResultsType = std::vector; +static ResultsType results; + +class FairThreadPoolTest : public testing::Test { + public: + + void SetUp() override + { + results.clear(); + threadPool = new FairThreadPool(1, 1, 0, 0); + } + + + FairThreadPool* threadPool; +}; + +class TestFunctor: public FairThreadPool::Functor +{ + public: + TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestFunctor() {}; + int operator()() override + { + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; +}; + +class TestRescheduleFunctor: public FairThreadPool::Functor +{ + public: + TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestRescheduleFunctor() {}; + int operator()() override + { + if (firstRun) + { + firstRun = false; + return 1; // re-schedule the Job + } + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; + bool firstRun = true; +}; + +testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b) +{ + if (arr.empty() || arr.size() <= max(idxA, idxB)) + return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; + if (arr[idxA] == a && arr[idxB] == b) + return testing::AssertionSuccess(); + if (arr[idxA] == b && arr[idxB] == a) + return testing::AssertionSuccess(); + return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB + << " are not " << a << " and " << b << std::endl; +} + +TEST_F(FairThreadPoolTest, FairThreadPoolAdd) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 50000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 5000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 5000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(2500000); + } + usleep(2500000); + + EXPECT_EQ(threadPool->queueSize(), 0); + EXPECT_EQ(results.size(), 3); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); + EXPECT_EQ(results[2], 2); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolRemove) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + threadPool->removeJobs(job2.id_); + + while (threadPool->queueSize()) + { + usleep(1500000); + } + + EXPECT_EQ(threadPool->queueSize(), 0); + EXPECT_EQ(results.size(), 2); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(1500000); + } + + EXPECT_EQ(threadPool->queueSize(), 0); + EXPECT_EQ(results.size(), 3); + EXPECT_EQ(results[0], 1); + EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); +} \ No newline at end of file diff --git a/utils/threadpool/CMakeLists.txt b/utils/threadpool/CMakeLists.txt index d20887ed6..10891e63f 100644 --- a/utils/threadpool/CMakeLists.txt +++ b/utils/threadpool/CMakeLists.txt @@ -4,10 +4,12 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp) - +set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp) add_library(threadpool SHARED ${threadpool_LIB_SRCS}) - add_dependencies(threadpool loggingcpp) - install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) + +# set(fair_threadpool_LIB_SRCS fair_threadpool.cpp) +# add_library(fair_threadpool STATIC ${fair_threadpool_LIB_SRCS}) +# add_dependencies(fair_threadpool loggingcpp) +# install(TARGETS fair_threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp new file mode 100644 index 000000000..631c4ac49 --- /dev/null +++ b/utils/threadpool/fair_threadpool.cpp @@ -0,0 +1,302 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +using namespace std; + +#include "messageobj.h" +#include "messagelog.h" +#include "threadnaming.h" +using namespace logging; + +#include "fair_threadpool.h" +using namespace boost; + +#include "dbcon/joblist/primitivemsg.h" + +namespace threadpool +{ +FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, + uint ID) + : _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true) +{ + boost::thread* newThread; + size_t numberOfThreads = highThreads + midThreads + lowThreads; + for (uint32_t i = 0; i < numberOfThreads; ++i) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + } + cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n"; + defaultThreadCounts = threadCounts = numberOfThreads; +} + +FairThreadPool::~FairThreadPool() +{ + stop(); +} + +void FairThreadPool::addJob(const Job& job) +{ + addJob_(job); +} + +void FairThreadPool::addJob_(const Job& job, bool useLock) +{ + boost::thread* newThread; + std::unique_lock lk(mutex, std::defer_lock_t()); + + if (useLock) + lk.lock(); + + // Create any missing threads + if (defaultThreadCounts != threadCounts) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + ++threadCounts; + } + + // If some threads have blocked (because of output queue full) + // Temporarily add some extra worker threads to make up for the blocked threads. + if (blockedThreads > extraThreads) + { + stopExtra = false; + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA)); + newThread->detach(); + extraThreads++; + } + else if (blockedThreads == 0) + { + // Release the temporary threads -- some threads have become unblocked. + stopExtra = true; + } + + auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); + if (jobsListMapIter == txn2JobsListMap_.end()) + { + ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; + jobsList->push_back(job); + txn2JobsListMap_[job.txnIdx_] = jobsList; + WeightT currentTopWeight = weightedTxnsQueue_.empty() ? 0 : weightedTxnsQueue_.top().first; + weightedTxnsQueue_.push({currentTopWeight, job.txnIdx_}); + } + else + { + jobsListMapIter->second->push_back(job); + } + + if (useLock) + newJob.notify_one(); +} + +void FairThreadPool::removeJobs(uint32_t id) +{ + std::unique_lock lk(mutex); + + for (auto& txnJobsMapPair : txn2JobsListMap_) + { + ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; + auto job = txnJobsList->begin(); + while (job != txnJobsList->end()) + { + if (job->id_ == id) + { + job = txnJobsList->erase(job); // update the job iter + if (txnJobsList->empty()) + { + txn2JobsListMap_.erase(txnJobsMapPair.first); + delete txnJobsList; + break; + // There is no clean-up for PQ. It will happen later in threadFcn + } + continue; // go-on skiping job iter increment + } + ++job; + } + } +} + +void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) +{ + if (preferredQueue == PriorityThreadPool::Priority::EXTRA) + utils::setThreadName("Extra"); + else + utils::setThreadName("Idle"); + RunListT runList; // This is a vector to allow to grab multiple jobs + RescheduleVecType reschedule; + bool running = false; + bool rescheduleJob = false; + + try + { + while (!_stop) + { + runList.clear(); // remove the job + std::unique_lock lk(mutex); + + if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra) + { + --extraThreads; + return; + } + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); + continue; // just go on w/o re-taking the lock + } + + WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); + auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + // Looking for non-empty jobsList in a loop + // Waiting on cond_var if PQ is empty(no jobs in this thread pool) + while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty()) + { + // JobList is empty. This can happen when this method pops the last Job. + if (txnAndJobListPair != txn2JobsListMap_.end()) + { + ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second; + delete txnJobsList; + txn2JobsListMap_.erase(txnAndJobListPair->first); + } + weightedTxnsQueue_.pop(); + if (weightedTxnsQueue_.empty()) // remove the empty + { + break; + } + weightedTxn = weightedTxnsQueue_.top(); + txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + } + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); // might need a lock here + continue; + } + + // We have non-empty jobsList at this point. + // Remove the txn from a queue first to add it later + weightedTxnsQueue_.pop(); + TransactionIdxT txnIdx = txnAndJobListPair->first; + ThreadPoolJobsList* jobsList = txnAndJobListPair->second; + // Job& job = jobsList->front(); + runList.push_back(jobsList->front()); + + jobsList->pop_front(); + // Add the jobList back into the PQ adding some weight to it + // Current algo doesn't reduce total txn weight if the job is rescheduled. + if (!jobsList->empty()) + { + weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx}); + } + + lk.unlock(); + + running = true; + rescheduleJob = (*(runList[0].functor_))(); + running = false; + + utils::setThreadName("Idle"); + + if (rescheduleJob) + { + lk.lock(); + addJob_(runList[0], false); + newJob.notify_one(); + lk.unlock(); + } + } + } + catch (std::exception& ex) + { + // Log the exception and exit this thread + try + { + --threadCounts; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("threadFcn: Caught exception: "); + args.add(ex.what()); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } + catch (...) + { + // Log the exception and exit this thread + try + { + --threadCounts; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(6); + args.add("threadFcn: Caught unknown exception!"); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } +} + +void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) +{ + ISMPacketHeader ism; + PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; + + ism.Status = logging::primitiveServerErr; + ph.UniqueID = id; + ph.StepID = step; + messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + msg.append((uint8_t*)&ism, sizeof(ism)); + msg.append((uint8_t*)&ph, sizeof(ph)); + + sock->write(msg); +} + +void FairThreadPool::stop() +{ + _stop = true; +} + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h new file mode 100644 index 000000000..f39f865a3 --- /dev/null +++ b/utils/threadpool/fair_threadpool.h @@ -0,0 +1,173 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "primitives/primproc/umsocketselector.h" +#include "prioritythreadpool.h" + +namespace threadpool +{ +class FairThreadPool +{ + public: + using Functor = PriorityThreadPool::Functor; + + using TransactionIdxT = uint32_t; + struct Job + { + Job() : weight_(1), priority_(0), id_(0) + { + } + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, + const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) + : uniqueID_(uniqueID) + , stepID_(stepID) + , txnIdx_(txnIdx) + , functor_(functor) + , sock_(sock) + , weight_(weight) + , priority_(priority) + , id_(id) + { + } + uint32_t uniqueID_; + uint32_t stepID_; + TransactionIdxT txnIdx_; + boost::shared_ptr functor_; + primitiveprocessor::SP_UM_IOSOCK sock_; + uint32_t weight_; + uint32_t priority_; + uint32_t id_; + }; + + /********************************************* + * ctor/dtor + * + *********************************************/ + + /** @brief ctor + */ + + FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0); + virtual ~FairThreadPool(); + + void removeJobs(uint32_t id); + void addJob(const Job& job); + void stop(); + + /** @brief for use in debugging + */ + void dump(); + + // If a job is blocked, we want to temporarily increase the number of threads managed by the pool + // A problem can occur if all threads are running long or blocked for a single query. Other + // queries won't get serviced, even though there are cpu cycles available. + // These calls are currently protected by respondLock in sendThread(). If you call from other + // places, you need to consider atomicity. + void incBlockedThreads() + { + blockedThreads++; + } + void decBlockedThreads() + { + blockedThreads--; + } + uint32_t blockedThreadCount() const + { + return blockedThreads; + } + size_t queueSize() const + { + return weightedTxnsQueue_.size(); + } + + protected: + private: + struct ThreadHelper + { + ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue) + { + } + void operator()() + { + ptp->threadFcn(preferredQueue); + } + FairThreadPool* ptp; + PriorityThreadPool::Priority preferredQueue; + }; + + explicit FairThreadPool(); + explicit FairThreadPool(const FairThreadPool&); + FairThreadPool& operator=(const FairThreadPool&); + + void addJob_(const Job& job, bool useLock = true); + void threadFcn(const PriorityThreadPool::Priority preferredQueue); + void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); + + uint32_t threadCounts; + uint32_t defaultThreadCounts; + std::mutex mutex; + std::condition_variable newJob; + boost::thread_group threads; + bool _stop; + uint32_t weightPerRun; + volatile uint id; // prevent it from being optimized out + + using WeightT = uint32_t; + using WeightedTxnT = std::pair; + using WeightedTxnVec = std::vector; + struct PrioQueueCmp + { + bool operator()(WeightedTxnT lhs, WeightedTxnT rhs) + { + if (lhs.first == rhs.first) + return lhs.second > rhs.second; + return lhs.first > rhs.first; + } + }; + using RunListT = std::vector; + using RescheduleVecType = std::vector; + using WeightedTxnPrioQueue = std::priority_queue; + using ThreadPoolJobsList = std::list; + using Txn2ThreadPoolJobsListMap = std::unordered_map; + Txn2ThreadPoolJobsListMap txn2JobsListMap_; + WeightedTxnPrioQueue weightedTxnsQueue_; + std::atomic blockedThreads; + std::atomic extraThreads; + bool stopExtra; +}; + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index c37990416..dbaa71894 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -54,8 +54,6 @@ class PriorityThreadPool virtual int operator()() = 0; }; - // typedef boost::function0 Functor; - struct Job { Job() : weight(1), priority(0), id(0)