You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
fix(pp-threadpool): the workaround for a stuck tests001 in CI (#2931)
CI ocassionaly stuck running test001 b/c PP threadpool endlessly reschedules meta jobs, e.g. BATCH_PRIMITIVE_CREATE, which ByteStreams were somehow damaged or read out. Co-authored-by: Leonid Fedorov <leonid.fedorov@mariadb.com>
This commit is contained in:
@ -31,7 +31,7 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
//#define NDEBUG
|
// #define NDEBUG
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
#include <boost/thread/condition.hpp>
|
#include <boost/thread/condition.hpp>
|
||||||
@ -348,7 +348,6 @@ uint32_t loadBlocks(LBID_t* lbids, QueryContext qc, VER_t txn, int compType, uin
|
|||||||
|
|
||||||
*blocksWereVersioned = false;
|
*blocksWereVersioned = false;
|
||||||
|
|
||||||
|
|
||||||
if (LBIDTrace)
|
if (LBIDTrace)
|
||||||
{
|
{
|
||||||
for (i = 0; i < blockCount; i++)
|
for (i = 0; i < blockCount; i++)
|
||||||
@ -410,7 +409,6 @@ uint32_t loadBlocks(LBID_t* lbids, QueryContext qc, VER_t txn, int compType, uin
|
|||||||
{
|
{
|
||||||
prefetchBlocks(lbids[0], compType, &blksRead);
|
prefetchBlocks(lbids[0], compType, &blksRead);
|
||||||
|
|
||||||
|
|
||||||
if (fPMProfOn)
|
if (fPMProfOn)
|
||||||
pmstats.markEvent(lbids[0], (pthread_t)-1, sessionID, 'M');
|
pmstats.markEvent(lbids[0], (pthread_t)-1, sessionID, 'M');
|
||||||
|
|
||||||
@ -494,11 +492,9 @@ void loadBlock(uint64_t lbid, QueryContext v, uint32_t t, int compType, void* bu
|
|||||||
uint32_t blksRead = 0;
|
uint32_t blksRead = 0;
|
||||||
VSSCache::iterator it;
|
VSSCache::iterator it;
|
||||||
|
|
||||||
|
|
||||||
if (LBIDTrace)
|
if (LBIDTrace)
|
||||||
stats.touchedLBID(lbid, pthread_self(), sessionID);
|
stats.touchedLBID(lbid, pthread_self(), sessionID);
|
||||||
|
|
||||||
|
|
||||||
if (vssCache)
|
if (vssCache)
|
||||||
{
|
{
|
||||||
it = vssCache->find(lbid);
|
it = vssCache->find(lbid);
|
||||||
@ -554,14 +550,12 @@ void loadBlock(uint64_t lbid, QueryContext v, uint32_t t, int compType, void* bu
|
|||||||
SUMMARY_INFO2("open failed: ", fileNamePtr);
|
SUMMARY_INFO2("open failed: ", fileNamePtr);
|
||||||
char errbuf[80];
|
char errbuf[80];
|
||||||
string errMsg;
|
string errMsg;
|
||||||
//#if STRERROR_R_CHAR_P
|
// #if STRERROR_R_CHAR_P
|
||||||
const char* p;
|
const char* p;
|
||||||
|
|
||||||
if ((p = strerror_r(errCode, errbuf, 80)) != 0)
|
if ((p = strerror_r(errCode, errbuf, 80)) != 0)
|
||||||
errMsg = p;
|
errMsg = p;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (errCode == EINVAL)
|
if (errCode == EINVAL)
|
||||||
{
|
{
|
||||||
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT),
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT),
|
||||||
@ -809,7 +803,6 @@ void loadBlock(uint64_t lbid, QueryContext v, uint32_t t, int compType, void* bu
|
|||||||
{
|
{
|
||||||
prefetchBlocks(lbid, compType, &blksRead);
|
prefetchBlocks(lbid, compType, &blksRead);
|
||||||
|
|
||||||
|
|
||||||
if (fPMProfOn)
|
if (fPMProfOn)
|
||||||
pmstats.markEvent(lbid, (pthread_t)-1, sessionID, 'M');
|
pmstats.markEvent(lbid, (pthread_t)-1, sessionID, 'M');
|
||||||
|
|
||||||
@ -981,7 +974,7 @@ void loadBlockAsync(uint64_t lbid, const QueryContext& c, uint32_t txn, int comp
|
|||||||
|
|
||||||
} // namespace primitiveprocessor
|
} // namespace primitiveprocessor
|
||||||
|
|
||||||
//#define DCT_DEBUG 1
|
// #define DCT_DEBUG 1
|
||||||
#define SETUP_GUARD \
|
#define SETUP_GUARD \
|
||||||
{ \
|
{ \
|
||||||
unsigned char* o = outputp.get(); \
|
unsigned char* o = outputp.get(); \
|
||||||
@ -1563,6 +1556,14 @@ struct BPPHandler
|
|||||||
|
|
||||||
int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime)
|
int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime)
|
||||||
{
|
{
|
||||||
|
// This is a corner case that damages bs so its length becomes less than a header length.
|
||||||
|
// The damaged bs doesn't pass the if that checks bs at least has header + 3x int32_t.
|
||||||
|
// The if block below works around the issue.
|
||||||
|
if (posix_time::second_clock::universal_time() > dieTime)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t uniqueID, sessionID, stepID;
|
uint32_t uniqueID, sessionID, stepID;
|
||||||
BPPMap::iterator it;
|
BPPMap::iterator it;
|
||||||
if (bs.length() < sizeof(ISMPacketHeader) + sizeof(sessionID) + sizeof(stepID) + sizeof(uniqueID))
|
if (bs.length() < sizeof(ISMPacketHeader) + sizeof(sessionID) + sizeof(stepID) + sizeof(uniqueID))
|
||||||
@ -1939,7 +1940,7 @@ struct ReadThread
|
|||||||
const uint32_t txnId = *((uint32_t*)&buf[pos + 2]);
|
const uint32_t txnId = *((uint32_t*)&buf[pos + 2]);
|
||||||
const uint32_t stepID = *((uint32_t*)&buf[pos + 6]);
|
const uint32_t stepID = *((uint32_t*)&buf[pos + 6]);
|
||||||
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
|
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
|
||||||
const uint32_t weight = 1;
|
const uint32_t weight = threadpool::MetaJobsInitialWeight;
|
||||||
const uint32_t priority = 0;
|
const uint32_t priority = 0;
|
||||||
uint32_t id = 0;
|
uint32_t id = 0;
|
||||||
boost::shared_ptr<FairThreadPool::Functor> functor;
|
boost::shared_ptr<FairThreadPool::Functor> functor;
|
||||||
|
@ -228,6 +228,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
|
|||||||
{
|
{
|
||||||
// to avoid excessive CPU usage waiting for data from storage
|
// to avoid excessive CPU usage waiting for data from storage
|
||||||
usleep(500);
|
usleep(500);
|
||||||
|
runList[0].weight_ += RescheduleWeightIncrement;
|
||||||
addJob(runList[0]);
|
addJob(runList[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,12 @@
|
|||||||
namespace threadpool
|
namespace threadpool
|
||||||
{
|
{
|
||||||
|
|
||||||
|
// Meta Jobs, e.g. BATCH_PRIMITIVE_CREATE has very small weight if a number of such Jobs
|
||||||
|
// stuck in the scheduler queue they will starve the whole queue so that no Job could be run
|
||||||
|
// except these meta jobs.
|
||||||
|
constexpr const uint32_t RescheduleWeightIncrement = 10000;
|
||||||
|
constexpr const uint32_t MetaJobsInitialWeight = 1;
|
||||||
|
|
||||||
// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time
|
// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time
|
||||||
// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight
|
// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight
|
||||||
// stored in PriorityQueue that thread increases before run another morsel for the query. When query is
|
// stored in PriorityQueue that thread increases before run another morsel for the query. When query is
|
||||||
|
Reference in New Issue
Block a user