1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

MCOL-5044 Adding EXTRA thread logic into FairThreadPool

This commit is contained in:
Roman Nozdrin
2022-06-21 19:17:03 +00:00
committed by Roman Nozdrin
parent 6cff14997d
commit 0e8014db02
7 changed files with 64 additions and 21 deletions

View File

@@ -24,12 +24,13 @@
#pragma once #pragma once
#include "fair_threadpool.h"
#include "umsocketselector.h" #include "umsocketselector.h"
#include <queue> #include <queue>
#include <set> #include <set>
#include <condition_variable> #include <condition_variable>
#include "threadnaming.h" #include "threadnaming.h"
#include "prioritythreadpool.h" #include "fair_threadpool.h"
namespace primitiveprocessor namespace primitiveprocessor
{ {
@@ -84,7 +85,7 @@ class BPPSendThread
{ {
return die; return die;
} }
void setProcessorPool(boost::shared_ptr<threadpool::PriorityThreadPool> processorPool) void setProcessorPool(boost::shared_ptr<threadpool::FairThreadPool> processorPool)
{ {
fProcessorPool = processorPool; fProcessorPool = processorPool;
} }
@@ -146,9 +147,9 @@ class BPPSendThread
/* secondary queue size restriction based on byte size */ /* secondary queue size restriction based on byte size */
volatile uint64_t currentByteSize; volatile uint64_t currentByteSize;
uint64_t maxByteSize; uint64_t maxByteSize;
// Used to tell the PriorityThreadPool It should consider additional threads because a // Used to tell the ThreadPool It should consider additional threads because a
// queue full event has happened and a thread has been blocked. // queue full event has happened and a thread has been blocked.
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool; boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
}; };
} // namespace primitiveprocessor } // namespace primitiveprocessor

View File

@@ -1947,7 +1947,7 @@ struct ReadThread
void operator()() void operator()()
{ {
utils::setThreadName("PPReadThread"); utils::setThreadName("PPReadThread");
threadpool::FairThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); boost::shared_ptr<threadpool::FairThreadPool> procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool();
SBS bs; SBS bs;
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
@@ -2322,8 +2322,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
fServerpool.setQueueSize(fServerQueueSize); fServerpool.setQueueSize(fServerQueueSize);
fServerpool.setName("PrimitiveServer"); fServerpool.setName("PrimitiveServer");
fProcessorPool = new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0); medPriorityThreads, lowPriorityThreads, 0));
// We're not using either the priority or the job-clustering features, just need a threadpool // We're not using either the priority or the job-clustering features, just need a threadpool
// that can reschedule jobs, and an unlimited non-blocking queue // that can reschedule jobs, and an unlimited non-blocking queue

View File

@@ -36,7 +36,7 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include "threadpool.h" #include "threadpool.h"
#include "../../utils/threadpool/prioritythreadpool.h" #include "../../utils/threadpool/fair_threadpool.h"
#include "fair_threadpool.h" #include "fair_threadpool.h"
#include "messagequeue.h" #include "messagequeue.h"
#include "blockrequestprocessor.h" #include "blockrequestprocessor.h"

View File

@@ -18,29 +18,29 @@
#pragma once #pragma once
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include "prioritythreadpool.h" #include "fair_threadpool.h"
class PrimitiveServerThreadPools class PrimitiveServerThreadPools
{ {
public: public:
PrimitiveServerThreadPools() = default; PrimitiveServerThreadPools() = default;
PrimitiveServerThreadPools(boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool, PrimitiveServerThreadPools(boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool,
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool) boost::shared_ptr<threadpool::FairThreadPool> OOBThreadPool)
: fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool) : fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool)
{ {
} }
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool() boost::shared_ptr<threadpool::FairThreadPool> getPrimitiveServerThreadPool()
{ {
return fPrimServerThreadPool; return fPrimServerThreadPool;
} }
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool() boost::shared_ptr<threadpool::FairThreadPool> getOOBThreadPool()
{ {
return fOOBThreadPool; return fOOBThreadPool;
} }
private: private:
boost::shared_ptr<threadpool::PriorityThreadPool> fPrimServerThreadPool; boost::shared_ptr<threadpool::FairThreadPool> fPrimServerThreadPool;
boost::shared_ptr<threadpool::PriorityThreadPool> fOOBThreadPool; boost::shared_ptr<threadpool::FairThreadPool> fOOBThreadPool;
}; };

