1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/utils/threadpool/fair_threadpool.cpp
Roman Nozdrin 4b51820db1 MCOL-5044 Initial version of a fair thread pool
PP now uses PriorityThreadPool that arbitrary picks another jobs pack
    to run. This scheduling discipline tend to run portions of a single query
    forcing other simultaneous queries to wait. In result parallel queries
    timings variance is high. The FairThreadPool picks the job with the smallest
    amount of work done so far(see the code for details)
2022-05-27 12:37:21 +00:00

302 lines
8.1 KiB
C++

/* Copyright (c) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <stdexcept>
#include <unistd.h>
#include <exception>
using namespace std;
#include "messageobj.h"
#include "messagelog.h"
#include "threadnaming.h"
using namespace logging;
#include "fair_threadpool.h"
using namespace boost;
#include "dbcon/joblist/primitivemsg.h"
namespace threadpool
{
FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads,
uint ID)
: _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true)
{
boost::thread* newThread;
size_t numberOfThreads = highThreads + midThreads + lowThreads;
for (uint32_t i = 0; i < numberOfThreads; ++i)
{
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
newThread->detach();
}
cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n";
defaultThreadCounts = threadCounts = numberOfThreads;
}
FairThreadPool::~FairThreadPool()
{
stop();
}
void FairThreadPool::addJob(const Job& job)
{
addJob_(job);
}
void FairThreadPool::addJob_(const Job& job, bool useLock)
{
boost::thread* newThread;
std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t());
if (useLock)
lk.lock();
// Create any missing threads
if (defaultThreadCounts != threadCounts)
{
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
newThread->detach();
++threadCounts;
}
// If some threads have blocked (because of output queue full)
// Temporarily add some extra worker threads to make up for the blocked threads.
if (blockedThreads > extraThreads)
{
stopExtra = false;
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA));
newThread->detach();
extraThreads++;
}
else if (blockedThreads == 0)
{
// Release the temporary threads -- some threads have become unblocked.
stopExtra = true;
}
auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_);
if (jobsListMapIter == txn2JobsListMap_.end())
{
ThreadPoolJobsList* jobsList = new ThreadPoolJobsList;
jobsList->push_back(job);
txn2JobsListMap_[job.txnIdx_] = jobsList;
WeightT currentTopWeight = weightedTxnsQueue_.empty() ? 0 : weightedTxnsQueue_.top().first;
weightedTxnsQueue_.push({currentTopWeight, job.txnIdx_});
}
else
{
jobsListMapIter->second->push_back(job);
}
if (useLock)
newJob.notify_one();
}
void FairThreadPool::removeJobs(uint32_t id)
{
std::unique_lock<std::mutex> lk(mutex);
for (auto& txnJobsMapPair : txn2JobsListMap_)
{
ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second;
auto job = txnJobsList->begin();
while (job != txnJobsList->end())
{
if (job->id_ == id)
{
job = txnJobsList->erase(job); // update the job iter
if (txnJobsList->empty())
{
txn2JobsListMap_.erase(txnJobsMapPair.first);
delete txnJobsList;
break;
// There is no clean-up for PQ. It will happen later in threadFcn
}
continue; // go-on skiping job iter increment
}
++job;
}
}
}
void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue)
{
if (preferredQueue == PriorityThreadPool::Priority::EXTRA)
utils::setThreadName("Extra");
else
utils::setThreadName("Idle");
RunListT runList; // This is a vector to allow to grab multiple jobs
RescheduleVecType reschedule;
bool running = false;
bool rescheduleJob = false;
try
{
while (!_stop)
{
runList.clear(); // remove the job
std::unique_lock<std::mutex> lk(mutex);
if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra)
{
--extraThreads;
return;
}
if (weightedTxnsQueue_.empty())
{
newJob.wait(lk);
continue; // just go on w/o re-taking the lock
}
WeightedTxnT weightedTxn = weightedTxnsQueue_.top();
auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
// Looking for non-empty jobsList in a loop
// Waiting on cond_var if PQ is empty(no jobs in this thread pool)
while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty())
{
// JobList is empty. This can happen when this method pops the last Job.
if (txnAndJobListPair != txn2JobsListMap_.end())
{
ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second;
delete txnJobsList;
txn2JobsListMap_.erase(txnAndJobListPair->first);
}
weightedTxnsQueue_.pop();
if (weightedTxnsQueue_.empty()) // remove the empty
{
break;
}
weightedTxn = weightedTxnsQueue_.top();
txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
}
if (weightedTxnsQueue_.empty())
{
newJob.wait(lk); // might need a lock here
continue;
}
// We have non-empty jobsList at this point.
// Remove the txn from a queue first to add it later
weightedTxnsQueue_.pop();
TransactionIdxT txnIdx = txnAndJobListPair->first;
ThreadPoolJobsList* jobsList = txnAndJobListPair->second;
// Job& job = jobsList->front();
runList.push_back(jobsList->front());
jobsList->pop_front();
// Add the jobList back into the PQ adding some weight to it
// Current algo doesn't reduce total txn weight if the job is rescheduled.
if (!jobsList->empty())
{
weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx});
}
lk.unlock();
running = true;
rescheduleJob = (*(runList[0].functor_))();
running = false;
utils::setThreadName("Idle");
if (rescheduleJob)
{
lk.lock();
addJob_(runList[0], false);
newJob.notify_one();
lk.unlock();
}
}
}
catch (std::exception& ex)
{
// Log the exception and exit this thread
try
{
--threadCounts;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("threadFcn: Caught exception: ");
args.add(ex.what());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage(message);
#endif
if (running)
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
}
catch (...)
{
}
}
catch (...)
{
// Log the exception and exit this thread
try
{
--threadCounts;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(6);
args.add("threadFcn: Caught unknown exception!");
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage(message);
#endif
if (running)
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
}
catch (...)
{
}
}
}
void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
ism.Status = logging::primitiveServerErr;
ph.UniqueID = id;
ph.StepID = step;
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*)&ism, sizeof(ism));
msg.append((uint8_t*)&ph, sizeof(ph));
sock->write(msg);
}
void FairThreadPool::stop()
{
_stop = true;
}
} // namespace threadpool