From 2d2a6223f51c8af0cced6d55dda00761245d465a Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Wed, 8 Jun 2022 16:58:50 +0000 Subject: [PATCH] MCOL-5044 This patch introduces current active jobs estimate counter and replaces some attributes with atomics --- utils/threadpool/fair_threadpool.cpp | 57 +++++++++++++++++----------- utils/threadpool/fair_threadpool.h | 10 ++++- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index 8e2721804..44d9cd3a9 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -15,6 +15,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include #include #include #include @@ -34,7 +35,7 @@ namespace threadpool { FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint ID) - : _stop(false), weightPerRun(targetWeightPerRun), id(ID) + : weightPerRun(targetWeightPerRun), id(ID) { boost::thread* newThread; size_t numberOfThreads = highThreads + midThreads + lowThreads; @@ -44,7 +45,8 @@ FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint m newThread->detach(); } cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n"; - defaultThreadCounts = threadCounts = numberOfThreads; + threadCounts_.store(numberOfThreads, std::memory_order_relaxed); + defaultThreadCounts = numberOfThreads; } FairThreadPool::~FairThreadPool() @@ -62,28 +64,28 @@ void FairThreadPool::addJob_(const Job& job, bool useLock) boost::thread* newThread; std::unique_lock lk(mutex, std::defer_lock_t()); - if (useLock) - lk.lock(); - // Create any missing threads - if (defaultThreadCounts != threadCounts) + if (defaultThreadCounts != threadCounts_.load(std::memory_order_relaxed)) { newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); newThread->detach(); - ++threadCounts; + threadCounts_.fetch_add(1, std::memory_order_relaxed); } + if (useLock) + lk.lock(); + auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); - if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map + if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map { ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; jobsList->push_back(job); txn2JobsListMap_[job.txnIdx_] = jobsList; weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); } - else // txn is in the map + else // txn is in the map { - if (jobsListMapIter->second->empty()) // there are no jobs for the txn + if (jobsListMapIter->second->empty()) // there are no jobs for the txn { weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); } @@ -106,7 +108,7 @@ void FairThreadPool::removeJobs(uint32_t id) { if (job->id_ == id) { - job = txnJobsList->erase(job); // update the job iter + job = txnJobsList->erase(job); // update the job iter if (txnJobsList->empty()) { txn2JobsListMap_.erase(txnJobsMapPair.first); @@ -114,7 +116,7 @@ void FairThreadPool::removeJobs(uint32_t id) break; // There is no clean-up for PQ. It will happen later in threadFcn } - continue; // go-on skiping job iter increment + continue; // go-on skiping job iter increment } ++job; } @@ -124,22 +126,22 @@ void FairThreadPool::removeJobs(uint32_t id) void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) { utils::setThreadName("Idle"); - RunListT runList(1); // This is a vector to allow to grab multiple jobs + RunListT runList(1); // This is a vector to allow to grab multiple jobs RescheduleVecType reschedule; bool running = false; bool rescheduleJob = false; try { - while (!_stop) + while (!stop_.load(std::memory_order_relaxed)) { - runList.clear(); // remove the job + runList.clear(); // remove the job std::unique_lock lk(mutex); if (weightedTxnsQueue_.empty()) { newJob.wait(lk); - continue; // just go on w/o re-taking the lock + continue; // just go on w/o re-taking the lock } WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); @@ -156,7 +158,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue txn2JobsListMap_.erase(txnAndJobListPair->first); } weightedTxnsQueue_.pop(); - if (weightedTxnsQueue_.empty()) // remove the empty + if (weightedTxnsQueue_.empty()) // remove the empty { break; } @@ -166,7 +168,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue if (weightedTxnsQueue_.empty()) { - newJob.wait(lk); // might need a lock here + newJob.wait(lk); // might need a lock here continue; } @@ -188,7 +190,9 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue lk.unlock(); running = true; - rescheduleJob = (*(runList[0].functor_))(); // run the functor + jobsRunning_.fetch_add(1, std::memory_order_relaxed); + rescheduleJob = (*(runList[0].functor_))(); // run the functor + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); running = false; utils::setThreadName("Idle"); @@ -206,10 +210,14 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue } catch (std::exception& ex) { + if (running) + { + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + } // Log the exception and exit this thread try { - --threadCounts; + threadCounts_.fetch_sub(1, std::memory_order_relaxed); #ifndef NOLOGGING logging::Message::Args args; logging::Message message(5); @@ -236,7 +244,12 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue // Log the exception and exit this thread try { - --threadCounts; + if (running) + { + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + } + threadCounts_.fetch_sub(1, std::memory_order_relaxed); + ; #ifndef NOLOGGING logging::Message::Args args; logging::Message message(6); @@ -276,7 +289,7 @@ void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor void FairThreadPool::stop() { - _stop = true; + stop_.store(true, std::memory_order_relaxed); } } // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h index 27a183c74..ab3afaa71 100644 --- a/utils/threadpool/fair_threadpool.h +++ b/utils/threadpool/fair_threadpool.h @@ -102,6 +102,11 @@ class FairThreadPool { return weightedTxnsQueue_.size(); } + // This method enables a pool current workload estimate. + size_t jobsRunning() const + { + return jobsRunning_.load(std::memory_order_relaxed); + } protected: private: @@ -126,12 +131,10 @@ class FairThreadPool void threadFcn(const PriorityThreadPool::Priority preferredQueue); void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); - uint32_t threadCounts; uint32_t defaultThreadCounts; std::mutex mutex; std::condition_variable newJob; boost::thread_group threads; - bool _stop; uint32_t weightPerRun; volatile uint id; // prevent it from being optimized out @@ -154,6 +157,9 @@ class FairThreadPool using Txn2ThreadPoolJobsListMap = std::unordered_map; Txn2ThreadPoolJobsListMap txn2JobsListMap_; WeightedTxnPrioQueue weightedTxnsQueue_; + std::atomic jobsRunning_{0}; + std::atomic threadCounts_{0}; + std::atomic stop_{false}; }; } // namespace threadpool \ No newline at end of file