From 765dd46b61334c750c6fa5e5f22e9761950ec29d Mon Sep 17 00:00:00 2001 From: drrtuy Date: Thu, 17 Aug 2023 23:02:31 +0200 Subject: [PATCH] fix(pp-threadpool): the workaround for a stuck tests001 in CI (#2931) CI ocassionaly stuck running test001 b/c PP threadpool endlessly reschedules meta jobs, e.g. BATCH_PRIMITIVE_CREATE, which ByteStreams were somehow damaged or read out. Co-authored-by: Leonid Fedorov --- primitives/primproc/primitiveserver.cpp | 23 ++++++++++++----------- utils/threadpool/fair_threadpool.cpp | 1 + utils/threadpool/fair_threadpool.h | 6 ++++++ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 8a3fc4182..4819200a0 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -31,7 +31,7 @@ #include #include -//#define NDEBUG +// #define NDEBUG #include #include #include @@ -348,7 +348,6 @@ uint32_t loadBlocks(LBID_t* lbids, QueryContext qc, VER_t txn, int compType, uin *blocksWereVersioned = false; - if (LBIDTrace) { for (i = 0; i < blockCount; i++) @@ -410,7 +409,6 @@ uint32_t loadBlocks(LBID_t* lbids, QueryContext qc, VER_t txn, int compType, uin { prefetchBlocks(lbids[0], compType, &blksRead); - if (fPMProfOn) pmstats.markEvent(lbids[0], (pthread_t)-1, sessionID, 'M'); @@ -494,11 +492,9 @@ void loadBlock(uint64_t lbid, QueryContext v, uint32_t t, int compType, void* bu uint32_t blksRead = 0; VSSCache::iterator it; - if (LBIDTrace) stats.touchedLBID(lbid, pthread_self(), sessionID); - if (vssCache) { it = vssCache->find(lbid); @@ -554,14 +550,12 @@ void loadBlock(uint64_t lbid, QueryContext v, uint32_t t, int compType, void* bu SUMMARY_INFO2("open failed: ", fileNamePtr); char errbuf[80]; string errMsg; - //#if STRERROR_R_CHAR_P + // #if STRERROR_R_CHAR_P const char* p; if ((p = strerror_r(errCode, errbuf, 80)) != 0) errMsg = p; - - if (errCode == EINVAL) { throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT), @@ -809,7 +803,6 @@ void loadBlock(uint64_t lbid, QueryContext v, uint32_t t, int compType, void* bu { prefetchBlocks(lbid, compType, &blksRead); - if (fPMProfOn) pmstats.markEvent(lbid, (pthread_t)-1, sessionID, 'M'); @@ -981,7 +974,7 @@ void loadBlockAsync(uint64_t lbid, const QueryContext& c, uint32_t txn, int comp } // namespace primitiveprocessor -//#define DCT_DEBUG 1 +// #define DCT_DEBUG 1 #define SETUP_GUARD \ { \ unsigned char* o = outputp.get(); \ @@ -1563,6 +1556,14 @@ struct BPPHandler int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime) { + // This is a corner case that damages bs so its length becomes less than a header length. + // The damaged bs doesn't pass the if that checks bs at least has header + 3x int32_t. + // The if block below works around the issue. + if (posix_time::second_clock::universal_time() > dieTime) + { + return 0; + } + uint32_t uniqueID, sessionID, stepID; BPPMap::iterator it; if (bs.length() < sizeof(ISMPacketHeader) + sizeof(sessionID) + sizeof(stepID) + sizeof(uniqueID)) @@ -1939,7 +1940,7 @@ struct ReadThread const uint32_t txnId = *((uint32_t*)&buf[pos + 2]); const uint32_t stepID = *((uint32_t*)&buf[pos + 6]); const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); - const uint32_t weight = 1; + const uint32_t weight = threadpool::MetaJobsInitialWeight; const uint32_t priority = 0; uint32_t id = 0; boost::shared_ptr functor; diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index 7c267e42c..cfd0350fd 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -228,6 +228,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue { // to avoid excessive CPU usage waiting for data from storage usleep(500); + runList[0].weight_ += RescheduleWeightIncrement; addJob(runList[0]); } } diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h index 42a9f7fb4..24adb1aed 100644 --- a/utils/threadpool/fair_threadpool.h +++ b/utils/threadpool/fair_threadpool.h @@ -40,6 +40,12 @@ namespace threadpool { +// Meta Jobs, e.g. BATCH_PRIMITIVE_CREATE has very small weight if a number of such Jobs +// stuck in the scheduler queue they will starve the whole queue so that no Job could be run +// except these meta jobs. +constexpr const uint32_t RescheduleWeightIncrement = 10000; +constexpr const uint32_t MetaJobsInitialWeight = 1; + // The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time // b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight // stored in PriorityQueue that thread increases before run another morsel for the query. When query is