You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-5044 Initial version of a fair thread pool
PP now uses PriorityThreadPool that arbitrary picks another jobs pack to run. This scheduling discipline tend to run portions of a single query forcing other simultaneous queries to wait. In result parallel queries timings variance is high. The FairThreadPool picks the job with the smallest amount of work done so far(see the code for details)
This commit is contained in:
@ -51,6 +51,10 @@ if (WITH_UNITTESTS)
|
||||
target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
||||
gtest_discover_tests(simd_processors TEST_PREFIX columnstore:)
|
||||
|
||||
add_executable(fair_threadpool_test fair_threadpool.cpp)
|
||||
target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
||||
gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:)
|
||||
|
||||
# CPPUNIT TESTS
|
||||
add_executable(we_shared_components_tests shared_components_tests.cpp)
|
||||
add_dependencies(we_shared_components_tests loggingcpp)
|
||||
|
174
tests/fair_threadpool.cpp
Normal file
174
tests/fair_threadpool.cpp
Normal file
@ -0,0 +1,174 @@
|
||||
/* Copyright (C) 2022 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <iostream>
|
||||
#include <gtest/gtest.h>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/threadpool/fair_threadpool.h"
|
||||
|
||||
using namespace primitiveprocessor;
|
||||
using namespace std;
|
||||
using namespace threadpool;
|
||||
|
||||
using ResultsType = std::vector<int>;
|
||||
static ResultsType results;
|
||||
|
||||
class FairThreadPoolTest : public testing::Test {
|
||||
public:
|
||||
|
||||
void SetUp() override
|
||||
{
|
||||
results.clear();
|
||||
threadPool = new FairThreadPool(1, 1, 0, 0);
|
||||
}
|
||||
|
||||
|
||||
FairThreadPool* threadPool;
|
||||
};
|
||||
|
||||
class TestFunctor: public FairThreadPool::Functor
|
||||
{
|
||||
public:
|
||||
TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay)
|
||||
{
|
||||
}
|
||||
~TestFunctor() {};
|
||||
int operator()() override
|
||||
{
|
||||
usleep(delay_);
|
||||
results.push_back(id_);
|
||||
return 0;
|
||||
}
|
||||
private:
|
||||
size_t id_;
|
||||
size_t delay_;
|
||||
};
|
||||
|
||||
class TestRescheduleFunctor: public FairThreadPool::Functor
|
||||
{
|
||||
public:
|
||||
TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay)
|
||||
{
|
||||
}
|
||||
~TestRescheduleFunctor() {};
|
||||
int operator()() override
|
||||
{
|
||||
if (firstRun)
|
||||
{
|
||||
firstRun = false;
|
||||
return 1; // re-schedule the Job
|
||||
}
|
||||
usleep(delay_);
|
||||
results.push_back(id_);
|
||||
return 0;
|
||||
}
|
||||
private:
|
||||
size_t id_;
|
||||
size_t delay_;
|
||||
bool firstRun = true;
|
||||
};
|
||||
|
||||
testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b)
|
||||
{
|
||||
if (arr.empty() || arr.size() <= max(idxA, idxB))
|
||||
return testing::AssertionFailure() << "The supplied vector is either empty or not big enough.";
|
||||
if (arr[idxA] == a && arr[idxB] == b)
|
||||
return testing::AssertionSuccess();
|
||||
if (arr[idxA] == b && arr[idxB] == a)
|
||||
return testing::AssertionSuccess();
|
||||
return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB
|
||||
<< " are not " << a << " and " << b << std::endl;
|
||||
}
|
||||
|
||||
TEST_F(FairThreadPoolTest, FairThreadPoolAdd)
|
||||
{
|
||||
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
||||
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 50000));
|
||||
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1);
|
||||
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 5000));
|
||||
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1);
|
||||
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 5000));
|
||||
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1);
|
||||
|
||||
threadPool->addJob(job1);
|
||||
threadPool->addJob(job2);
|
||||
threadPool->addJob(job3);
|
||||
|
||||
while (threadPool->queueSize())
|
||||
{
|
||||
usleep(2500000);
|
||||
}
|
||||
usleep(2500000);
|
||||
|
||||
EXPECT_EQ(threadPool->queueSize(), 0);
|
||||
EXPECT_EQ(results.size(), 3);
|
||||
EXPECT_EQ(results[0], 1);
|
||||
EXPECT_EQ(results[1], 3);
|
||||
EXPECT_EQ(results[2], 2);
|
||||
}
|
||||
|
||||
TEST_F(FairThreadPoolTest, FairThreadPoolRemove)
|
||||
{
|
||||
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
||||
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
||||
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
||||
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
||||
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2);
|
||||
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
||||
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3);
|
||||
|
||||
threadPool->addJob(job1);
|
||||
threadPool->addJob(job2);
|
||||
threadPool->addJob(job3);
|
||||
threadPool->removeJobs(job2.id_);
|
||||
|
||||
while (threadPool->queueSize())
|
||||
{
|
||||
usleep(1500000);
|
||||
}
|
||||
|
||||
EXPECT_EQ(threadPool->queueSize(), 0);
|
||||
EXPECT_EQ(results.size(), 2);
|
||||
EXPECT_EQ(results[0], 1);
|
||||
EXPECT_EQ(results[1], 3);
|
||||
}
|
||||
|
||||
TEST_F(FairThreadPoolTest, FairThreadPoolReschedule)
|
||||
{
|
||||
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
||||
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
||||
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
||||
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
||||
FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2);
|
||||
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
||||
FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3);
|
||||
|
||||
threadPool->addJob(job1);
|
||||
threadPool->addJob(job2);
|
||||
threadPool->addJob(job3);
|
||||
|
||||
while (threadPool->queueSize())
|
||||
{
|
||||
usleep(1500000);
|
||||
}
|
||||
|
||||
EXPECT_EQ(threadPool->queueSize(), 0);
|
||||
EXPECT_EQ(results.size(), 3);
|
||||
EXPECT_EQ(results[0], 1);
|
||||
EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3));
|
||||
}
|
@ -4,11 +4,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES} )
|
||||
|
||||
########### next target ###############
|
||||
|
||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp)
|
||||
|
||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp)
|
||||
add_library(threadpool SHARED ${threadpool_LIB_SRCS})
|
||||
|
||||
add_dependencies(threadpool loggingcpp)
|
||||
target_link_libraries(threadpool Boost::chrono)
|
||||
|
||||
install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine)
|
302
utils/threadpool/fair_threadpool.cpp
Normal file
302
utils/threadpool/fair_threadpool.cpp
Normal file
@ -0,0 +1,302 @@
|
||||
/* Copyright (c) 2022 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
#include <exception>
|
||||
using namespace std;
|
||||
|
||||
#include "messageobj.h"
|
||||
#include "messagelog.h"
|
||||
#include "threadnaming.h"
|
||||
using namespace logging;
|
||||
|
||||
#include "fair_threadpool.h"
|
||||
using namespace boost;
|
||||
|
||||
#include "dbcon/joblist/primitivemsg.h"
|
||||
|
||||
namespace threadpool
|
||||
{
|
||||
FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads,
|
||||
uint ID)
|
||||
: _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true)
|
||||
{
|
||||
boost::thread* newThread;
|
||||
size_t numberOfThreads = highThreads + midThreads + lowThreads;
|
||||
for (uint32_t i = 0; i < numberOfThreads; ++i)
|
||||
{
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
|
||||
newThread->detach();
|
||||
}
|
||||
cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n";
|
||||
defaultThreadCounts = threadCounts = numberOfThreads;
|
||||
}
|
||||
|
||||
FairThreadPool::~FairThreadPool()
|
||||
{
|
||||
stop();
|
||||
}
|
||||
|
||||
void FairThreadPool::addJob(const Job& job)
|
||||
{
|
||||
addJob_(job);
|
||||
}
|
||||
|
||||
void FairThreadPool::addJob_(const Job& job, bool useLock)
|
||||
{
|
||||
boost::thread* newThread;
|
||||
std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t());
|
||||
|
||||
if (useLock)
|
||||
lk.lock();
|
||||
|
||||
// Create any missing threads
|
||||
if (defaultThreadCounts != threadCounts)
|
||||
{
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH));
|
||||
newThread->detach();
|
||||
++threadCounts;
|
||||
}
|
||||
|
||||
// If some threads have blocked (because of output queue full)
|
||||
// Temporarily add some extra worker threads to make up for the blocked threads.
|
||||
if (blockedThreads > extraThreads)
|
||||
{
|
||||
stopExtra = false;
|
||||
newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::EXTRA));
|
||||
newThread->detach();
|
||||
extraThreads++;
|
||||
}
|
||||
else if (blockedThreads == 0)
|
||||
{
|
||||
// Release the temporary threads -- some threads have become unblocked.
|
||||
stopExtra = true;
|
||||
}
|
||||
|
||||
auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_);
|
||||
if (jobsListMapIter == txn2JobsListMap_.end())
|
||||
{
|
||||
ThreadPoolJobsList* jobsList = new ThreadPoolJobsList;
|
||||
jobsList->push_back(job);
|
||||
txn2JobsListMap_[job.txnIdx_] = jobsList;
|
||||
WeightT currentTopWeight = weightedTxnsQueue_.empty() ? 0 : weightedTxnsQueue_.top().first;
|
||||
weightedTxnsQueue_.push({currentTopWeight, job.txnIdx_});
|
||||
}
|
||||
else
|
||||
{
|
||||
jobsListMapIter->second->push_back(job);
|
||||
}
|
||||
|
||||
if (useLock)
|
||||
newJob.notify_one();
|
||||
}
|
||||
|
||||
void FairThreadPool::removeJobs(uint32_t id)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
|
||||
for (auto& txnJobsMapPair : txn2JobsListMap_)
|
||||
{
|
||||
ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second;
|
||||
auto job = txnJobsList->begin();
|
||||
while (job != txnJobsList->end())
|
||||
{
|
||||
if (job->id_ == id)
|
||||
{
|
||||
job = txnJobsList->erase(job); // update the job iter
|
||||
if (txnJobsList->empty())
|
||||
{
|
||||
txn2JobsListMap_.erase(txnJobsMapPair.first);
|
||||
delete txnJobsList;
|
||||
break;
|
||||
// There is no clean-up for PQ. It will happen later in threadFcn
|
||||
}
|
||||
continue; // go-on skiping job iter increment
|
||||
}
|
||||
++job;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue)
|
||||
{
|
||||
if (preferredQueue == PriorityThreadPool::Priority::EXTRA)
|
||||
utils::setThreadName("Extra");
|
||||
else
|
||||
utils::setThreadName("Idle");
|
||||
RunListT runList; // This is a vector to allow to grab multiple jobs
|
||||
RescheduleVecType reschedule;
|
||||
bool running = false;
|
||||
bool rescheduleJob = false;
|
||||
|
||||
try
|
||||
{
|
||||
while (!_stop)
|
||||
{
|
||||
runList.clear(); // remove the job
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
|
||||
if (preferredQueue == PriorityThreadPool::Priority::EXTRA && stopExtra)
|
||||
{
|
||||
--extraThreads;
|
||||
return;
|
||||
}
|
||||
|
||||
if (weightedTxnsQueue_.empty())
|
||||
{
|
||||
newJob.wait(lk);
|
||||
continue; // just go on w/o re-taking the lock
|
||||
}
|
||||
|
||||
WeightedTxnT weightedTxn = weightedTxnsQueue_.top();
|
||||
auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
|
||||
// Looking for non-empty jobsList in a loop
|
||||
// Waiting on cond_var if PQ is empty(no jobs in this thread pool)
|
||||
while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty())
|
||||
{
|
||||
// JobList is empty. This can happen when this method pops the last Job.
|
||||
if (txnAndJobListPair != txn2JobsListMap_.end())
|
||||
{
|
||||
ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second;
|
||||
delete txnJobsList;
|
||||
txn2JobsListMap_.erase(txnAndJobListPair->first);
|
||||
}
|
||||
weightedTxnsQueue_.pop();
|
||||
if (weightedTxnsQueue_.empty()) // remove the empty
|
||||
{
|
||||
break;
|
||||
}
|
||||
weightedTxn = weightedTxnsQueue_.top();
|
||||
txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second);
|
||||
}
|
||||
|
||||
if (weightedTxnsQueue_.empty())
|
||||
{
|
||||
newJob.wait(lk); // might need a lock here
|
||||
continue;
|
||||
}
|
||||
|
||||
// We have non-empty jobsList at this point.
|
||||
// Remove the txn from a queue first to add it later
|
||||
weightedTxnsQueue_.pop();
|
||||
TransactionIdxT txnIdx = txnAndJobListPair->first;
|
||||
ThreadPoolJobsList* jobsList = txnAndJobListPair->second;
|
||||
// Job& job = jobsList->front();
|
||||
runList.push_back(jobsList->front());
|
||||
|
||||
jobsList->pop_front();
|
||||
// Add the jobList back into the PQ adding some weight to it
|
||||
// Current algo doesn't reduce total txn weight if the job is rescheduled.
|
||||
if (!jobsList->empty())
|
||||
{
|
||||
weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx});
|
||||
}
|
||||
|
||||
lk.unlock();
|
||||
|
||||
running = true;
|
||||
rescheduleJob = (*(runList[0].functor_))();
|
||||
running = false;
|
||||
|
||||
utils::setThreadName("Idle");
|
||||
|
||||
if (rescheduleJob)
|
||||
{
|
||||
lk.lock();
|
||||
addJob_(runList[0], false);
|
||||
newJob.notify_one();
|
||||
lk.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
--threadCounts;
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(5);
|
||||
args.add("threadFcn: Caught exception: ");
|
||||
args.add(ex.what());
|
||||
|
||||
message.format(args);
|
||||
|
||||
logging::LoggingID lid(22);
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage(message);
|
||||
#endif
|
||||
|
||||
if (running)
|
||||
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
--threadCounts;
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(6);
|
||||
args.add("threadFcn: Caught unknown exception!");
|
||||
|
||||
message.format(args);
|
||||
|
||||
logging::LoggingID lid(22);
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage(message);
|
||||
#endif
|
||||
|
||||
if (running)
|
||||
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
|
||||
{
|
||||
ISMPacketHeader ism;
|
||||
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
|
||||
|
||||
ism.Status = logging::primitiveServerErr;
|
||||
ph.UniqueID = id;
|
||||
ph.StepID = step;
|
||||
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
|
||||
msg.append((uint8_t*)&ism, sizeof(ism));
|
||||
msg.append((uint8_t*)&ph, sizeof(ph));
|
||||
|
||||
sock->write(msg);
|
||||
}
|
||||
|
||||
void FairThreadPool::stop()
|
||||
{
|
||||
_stop = true;
|
||||
}
|
||||
|
||||
} // namespace threadpool
|
173
utils/threadpool/fair_threadpool.h
Normal file
173
utils/threadpool/fair_threadpool.h
Normal file
@ -0,0 +1,173 @@
|
||||
/* Copyright (c) 2022 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <cstdlib>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition.hpp>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <boost/function.hpp>
|
||||
#include <atomic>
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
#include <list>
|
||||
#include <functional>
|
||||
|
||||
#include "primitives/primproc/umsocketselector.h"
|
||||
#include "prioritythreadpool.h"
|
||||
|
||||
namespace threadpool
|
||||
{
|
||||
class FairThreadPool
|
||||
{
|
||||
public:
|
||||
using Functor = PriorityThreadPool::Functor;
|
||||
|
||||
using TransactionIdxT = uint32_t;
|
||||
struct Job
|
||||
{
|
||||
Job() : weight_(1), priority_(0), id_(0)
|
||||
{
|
||||
}
|
||||
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
|
||||
const boost::shared_ptr<Functor>& functor, const primitiveprocessor::SP_UM_IOSOCK& sock,
|
||||
const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0)
|
||||
: uniqueID_(uniqueID)
|
||||
, stepID_(stepID)
|
||||
, txnIdx_(txnIdx)
|
||||
, functor_(functor)
|
||||
, sock_(sock)
|
||||
, weight_(weight)
|
||||
, priority_(priority)
|
||||
, id_(id)
|
||||
{
|
||||
}
|
||||
uint32_t uniqueID_;
|
||||
uint32_t stepID_;
|
||||
TransactionIdxT txnIdx_;
|
||||
boost::shared_ptr<Functor> functor_;
|
||||
primitiveprocessor::SP_UM_IOSOCK sock_;
|
||||
uint32_t weight_;
|
||||
uint32_t priority_;
|
||||
uint32_t id_;
|
||||
};
|
||||
|
||||
/*********************************************
|
||||
* ctor/dtor
|
||||
*
|
||||
*********************************************/
|
||||
|
||||
/** @brief ctor
|
||||
*/
|
||||
|
||||
FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0);
|
||||
virtual ~FairThreadPool();
|
||||
|
||||
void removeJobs(uint32_t id);
|
||||
void addJob(const Job& job);
|
||||
void stop();
|
||||
|
||||
/** @brief for use in debugging
|
||||
*/
|
||||
void dump();
|
||||
|
||||
// If a job is blocked, we want to temporarily increase the number of threads managed by the pool
|
||||
// A problem can occur if all threads are running long or blocked for a single query. Other
|
||||
// queries won't get serviced, even though there are cpu cycles available.
|
||||
// These calls are currently protected by respondLock in sendThread(). If you call from other
|
||||
// places, you need to consider atomicity.
|
||||
void incBlockedThreads()
|
||||
{
|
||||
blockedThreads++;
|
||||
}
|
||||
void decBlockedThreads()
|
||||
{
|
||||
blockedThreads--;
|
||||
}
|
||||
uint32_t blockedThreadCount() const
|
||||
{
|
||||
return blockedThreads;
|
||||
}
|
||||
size_t queueSize() const
|
||||
{
|
||||
return weightedTxnsQueue_.size();
|
||||
}
|
||||
|
||||
protected:
|
||||
private:
|
||||
struct ThreadHelper
|
||||
{
|
||||
ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue)
|
||||
{
|
||||
}
|
||||
void operator()()
|
||||
{
|
||||
ptp->threadFcn(preferredQueue);
|
||||
}
|
||||
FairThreadPool* ptp;
|
||||
PriorityThreadPool::Priority preferredQueue;
|
||||
};
|
||||
|
||||
explicit FairThreadPool();
|
||||
explicit FairThreadPool(const FairThreadPool&);
|
||||
FairThreadPool& operator=(const FairThreadPool&);
|
||||
|
||||
void addJob_(const Job& job, bool useLock = true);
|
||||
void threadFcn(const PriorityThreadPool::Priority preferredQueue);
|
||||
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
|
||||
|
||||
uint32_t threadCounts;
|
||||
uint32_t defaultThreadCounts;
|
||||
std::mutex mutex;
|
||||
std::condition_variable newJob;
|
||||
boost::thread_group threads;
|
||||
bool _stop;
|
||||
uint32_t weightPerRun;
|
||||
volatile uint id; // prevent it from being optimized out
|
||||
|
||||
using WeightT = uint32_t;
|
||||
using WeightedTxnT = std::pair<WeightT, TransactionIdxT>;
|
||||
using WeightedTxnVec = std::vector<WeightedTxnT>;
|
||||
struct PrioQueueCmp
|
||||
{
|
||||
bool operator()(WeightedTxnT lhs, WeightedTxnT rhs)
|
||||
{
|
||||
if (lhs.first == rhs.first)
|
||||
return lhs.second > rhs.second;
|
||||
return lhs.first > rhs.first;
|
||||
}
|
||||
};
|
||||
using RunListT = std::vector<Job>;
|
||||
using RescheduleVecType = std::vector<bool>;
|
||||
using WeightedTxnPrioQueue = std::priority_queue<WeightedTxnT, WeightedTxnVec, PrioQueueCmp>;
|
||||
using ThreadPoolJobsList = std::list<Job>;
|
||||
using Txn2ThreadPoolJobsListMap = std::unordered_map<TransactionIdxT, ThreadPoolJobsList*>;
|
||||
Txn2ThreadPoolJobsListMap txn2JobsListMap_;
|
||||
WeightedTxnPrioQueue weightedTxnsQueue_;
|
||||
std::atomic<uint32_t> blockedThreads;
|
||||
std::atomic<uint32_t> extraThreads;
|
||||
bool stopExtra;
|
||||
};
|
||||
|
||||
} // namespace threadpool
|
@ -51,8 +51,6 @@ class PriorityThreadPool
|
||||
virtual int operator()() = 0;
|
||||
};
|
||||
|
||||
// typedef boost::function0<int> Functor;
|
||||
|
||||
struct Job
|
||||
{
|
||||
Job() : weight(1), priority(0), id(0)
|
||||
|
Reference in New Issue
Block a user