You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-5044 Initial version of a fair thread pool
PP now uses PriorityThreadPool that arbitrary picks another jobs pack to run. This scheduling discipline tend to run portions of a single query forcing other simultaneous queries to wait. In result parallel queries timings variance is high. The FairThreadPool picks the job with the smallest amount of work done so far(see the code for details)
This commit is contained in:
committed by
Roman Nozdrin
parent
44d326ef93
commit
4b51820db1
@ -4,10 +4,12 @@ include_directories( ${ENGINE_COMMON_INCLUDES} )
|
||||
|
||||
########### next target ###############
|
||||
|
||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp)
|
||||
|
||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp)
|
||||
add_library(threadpool SHARED ${threadpool_LIB_SRCS})
|
||||
|
||||
add_dependencies(threadpool loggingcpp)
|
||||
|
||||
install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine)
|
||||
|
||||
# set(fair_threadpool_LIB_SRCS fair_threadpool.cpp)
|
||||
# add_library(fair_threadpool STATIC ${fair_threadpool_LIB_SRCS})
|
||||
# add_dependencies(fair_threadpool loggingcpp)
|
||||
# install(TARGETS fair_threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine)
|
302
utils/threadpool/fair_threadpool.cpp
Normal file
302
utils/threadpool/fair_threadpool.cpp
Normal file
@ -0,0 +1,302 @@
|
||||
/* Copyright (c) 2022 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
#include <exception>
|
||||
using namespace std;
|
||||
|
||||
#include "messageobj.h"
|
||||
#include "messagelog.h"
|
||||
#include "threadnaming.h"
|
||||
using namespace logging;
|
||||
|
||||
#include "fair_threadpool.h"
|
||||
using namespace boost;
|
||||
|
||||
#include "dbcon/joblist/primitivemsg.h"
|
||||
|
||||
namespace threadpool
|
||||
{
|
||||
FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads,
|
||||
uint ID)
|
||||
: _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true)
|
||||
{
|
||||
boost::thread* newThread;
|
||||
size_t numberOfThreads = highThreads + midThreads + lowThreads;
|
||||
for (uint32_t i = 0; i < numberOfThreads; ++i)
|
||||
{
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
|
||||
newThread->detach();
|
||||
}
|
||||
cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n";
|
||||
defaultThreadCounts = threadCounts = numberOfThreads;
|
||||
}
|
||||
|
||||
FairThreadPool::~FairThreadPool()
|
||||
{
|
||||
stop();
|
||||
}
|
||||
|
||||
void FairThreadPool::addJob(const Job& job)
|
||||
{
|
||||
addJob_(job);
|
||||
}
|
||||
|
||||
void FairThreadPool::addJob_(const Job& job, bool useLock)
|
||||
{
|
||||
boost::thread* newThread;
|
||||
std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t());
|
||||
|
||||
if (useLock)
|
||||
lk.lock();
|
||||
|
||||
// Create any missing threads
|
||||
if (defaultThreadCounts != threadCounts)
|
||||
{
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
|
||||
newThread->detach();
|
||||
++threadCounts;
|
||||
}
|
||||
|
||||
// If some threads have blocked (because of output queue full)
|
||||
// Temporarily add some extra worker threads to make up for the blocked threads.
|
||||
if (blockedThreads > extraThreads)
|
||||
{
|
||||
stopExtra = false;
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA));
|
||||
newThread->detach();
|
||||
extraThreads++;
|
||||
}
|
||||
else if (blockedThreads == 0)
|
||||
{
|
||||
// Release the temporary threads -- some threads have become unblocked.
|
||||
stopExtra = true;
|
||||
}
|
||||
|
||||
auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_);
|
||||
if (jobsListMapIter == txn2JobsListMap_.end())
|
||||
{
|
||||
ThreadPoolJobsList* jobsList = new ThreadPoolJobsList;
|
||||
jobsList->push_back(job);
|
||||
txn2JobsListMap_[job.txnIdx_] = jobsList;
|
||||
WeightT currentTopWeight = weightedTxnsQueue_.empty() ? 0 : weightedTxnsQueue_.top().first;
|
||||
weightedTxnsQueue_.push({currentTopWeight, job.txnIdx_});
|
||||
}
|
||||
else
|
||||
{
|
||||
jobsListMapIter->second->push_back(job);
|
||||
}
|
||||
|
||||
if (useLock)
|
||||
newJob.notify_one();
|
||||
}
|
||||
|
||||
void FairThreadPool::removeJobs(uint32_t id)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
|
||||
for (auto& txnJobsMapPair : txn2JobsListMap_)
|
||||
{
|
||||
ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second;
|
||||
auto job = txnJobsList->begin();
|
||||
while (job != txnJobsList->end())
|
||||
{
|
||||
if (job->id_ == id)
|
||||
{
|
||||
job = txnJobsList->erase(job); // update the job iter
|
||||
if (txnJobsList->empty())
|
||||
{
|
||||
txn2JobsListMap_.erase(txnJobsMapPair.first);
|
||||
delete txnJobsList;
|
||||
break;
|
||||
// There is no clean-up for PQ. It will happen later in threadFcn
|
||||
}
|
||||
continue; // go-on skiping job iter increment
|
||||
}
|
||||
++job;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue)
|
||||
{
|
||||
if (preferredQueue == PriorityThreadPool::Priority::EXTRA)
|
||||
utils::setThreadName("Extra");
|
||||
else
|
||||
utils::setThreadName("Idle");
|
||||
RunListT runList; // This is a vector to allow to grab multiple jobs
|
||||
RescheduleVecType reschedule;
|
||||
bool running = false;
|
||||
bool rescheduleJob = false;
|
||||
|
||||
try
|
||||
{
|
||||
while (!_stop)
|
||||
{
|
||||
runList.clear(); // remove the job
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
|
||||
if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra)
|
||||
{
|
||||
--extraThreads;
|
||||
return;
|
||||
}
|
||||
|
||||
if (weightedTxnsQueue_.empty())
|
||||
{
|
||||
newJob.wait(lk);
|
||||
continue; // just go on w/o re-taking the lock
|
||||
}
|
||||
|
||||
WeightedTxnT weightedTxn = weightedTxnsQueue_.top();
|
||||
auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
|
||||
// Looking for non-empty jobsList in a loop
|
||||
// Waiting on cond_var if PQ is empty(no jobs in this thread pool)
|
||||
while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty())
|
||||
{
|
||||
// JobList is empty. This can happen when this method pops the last Job.
|
||||
if (txnAndJobListPair != txn2JobsListMap_.end())
|
||||
{
|
||||
ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second;
|
||||
delete txnJobsList;
|
||||
txn2JobsListMap_.erase(txnAndJobListPair->first);
|
||||
}
|
||||
weightedTxnsQueue_.pop();
|
||||
if (weightedTxnsQueue_.empty()) // remove the empty
|
||||
{
|
||||
break;
|
||||
}
|
||||
weightedTxn = weightedTxnsQueue_.top();
|
||||
txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
|
||||
}
|
||||
|
||||
if (weightedTxnsQueue_.empty())
|
||||
{
|
||||
newJob.wait(lk); // might need a lock here
|
||||
continue;
|
||||
}
|
||||
|
||||
// We have non-empty jobsList at this point.
|
||||
// Remove the txn from a queue first to add it later
|
||||
weightedTxnsQueue_.pop();
|
||||
TransactionIdxT txnIdx = txnAndJobListPair->first;
|
||||
ThreadPoolJobsList* jobsList = txnAndJobListPair->second;
|
||||
// Job& job = jobsList->front();
|
||||
runList.push_back(jobsList->front());
|
||||
|
||||
jobsList->pop_front();
|
||||
// Add the jobList back into the PQ adding some weight to it
|
||||
// Current algo doesn't reduce total txn weight if the job is rescheduled.
|
||||
if (!jobsList->empty())
|
||||
{
|
||||
weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx});
|
||||
}
|
||||
|
||||
lk.unlock();
|
||||
|
||||
running = true;
|
||||
rescheduleJob = (*(runList[0].functor_))();
|
||||
running = false;
|
||||
|
||||
utils::setThreadName("Idle");
|
||||
|
||||
if (rescheduleJob)
|
||||
{
|
||||
lk.lock();
|
||||
addJob_(runList[0], false);
|
||||
newJob.notify_one();
|
||||
lk.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
--threadCounts;
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(5);
|
||||
args.add("threadFcn: Caught exception: ");
|
||||
args.add(ex.what());
|
||||
|
||||
message.format(args);
|
||||
|
||||
logging::LoggingID lid(22);
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage(message);
|
||||
#endif
|
||||
|
||||
if (running)
|
||||
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
--threadCounts;
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(6);
|
||||
args.add("threadFcn: Caught unknown exception!");
|
||||
|
||||
message.format(args);
|
||||
|
||||
logging::LoggingID lid(22);
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage(message);
|
||||
#endif
|
||||
|
||||
if (running)
|
||||
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
|
||||
{
|
||||
ISMPacketHeader ism;
|
||||
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
|
||||
|
||||
ism.Status = logging::primitiveServerErr;
|
||||
ph.UniqueID = id;
|
||||
ph.StepID = step;
|
||||
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
||||
msg.append((uint8_t*)&ism, sizeof(ism));
|
||||
msg.append((uint8_t*)&ph, sizeof(ph));
|
||||
|
||||
sock->write(msg);
|
||||
}
|
||||
|
||||
void FairThreadPool::stop()
|
||||
{
|
||||
_stop = true;
|
||||
}
|
||||
|
||||
} // namespace threadpool
|
173
utils/threadpool/fair_threadpool.h
Normal file
173
utils/threadpool/fair_threadpool.h
Normal file
@ -0,0 +1,173 @@
|
||||
/* Copyright (c) 2022 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <cstdlib>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition.hpp>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <boost/function.hpp>
|
||||
#include <atomic>
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
#include <list>
|
||||
#include <functional>
|
||||
|
||||
#include "primitives/primproc/umsocketselector.h"
|
||||
#include "prioritythreadpool.h"
|
||||
|
||||
namespace threadpool
|
||||
{
|
||||
class FairThreadPool
|
||||
{
|
||||
public:
|
||||
using Functor = PriorityThreadPool::Functor;
|
||||
|
||||
using TransactionIdxT = uint32_t;
|
||||
struct Job
|
||||
{
|
||||
Job() : weight_(1), priority_(0), id_(0)
|
||||
{
|
||||
}
|
||||
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
|
||||
const boost::shared_ptr<Functor>& functor, const primitiveprocessor::SP_UM_IOSOCK& sock,
|
||||
const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0)
|
||||
: uniqueID_(uniqueID)
|
||||
, stepID_(stepID)
|
||||
, txnIdx_(txnIdx)
|
||||
, functor_(functor)
|
||||
, sock_(sock)
|
||||
, weight_(weight)
|
||||
, priority_(priority)
|
||||
, id_(id)
|
||||
{
|
||||
}
|
||||
uint32_t uniqueID_;
|
||||
uint32_t stepID_;
|
||||
TransactionIdxT txnIdx_;
|
||||
boost::shared_ptr<Functor> functor_;
|
||||
primitiveprocessor::SP_UM_IOSOCK sock_;
|
||||
uint32_t weight_;
|
||||
uint32_t priority_;
|
||||
uint32_t id_;
|
||||
};
|
||||
|
||||
/*********************************************
|
||||
* ctor/dtor
|
||||
*
|
||||
*********************************************/
|
||||
|
||||
/** @brief ctor
|
||||
*/
|
||||
|
||||
FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0);
|
||||
virtual ~FairThreadPool();
|
||||
|
||||
void removeJobs(uint32_t id);
|
||||
void addJob(const Job& job);
|
||||
void stop();
|
||||
|
||||
/** @brief for use in debugging
|
||||
*/
|
||||
void dump();
|
||||
|
||||
// If a job is blocked, we want to temporarily increase the number of threads managed by the pool
|
||||
// A problem can occur if all threads are running long or blocked for a single query. Other
|
||||
// queries won't get serviced, even though there are cpu cycles available.
|
||||
// These calls are currently protected by respondLock in sendThread(). If you call from other
|
||||
// places, you need to consider atomicity.
|
||||
void incBlockedThreads()
|
||||
{
|
||||
blockedThreads++;
|
||||
}
|
||||
void decBlockedThreads()
|
||||
{
|
||||
blockedThreads--;
|
||||
}
|
||||
uint32_t blockedThreadCount() const
|
||||
{
|
||||
return blockedThreads;
|
||||
}
|
||||
size_t queueSize() const
|
||||
{
|
||||
return weightedTxnsQueue_.size();
|
||||
}
|
||||
|
||||
protected:
|
||||
private:
|
||||
struct ThreadHelper
|
||||
{
|
||||
ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue)
|
||||
{
|
||||
}
|
||||
void operator()()
|
||||
{
|
||||
ptp->threadFcn(preferredQueue);
|
||||
}
|
||||
FairThreadPool* ptp;
|
||||
PriorityThreadPool::Priority preferredQueue;
|
||||
};
|
||||
|
||||
explicit FairThreadPool();
|
||||
explicit FairThreadPool(const FairThreadPool&);
|
||||
FairThreadPool& operator=(const FairThreadPool&);
|
||||
|
||||
void addJob_(const Job& job, bool useLock = true);
|
||||
void threadFcn(const PriorityThreadPool::Priority preferredQueue);
|
||||
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
|
||||
|
||||
uint32_t threadCounts;
|
||||
uint32_t defaultThreadCounts;
|
||||
std::mutex mutex;
|
||||
std::condition_variable newJob;
|
||||
boost::thread_group threads;
|
||||
bool _stop;
|
||||
uint32_t weightPerRun;
|
||||
volatile uint id; // prevent it from being optimized out
|
||||
|
||||
using WeightT = uint32_t;
|
||||
using WeightedTxnT = std::pair<WeightT, TransactionIdxT>;
|
||||
using WeightedTxnVec = std::vector<WeightedTxnT>;
|
||||
struct PrioQueueCmp
|
||||
{
|
||||
bool operator()(WeightedTxnT lhs, WeightedTxnT rhs)
|
||||
{
|
||||
if (lhs.first == rhs.first)
|
||||
return lhs.second > rhs.second;
|
||||
return lhs.first > rhs.first;
|
||||
}
|
||||
};
|
||||
using RunListT = std::vector<Job>;
|
||||
using RescheduleVecType = std::vector<bool>;
|
||||
using WeightedTxnPrioQueue = std::priority_queue<WeightedTxnT, WeightedTxnVec, PrioQueueCmp>;
|
||||
using ThreadPoolJobsList = std::list<Job>;
|
||||
using Txn2ThreadPoolJobsListMap = std::unordered_map<TransactionIdxT, ThreadPoolJobsList*>;
|
||||
Txn2ThreadPoolJobsListMap txn2JobsListMap_;
|
||||
WeightedTxnPrioQueue weightedTxnsQueue_;
|
||||
std::atomic<uint32_t> blockedThreads;
|
||||
std::atomic<uint32_t> extraThreads;
|
||||
bool stopExtra;
|
||||
};
|
||||
|
||||
} // namespace threadpool
|
@ -54,8 +54,6 @@ class PriorityThreadPool
|
||||
virtual int operator()() = 0;
|
||||
};
|
||||
|
||||
// typedef boost::function0<int> Functor;
|
||||
|
||||
struct Job
|
||||
{
|
||||
Job() : weight(1), priority(0), id(0)
|
||||
|
Reference in New Issue
Block a user