1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5044 This patch introduces current active jobs estimate counter and replaces some attributes with atomics

This commit is contained in:
Roman Nozdrin
2022-06-08 16:58:50 +00:00
parent fd8ba33f21
commit 2d2a6223f5
2 changed files with 43 additions and 24 deletions

View File

@ -15,6 +15,7 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */ MA 02110-1301, USA. */
#include <atomic>
#include <stdexcept> #include <stdexcept>
#include <unistd.h> #include <unistd.h>
#include <exception> #include <exception>
@ -34,7 +35,7 @@ namespace threadpool
{ {
FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads,
uint ID) uint ID)
: _stop(false), weightPerRun(targetWeightPerRun), id(ID) : weightPerRun(targetWeightPerRun), id(ID)
{ {
boost::thread* newThread; boost::thread* newThread;
size_t numberOfThreads = highThreads + midThreads + lowThreads; size_t numberOfThreads = highThreads + midThreads + lowThreads;
@ -44,7 +45,8 @@ FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint m
newThread->detach(); newThread->detach();
} }
cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n"; cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n";
defaultThreadCounts = threadCounts = numberOfThreads; threadCounts_.store(numberOfThreads, std::memory_order_relaxed);
defaultThreadCounts = numberOfThreads;
} }
FairThreadPool::~FairThreadPool() FairThreadPool::~FairThreadPool()
@ -62,28 +64,28 @@ void FairThreadPool::addJob_(const Job& job, bool useLock)
boost::thread* newThread; boost::thread* newThread;
std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t()); std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t());
if (useLock)
lk.lock();
// Create any missing threads // Create any missing threads
if (defaultThreadCounts != threadCounts) if (defaultThreadCounts != threadCounts_.load(std::memory_order_relaxed))
{ {
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
newThread->detach(); newThread->detach();
++threadCounts; threadCounts_.fetch_add(1, std::memory_order_relaxed);
} }
if (useLock)
lk.lock();
auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_);
if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map
{ {
ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; ThreadPoolJobsList* jobsList = new ThreadPoolJobsList;
jobsList->push_back(job); jobsList->push_back(job);
txn2JobsListMap_[job.txnIdx_] = jobsList; txn2JobsListMap_[job.txnIdx_] = jobsList;
weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); weightedTxnsQueue_.push({job.weight_, job.txnIdx_});
} }
else // txn is in the map else // txn is in the map
{ {
if (jobsListMapIter->second->empty()) // there are no jobs for the txn if (jobsListMapIter->second->empty()) // there are no jobs for the txn
{ {
weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); weightedTxnsQueue_.push({job.weight_, job.txnIdx_});
} }
@ -106,7 +108,7 @@ void FairThreadPool::removeJobs(uint32_t id)
{ {
if (job->id_ == id) if (job->id_ == id)
{ {
job = txnJobsList->erase(job); // update the job iter job = txnJobsList->erase(job); // update the job iter
if (txnJobsList->empty()) if (txnJobsList->empty())
{ {
txn2JobsListMap_.erase(txnJobsMapPair.first); txn2JobsListMap_.erase(txnJobsMapPair.first);
@ -114,7 +116,7 @@ void FairThreadPool::removeJobs(uint32_t id)
break; break;
// There is no clean-up for PQ. It will happen later in threadFcn // There is no clean-up for PQ. It will happen later in threadFcn
} }
continue; // go-on skiping job iter increment continue; // go-on skiping job iter increment
} }
++job; ++job;
} }
@ -124,22 +126,22 @@ void FairThreadPool::removeJobs(uint32_t id)
void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue)
{ {
utils::setThreadName("Idle"); utils::setThreadName("Idle");
RunListT runList(1); // This is a vector to allow to grab multiple jobs RunListT runList(1); // This is a vector to allow to grab multiple jobs
RescheduleVecType reschedule; RescheduleVecType reschedule;
bool running = false; bool running = false;
bool rescheduleJob = false; bool rescheduleJob = false;
try try
{ {
while (!_stop) while (!stop_.load(std::memory_order_relaxed))
{ {
runList.clear(); // remove the job runList.clear(); // remove the job
std::unique_lock<std::mutex> lk(mutex); std::unique_lock<std::mutex> lk(mutex);
if (weightedTxnsQueue_.empty()) if (weightedTxnsQueue_.empty())
{ {
newJob.wait(lk); newJob.wait(lk);
continue; // just go on w/o re-taking the lock continue; // just go on w/o re-taking the lock
} }
WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); WeightedTxnT weightedTxn = weightedTxnsQueue_.top();
@ -156,7 +158,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
txn2JobsListMap_.erase(txnAndJobListPair->first); txn2JobsListMap_.erase(txnAndJobListPair->first);
} }
weightedTxnsQueue_.pop(); weightedTxnsQueue_.pop();
if (weightedTxnsQueue_.empty()) // remove the empty if (weightedTxnsQueue_.empty()) // remove the empty
{ {
break; break;
} }
@ -166,7 +168,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
if (weightedTxnsQueue_.empty()) if (weightedTxnsQueue_.empty())
{ {
newJob.wait(lk); // might need a lock here newJob.wait(lk); // might need a lock here
continue; continue;
} }
@ -188,7 +190,9 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
lk.unlock(); lk.unlock();
running = true; running = true;
rescheduleJob = (*(runList[0].functor_))(); // run the functor jobsRunning_.fetch_add(1, std::memory_order_relaxed);
rescheduleJob = (*(runList[0].functor_))(); // run the functor
jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
running = false; running = false;
utils::setThreadName("Idle"); utils::setThreadName("Idle");
@ -206,10 +210,14 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
if (running)
{
jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
}
// Log the exception and exit this thread // Log the exception and exit this thread
try try
{ {
--threadCounts; threadCounts_.fetch_sub(1, std::memory_order_relaxed);
#ifndef NOLOGGING #ifndef NOLOGGING
logging::Message::Args args; logging::Message::Args args;
logging::Message message(5); logging::Message message(5);
@ -236,7 +244,12 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
// Log the exception and exit this thread // Log the exception and exit this thread
try try
{ {
--threadCounts; if (running)
{
jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
}
threadCounts_.fetch_sub(1, std::memory_order_relaxed);
;
#ifndef NOLOGGING #ifndef NOLOGGING
logging::Message::Args args; logging::Message::Args args;
logging::Message message(6); logging::Message message(6);
@ -276,7 +289,7 @@ void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor
void FairThreadPool::stop() void FairThreadPool::stop()
{ {
_stop = true; stop_.store(true, std::memory_order_relaxed);
} }
} // namespace threadpool } // namespace threadpool

View File

@ -102,6 +102,11 @@ class FairThreadPool
{ {
return weightedTxnsQueue_.size(); return weightedTxnsQueue_.size();
} }
// This method enables a pool current workload estimate.
size_t jobsRunning() const
{
return jobsRunning_.load(std::memory_order_relaxed);
}
protected: protected:
private: private:
@ -126,12 +131,10 @@ class FairThreadPool
void threadFcn(const PriorityThreadPool::Priority preferredQueue); void threadFcn(const PriorityThreadPool::Priority preferredQueue);
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
uint32_t threadCounts;
uint32_t defaultThreadCounts; uint32_t defaultThreadCounts;
std::mutex mutex; std::mutex mutex;
std::condition_variable newJob; std::condition_variable newJob;
boost::thread_group threads; boost::thread_group threads;
bool _stop;
uint32_t weightPerRun; uint32_t weightPerRun;
volatile uint id; // prevent it from being optimized out volatile uint id; // prevent it from being optimized out
@ -154,6 +157,9 @@ class FairThreadPool
using Txn2ThreadPoolJobsListMap = std::unordered_map<TransactionIdxT, ThreadPoolJobsList*>; using Txn2ThreadPoolJobsListMap = std::unordered_map<TransactionIdxT, ThreadPoolJobsList*>;
Txn2ThreadPoolJobsListMap txn2JobsListMap_; Txn2ThreadPoolJobsListMap txn2JobsListMap_;
WeightedTxnPrioQueue weightedTxnsQueue_; WeightedTxnPrioQueue weightedTxnsQueue_;
std::atomic<size_t> jobsRunning_{0};
std::atomic<size_t> threadCounts_{0};
std::atomic<bool> stop_{false};
}; };
} // namespace threadpool } // namespace threadpool