From 0e8014db02e537f598fe4845f98f4d374cafa287 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Tue, 21 Jun 2022 19:17:03 +0000 Subject: [PATCH] MCOL-5044 Adding EXTRA thread logic into FairThreadPool --- primitives/primproc/bppsendthread.h | 9 ++++---- primitives/primproc/primitiveserver.cpp | 6 ++--- primitives/primproc/primitiveserver.h | 2 +- .../primproc/primitiveserverthreadpools.h | 14 +++++------ primitives/primproc/primproc.h | 10 ++++---- utils/threadpool/fair_threadpool.cpp | 23 ++++++++++++++++++- utils/threadpool/fair_threadpool.h | 21 +++++++++++++++++ 7 files changed, 64 insertions(+), 21 deletions(-) diff --git a/primitives/primproc/bppsendthread.h b/primitives/primproc/bppsendthread.h index 615b075c7..7de14f881 100644 --- a/primitives/primproc/bppsendthread.h +++ b/primitives/primproc/bppsendthread.h @@ -24,12 +24,13 @@ #pragma once +#include "fair_threadpool.h" #include "umsocketselector.h" #include #include #include #include "threadnaming.h" -#include "prioritythreadpool.h" +#include "fair_threadpool.h" namespace primitiveprocessor { @@ -84,7 +85,7 @@ class BPPSendThread { return die; } - void setProcessorPool(boost::shared_ptr processorPool) + void setProcessorPool(boost::shared_ptr processorPool) { fProcessorPool = processorPool; } @@ -146,9 +147,9 @@ class BPPSendThread /* secondary queue size restriction based on byte size */ volatile uint64_t currentByteSize; uint64_t maxByteSize; - // Used to tell the PriorityThreadPool It should consider additional threads because a + // Used to tell the ThreadPool It should consider additional threads because a // queue full event has happened and a thread has been blocked. - boost::shared_ptr fProcessorPool; + boost::shared_ptr fProcessorPool; }; } // namespace primitiveprocessor diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 271963629..7d15c359e 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1947,7 +1947,7 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - threadpool::FairThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); + boost::shared_ptr procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2322,8 +2322,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool = new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, - medPriorityThreads, lowPriorityThreads, 0); + fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, + medPriorityThreads, lowPriorityThreads, 0)); // We're not using either the priority or the job-clustering features, just need a threadpool // that can reschedule jobs, and an unlimited non-blocking queue diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 3c3817405..eb50d0791 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -36,7 +36,7 @@ #include #include "threadpool.h" -#include "../../utils/threadpool/prioritythreadpool.h" +#include "../../utils/threadpool/fair_threadpool.h" #include "fair_threadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" diff --git a/primitives/primproc/primitiveserverthreadpools.h b/primitives/primproc/primitiveserverthreadpools.h index 5297adfe8..9c0a95a08 100644 --- a/primitives/primproc/primitiveserverthreadpools.h +++ b/primitives/primproc/primitiveserverthreadpools.h @@ -18,29 +18,29 @@ #pragma once #include -#include "prioritythreadpool.h" +#include "fair_threadpool.h" class PrimitiveServerThreadPools { public: PrimitiveServerThreadPools() = default; - PrimitiveServerThreadPools(boost::shared_ptr primServerThreadPool, - boost::shared_ptr OOBThreadPool) + PrimitiveServerThreadPools(boost::shared_ptr primServerThreadPool, + boost::shared_ptr OOBThreadPool) : fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool) { } - boost::shared_ptr getPrimitiveServerThreadPool() + boost::shared_ptr getPrimitiveServerThreadPool() { return fPrimServerThreadPool; } - boost::shared_ptr getOOBThreadPool() + boost::shared_ptr getOOBThreadPool() { return fOOBThreadPool; } private: - boost::shared_ptr fPrimServerThreadPool; - boost::shared_ptr fOOBThreadPool; + boost::shared_ptr fPrimServerThreadPool; + boost::shared_ptr fOOBThreadPool; }; diff --git a/primitives/primproc/primproc.h b/primitives/primproc/primproc.h index 328f004c0..4fb757c1f 100644 --- a/primitives/primproc/primproc.h +++ b/primitives/primproc/primproc.h @@ -35,7 +35,7 @@ #include #include "service.h" -#include "prioritythreadpool.h" +#include "fair_threadpool.h" #include "pp_logger.h" namespace primitiveprocessor @@ -172,12 +172,12 @@ class ServicePrimProc : public Service, public Opt return startupRaceFlag_; } - boost::shared_ptr getPrimitiveServerThreadPool() + boost::shared_ptr getPrimitiveServerThreadPool() { return primServerThreadPool; } - boost::shared_ptr getOOBThreadPool() + boost::shared_ptr getOOBThreadPool() { return OOBThreadPool; } @@ -190,6 +190,6 @@ class ServicePrimProc : public Service, public Opt static ServicePrimProc* fInstance; // Since C++20 flag's init value is false. std::atomic_flag startupRaceFlag_{false}; - boost::shared_ptr primServerThreadPool; - boost::shared_ptr OOBThreadPool; + boost::shared_ptr primServerThreadPool; + boost::shared_ptr OOBThreadPool; }; diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index 44d9cd3a9..f35781854 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -35,7 +35,7 @@ namespace threadpool { FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint ID) - : weightPerRun(targetWeightPerRun), id(ID) + : weightPerRun(targetWeightPerRun), id(ID), stopExtra_(false) { boost::thread* newThread; size_t numberOfThreads = highThreads + midThreads + lowThreads; @@ -74,6 +74,20 @@ void FairThreadPool::addJob_(const Job& job, bool useLock) if (useLock) lk.lock(); + // If some threads have blocked (because of output queue full) + // Temporarily add some extra worker threads to make up for the blocked threads. + if (blockedThreads_ > extraThreads_) + { + stopExtra_ = false; + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA)); + newThread->detach(); + ++extraThreads_; + } + else if (blockedThreads_ == 0) + { + // Release the temporary threads -- some threads have become unblocked. + stopExtra_ = true; + } auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map @@ -140,6 +154,13 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue if (weightedTxnsQueue_.empty()) { + // If this is an EXTRA thread due toother threads blocking, and all blockers are unblocked, + // we don't want this one any more. + if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra_) + { + --extraThreads_; + return; + } newJob.wait(lk); continue; // just go on w/o re-taking the lock } diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h index ab3afaa71..311b3b754 100644 --- a/utils/threadpool/fair_threadpool.h +++ b/utils/threadpool/fair_threadpool.h @@ -107,6 +107,23 @@ class FairThreadPool { return jobsRunning_.load(std::memory_order_relaxed); } + // If a job is blocked, we want to temporarily increase the number of threads managed by the pool + // A problem can occur if all threads are running long or blocked for a single query. Other + // queries won't get serviced, even though there are cpu cycles available. + // These calls are currently protected by respondLock in sendThread(). If you call from other + // places, you need to consider atomicity. + void incBlockedThreads() + { + ++blockedThreads_; + } + void decBlockedThreads() + { + --blockedThreads_; + } + uint32_t blockedThreadCount() + { + return blockedThreads_; + } protected: private: @@ -160,6 +177,10 @@ class FairThreadPool std::atomic jobsRunning_{0}; std::atomic threadCounts_{0}; std::atomic stop_{false}; + + std::atomic blockedThreads_{0}; + std::atomic extraThreads_{0}; + bool stopExtra_; }; } // namespace threadpool \ No newline at end of file