View File

@@ -35,7 +35,7 @@
#include <map> #include <map>
#include "service.h" #include "service.h"
#include "prioritythreadpool.h" #include "fair_threadpool.h"
#include "pp_logger.h" #include "pp_logger.h"
namespace primitiveprocessor namespace primitiveprocessor
@@ -172,12 +172,12 @@ class ServicePrimProc : public Service, public Opt
return startupRaceFlag_; return startupRaceFlag_;
} }
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool() boost::shared_ptr<threadpool::FairThreadPool> getPrimitiveServerThreadPool()
{ {
return primServerThreadPool; return primServerThreadPool;
} }
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool() boost::shared_ptr<threadpool::FairThreadPool> getOOBThreadPool()
{ {
return OOBThreadPool; return OOBThreadPool;
} }
@@ -190,6 +190,6 @@ class ServicePrimProc : public Service, public Opt
static ServicePrimProc* fInstance; static ServicePrimProc* fInstance;
// Since C++20 flag's init value is false. // Since C++20 flag's init value is false.
std::atomic_flag startupRaceFlag_{false}; std::atomic_flag startupRaceFlag_{false};
boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool; boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool;
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool; boost::shared_ptr<threadpool::FairThreadPool> OOBThreadPool;
}; };

View File

@@ -35,7 +35,7 @@ namespace threadpool
{ {
FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads,
uint ID) uint ID)
: weightPerRun(targetWeightPerRun), id(ID) : weightPerRun(targetWeightPerRun), id(ID), stopExtra_(false)
{ {
boost::thread* newThread; boost::thread* newThread;
size_t numberOfThreads = highThreads + midThreads + lowThreads; size_t numberOfThreads = highThreads + midThreads + lowThreads;
@@ -74,6 +74,20 @@ void FairThreadPool::addJob_(const Job& job, bool useLock)
if (useLock) if (useLock)
lk.lock(); lk.lock();
// If some threads have blocked (because of output queue full)
// Temporarily add some extra worker threads to make up for the blocked threads.
if (blockedThreads_ > extraThreads_)
{
stopExtra_ = false;
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA));
newThread->detach();
++extraThreads_;
}
else if (blockedThreads_ == 0)
{
// Release the temporary threads -- some threads have become unblocked.
stopExtra_ = true;
}
auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_);
if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map
@@ -140,6 +154,13 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
if (weightedTxnsQueue_.empty()) if (weightedTxnsQueue_.empty())
{ {
// If this is an EXTRA thread due toother threads blocking, and all blockers are unblocked,
// we don't want this one any more.
if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra_)
{
--extraThreads_;
return;
}
newJob.wait(lk); newJob.wait(lk);
continue; // just go on w/o re-taking the lock continue; // just go on w/o re-taking the lock
} }

View File

@@ -107,6 +107,23 @@ class FairThreadPool
{ {
return jobsRunning_.load(std::memory_order_relaxed); return jobsRunning_.load(std::memory_order_relaxed);
} }
// If a job is blocked, we want to temporarily increase the number of threads managed by the pool
// A problem can occur if all threads are running long or blocked for a single query. Other
// queries won't get serviced, even though there are cpu cycles available.
// These calls are currently protected by respondLock in sendThread(). If you call from other
// places, you need to consider atomicity.
void incBlockedThreads()
{
++blockedThreads_;
}
void decBlockedThreads()
{
--blockedThreads_;
}
uint32_t blockedThreadCount()
{
return blockedThreads_;
}
protected: protected:
private: private:
@@ -160,6 +177,10 @@ class FairThreadPool
std::atomic<size_t> jobsRunning_{0}; std::atomic<size_t> jobsRunning_{0};
std::atomic<size_t> threadCounts_{0}; std::atomic<size_t> threadCounts_{0};
std::atomic<bool> stop_{false}; std::atomic<bool> stop_{false};
std::atomic<uint32_t> blockedThreads_{0};
std::atomic<uint32_t> extraThreads_{0};
bool stopExtra_;
}; };
} // namespace threadpool } // namespace threadpool