You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Revert "This reverts MCOL-5044 AKA FairThreadPool that breaks regr test002"
This reverts commit 61359119ad
.
This commit is contained in:
committed by
Roman Nozdrin
parent
658bacf01d
commit
6cff14997d
@ -4,11 +4,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES} )
|
||||
|
||||
########### next target ###############
|
||||
|
||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp)
|
||||
|
||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp)
|
||||
add_library(threadpool SHARED ${threadpool_LIB_SRCS})
|
||||
|
||||
add_dependencies(threadpool loggingcpp)
|
||||
target_link_libraries(threadpool Boost::chrono)
|
||||
|
||||
install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine)
|
||||
install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine)
|
295
utils/threadpool/fair_threadpool.cpp
Normal file
295
utils/threadpool/fair_threadpool.cpp
Normal file
@ -0,0 +1,295 @@
|
||||
/* Copyright (c) 2022 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <atomic>
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
#include <exception>
|
||||
using namespace std;
|
||||
|
||||
#include "messageobj.h"
|
||||
#include "messagelog.h"
|
||||
#include "threadnaming.h"
|
||||
using namespace logging;
|
||||
|
||||
#include "fair_threadpool.h"
|
||||
using namespace boost;
|
||||
|
||||
#include "dbcon/joblist/primitivemsg.h"
|
||||
|
||||
namespace threadpool
|
||||
{
|
||||
FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads,
|
||||
uint ID)
|
||||
: weightPerRun(targetWeightPerRun), id(ID)
|
||||
{
|
||||
boost::thread* newThread;
|
||||
size_t numberOfThreads = highThreads + midThreads + lowThreads;
|
||||
for (uint32_t i = 0; i < numberOfThreads; ++i)
|
||||
{
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
|
||||
newThread->detach();
|
||||
}
|
||||
cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n";
|
||||
threadCounts_.store(numberOfThreads, std::memory_order_relaxed);
|
||||
defaultThreadCounts = numberOfThreads;
|
||||
}
|
||||
|
||||
FairThreadPool::~FairThreadPool()
|
||||
{
|
||||
stop();
|
||||
}
|
||||
|
||||
void FairThreadPool::addJob(const Job& job)
|
||||
{
|
||||
addJob_(job);
|
||||
}
|
||||
|
||||
void FairThreadPool::addJob_(const Job& job, bool useLock)
|
||||
{
|
||||
boost::thread* newThread;
|
||||
std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t());
|
||||
|
||||
// Create any missing threads
|
||||
if (defaultThreadCounts != threadCounts_.load(std::memory_order_relaxed))
|
||||
{
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
|
||||
newThread->detach();
|
||||
threadCounts_.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
if (useLock)
|
||||
lk.lock();
|
||||
|
||||
auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_);
|
||||
if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map
|
||||
{
|
||||
ThreadPoolJobsList* jobsList = new ThreadPoolJobsList;
|
||||
jobsList->push_back(job);
|
||||
txn2JobsListMap_[job.txnIdx_] = jobsList;
|
||||
weightedTxnsQueue_.push({job.weight_, job.txnIdx_});
|
||||
}
|
||||
else // txn is in the map
|
||||
{
|
||||
if (jobsListMapIter->second->empty()) // there are no jobs for the txn
|
||||
{
|
||||
weightedTxnsQueue_.push({job.weight_, job.txnIdx_});
|
||||
}
|
||||
jobsListMapIter->second->push_back(job);
|
||||
}
|
||||
|
||||
if (useLock)
|
||||
newJob.notify_one();
|
||||
}
|
||||
|
||||
void FairThreadPool::removeJobs(uint32_t id)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
|
||||
for (auto& txnJobsMapPair : txn2JobsListMap_)
|
||||
{
|
||||
ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second;
|
||||
auto job = txnJobsList->begin();
|
||||
while (job != txnJobsList->end())
|
||||
{
|
||||
if (job->id_ == id)
|
||||
{
|
||||
job = txnJobsList->erase(job); // update the job iter
|
||||
if (txnJobsList->empty())
|
||||
{
|
||||
txn2JobsListMap_.erase(txnJobsMapPair.first);
|
||||
delete txnJobsList;
|
||||
break;
|
||||
// There is no clean-up for PQ. It will happen later in threadFcn
|
||||
}
|
||||
continue; // go-on skiping job iter increment
|
||||
}
|
||||
++job;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue)
|
||||
{
|
||||
utils::setThreadName("Idle");
|
||||
RunListT runList(1); // This is a vector to allow to grab multiple jobs
|
||||
RescheduleVecType reschedule;
|
||||
bool running = false;
|
||||
bool rescheduleJob = false;
|
||||
|
||||
try
|
||||
{
|
||||
while (!stop_.load(std::memory_order_relaxed))
|
||||
{
|
||||
runList.clear(); // remove the job
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
|
||||
if (weightedTxnsQueue_.empty())
|
||||
{
|
||||
newJob.wait(lk);
|
||||
continue; // just go on w/o re-taking the lock
|
||||
}
|
||||
|
||||
WeightedTxnT weightedTxn = weightedTxnsQueue_.top();
|
||||
auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
|
||||
// Looking for non-empty jobsList in a loop
|
||||
// The loop waits on newJob cond_var if PQ is empty(no jobs in this thread pool)
|
||||
while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty())
|
||||
{
|
||||
// JobList is empty. This can happen when this method pops the last Job.
|
||||
if (txnAndJobListPair != txn2JobsListMap_.end())
|
||||
{
|
||||
ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second;
|
||||
delete txnJobsList;
|
||||
txn2JobsListMap_.erase(txnAndJobListPair->first);
|
||||
}
|
||||
weightedTxnsQueue_.pop();
|
||||
if (weightedTxnsQueue_.empty()) // remove the empty
|
||||
{
|
||||
break;
|
||||
}
|
||||
weightedTxn = weightedTxnsQueue_.top();
|
||||
txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
|
||||
}
|
||||
|
||||
if (weightedTxnsQueue_.empty())
|
||||
{
|
||||
newJob.wait(lk); // might need a lock here
|
||||
continue;
|
||||
}
|
||||
|
||||
// We have non-empty jobsList at this point.
|
||||
// Remove the txn from a queue first to add it later
|
||||
weightedTxnsQueue_.pop();
|
||||
TransactionIdxT txnIdx = txnAndJobListPair->first;
|
||||
ThreadPoolJobsList* jobsList = txnAndJobListPair->second;
|
||||
runList.push_back(jobsList->front());
|
||||
|
||||
jobsList->pop_front();
|
||||
// Add the jobList back into the PQ adding some weight to it
|
||||
// Current algo doesn't reduce total txn weight if the job is rescheduled.
|
||||
if (!jobsList->empty())
|
||||
{
|
||||
weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx});
|
||||
}
|
||||
|
||||
lk.unlock();
|
||||
|
||||
running = true;
|
||||
jobsRunning_.fetch_add(1, std::memory_order_relaxed);
|
||||
rescheduleJob = (*(runList[0].functor_))(); // run the functor
|
||||
jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
|
||||
running = false;
|
||||
|
||||
utils::setThreadName("Idle");
|
||||
|
||||
if (rescheduleJob)
|
||||
{
|
||||
// to avoid excessive CPU usage waiting for data from storage
|
||||
usleep(500);
|
||||
lk.lock();
|
||||
addJob_(runList[0], false);
|
||||
newJob.notify_one();
|
||||
lk.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
if (running)
|
||||
{
|
||||
jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
|
||||
}
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
threadCounts_.fetch_sub(1, std::memory_order_relaxed);
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(5);
|
||||
args.add("threadFcn: Caught exception: ");
|
||||
args.add(ex.what());
|
||||
|
||||
message.format(args);
|
||||
|
||||
logging::LoggingID lid(22);
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage(message);
|
||||
#endif
|
||||
|
||||
if (running)
|
||||
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
if (running)
|
||||
{
|
||||
jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
|
||||
}
|
||||
threadCounts_.fetch_sub(1, std::memory_order_relaxed);
|
||||
;
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(6);
|
||||
args.add("threadFcn: Caught unknown exception!");
|
||||
|
||||
message.format(args);
|
||||
|
||||
logging::LoggingID lid(22);
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage(message);
|
||||
#endif
|
||||
|
||||
if (running)
|
||||
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
|
||||
{
|
||||
ISMPacketHeader ism;
|
||||
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
|
||||
|
||||
ism.Status = logging::primitiveServerErr;
|
||||
ph.UniqueID = id;
|
||||
ph.StepID = step;
|
||||
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
||||
msg.append((uint8_t*)&ism, sizeof(ism));
|
||||
msg.append((uint8_t*)&ph, sizeof(ph));
|
||||
|
||||
sock->write(msg);
|
||||
}
|
||||
|
||||
void FairThreadPool::stop()
|
||||
{
|
||||
stop_.store(true, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
} // namespace threadpool
|
165
utils/threadpool/fair_threadpool.h
Normal file
165
utils/threadpool/fair_threadpool.h
Normal file
@ -0,0 +1,165 @@
|
||||
/* Copyright (c) 2022 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <cstdlib>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition.hpp>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <boost/function.hpp>
|
||||
#include <atomic>
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
#include <list>
|
||||
#include <functional>
|
||||
|
||||
#include "primitives/primproc/umsocketselector.h"
|
||||
#include "prioritythreadpool.h"
|
||||
|
||||
namespace threadpool
|
||||
{
|
||||
// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time
|
||||
// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight
|
||||
// stored in PriorityQueue that thread increases before run another morsel for the query. When query is
|
||||
// done(ThreadPoolJobsList is empty) it is removed from PQ and the Map(txn to ThreadPoolJobsList).
|
||||
// I tested multiple morsels per one loop iteration in ::threadFcn. This approach reduces CPU consumption
|
||||
// and increases query timings.
|
||||
class FairThreadPool
|
||||
{
|
||||
public:
|
||||
using Functor = PriorityThreadPool::Functor;
|
||||
|
||||
using TransactionIdxT = uint32_t;
|
||||
struct Job
|
||||
{
|
||||
Job() : weight_(1), priority_(0), id_(0)
|
||||
{
|
||||
}
|
||||
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
|
||||
const boost::shared_ptr<Functor>& functor, const primitiveprocessor::SP_UM_IOSOCK& sock,
|
||||
const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0)
|
||||
: uniqueID_(uniqueID)
|
||||
, stepID_(stepID)
|
||||
, txnIdx_(txnIdx)
|
||||
, functor_(functor)
|
||||
, sock_(sock)
|
||||
, weight_(weight)
|
||||
, priority_(priority)
|
||||
, id_(id)
|
||||
{
|
||||
}
|
||||
uint32_t uniqueID_;
|
||||
uint32_t stepID_;
|
||||
TransactionIdxT txnIdx_;
|
||||
boost::shared_ptr<Functor> functor_;
|
||||
primitiveprocessor::SP_UM_IOSOCK sock_;
|
||||
uint32_t weight_;
|
||||
uint32_t priority_;
|
||||
uint32_t id_;
|
||||
};
|
||||
|
||||
/*********************************************
|
||||
* ctor/dtor
|
||||
*
|
||||
*********************************************/
|
||||
|
||||
/** @brief ctor
|
||||
*/
|
||||
|
||||
FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0);
|
||||
virtual ~FairThreadPool();
|
||||
|
||||
void removeJobs(uint32_t id);
|
||||
void addJob(const Job& job);
|
||||
void stop();
|
||||
|
||||
/** @brief for use in debugging
|
||||
*/
|
||||
void dump();
|
||||
|
||||
size_t queueSize() const
|
||||
{
|
||||
return weightedTxnsQueue_.size();
|
||||
}
|
||||
// This method enables a pool current workload estimate.
|
||||
size_t jobsRunning() const
|
||||
{
|
||||
return jobsRunning_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
protected:
|
||||
private:
|
||||
struct ThreadHelper
|
||||
{
|
||||
ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue)
|
||||
{
|
||||
}
|
||||
void operator()()
|
||||
{
|
||||
ptp->threadFcn(preferredQueue);
|
||||
}
|
||||
FairThreadPool* ptp;
|
||||
PriorityThreadPool::Priority preferredQueue;
|
||||
};
|
||||
|
||||
explicit FairThreadPool();
|
||||
explicit FairThreadPool(const FairThreadPool&);
|
||||
FairThreadPool& operator=(const FairThreadPool&);
|
||||
|
||||
void addJob_(const Job& job, bool useLock = true);
|
||||
void threadFcn(const PriorityThreadPool::Priority preferredQueue);
|
||||
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
|
||||
|
||||
uint32_t defaultThreadCounts;
|
||||
std::mutex mutex;
|
||||
std::condition_variable newJob;
|
||||
boost::thread_group threads;
|
||||
uint32_t weightPerRun;
|
||||
volatile uint id; // prevent it from being optimized out
|
||||
|
||||
using WeightT = uint32_t;
|
||||
using WeightedTxnT = std::pair<WeightT, TransactionIdxT>;
|
||||
using WeightedTxnVec = std::vector<WeightedTxnT>;
|
||||
struct PrioQueueCmp
|
||||
{
|
||||
bool operator()(WeightedTxnT lhs, WeightedTxnT rhs)
|
||||
{
|
||||
if (lhs.first == rhs.first)
|
||||
return lhs.second > rhs.second;
|
||||
return lhs.first > rhs.first;
|
||||
}
|
||||
};
|
||||
using RunListT = std::vector<Job>;
|
||||
using RescheduleVecType = std::vector<bool>;
|
||||
using WeightedTxnPrioQueue = std::priority_queue<WeightedTxnT, WeightedTxnVec, PrioQueueCmp>;
|
||||
using ThreadPoolJobsList = std::list<Job>;
|
||||
using Txn2ThreadPoolJobsListMap = std::unordered_map<TransactionIdxT, ThreadPoolJobsList*>;
|
||||
Txn2ThreadPoolJobsListMap txn2JobsListMap_;
|
||||
WeightedTxnPrioQueue weightedTxnsQueue_;
|
||||
std::atomic<size_t> jobsRunning_{0};
|
||||
std::atomic<size_t> threadCounts_{0};
|
||||
std::atomic<bool> stop_{false};
|
||||
};
|
||||
|
||||
} // namespace threadpool
|
@ -53,8 +53,6 @@ class PriorityThreadPool
|
||||
virtual int operator()() = 0;
|
||||
};
|
||||
|
||||
// typedef boost::function0<int> Functor;
|
||||
|
||||
struct Job
|
||||
{
|
||||
Job() : weight(1), priority(0), id(0)
|
||||
|
Reference in New Issue
Block a user