From 0907ca414f89dbd7b0561fc9faff660cfe514480 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Mon, 4 Jul 2022 09:18:26 +0000 Subject: [PATCH] MCOL-5044 This patch simplifies addJob interfaces removing extra bool that control mutex locking, adds additional nullptr dereference check in removeJobs and fixes FairThreadPool hashmap iter invalidation issues --- tests/CMakeLists.txt | 2 +- tests/fair_threadpool.cpp | 57 +++++++++++++++++++++------- utils/threadpool/fair_threadpool.cpp | 55 ++++++++++++--------------- utils/threadpool/fair_threadpool.h | 2 +- 4 files changed, 69 insertions(+), 47 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index aaf40fc4a..b9b464aa6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -55,7 +55,7 @@ if (WITH_UNITTESTS) add_executable(fair_threadpool_test fair_threadpool.cpp) add_dependencies(fair_threadpool_test googletest) target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) - gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) + gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:) # CPPUNIT TESTS add_executable(we_shared_components_tests shared_components_tests.cpp) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp index c4a299bd1..a74674516 100644 --- a/tests/fair_threadpool.cpp +++ b/tests/fair_threadpool.cpp @@ -28,62 +28,64 @@ using namespace threadpool; using ResultsType = std::vector; static ResultsType results; -class FairThreadPoolTest : public testing::Test { - public: - +class FairThreadPoolTest : public testing::Test +{ + public: void SetUp() override { results.clear(); threadPool = new FairThreadPool(1, 1, 0, 0); } - FairThreadPool* threadPool; }; -class TestFunctor: public FairThreadPool::Functor +class TestFunctor : public FairThreadPool::Functor { public: - TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + TestFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay) { } - ~TestFunctor() {}; + ~TestFunctor(){}; int operator()() override { usleep(delay_); results.push_back(id_); return 0; } + private: size_t id_; size_t delay_; }; -class TestRescheduleFunctor: public FairThreadPool::Functor +class TestRescheduleFunctor : public FairThreadPool::Functor { public: - TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + TestRescheduleFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay) { } - ~TestRescheduleFunctor() {}; + ~TestRescheduleFunctor(){}; int operator()() override { if (firstRun) { firstRun = false; - return 1; // re-schedule the Job + return 1; // re-schedule the Job } usleep(delay_); results.push_back(id_); return 0; } + private: size_t id_; size_t delay_; bool firstRun = true; }; -testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b) +testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, + const size_t idxB, const int b) { if (arr.empty() || arr.size() <= max(idxA, idxB)) return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; @@ -91,8 +93,8 @@ testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, return testing::AssertionSuccess(); if (arr[idxA] == b && arr[idxB] == a) return testing::AssertionSuccess(); - return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB - << " are not " << a << " and " << b << std::endl; + return testing::AssertionFailure() << "The values at positions " << idxA << " " << idxB << " are not " << a + << " and " << b << std::endl; } TEST_F(FairThreadPoolTest, FairThreadPoolAdd) @@ -147,6 +149,33 @@ TEST_F(FairThreadPoolTest, FairThreadPoolRemove) EXPECT_EQ(results[1], 3); } +TEST_F(FairThreadPoolTest, FairThreadPoolCleanUp) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->removeJobs(job1.id_); + threadPool->addJob(job2); + threadPool->removeJobs(job2.id_); + threadPool->addJob(job3); + threadPool->removeJobs(job3.id_); + threadPool->removeJobs(job1.id_); + threadPool->removeJobs(job1.id_); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); +} + TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) { SP_UM_IOSOCK sock(new messageqcpp::IOSocket); diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index 094d6681e..7c267e42c 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -55,11 +55,6 @@ FairThreadPool::~FairThreadPool() } void FairThreadPool::addJob(const Job& job) -{ - addJob_(job); -} - -void FairThreadPool::addJob_(const Job& job, bool useLock) { boost::thread* newThread; std::unique_lock lk(mutex, std::defer_lock_t()); @@ -72,8 +67,7 @@ void FairThreadPool::addJob_(const Job& job, bool useLock) threadCounts_.fetch_add(1, std::memory_order_relaxed); } - if (useLock) - lk.lock(); + lk.lock(); // If some threads have blocked (because of output queue full) // Temporarily add some extra worker threads to make up for the blocked threads. if (blockedThreads_ > extraThreads_) @@ -106,21 +100,22 @@ void FairThreadPool::addJob_(const Job& job, bool useLock) jobsListMapIter->second->push_back(job); } - if (useLock) - newJob.notify_one(); + newJob.notify_one(); } void FairThreadPool::removeJobs(uint32_t id) { - // std::cout << "FairThreadPool::removeJobs id " << id << std::endl; std::unique_lock lk(mutex); - for (auto& txnJobsMapPair : txn2JobsListMap_) + auto txnJobsMapIter = txn2JobsListMap_.begin(); + while (txnJobsMapIter != txn2JobsListMap_.end()) { + auto& txnJobsMapPair = *txnJobsMapIter; ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; - if (txnJobsList->empty()) + // txnJobsList must not be nullptr + if (txnJobsList && txnJobsList->empty()) { - txn2JobsListMap_.erase(txnJobsMapPair.first); + txnJobsMapIter = txn2JobsListMap_.erase(txnJobsMapIter); delete txnJobsList; continue; // There is no clean-up for PQ. It will happen later in threadFcn @@ -128,21 +123,22 @@ void FairThreadPool::removeJobs(uint32_t id) auto job = txnJobsList->begin(); while (job != txnJobsList->end()) { - // std::cout << "removeJobs() job->id_ " << job->id_ << std::endl; if (job->id_ == id) { job = txnJobsList->erase(job); // update the job iter - if (txnJobsList->empty()) - { - txn2JobsListMap_.erase(txnJobsMapPair.first); - delete txnJobsList; - break; - // There is no clean-up for PQ. It will happen later in threadFcn - } - continue; // go-on skiping job iter increment + continue; // go-on skiping job iter increment } ++job; } + + if (txnJobsList->empty()) + { + txnJobsMapIter = txn2JobsListMap_.erase(txnJobsMapIter); + delete txnJobsList; + continue; + // There is no clean-up for PQ. It will happen later in threadFcn + } + ++txnJobsMapIter; } } @@ -185,6 +181,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue { ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second; delete txnJobsList; + // !txnAndJobListPair is invalidated after this! txn2JobsListMap_.erase(txnAndJobListPair->first); } weightedTxnsQueue_.pop(); @@ -231,16 +228,12 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue { // to avoid excessive CPU usage waiting for data from storage usleep(500); - lk.lock(); - addJob_(runList[0], false); - newJob.notify_one(); - lk.unlock(); + addJob(runList[0]); } } } catch (std::exception& ex) { - // std::cout << "FairThreadPool::threadFcn(): std::exception - no reschedule but send an error" << std::endl; if (running) { jobsRunning_.fetch_sub(1, std::memory_order_relaxed); @@ -270,13 +263,12 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue } catch (...) { - std::cout << "FairThreadPool::threadFcn(): std::exception - double exception: failed to send an error" << std::endl; + std::cout << "FairThreadPool::threadFcn(): std::exception - double exception: failed to send an error" + << std::endl; } } catch (...) { - // std::cout << "FairThreadPool::threadFcn(): ... exception - no reschedule but send an error" << std::endl; - // Log the exception and exit this thread try { if (running) @@ -302,7 +294,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue } catch (...) { - std::cout << "FairThreadPool::threadFcn(): ... exception - double exception: failed to send an error" << std::endl; + std::cout << "FairThreadPool::threadFcn(): ... exception - double exception: failed to send an error" + << std::endl; } } } diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h index 311b3b754..72c4fb0ce 100644 --- a/utils/threadpool/fair_threadpool.h +++ b/utils/threadpool/fair_threadpool.h @@ -39,6 +39,7 @@ namespace threadpool { + // The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time // b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight // stored in PriorityQueue that thread increases before run another morsel for the query. When query is @@ -144,7 +145,6 @@ class FairThreadPool explicit FairThreadPool(const FairThreadPool&); FairThreadPool& operator=(const FairThreadPool&); - void addJob_(const Job& job, bool useLock = true); void threadFcn(const PriorityThreadPool::Priority preferredQueue); void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);