You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Merge pull request #2468 from drrtuy/MCOL-5044-reenable-fair
Revert "This patch disables FairThreadPool to double check if this fe…
This commit is contained in:
@ -47,7 +47,7 @@
|
|||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
class BPPSeeder : public threadpool::PriorityThreadPool::Functor
|
class BPPSeeder : public threadpool::FairThreadPool::Functor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads,
|
BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads,
|
||||||
|
@ -24,12 +24,13 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "fair_threadpool.h"
|
||||||
#include "umsocketselector.h"
|
#include "umsocketselector.h"
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include "threadnaming.h"
|
#include "threadnaming.h"
|
||||||
#include "prioritythreadpool.h"
|
#include "fair_threadpool.h"
|
||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
@ -84,7 +85,7 @@ class BPPSendThread
|
|||||||
{
|
{
|
||||||
return die;
|
return die;
|
||||||
}
|
}
|
||||||
void setProcessorPool(boost::shared_ptr<threadpool::PriorityThreadPool> processorPool)
|
void setProcessorPool(boost::shared_ptr<threadpool::FairThreadPool> processorPool)
|
||||||
{
|
{
|
||||||
fProcessorPool = processorPool;
|
fProcessorPool = processorPool;
|
||||||
}
|
}
|
||||||
@ -148,7 +149,7 @@ class BPPSendThread
|
|||||||
uint64_t maxByteSize;
|
uint64_t maxByteSize;
|
||||||
// Used to tell the ThreadPool It should consider additional threads because a
|
// Used to tell the ThreadPool It should consider additional threads because a
|
||||||
// queue full event has happened and a thread has been blocked.
|
// queue full event has happened and a thread has been blocked.
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool;
|
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace primitiveprocessor
|
} // namespace primitiveprocessor
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||||
Copyright (C) 2016-2022 MariaDB Corporation
|
Copyright (C) 2016 MariaDB Corporation
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
This program is free software; you can redistribute it and/or
|
||||||
modify it under the terms of the GNU General Public License
|
modify it under the terms of the GNU General Public License
|
||||||
@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|||||||
// FIXME: there is an anon ns burried later in between 2 named namespaces...
|
// FIXME: there is an anon ns burried later in between 2 named namespaces...
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
|
|
||||||
BlockRequestProcessor** BRPp;
|
BlockRequestProcessor** BRPp;
|
||||||
#ifndef _MSC_VER
|
#ifndef _MSC_VER
|
||||||
dbbc::Stats stats;
|
dbbc::Stats stats;
|
||||||
@ -1049,7 +1049,7 @@ using namespace primitiveprocessor;
|
|||||||
/** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
|
/** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
|
||||||
* TODO: Move this & the impl into different files
|
* TODO: Move this & the impl into different files
|
||||||
*/
|
*/
|
||||||
class DictScanJob : public threadpool::PriorityThreadPool::Functor
|
class DictScanJob : public threadpool::FairThreadPool::Functor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
|
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
|
||||||
@ -1240,7 +1240,7 @@ struct BPPHandler
|
|||||||
scoped.unlock();
|
scoped.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BPPHandlerFunctor : public PriorityThreadPool::Functor
|
struct BPPHandlerFunctor : public FairThreadPool::Functor
|
||||||
{
|
{
|
||||||
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
|
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
|
||||||
{
|
{
|
||||||
@ -1706,7 +1706,7 @@ return 0;
|
|||||||
PrimitiveServer* fPrimitiveServerPtr;
|
PrimitiveServer* fPrimitiveServerPtr;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DictionaryOp : public PriorityThreadPool::Functor
|
class DictionaryOp : public FairThreadPool::Functor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DictionaryOp(SBS cmd) : bs(cmd)
|
DictionaryOp(SBS cmd) : bs(cmd)
|
||||||
@ -1943,8 +1943,7 @@ struct ReadThread
|
|||||||
void operator()()
|
void operator()()
|
||||||
{
|
{
|
||||||
utils::setThreadName("PPReadThread");
|
utils::setThreadName("PPReadThread");
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> procPoolPtr =
|
boost::shared_ptr<threadpool::FairThreadPool> procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool();
|
||||||
fPrimitiveServerPtr->getProcessorThreadPool();
|
|
||||||
SBS bs;
|
SBS bs;
|
||||||
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
|
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
|
||||||
|
|
||||||
@ -1995,6 +1994,9 @@ struct ReadThread
|
|||||||
idbassert(bs->length() >= sizeof(ISMPacketHeader));
|
idbassert(bs->length() >= sizeof(ISMPacketHeader));
|
||||||
|
|
||||||
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
|
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
|
||||||
|
// uint64_t someVal = ismHdr->Command;
|
||||||
|
// std::cout << " PP read thread Command " << someVal << std::endl;
|
||||||
|
|
||||||
/* This switch is for the OOB commands */
|
/* This switch is for the OOB commands */
|
||||||
switch (ismHdr->Command)
|
switch (ismHdr->Command)
|
||||||
{
|
{
|
||||||
@ -2054,7 +2056,7 @@ struct ReadThread
|
|||||||
const uint32_t weight = 1;
|
const uint32_t weight = 1;
|
||||||
const uint32_t priority = 0;
|
const uint32_t priority = 0;
|
||||||
uint32_t id = 0;
|
uint32_t id = 0;
|
||||||
boost::shared_ptr<PriorityThreadPool::Functor> functor;
|
boost::shared_ptr<FairThreadPool::Functor> functor;
|
||||||
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
|
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
|
||||||
{
|
{
|
||||||
functor.reset(new CreateEqualityFilter(bs));
|
functor.reset(new CreateEqualityFilter(bs));
|
||||||
@ -2086,7 +2088,7 @@ struct ReadThread
|
|||||||
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
|
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
|
||||||
functor.reset(new BPPHandler::Abort(fBPPHandler, bs));
|
functor.reset(new BPPHandler::Abort(fBPPHandler, bs));
|
||||||
}
|
}
|
||||||
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
||||||
procPoolPtr->addJob(job);
|
procPoolPtr->addJob(job);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2095,7 +2097,7 @@ struct ReadThread
|
|||||||
case BATCH_PRIMITIVE_RUN:
|
case BATCH_PRIMITIVE_RUN:
|
||||||
{
|
{
|
||||||
TokenByScanRequestHeader* hdr = nullptr;
|
TokenByScanRequestHeader* hdr = nullptr;
|
||||||
boost::shared_ptr<PriorityThreadPool::Functor> functor;
|
boost::shared_ptr<FairThreadPool::Functor> functor;
|
||||||
uint32_t id = 0;
|
uint32_t id = 0;
|
||||||
uint32_t weight = 0;
|
uint32_t weight = 0;
|
||||||
uint32_t priority = 0;
|
uint32_t priority = 0;
|
||||||
@ -2135,8 +2137,9 @@ struct ReadThread
|
|||||||
}
|
}
|
||||||
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
|
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
|
||||||
{
|
{
|
||||||
functor.reset(new BPPSeeder(bs, writeLock, outIos, fPrimitiveServerPtr->ProcessorThreads(),
|
functor.reset(new BPPSeeder(bs, writeLock, outIos,
|
||||||
fPrimitiveServerPtr->PTTrace()));
|
fPrimitiveServerPtr->ProcessorThreads(),
|
||||||
|
fPrimitiveServerPtr->PTTrace()));
|
||||||
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
|
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
|
||||||
id = bpps->getID();
|
id = bpps->getID();
|
||||||
priority = bpps->priority();
|
priority = bpps->priority();
|
||||||
@ -2147,7 +2150,7 @@ struct ReadThread
|
|||||||
uniqueID = *((uint32_t*)&buf[pos + 10]);
|
uniqueID = *((uint32_t*)&buf[pos + 10]);
|
||||||
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
|
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
|
||||||
}
|
}
|
||||||
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
||||||
procPoolPtr->addJob(job);
|
procPoolPtr->addJob(job);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
@ -2307,13 +2310,8 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
|
|||||||
fServerpool.setQueueSize(fServerQueueSize);
|
fServerpool.setQueueSize(fServerQueueSize);
|
||||||
fServerpool.setName("PrimitiveServer");
|
fServerpool.setName("PrimitiveServer");
|
||||||
|
|
||||||
fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
|
fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
|
||||||
medPriorityThreads, lowPriorityThreads, 0));
|
medPriorityThreads, lowPriorityThreads, 0));
|
||||||
// We're not using either the priority or the job-clustering features, just need a threadpool
|
|
||||||
// that can reschedule jobs, and an unlimited non-blocking queue
|
|
||||||
OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
|
|
||||||
// Initialize a local pointer.
|
|
||||||
fOOBPool = OOBPool;
|
|
||||||
|
|
||||||
asyncCounter = 0;
|
asyncCounter = 0;
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@
|
|||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
|
|
||||||
#include "threadpool.h"
|
#include "threadpool.h"
|
||||||
#include "prioritythreadpool.h"
|
#include "fair_threadpool.h"
|
||||||
#include "messagequeue.h"
|
#include "messagequeue.h"
|
||||||
#include "blockrequestprocessor.h"
|
#include "blockrequestprocessor.h"
|
||||||
#include "batchprimitiveprocessor.h"
|
#include "batchprimitiveprocessor.h"
|
||||||
@ -48,7 +48,6 @@ extern oam::OamCache* oamCache;
|
|||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
extern boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
|
|
||||||
extern dbbc::BlockRequestProcessor** BRPp;
|
extern dbbc::BlockRequestProcessor** BRPp;
|
||||||
extern BRM::DBRM* brm;
|
extern BRM::DBRM* brm;
|
||||||
extern boost::mutex bppLock;
|
extern boost::mutex bppLock;
|
||||||
@ -130,16 +129,11 @@ class PrimitiveServer
|
|||||||
|
|
||||||
/** @brief get a pointer the shared processor thread pool
|
/** @brief get a pointer the shared processor thread pool
|
||||||
*/
|
*/
|
||||||
inline boost::shared_ptr<threadpool::PriorityThreadPool> getProcessorThreadPool() const
|
inline boost::shared_ptr<threadpool::FairThreadPool> getProcessorThreadPool() const
|
||||||
{
|
{
|
||||||
return fProcessorPool;
|
return fProcessorPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool() const
|
|
||||||
{
|
|
||||||
return fOOBPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ReadAheadBlocks() const
|
int ReadAheadBlocks() const
|
||||||
{
|
{
|
||||||
return fReadAheadBlocks;
|
return fReadAheadBlocks;
|
||||||
@ -171,8 +165,7 @@ class PrimitiveServer
|
|||||||
/** @brief the thread pool used to process
|
/** @brief the thread pool used to process
|
||||||
* primitive commands
|
* primitive commands
|
||||||
*/
|
*/
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool;
|
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> fOOBPool;
|
|
||||||
|
|
||||||
int fServerThreads;
|
int fServerThreads;
|
||||||
int fServerQueueSize;
|
int fServerQueueSize;
|
||||||
|
@ -18,29 +18,22 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <boost/shared_ptr.hpp>
|
#include <boost/shared_ptr.hpp>
|
||||||
#include "prioritythreadpool.h"
|
#include "fair_threadpool.h"
|
||||||
|
|
||||||
class PrimitiveServerThreadPools
|
class PrimitiveServerThreadPools
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
PrimitiveServerThreadPools() = default;
|
PrimitiveServerThreadPools() = default;
|
||||||
PrimitiveServerThreadPools(boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool,
|
PrimitiveServerThreadPools(boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool)
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool)
|
: fPrimServerThreadPool(primServerThreadPool)
|
||||||
: fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool()
|
boost::shared_ptr<threadpool::FairThreadPool> getPrimitiveServerThreadPool()
|
||||||
{
|
{
|
||||||
return fPrimServerThreadPool;
|
return fPrimServerThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool()
|
|
||||||
{
|
|
||||||
return fOOBThreadPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> fPrimServerThreadPool;
|
boost::shared_ptr<threadpool::FairThreadPool> fPrimServerThreadPool;
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> fOOBThreadPool;
|
|
||||||
};
|
};
|
||||||
|
@ -725,7 +725,6 @@ int ServicePrimProc::Child()
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
primServerThreadPool = server.getProcessorThreadPool();
|
primServerThreadPool = server.getProcessorThreadPool();
|
||||||
OOBThreadPool = server.getOOBThreadPool();
|
|
||||||
|
|
||||||
server.start(this, startupRaceLock);
|
server.start(this, startupRaceLock);
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
#include <map>
|
#include <map>
|
||||||
|
|
||||||
#include "service.h"
|
#include "service.h"
|
||||||
#include "prioritythreadpool.h"
|
#include "fair_threadpool.h"
|
||||||
#include "pp_logger.h"
|
#include "pp_logger.h"
|
||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
@ -155,12 +155,12 @@ class ServicePrimProc : public Service, public Opt
|
|||||||
|
|
||||||
void LogErrno() override
|
void LogErrno() override
|
||||||
{
|
{
|
||||||
std::cerr << strerror(errno) << std::endl;
|
cerr << strerror(errno) << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ParentLogChildMessage(const std::string& str) override
|
void ParentLogChildMessage(const std::string& str) override
|
||||||
{
|
{
|
||||||
std::cout << str << std::endl;
|
cout << str << endl;
|
||||||
}
|
}
|
||||||
int Child() override;
|
int Child() override;
|
||||||
int Run()
|
int Run()
|
||||||
@ -172,12 +172,12 @@ class ServicePrimProc : public Service, public Opt
|
|||||||
return startupRaceFlag_;
|
return startupRaceFlag_;
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool()
|
boost::shared_ptr<threadpool::FairThreadPool> getPrimitiveServerThreadPool()
|
||||||
{
|
{
|
||||||
return primServerThreadPool;
|
return primServerThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool()
|
boost::shared_ptr<threadpool::FairThreadPool> getOOBThreadPool()
|
||||||
{
|
{
|
||||||
return OOBThreadPool;
|
return OOBThreadPool;
|
||||||
}
|
}
|
||||||
@ -190,6 +190,6 @@ class ServicePrimProc : public Service, public Opt
|
|||||||
static ServicePrimProc* fInstance;
|
static ServicePrimProc* fInstance;
|
||||||
// Since C++20 flag's init value is false.
|
// Since C++20 flag's init value is false.
|
||||||
std::atomic_flag startupRaceFlag_{false};
|
std::atomic_flag startupRaceFlag_{false};
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool;
|
boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool;
|
||||||
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool;
|
boost::shared_ptr<threadpool::FairThreadPool> OOBThreadPool;
|
||||||
};
|
};
|
||||||
|
@ -520,8 +520,7 @@ namespace exemgr
|
|||||||
statementsRunningCount->incr(stmtCounted);
|
statementsRunningCount->incr(stmtCounted);
|
||||||
|
|
||||||
PrimitiveServerThreadPools primitiveServerThreadPools(
|
PrimitiveServerThreadPools primitiveServerThreadPools(
|
||||||
ServicePrimProc::instance()->getPrimitiveServerThreadPool(),
|
ServicePrimProc::instance()->getPrimitiveServerThreadPool());
|
||||||
ServicePrimProc::instance()->getOOBThreadPool());
|
|
||||||
|
|
||||||
if (tryTuples)
|
if (tryTuples)
|
||||||
{
|
{
|
||||||
|
@ -59,6 +59,11 @@ if (WITH_UNITTESTS)
|
|||||||
target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
||||||
gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:)
|
gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:)
|
||||||
|
|
||||||
|
add_executable(fair_threadpool_test fair_threadpool.cpp)
|
||||||
|
add_dependencies(fair_threadpool_test googletest)
|
||||||
|
target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
||||||
|
gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:)
|
||||||
|
|
||||||
add_executable(comparators_tests comparators-tests.cpp)
|
add_executable(comparators_tests comparators-tests.cpp)
|
||||||
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
|
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
|
||||||
add_test(NAME columnstore:comparators_tests COMMAND comparators_tests)
|
add_test(NAME columnstore:comparators_tests COMMAND comparators_tests)
|
||||||
|
202
tests/fair_threadpool.cpp
Normal file
202
tests/fair_threadpool.cpp
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
/* Copyright (C) 2022 MariaDB Corporation
|
||||||
|
|
||||||
|
This program is free software; you can redistribute it and/or
|
||||||
|
modify it under the terms of the GNU General Public License
|
||||||
|
as published by the Free Software Foundation; version 2 of
|
||||||
|
the License.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program; if not, write to the Free Software
|
||||||
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||||
|
MA 02110-1301, USA. */
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "utils/threadpool/fair_threadpool.h"
|
||||||
|
|
||||||
|
using namespace primitiveprocessor;
|
||||||
|
using namespace std;
|
||||||
|
using namespace threadpool;
|
||||||
|
|
||||||
|
using ResultsType = std::vector<int>;
|
||||||
|
static ResultsType results;
|
||||||
|
|
||||||
|
class FairThreadPoolTest : public testing::Test
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
void SetUp() override
|
||||||
|
{
|
||||||
|
results.clear();
|
||||||
|
threadPool = new FairThreadPool(1, 1, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
FairThreadPool* threadPool;
|
||||||
|
};
|
||||||
|
|
||||||
|
class TestFunctor : public FairThreadPool::Functor
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TestFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
~TestFunctor(){};
|
||||||
|
int operator()() override
|
||||||
|
{
|
||||||
|
usleep(delay_);
|
||||||
|
results.push_back(id_);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
size_t id_;
|
||||||
|
size_t delay_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class TestRescheduleFunctor : public FairThreadPool::Functor
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TestRescheduleFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
~TestRescheduleFunctor(){};
|
||||||
|
int operator()() override
|
||||||
|
{
|
||||||
|
if (firstRun)
|
||||||
|
{
|
||||||
|
firstRun = false;
|
||||||
|
return 1; // re-schedule the Job
|
||||||
|
}
|
||||||
|
usleep(delay_);
|
||||||
|
results.push_back(id_);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
size_t id_;
|
||||||
|
size_t delay_;
|
||||||
|
bool firstRun = true;
|
||||||
|
};
|
||||||
|
|
||||||
|
testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a,
|
||||||
|
const size_t idxB, const int b)
|
||||||
|
{
|
||||||
|
if (arr.empty() || arr.size() <= max(idxA, idxB))
|
||||||
|
return testing::AssertionFailure() << "The supplied vector is either empty or not big enough.";
|
||||||
|
if (arr[idxA] == a && arr[idxB] == b)
|
||||||
|
return testing::AssertionSuccess();
|
||||||
|
if (arr[idxA] == b && arr[idxB] == a)
|
||||||
|
return testing::AssertionSuccess();
|
||||||
|
return testing::AssertionFailure() << "The values at positions " << idxA << " " << idxB << " are not " << a
|
||||||
|
<< " and " << b << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(FairThreadPoolTest, FairThreadPoolAdd)
|
||||||
|
{
|
||||||
|
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
||||||
|
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 50000));
|
||||||
|
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1);
|
||||||
|
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 5000));
|
||||||
|
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1);
|
||||||
|
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 5000));
|
||||||
|
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1);
|
||||||
|
|
||||||
|
threadPool->addJob(job1);
|
||||||
|
threadPool->addJob(job2);
|
||||||
|
threadPool->addJob(job3);
|
||||||
|
|
||||||
|
while (threadPool->queueSize())
|
||||||
|
{
|
||||||
|
usleep(250000);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
||||||
|
EXPECT_EQ(results.size(), 3ULL);
|
||||||
|
EXPECT_EQ(results[0], 1);
|
||||||
|
EXPECT_EQ(results[1], 3);
|
||||||
|
EXPECT_EQ(results[2], 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(FairThreadPoolTest, FairThreadPoolRemove)
|
||||||
|
{
|
||||||
|
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
||||||
|
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
||||||
|
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
||||||
|
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
||||||
|
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2);
|
||||||
|
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
||||||
|
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3);
|
||||||
|
|
||||||
|
threadPool->addJob(job1);
|
||||||
|
threadPool->addJob(job2);
|
||||||
|
threadPool->addJob(job3);
|
||||||
|
threadPool->removeJobs(job2.id_);
|
||||||
|
|
||||||
|
while (threadPool->queueSize())
|
||||||
|
{
|
||||||
|
usleep(250000);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
||||||
|
EXPECT_EQ(results.size(), 2ULL);
|
||||||
|
EXPECT_EQ(results[0], 1);
|
||||||
|
EXPECT_EQ(results[1], 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(FairThreadPoolTest, FairThreadPoolCleanUp)
|
||||||
|
{
|
||||||
|
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
||||||
|
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
||||||
|
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
||||||
|
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
||||||
|
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2);
|
||||||
|
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
||||||
|
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3);
|
||||||
|
|
||||||
|
threadPool->addJob(job1);
|
||||||
|
threadPool->removeJobs(job1.id_);
|
||||||
|
threadPool->addJob(job2);
|
||||||
|
threadPool->removeJobs(job2.id_);
|
||||||
|
threadPool->addJob(job3);
|
||||||
|
threadPool->removeJobs(job3.id_);
|
||||||
|
threadPool->removeJobs(job1.id_);
|
||||||
|
threadPool->removeJobs(job1.id_);
|
||||||
|
|
||||||
|
while (threadPool->queueSize())
|
||||||
|
{
|
||||||
|
usleep(250000);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(FairThreadPoolTest, FairThreadPoolReschedule)
|
||||||
|
{
|
||||||
|
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
||||||
|
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
||||||
|
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
||||||
|
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
||||||
|
FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2);
|
||||||
|
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
||||||
|
FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3);
|
||||||
|
|
||||||
|
threadPool->addJob(job1);
|
||||||
|
threadPool->addJob(job2);
|
||||||
|
threadPool->addJob(job3);
|
||||||
|
|
||||||
|
while (threadPool->queueSize())
|
||||||
|
{
|
||||||
|
usleep(250000);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
||||||
|
EXPECT_EQ(results.size(), 3ULL);
|
||||||
|
EXPECT_EQ(results[0], 1);
|
||||||
|
EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3));
|
||||||
|
}
|
@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} )
|
|||||||
|
|
||||||
########### next target ###############
|
########### next target ###############
|
||||||
|
|
||||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp)
|
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp)
|
||||||
add_library(threadpool SHARED ${threadpool_LIB_SRCS})
|
add_library(threadpool SHARED ${threadpool_LIB_SRCS})
|
||||||
add_dependencies(threadpool loggingcpp)
|
add_dependencies(threadpool loggingcpp)
|
||||||
target_link_libraries(threadpool Boost::chrono)
|
target_link_libraries(threadpool Boost::chrono)
|
||||||
|
@ -52,33 +52,19 @@ class PriorityThreadPool
|
|||||||
// this thread pool to reschedule the job, 0 will throw it away on return.
|
// this thread pool to reschedule the job, 0 will throw it away on return.
|
||||||
virtual int operator()() = 0;
|
virtual int operator()() = 0;
|
||||||
};
|
};
|
||||||
using TransactionIdxT = uint32_t;
|
|
||||||
struct Job
|
struct Job
|
||||||
{
|
{
|
||||||
Job() : weight(1), priority(0), id(0)
|
Job() : weight(1), priority(0), id(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
|
|
||||||
const boost::shared_ptr<Functor>& functor, const primitiveprocessor::SP_UM_IOSOCK& sock,
|
|
||||||
const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0)
|
|
||||||
: uniqueID(uniqueID)
|
|
||||||
, stepID(stepID)
|
|
||||||
, txnIdx(txnIdx)
|
|
||||||
, functor(functor)
|
|
||||||
, sock(sock)
|
|
||||||
, weight(weight)
|
|
||||||
, priority(priority)
|
|
||||||
, id(id)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
uint32_t uniqueID;
|
|
||||||
uint32_t stepID;
|
|
||||||
TransactionIdxT txnIdx;
|
|
||||||
boost::shared_ptr<Functor> functor;
|
boost::shared_ptr<Functor> functor;
|
||||||
primitiveprocessor::SP_UM_IOSOCK sock;
|
|
||||||
uint32_t weight;
|
uint32_t weight;
|
||||||
uint32_t priority;
|
uint32_t priority;
|
||||||
uint32_t id;
|
uint32_t id;
|
||||||
|
uint32_t uniqueID;
|
||||||
|
uint32_t stepID;
|
||||||
|
primitiveprocessor::SP_UM_IOSOCK sock;
|
||||||
};
|
};
|
||||||
|
|
||||||
enum Priority
|
enum Priority
|
||||||
@ -128,7 +114,7 @@ class PriorityThreadPool
|
|||||||
{
|
{
|
||||||
return blockedThreads;
|
return blockedThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
private:
|
private:
|
||||||
struct ThreadHelper
|
struct ThreadHelper
|
||||||
|
Reference in New Issue
Block a user