You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
This patch disables FairThreadPool to double check if this feature contributes to multiple strange side-effects and ocassional failed MTR tests
This commit is contained in:
@ -47,7 +47,7 @@
|
|||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
class BPPSeeder : public threadpool::FairThreadPool::Functor
|
class BPPSeeder : public threadpool::PriorityThreadPool::Functor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads,
|
BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads,
|
||||||
|
@ -24,13 +24,12 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "fair_threadpool.h"
|
|
||||||
#include "umsocketselector.h"
|
#include "umsocketselector.h"
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include "threadnaming.h"
|
#include "threadnaming.h"
|
||||||
#include "fair_threadpool.h"
|
#include "prioritythreadpool.h"
|
||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
@ -85,7 +84,7 @@ class BPPSendThread
|
|||||||
{
|
{
|
||||||
return die;
|
return die;
|
||||||
}
|
}
|
||||||
void setProcessorPool(boost::shared_ptr<threadpool::FairThreadPool> processorPool)
|
void setProcessorPool(boost::shared_ptr<threadpool::PriorityThreadPool> processorPool)
|
||||||
{
|
{
|
||||||
fProcessorPool = processorPool;
|
fProcessorPool = processorPool;
|
||||||
}
|
}
|
||||||
@ -149,7 +148,7 @@ class BPPSendThread
|
|||||||
uint64_t maxByteSize;
|
uint64_t maxByteSize;
|
||||||
// Used to tell the ThreadPool It should consider additional threads because a
|
// Used to tell the ThreadPool It should consider additional threads because a
|
||||||
// queue full event has happened and a thread has been blocked.
|
// queue full event has happened and a thread has been blocked.
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
|
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace primitiveprocessor
|
} // namespace primitiveprocessor
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||||
Copyright (C) 2016 MariaDB Corporation
|
Copyright (C) 2016-2022 MariaDB Corporation
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
This program is free software; you can redistribute it and/or
|
||||||
modify it under the terms of the GNU General Public License
|
modify it under the terms of the GNU General Public License
|
||||||
@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|||||||
// FIXME: there is an anon ns burried later in between 2 named namespaces...
|
// FIXME: there is an anon ns burried later in between 2 named namespaces...
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
|
boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
|
||||||
BlockRequestProcessor** BRPp;
|
BlockRequestProcessor** BRPp;
|
||||||
#ifndef _MSC_VER
|
#ifndef _MSC_VER
|
||||||
dbbc::Stats stats;
|
dbbc::Stats stats;
|
||||||
@ -1049,7 +1049,7 @@ using namespace primitiveprocessor;
|
|||||||
/** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
|
/** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
|
||||||
* TODO: Move this & the impl into different files
|
* TODO: Move this & the impl into different files
|
||||||
*/
|
*/
|
||||||
class DictScanJob : public threadpool::FairThreadPool::Functor
|
class DictScanJob : public threadpool::PriorityThreadPool::Functor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
|
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
|
||||||
@ -1240,7 +1240,7 @@ struct BPPHandler
|
|||||||
scoped.unlock();
|
scoped.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BPPHandlerFunctor : public FairThreadPool::Functor
|
struct BPPHandlerFunctor : public PriorityThreadPool::Functor
|
||||||
{
|
{
|
||||||
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
|
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : bs(b)
|
||||||
{
|
{
|
||||||
@ -1706,7 +1706,7 @@ return 0;
|
|||||||
PrimitiveServer* fPrimitiveServerPtr;
|
PrimitiveServer* fPrimitiveServerPtr;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DictionaryOp : public FairThreadPool::Functor
|
class DictionaryOp : public PriorityThreadPool::Functor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DictionaryOp(SBS cmd) : bs(cmd)
|
DictionaryOp(SBS cmd) : bs(cmd)
|
||||||
@ -1943,7 +1943,8 @@ struct ReadThread
|
|||||||
void operator()()
|
void operator()()
|
||||||
{
|
{
|
||||||
utils::setThreadName("PPReadThread");
|
utils::setThreadName("PPReadThread");
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool();
|
boost::shared_ptr<threadpool::PriorityThreadPool> procPoolPtr =
|
||||||
|
fPrimitiveServerPtr->getProcessorThreadPool();
|
||||||
SBS bs;
|
SBS bs;
|
||||||
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
|
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
|
||||||
|
|
||||||
@ -1994,9 +1995,6 @@ struct ReadThread
|
|||||||
idbassert(bs->length() >= sizeof(ISMPacketHeader));
|
idbassert(bs->length() >= sizeof(ISMPacketHeader));
|
||||||
|
|
||||||
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
|
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
|
||||||
// uint64_t someVal = ismHdr->Command;
|
|
||||||
// std::cout << " PP read thread Command " << someVal << std::endl;
|
|
||||||
|
|
||||||
/* This switch is for the OOB commands */
|
/* This switch is for the OOB commands */
|
||||||
switch (ismHdr->Command)
|
switch (ismHdr->Command)
|
||||||
{
|
{
|
||||||
@ -2056,7 +2054,7 @@ struct ReadThread
|
|||||||
const uint32_t weight = 1;
|
const uint32_t weight = 1;
|
||||||
const uint32_t priority = 0;
|
const uint32_t priority = 0;
|
||||||
uint32_t id = 0;
|
uint32_t id = 0;
|
||||||
boost::shared_ptr<FairThreadPool::Functor> functor;
|
boost::shared_ptr<PriorityThreadPool::Functor> functor;
|
||||||
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
|
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
|
||||||
{
|
{
|
||||||
functor.reset(new CreateEqualityFilter(bs));
|
functor.reset(new CreateEqualityFilter(bs));
|
||||||
@ -2088,7 +2086,7 @@ struct ReadThread
|
|||||||
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
|
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
|
||||||
functor.reset(new BPPHandler::Abort(fBPPHandler, bs));
|
functor.reset(new BPPHandler::Abort(fBPPHandler, bs));
|
||||||
}
|
}
|
||||||
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
||||||
procPoolPtr->addJob(job);
|
procPoolPtr->addJob(job);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2097,7 +2095,7 @@ struct ReadThread
|
|||||||
case BATCH_PRIMITIVE_RUN:
|
case BATCH_PRIMITIVE_RUN:
|
||||||
{
|
{
|
||||||
TokenByScanRequestHeader* hdr = nullptr;
|
TokenByScanRequestHeader* hdr = nullptr;
|
||||||
boost::shared_ptr<FairThreadPool::Functor> functor;
|
boost::shared_ptr<PriorityThreadPool::Functor> functor;
|
||||||
uint32_t id = 0;
|
uint32_t id = 0;
|
||||||
uint32_t weight = 0;
|
uint32_t weight = 0;
|
||||||
uint32_t priority = 0;
|
uint32_t priority = 0;
|
||||||
@ -2137,9 +2135,8 @@ struct ReadThread
|
|||||||
}
|
}
|
||||||
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
|
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
|
||||||
{
|
{
|
||||||
functor.reset(new BPPSeeder(bs, writeLock, outIos,
|
functor.reset(new BPPSeeder(bs, writeLock, outIos, fPrimitiveServerPtr->ProcessorThreads(),
|
||||||
fPrimitiveServerPtr->ProcessorThreads(),
|
fPrimitiveServerPtr->PTTrace()));
|
||||||
fPrimitiveServerPtr->PTTrace()));
|
|
||||||
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
|
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
|
||||||
id = bpps->getID();
|
id = bpps->getID();
|
||||||
priority = bpps->priority();
|
priority = bpps->priority();
|
||||||
@ -2150,7 +2147,7 @@ struct ReadThread
|
|||||||
uniqueID = *((uint32_t*)&buf[pos + 10]);
|
uniqueID = *((uint32_t*)&buf[pos + 10]);
|
||||||
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
|
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
|
||||||
}
|
}
|
||||||
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
||||||
procPoolPtr->addJob(job);
|
procPoolPtr->addJob(job);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
@ -2310,8 +2307,13 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
|
|||||||
fServerpool.setQueueSize(fServerQueueSize);
|
fServerpool.setQueueSize(fServerQueueSize);
|
||||||
fServerpool.setName("PrimitiveServer");
|
fServerpool.setName("PrimitiveServer");
|
||||||
|
|
||||||
fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
|
fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
|
||||||
medPriorityThreads, lowPriorityThreads, 0));
|
medPriorityThreads, lowPriorityThreads, 0));
|
||||||
|
// We're not using either the priority or the job-clustering features, just need a threadpool
|
||||||
|
// that can reschedule jobs, and an unlimited non-blocking queue
|
||||||
|
OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
|
||||||
|
// Initialize a local pointer.
|
||||||
|
fOOBPool = OOBPool;
|
||||||
|
|
||||||
asyncCounter = 0;
|
asyncCounter = 0;
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@
|
|||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
|
|
||||||
#include "threadpool.h"
|
#include "threadpool.h"
|
||||||
#include "fair_threadpool.h"
|
#include "prioritythreadpool.h"
|
||||||
#include "messagequeue.h"
|
#include "messagequeue.h"
|
||||||
#include "blockrequestprocessor.h"
|
#include "blockrequestprocessor.h"
|
||||||
#include "batchprimitiveprocessor.h"
|
#include "batchprimitiveprocessor.h"
|
||||||
@ -48,6 +48,7 @@ extern oam::OamCache* oamCache;
|
|||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
{
|
{
|
||||||
|
extern boost::shared_ptr<threadpool::PriorityThreadPool> OOBPool;
|
||||||
extern dbbc::BlockRequestProcessor** BRPp;
|
extern dbbc::BlockRequestProcessor** BRPp;
|
||||||
extern BRM::DBRM* brm;
|
extern BRM::DBRM* brm;
|
||||||
extern boost::mutex bppLock;
|
extern boost::mutex bppLock;
|
||||||
@ -129,11 +130,16 @@ class PrimitiveServer
|
|||||||
|
|
||||||
/** @brief get a pointer the shared processor thread pool
|
/** @brief get a pointer the shared processor thread pool
|
||||||
*/
|
*/
|
||||||
inline boost::shared_ptr<threadpool::FairThreadPool> getProcessorThreadPool() const
|
inline boost::shared_ptr<threadpool::PriorityThreadPool> getProcessorThreadPool() const
|
||||||
{
|
{
|
||||||
return fProcessorPool;
|
return fProcessorPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool() const
|
||||||
|
{
|
||||||
|
return fOOBPool;
|
||||||
|
}
|
||||||
|
|
||||||
int ReadAheadBlocks() const
|
int ReadAheadBlocks() const
|
||||||
{
|
{
|
||||||
return fReadAheadBlocks;
|
return fReadAheadBlocks;
|
||||||
@ -165,7 +171,8 @@ class PrimitiveServer
|
|||||||
/** @brief the thread pool used to process
|
/** @brief the thread pool used to process
|
||||||
* primitive commands
|
* primitive commands
|
||||||
*/
|
*/
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
|
boost::shared_ptr<threadpool::PriorityThreadPool> fProcessorPool;
|
||||||
|
boost::shared_ptr<threadpool::PriorityThreadPool> fOOBPool;
|
||||||
|
|
||||||
int fServerThreads;
|
int fServerThreads;
|
||||||
int fServerQueueSize;
|
int fServerQueueSize;
|
||||||
|
@ -18,22 +18,29 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <boost/shared_ptr.hpp>
|
#include <boost/shared_ptr.hpp>
|
||||||
#include "fair_threadpool.h"
|
#include "prioritythreadpool.h"
|
||||||
|
|
||||||
class PrimitiveServerThreadPools
|
class PrimitiveServerThreadPools
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
PrimitiveServerThreadPools() = default;
|
PrimitiveServerThreadPools() = default;
|
||||||
PrimitiveServerThreadPools(boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool)
|
PrimitiveServerThreadPools(boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool,
|
||||||
: fPrimServerThreadPool(primServerThreadPool)
|
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool)
|
||||||
|
: fPrimServerThreadPool(primServerThreadPool), fOOBThreadPool(OOBThreadPool)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> getPrimitiveServerThreadPool()
|
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool()
|
||||||
{
|
{
|
||||||
return fPrimServerThreadPool;
|
return fPrimServerThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool()
|
||||||
|
{
|
||||||
|
return fOOBThreadPool;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> fPrimServerThreadPool;
|
boost::shared_ptr<threadpool::PriorityThreadPool> fPrimServerThreadPool;
|
||||||
|
boost::shared_ptr<threadpool::PriorityThreadPool> fOOBThreadPool;
|
||||||
};
|
};
|
||||||
|
@ -725,6 +725,7 @@ int ServicePrimProc::Child()
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
primServerThreadPool = server.getProcessorThreadPool();
|
primServerThreadPool = server.getProcessorThreadPool();
|
||||||
|
OOBThreadPool = server.getOOBThreadPool();
|
||||||
|
|
||||||
server.start(this, startupRaceLock);
|
server.start(this, startupRaceLock);
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
#include <map>
|
#include <map>
|
||||||
|
|
||||||
#include "service.h"
|
#include "service.h"
|
||||||
#include "fair_threadpool.h"
|
#include "prioritythreadpool.h"
|
||||||
#include "pp_logger.h"
|
#include "pp_logger.h"
|
||||||
|
|
||||||
namespace primitiveprocessor
|
namespace primitiveprocessor
|
||||||
@ -155,12 +155,12 @@ class ServicePrimProc : public Service, public Opt
|
|||||||
|
|
||||||
void LogErrno() override
|
void LogErrno() override
|
||||||
{
|
{
|
||||||
cerr << strerror(errno) << endl;
|
std::cerr << strerror(errno) << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ParentLogChildMessage(const std::string& str) override
|
void ParentLogChildMessage(const std::string& str) override
|
||||||
{
|
{
|
||||||
cout << str << endl;
|
std::cout << str << std::endl;
|
||||||
}
|
}
|
||||||
int Child() override;
|
int Child() override;
|
||||||
int Run()
|
int Run()
|
||||||
@ -172,12 +172,12 @@ class ServicePrimProc : public Service, public Opt
|
|||||||
return startupRaceFlag_;
|
return startupRaceFlag_;
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> getPrimitiveServerThreadPool()
|
boost::shared_ptr<threadpool::PriorityThreadPool> getPrimitiveServerThreadPool()
|
||||||
{
|
{
|
||||||
return primServerThreadPool;
|
return primServerThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> getOOBThreadPool()
|
boost::shared_ptr<threadpool::PriorityThreadPool> getOOBThreadPool()
|
||||||
{
|
{
|
||||||
return OOBThreadPool;
|
return OOBThreadPool;
|
||||||
}
|
}
|
||||||
@ -190,6 +190,6 @@ class ServicePrimProc : public Service, public Opt
|
|||||||
static ServicePrimProc* fInstance;
|
static ServicePrimProc* fInstance;
|
||||||
// Since C++20 flag's init value is false.
|
// Since C++20 flag's init value is false.
|
||||||
std::atomic_flag startupRaceFlag_{false};
|
std::atomic_flag startupRaceFlag_{false};
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> primServerThreadPool;
|
boost::shared_ptr<threadpool::PriorityThreadPool> primServerThreadPool;
|
||||||
boost::shared_ptr<threadpool::FairThreadPool> OOBThreadPool;
|
boost::shared_ptr<threadpool::PriorityThreadPool> OOBThreadPool;
|
||||||
};
|
};
|
||||||
|
@ -520,7 +520,8 @@ namespace exemgr
|
|||||||
statementsRunningCount->incr(stmtCounted);
|
statementsRunningCount->incr(stmtCounted);
|
||||||
|
|
||||||
PrimitiveServerThreadPools primitiveServerThreadPools(
|
PrimitiveServerThreadPools primitiveServerThreadPools(
|
||||||
ServicePrimProc::instance()->getPrimitiveServerThreadPool());
|
ServicePrimProc::instance()->getPrimitiveServerThreadPool(),
|
||||||
|
ServicePrimProc::instance()->getOOBThreadPool());
|
||||||
|
|
||||||
if (tryTuples)
|
if (tryTuples)
|
||||||
{
|
{
|
||||||
|
@ -59,11 +59,6 @@ if (WITH_UNITTESTS)
|
|||||||
target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
||||||
gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:)
|
gtest_add_tests(TARGET simd_processors TEST_PREFIX columnstore:)
|
||||||
|
|
||||||
add_executable(fair_threadpool_test fair_threadpool.cpp)
|
|
||||||
add_dependencies(fair_threadpool_test googletest)
|
|
||||||
target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
|
|
||||||
gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:)
|
|
||||||
|
|
||||||
add_executable(comparators_tests comparators-tests.cpp)
|
add_executable(comparators_tests comparators-tests.cpp)
|
||||||
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
|
target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
|
||||||
add_test(NAME columnstore:comparators_tests COMMAND comparators_tests)
|
add_test(NAME columnstore:comparators_tests COMMAND comparators_tests)
|
||||||
|
@ -1,202 +0,0 @@
|
|||||||
/* Copyright (C) 2022 MariaDB Corporation
|
|
||||||
|
|
||||||
This program is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU General Public License
|
|
||||||
as published by the Free Software Foundation; version 2 of
|
|
||||||
the License.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
||||||
MA 02110-1301, USA. */
|
|
||||||
|
|
||||||
#include <iostream>
|
|
||||||
#include <gtest/gtest.h>
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "utils/threadpool/fair_threadpool.h"
|
|
||||||
|
|
||||||
using namespace primitiveprocessor;
|
|
||||||
using namespace std;
|
|
||||||
using namespace threadpool;
|
|
||||||
|
|
||||||
using ResultsType = std::vector<int>;
|
|
||||||
static ResultsType results;
|
|
||||||
|
|
||||||
class FairThreadPoolTest : public testing::Test
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
void SetUp() override
|
|
||||||
{
|
|
||||||
results.clear();
|
|
||||||
threadPool = new FairThreadPool(1, 1, 0, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
FairThreadPool* threadPool;
|
|
||||||
};
|
|
||||||
|
|
||||||
class TestFunctor : public FairThreadPool::Functor
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
TestFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
~TestFunctor(){};
|
|
||||||
int operator()() override
|
|
||||||
{
|
|
||||||
usleep(delay_);
|
|
||||||
results.push_back(id_);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
size_t id_;
|
|
||||||
size_t delay_;
|
|
||||||
};
|
|
||||||
|
|
||||||
class TestRescheduleFunctor : public FairThreadPool::Functor
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
TestRescheduleFunctor(const size_t id, const size_t delay) : id_(id), delay_(delay)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
~TestRescheduleFunctor(){};
|
|
||||||
int operator()() override
|
|
||||||
{
|
|
||||||
if (firstRun)
|
|
||||||
{
|
|
||||||
firstRun = false;
|
|
||||||
return 1; // re-schedule the Job
|
|
||||||
}
|
|
||||||
usleep(delay_);
|
|
||||||
results.push_back(id_);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
size_t id_;
|
|
||||||
size_t delay_;
|
|
||||||
bool firstRun = true;
|
|
||||||
};
|
|
||||||
|
|
||||||
testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a,
|
|
||||||
const size_t idxB, const int b)
|
|
||||||
{
|
|
||||||
if (arr.empty() || arr.size() <= max(idxA, idxB))
|
|
||||||
return testing::AssertionFailure() << "The supplied vector is either empty or not big enough.";
|
|
||||||
if (arr[idxA] == a && arr[idxB] == b)
|
|
||||||
return testing::AssertionSuccess();
|
|
||||||
if (arr[idxA] == b && arr[idxB] == a)
|
|
||||||
return testing::AssertionSuccess();
|
|
||||||
return testing::AssertionFailure() << "The values at positions " << idxA << " " << idxB << " are not " << a
|
|
||||||
<< " and " << b << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(FairThreadPoolTest, FairThreadPoolAdd)
|
|
||||||
{
|
|
||||||
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
|
||||||
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 50000));
|
|
||||||
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1);
|
|
||||||
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 5000));
|
|
||||||
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1);
|
|
||||||
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 5000));
|
|
||||||
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1);
|
|
||||||
|
|
||||||
threadPool->addJob(job1);
|
|
||||||
threadPool->addJob(job2);
|
|
||||||
threadPool->addJob(job3);
|
|
||||||
|
|
||||||
while (threadPool->queueSize())
|
|
||||||
{
|
|
||||||
usleep(250000);
|
|
||||||
}
|
|
||||||
|
|
||||||
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
|
||||||
EXPECT_EQ(results.size(), 3ULL);
|
|
||||||
EXPECT_EQ(results[0], 1);
|
|
||||||
EXPECT_EQ(results[1], 3);
|
|
||||||
EXPECT_EQ(results[2], 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(FairThreadPoolTest, FairThreadPoolRemove)
|
|
||||||
{
|
|
||||||
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
|
||||||
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
|
||||||
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
|
||||||
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
|
||||||
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2);
|
|
||||||
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
|
||||||
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3);
|
|
||||||
|
|
||||||
threadPool->addJob(job1);
|
|
||||||
threadPool->addJob(job2);
|
|
||||||
threadPool->addJob(job3);
|
|
||||||
threadPool->removeJobs(job2.id_);
|
|
||||||
|
|
||||||
while (threadPool->queueSize())
|
|
||||||
{
|
|
||||||
usleep(250000);
|
|
||||||
}
|
|
||||||
|
|
||||||
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
|
||||||
EXPECT_EQ(results.size(), 2ULL);
|
|
||||||
EXPECT_EQ(results[0], 1);
|
|
||||||
EXPECT_EQ(results[1], 3);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(FairThreadPoolTest, FairThreadPoolCleanUp)
|
|
||||||
{
|
|
||||||
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
|
||||||
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
|
||||||
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
|
||||||
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
|
||||||
FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2);
|
|
||||||
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
|
||||||
FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3);
|
|
||||||
|
|
||||||
threadPool->addJob(job1);
|
|
||||||
threadPool->removeJobs(job1.id_);
|
|
||||||
threadPool->addJob(job2);
|
|
||||||
threadPool->removeJobs(job2.id_);
|
|
||||||
threadPool->addJob(job3);
|
|
||||||
threadPool->removeJobs(job3.id_);
|
|
||||||
threadPool->removeJobs(job1.id_);
|
|
||||||
threadPool->removeJobs(job1.id_);
|
|
||||||
|
|
||||||
while (threadPool->queueSize())
|
|
||||||
{
|
|
||||||
usleep(250000);
|
|
||||||
}
|
|
||||||
|
|
||||||
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(FairThreadPoolTest, FairThreadPoolReschedule)
|
|
||||||
{
|
|
||||||
SP_UM_IOSOCK sock(new messageqcpp::IOSocket);
|
|
||||||
auto functor1 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(1, 100000));
|
|
||||||
FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1);
|
|
||||||
auto functor2 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(2, 50000));
|
|
||||||
FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2);
|
|
||||||
auto functor3 = boost::shared_ptr<FairThreadPool::Functor>(new TestFunctor(3, 50000));
|
|
||||||
FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3);
|
|
||||||
|
|
||||||
threadPool->addJob(job1);
|
|
||||||
threadPool->addJob(job2);
|
|
||||||
threadPool->addJob(job3);
|
|
||||||
|
|
||||||
while (threadPool->queueSize())
|
|
||||||
{
|
|
||||||
usleep(250000);
|
|
||||||
}
|
|
||||||
|
|
||||||
EXPECT_EQ(threadPool->queueSize(), 0ULL);
|
|
||||||
EXPECT_EQ(results.size(), 3ULL);
|
|
||||||
EXPECT_EQ(results[0], 1);
|
|
||||||
EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3));
|
|
||||||
}
|
|
@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} )
|
|||||||
|
|
||||||
########### next target ###############
|
########### next target ###############
|
||||||
|
|
||||||
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp)
|
set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp)
|
||||||
add_library(threadpool SHARED ${threadpool_LIB_SRCS})
|
add_library(threadpool SHARED ${threadpool_LIB_SRCS})
|
||||||
add_dependencies(threadpool loggingcpp)
|
add_dependencies(threadpool loggingcpp)
|
||||||
target_link_libraries(threadpool Boost::chrono)
|
target_link_libraries(threadpool Boost::chrono)
|
||||||
|
@ -52,19 +52,33 @@ class PriorityThreadPool
|
|||||||
// this thread pool to reschedule the job, 0 will throw it away on return.
|
// this thread pool to reschedule the job, 0 will throw it away on return.
|
||||||
virtual int operator()() = 0;
|
virtual int operator()() = 0;
|
||||||
};
|
};
|
||||||
|
using TransactionIdxT = uint32_t;
|
||||||
struct Job
|
struct Job
|
||||||
{
|
{
|
||||||
Job() : weight(1), priority(0), id(0)
|
Job() : weight(1), priority(0), id(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
|
||||||
|
const boost::shared_ptr<Functor>& functor, const primitiveprocessor::SP_UM_IOSOCK& sock,
|
||||||
|
const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0)
|
||||||
|
: uniqueID(uniqueID)
|
||||||
|
, stepID(stepID)
|
||||||
|
, txnIdx(txnIdx)
|
||||||
|
, functor(functor)
|
||||||
|
, sock(sock)
|
||||||
|
, weight(weight)
|
||||||
|
, priority(priority)
|
||||||
|
, id(id)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
uint32_t uniqueID;
|
||||||
|
uint32_t stepID;
|
||||||
|
TransactionIdxT txnIdx;
|
||||||
boost::shared_ptr<Functor> functor;
|
boost::shared_ptr<Functor> functor;
|
||||||
|
primitiveprocessor::SP_UM_IOSOCK sock;
|
||||||
uint32_t weight;
|
uint32_t weight;
|
||||||
uint32_t priority;
|
uint32_t priority;
|
||||||
uint32_t id;
|
uint32_t id;
|
||||||
uint32_t uniqueID;
|
|
||||||
uint32_t stepID;
|
|
||||||
primitiveprocessor::SP_UM_IOSOCK sock;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
enum Priority
|
enum Priority
|
||||||
@ -114,7 +128,7 @@ class PriorityThreadPool
|
|||||||
{
|
{
|
||||||
return blockedThreads;
|
return blockedThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
private:
|
private:
|
||||||
struct ThreadHelper
|
struct ThreadHelper
|
||||||
|
Reference in New Issue
Block a user