diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2b45e83c3..36efa797a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,6 +38,10 @@ if (WITH_UNITTESTS) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_discover_tests(simd_processors TEST_PREFIX columnstore:) + add_executable(fair_threadpool_test fair_threadpool.cpp) + target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) + gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) + # CPPUNIT TESTS add_executable(we_shared_components_tests shared_components_tests.cpp) add_dependencies(we_shared_components_tests loggingcpp) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp new file mode 100644 index 000000000..b4a215992 --- /dev/null +++ b/tests/fair_threadpool.cpp @@ -0,0 +1,174 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include + +#include "utils/threadpool/fair_threadpool.h" + +using namespace primitiveprocessor; +using namespace std; +using namespace threadpool; + +using ResultsType = std::vector; +static ResultsType results; + +class FairThreadPoolTest : public testing::Test { + public: + + void SetUp() override + { + results.clear(); + threadPool = new FairThreadPool(1, 1, 0, 0); + } + + + FairThreadPool* threadPool; +}; + +class TestFunctor: public FairThreadPool::Functor +{ + public: + TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestFunctor() {}; + int operator()() override + { + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; +}; + +class TestRescheduleFunctor: public FairThreadPool::Functor +{ + public: + TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestRescheduleFunctor() {}; + int operator()() override + { + if (firstRun) + { + firstRun = false; + return 1; // re-schedule the Job + } + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; + bool firstRun = true; +}; + +testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b) +{ + if (arr.empty() || arr.size() <= max(idxA, idxB)) + return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; + if (arr[idxA] == a && arr[idxB] == b) + return testing::AssertionSuccess(); + if (arr[idxA] == b && arr[idxB] == a) + return testing::AssertionSuccess(); + return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB + << " are not " << a << " and " << b << std::endl; +} + +TEST_F(FairThreadPoolTest, FairThreadPoolAdd) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 50000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 5000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 5000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(2500000); + } + usleep(2500000); + + EXPECT_EQ(threadPool->queueSize(), 0); + EXPECT_EQ(results.size(), 3); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); + EXPECT_EQ(results[2], 2); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolRemove) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + threadPool->removeJobs(job2.id_); + + while (threadPool->queueSize()) + { + usleep(1500000); + } + + EXPECT_EQ(threadPool->queueSize(), 0); + EXPECT_EQ(results.size(), 2); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(1500000); + } + + EXPECT_EQ(threadPool->queueSize(), 0); + EXPECT_EQ(results.size(), 3); + EXPECT_EQ(results[0], 1); + EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); +} \ No newline at end of file diff --git a/utils/threadpool/CMakeLists.txt b/utils/threadpool/CMakeLists.txt index d20887ed6..10891e63f 100644 --- a/utils/threadpool/CMakeLists.txt +++ b/utils/threadpool/CMakeLists.txt @@ -4,10 +4,12 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp) - +set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp) add_library(threadpool SHARED ${threadpool_LIB_SRCS}) - add_dependencies(threadpool loggingcpp) - install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) + +# set(fair_threadpool_LIB_SRCS fair_threadpool.cpp) +# add_library(fair_threadpool STATIC ${fair_threadpool_LIB_SRCS}) +# add_dependencies(fair_threadpool loggingcpp) +# install(TARGETS fair_threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp new file mode 100644 index 000000000..631c4ac49 --- /dev/null +++ b/utils/threadpool/fair_threadpool.cpp @@ -0,0 +1,302 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +using namespace std; + +#include "messageobj.h" +#include "messagelog.h" +#include "threadnaming.h" +using namespace logging; + +#include "fair_threadpool.h" +using namespace boost; + +#include "dbcon/joblist/primitivemsg.h" + +namespace threadpool +{ +FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, + uint ID) + : _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true) +{ + boost::thread* newThread; + size_t numberOfThreads = highThreads + midThreads + lowThreads; + for (uint32_t i = 0; i < numberOfThreads; ++i) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + } + cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n"; + defaultThreadCounts = threadCounts = numberOfThreads; +} + +FairThreadPool::~FairThreadPool() +{ + stop(); +} + +void FairThreadPool::addJob(const Job& job) +{ + addJob_(job); +} + +void FairThreadPool::addJob_(const Job& job, bool useLock) +{ + boost::thread* newThread; + std::unique_lock lk(mutex, std::defer_lock_t()); + + if (useLock) + lk.lock(); + + // Create any missing threads + if (defaultThreadCounts != threadCounts) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + ++threadCounts; + } + + // If some threads have blocked (because of output queue full) + // Temporarily add some extra worker threads to make up for the blocked threads. + if (blockedThreads > extraThreads) + { + stopExtra = false; + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA)); + newThread->detach(); + extraThreads++; + } + else if (blockedThreads == 0) + { + // Release the temporary threads -- some threads have become unblocked. + stopExtra = true; + } + + auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); + if (jobsListMapIter == txn2JobsListMap_.end()) + { + ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; + jobsList->push_back(job); + txn2JobsListMap_[job.txnIdx_] = jobsList; + WeightT currentTopWeight = weightedTxnsQueue_.empty() ? 0 : weightedTxnsQueue_.top().first; + weightedTxnsQueue_.push({currentTopWeight, job.txnIdx_}); + } + else + { + jobsListMapIter->second->push_back(job); + } + + if (useLock) + newJob.notify_one(); +} + +void FairThreadPool::removeJobs(uint32_t id) +{ + std::unique_lock lk(mutex); + + for (auto& txnJobsMapPair : txn2JobsListMap_) + { + ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; + auto job = txnJobsList->begin(); + while (job != txnJobsList->end()) + { + if (job->id_ == id) + { + job = txnJobsList->erase(job); // update the job iter + if (txnJobsList->empty()) + { + txn2JobsListMap_.erase(txnJobsMapPair.first); + delete txnJobsList; + break; + // There is no clean-up for PQ. It will happen later in threadFcn + } + continue; // go-on skiping job iter increment + } + ++job; + } + } +} + +void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) +{ + if (preferredQueue == PriorityThreadPool::Priority::EXTRA) + utils::setThreadName("Extra"); + else + utils::setThreadName("Idle"); + RunListT runList; // This is a vector to allow to grab multiple jobs + RescheduleVecType reschedule; + bool running = false; + bool rescheduleJob = false; + + try + { + while (!_stop) + { + runList.clear(); // remove the job + std::unique_lock lk(mutex); + + if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra) + { + --extraThreads; + return; + } + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); + continue; // just go on w/o re-taking the lock + } + + WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); + auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + // Looking for non-empty jobsList in a loop + // Waiting on cond_var if PQ is empty(no jobs in this thread pool) + while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty()) + { + // JobList is empty. This can happen when this method pops the last Job. + if (txnAndJobListPair != txn2JobsListMap_.end()) + { + ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second; + delete txnJobsList; + txn2JobsListMap_.erase(txnAndJobListPair->first); + } + weightedTxnsQueue_.pop(); + if (weightedTxnsQueue_.empty()) // remove the empty + { + break; + } + weightedTxn = weightedTxnsQueue_.top(); + txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + } + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); // might need a lock here + continue; + } + + // We have non-empty jobsList at this point. + // Remove the txn from a queue first to add it later + weightedTxnsQueue_.pop(); + TransactionIdxT txnIdx = txnAndJobListPair->first; + ThreadPoolJobsList* jobsList = txnAndJobListPair->second; + // Job& job = jobsList->front(); + runList.push_back(jobsList->front()); + + jobsList->pop_front(); + // Add the jobList back into the PQ adding some weight to it + // Current algo doesn't reduce total txn weight if the job is rescheduled. + if (!jobsList->empty()) + { + weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx}); + } + + lk.unlock(); + + running = true; + rescheduleJob = (*(runList[0].functor_))(); + running = false; + + utils::setThreadName("Idle"); + + if (rescheduleJob) + { + lk.lock(); + addJob_(runList[0], false); + newJob.notify_one(); + lk.unlock(); + } + } + } + catch (std::exception& ex) + { + // Log the exception and exit this thread + try + { + --threadCounts; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("threadFcn: Caught exception: "); + args.add(ex.what()); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } + catch (...) + { + // Log the exception and exit this thread + try + { + --threadCounts; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(6); + args.add("threadFcn: Caught unknown exception!"); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } +} + +void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) +{ + ISMPacketHeader ism; + PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; + + ism.Status = logging::primitiveServerErr; + ph.UniqueID = id; + ph.StepID = step; + messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + msg.append((uint8_t*)&ism, sizeof(ism)); + msg.append((uint8_t*)&ph, sizeof(ph)); + + sock->write(msg); +} + +void FairThreadPool::stop() +{ + _stop = true; +} + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h new file mode 100644 index 000000000..f39f865a3 --- /dev/null +++ b/utils/threadpool/fair_threadpool.h @@ -0,0 +1,173 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "primitives/primproc/umsocketselector.h" +#include "prioritythreadpool.h" + +namespace threadpool +{ +class FairThreadPool +{ + public: + using Functor = PriorityThreadPool::Functor; + + using TransactionIdxT = uint32_t; + struct Job + { + Job() : weight_(1), priority_(0), id_(0) + { + } + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, + const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) + : uniqueID_(uniqueID) + , stepID_(stepID) + , txnIdx_(txnIdx) + , functor_(functor) + , sock_(sock) + , weight_(weight) + , priority_(priority) + , id_(id) + { + } + uint32_t uniqueID_; + uint32_t stepID_; + TransactionIdxT txnIdx_; + boost::shared_ptr functor_; + primitiveprocessor::SP_UM_IOSOCK sock_; + uint32_t weight_; + uint32_t priority_; + uint32_t id_; + }; + + /********************************************* + * ctor/dtor + * + *********************************************/ + + /** @brief ctor + */ + + FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0); + virtual ~FairThreadPool(); + + void removeJobs(uint32_t id); + void addJob(const Job& job); + void stop(); + + /** @brief for use in debugging + */ + void dump(); + + // If a job is blocked, we want to temporarily increase the number of threads managed by the pool + // A problem can occur if all threads are running long or blocked for a single query. Other + // queries won't get serviced, even though there are cpu cycles available. + // These calls are currently protected by respondLock in sendThread(). If you call from other + // places, you need to consider atomicity. + void incBlockedThreads() + { + blockedThreads++; + } + void decBlockedThreads() + { + blockedThreads--; + } + uint32_t blockedThreadCount() const + { + return blockedThreads; + } + size_t queueSize() const + { + return weightedTxnsQueue_.size(); + } + + protected: + private: + struct ThreadHelper + { + ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue) + { + } + void operator()() + { + ptp->threadFcn(preferredQueue); + } + FairThreadPool* ptp; + PriorityThreadPool::Priority preferredQueue; + }; + + explicit FairThreadPool(); + explicit FairThreadPool(const FairThreadPool&); + FairThreadPool& operator=(const FairThreadPool&); + + void addJob_(const Job& job, bool useLock = true); + void threadFcn(const PriorityThreadPool::Priority preferredQueue); + void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); + + uint32_t threadCounts; + uint32_t defaultThreadCounts; + std::mutex mutex; + std::condition_variable newJob; + boost::thread_group threads; + bool _stop; + uint32_t weightPerRun; + volatile uint id; // prevent it from being optimized out + + using WeightT = uint32_t; + using WeightedTxnT = std::pair; + using WeightedTxnVec = std::vector; + struct PrioQueueCmp + { + bool operator()(WeightedTxnT lhs, WeightedTxnT rhs) + { + if (lhs.first == rhs.first) + return lhs.second > rhs.second; + return lhs.first > rhs.first; + } + }; + using RunListT = std::vector; + using RescheduleVecType = std::vector; + using WeightedTxnPrioQueue = std::priority_queue; + using ThreadPoolJobsList = std::list; + using Txn2ThreadPoolJobsListMap = std::unordered_map; + Txn2ThreadPoolJobsListMap txn2JobsListMap_; + WeightedTxnPrioQueue weightedTxnsQueue_; + std::atomic blockedThreads; + std::atomic extraThreads; + bool stopExtra; +}; + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index c37990416..dbaa71894 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -54,8 +54,6 @@ class PriorityThreadPool virtual int operator()() = 0; }; - // typedef boost::function0 Functor; - struct Job { Job() : weight(1), priority(0), id(0)