1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5044 This patch simplifies addJob interfaces removing extra bool that control mutex locking,

adds additional nullptr dereference check in removeJobs and fixes FairThreadPool
hashmap iter invalidation issues
This commit is contained in:
Roman Nozdrin
2022-07-04 09:18:26 +00:00
committed by Roman Nozdrin
parent 4d41a945db
commit 0907ca414f
4 changed files with 69 additions and 47 deletions

View File

@ -55,7 +55,7 @@ if (WITH_UNITTESTS)
add_executable(fair_threadpool_test fair_threadpool.cpp) add_executable(fair_threadpool_test fair_threadpool.cpp)
add_dependencies(fair_threadpool_test googletest) add_dependencies(fair_threadpool_test googletest)
target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:)
# CPPUNIT TESTS # CPPUNIT TESTS
add_executable(we_shared_components_tests shared_components_tests.cpp) add_executable(we_shared_components_tests shared_components_tests.cpp)

View File

@ -28,62 +28,64 @@ using namespace threadpool;
using ResultsType = std::vector<int>; using ResultsType = std::vector<int>;
static ResultsType results; static ResultsType results;
class FairThreadPoolTest : public testing::Test { class FairThreadPoolTest : public testing::Test
public: {
public:
void SetUp() override void SetUp() override
{ {
results.clear(); results.clear();
threadPool = new FairThreadPool(1, 1, 0, 0); threadPool = new FairThreadPool(1, 1, 0, 0);
} }
FairThreadPool* threadPool; FairThreadPool* threadPool;
}; };
class TestFunctor: public FairThreadPool::Functor class TestFunctor : public FairThreadPool::Functor
{ {
public: public:
TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) TestFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay)
{ {
} }
~TestFunctor() {}; ~TestFunctor(){};
int operator()() override int operator()() override
{ {
usleep(delay_); usleep(delay_);
results.push_back(id_); results.push_back(id_);
return 0; return 0;
} }
private: private:
size_t id_; size_t id_;
size_t delay_; size_t delay_;
}; };
class TestRescheduleFunctor: public FairThreadPool::Functor class TestRescheduleFunctor : public FairThreadPool::Functor
{ {
public: public:
TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) TestRescheduleFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay)
{ {
} }
~TestRescheduleFunctor() {}; ~TestRescheduleFunctor(){};
int operator()() override int operator()() override
{ {
if (firstRun) if (firstRun)
{ {
firstRun = false; firstRun = false;
return 1; // re-schedule the Job return 1; // re-schedule the Job
} }
usleep(delay_); usleep(delay_);
results.push_back(id_); results.push_back(id_);
return 0; return 0;
} }
private: private:
size_t id_; size_t id_;
size_t delay_; size_t delay_;
bool firstRun = true; bool firstRun = true;
}; };
testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b) testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a,
const size_t idxB, const int b)
{ {
if (arr.empty() || arr.size() <= max(idxA, idxB)) if (arr.empty() || arr.size() <= max(idxA, idxB))
return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; return testing::AssertionFailure() << "The supplied vector is either empty or not big enough.";
@ -91,8 +93,8 @@ testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA,
return testing::AssertionSuccess(); return testing::AssertionSuccess();
if (arr[idxA] == b && arr[idxB] == a) if (arr[idxA] == b && arr[idxB] == a)
return testing::AssertionSuccess(); return testing::AssertionSuccess();
return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB return testing::AssertionFailure() << "The values at positions " << idxA << " " << idxB << " are not " << a
<< " are not " << a << " and " << b << std::endl; << " and " << b << std::endl;
} }
TEST_F(FairThreadPoolTest, FairThreadPoolAdd) TEST_F(FairThreadPoolTest, FairThreadPoolAdd)
@ -147,6 +149,33 @@ TEST_F(FairThreadPoolTest, FairThreadPoolRemove)
EXPECT_EQ(results[1], 3); EXPECT_EQ(results[1], 3);
} }
TEST_F(FairThreadPoolTest, FairThreadPoolCleanUp)
{
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2);
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3);
threadPool->addJob(job1);
threadPool->removeJobs(job1.id_);
threadPool->addJob(job2);
threadPool->removeJobs(job2.id_);
threadPool->addJob(job3);
threadPool->removeJobs(job3.id_);
threadPool->removeJobs(job1.id_);
threadPool->removeJobs(job1.id_);
while (threadPool->queueSize())
{
usleep(250000);
}
EXPECT_EQ(threadPool->queueSize(), 0ULL);
}
TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) TEST_F(FairThreadPoolTest, FairThreadPoolReschedule)
{ {
SP_UM_IOSOCK sock(new messageqcpp::IOSocket); SP_UM_IOSOCK sock(new messageqcpp::IOSocket);

View File

@ -55,11 +55,6 @@ FairThreadPool::~FairThreadPool()
} }
void FairThreadPool::addJob(const Job& job) void FairThreadPool::addJob(const Job& job)
{
addJob_(job);
}
void FairThreadPool::addJob_(const Job& job, bool useLock)
{ {
boost::thread* newThread; boost::thread* newThread;
std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t()); std::unique_lock<std::mutex> lk(mutex, std::defer_lock_t());
@ -72,8 +67,7 @@ void FairThreadPool::addJob_(const Job& job, bool useLock)
threadCounts_.fetch_add(1, std::memory_order_relaxed); threadCounts_.fetch_add(1, std::memory_order_relaxed);
} }
if (useLock) lk.lock();
lk.lock();
// If some threads have blocked (because of output queue full) // If some threads have blocked (because of output queue full)
// Temporarily add some extra worker threads to make up for the blocked threads. // Temporarily add some extra worker threads to make up for the blocked threads.
if (blockedThreads_ > extraThreads_) if (blockedThreads_ > extraThreads_)
@ -106,21 +100,22 @@ void FairThreadPool::addJob_(const Job& job, bool useLock)
jobsListMapIter->second->push_back(job); jobsListMapIter->second->push_back(job);
} }
if (useLock) newJob.notify_one();
newJob.notify_one();
} }
void FairThreadPool::removeJobs(uint32_t id) void FairThreadPool::removeJobs(uint32_t id)
{ {
// std::cout << "FairThreadPool::removeJobs id " << id << std::endl;
std::unique_lock<std::mutex> lk(mutex); std::unique_lock<std::mutex> lk(mutex);
for (auto& txnJobsMapPair : txn2JobsListMap_) auto txnJobsMapIter = txn2JobsListMap_.begin();
while (txnJobsMapIter != txn2JobsListMap_.end())
{ {
auto& txnJobsMapPair = *txnJobsMapIter;
ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second;
if (txnJobsList->empty()) // txnJobsList must not be nullptr
if (txnJobsList && txnJobsList->empty())
{ {
txn2JobsListMap_.erase(txnJobsMapPair.first); txnJobsMapIter = txn2JobsListMap_.erase(txnJobsMapIter);
delete txnJobsList; delete txnJobsList;
continue; continue;
// There is no clean-up for PQ. It will happen later in threadFcn // There is no clean-up for PQ. It will happen later in threadFcn
@ -128,21 +123,22 @@ void FairThreadPool::removeJobs(uint32_t id)
auto job = txnJobsList->begin(); auto job = txnJobsList->begin();
while (job != txnJobsList->end()) while (job != txnJobsList->end())
{ {
// std::cout << "removeJobs() job->id_ " << job->id_ << std::endl;
if (job->id_ == id) if (job->id_ == id)
{ {
job = txnJobsList->erase(job); // update the job iter job = txnJobsList->erase(job); // update the job iter
if (txnJobsList->empty()) continue; // go-on skiping job iter increment
{
txn2JobsListMap_.erase(txnJobsMapPair.first);
delete txnJobsList;
break;
// There is no clean-up for PQ. It will happen later in threadFcn
}
continue; // go-on skiping job iter increment
} }
++job; ++job;
} }
if (txnJobsList->empty())
{
txnJobsMapIter = txn2JobsListMap_.erase(txnJobsMapIter);
delete txnJobsList;
continue;
// There is no clean-up for PQ. It will happen later in threadFcn
}
++txnJobsMapIter;
} }
} }
@ -185,6 +181,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
{ {
ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second; ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second;
delete txnJobsList; delete txnJobsList;
// !txnAndJobListPair is invalidated after this!
txn2JobsListMap_.erase(txnAndJobListPair->first); txn2JobsListMap_.erase(txnAndJobListPair->first);
} }
weightedTxnsQueue_.pop(); weightedTxnsQueue_.pop();
@ -231,16 +228,12 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
{ {
// to avoid excessive CPU usage waiting for data from storage // to avoid excessive CPU usage waiting for data from storage
usleep(500); usleep(500);
lk.lock(); addJob(runList[0]);
addJob_(runList[0], false);
newJob.notify_one();
lk.unlock();
} }
} }
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
// std::cout << "FairThreadPool::threadFcn(): std::exception - no reschedule but send an error" << std::endl;
if (running) if (running)
{ {
jobsRunning_.fetch_sub(1, std::memory_order_relaxed); jobsRunning_.fetch_sub(1, std::memory_order_relaxed);
@ -270,13 +263,12 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
} }
catch (...) catch (...)
{ {
std::cout << "FairThreadPool::threadFcn(): std::exception - double exception: failed to send an error" << std::endl; std::cout << "FairThreadPool::threadFcn(): std::exception - double exception: failed to send an error"
<< std::endl;
} }
} }
catch (...) catch (...)
{ {
// std::cout << "FairThreadPool::threadFcn(): ... exception - no reschedule but send an error" << std::endl;
// Log the exception and exit this thread
try try
{ {
if (running) if (running)
@ -302,7 +294,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
} }
catch (...) catch (...)
{ {
std::cout << "FairThreadPool::threadFcn(): ... exception - double exception: failed to send an error" << std::endl; std::cout << "FairThreadPool::threadFcn(): ... exception - double exception: failed to send an error"
<< std::endl;
} }
} }
} }

View File

@ -39,6 +39,7 @@
namespace threadpool namespace threadpool
{ {
// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time // The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time
// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight // b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight
// stored in PriorityQueue that thread increases before run another morsel for the query. When query is // stored in PriorityQueue that thread increases before run another morsel for the query. When query is
@ -144,7 +145,6 @@ class FairThreadPool
explicit FairThreadPool(const FairThreadPool&); explicit FairThreadPool(const FairThreadPool&);
FairThreadPool& operator=(const FairThreadPool&); FairThreadPool& operator=(const FairThreadPool&);
void addJob_(const Job& job, bool useLock = true);
void threadFcn(const PriorityThreadPool::Priority preferredQueue); void threadFcn(const PriorityThreadPool::Priority preferredQueue);
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);