diff --git a/.drone.jsonnet b/.drone.jsonnet index 0f0e7b0a4..e71268154 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -156,7 +156,6 @@ local Pipeline(branch, platform, event, arch='amd64') = { mtr:: { name: 'mtr', image: 'docker:git', - [if arch == 'arm64' then 'failure']: 'ignore', volumes: [pipeline._volumes.docker], commands: [ 'docker run --volume /sys/fs/cgroup:/sys/fs/cgroup:ro --env MYSQL_TEST_DIR=' + mtr_path + ' --env DEBIAN_FRONTEND=noninteractive --env MCS_USE_S3_STORAGE=0 --name mtr$${DRONE_BUILD_NUMBER} --privileged --detach ' + img + ' ' + init + ' --unit=basic.target', diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 4699042bc..af95dae09 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1274,6 +1274,10 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum) bs << uniqueID; bs << _priority; + // The weight is used by PrimProc thread pool algo + uint32_t weight = calculateBPPWeight(); + bs << weight; + bs << dbRoot; bs << count; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index a249b3102..73a636f2f 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -252,8 +252,27 @@ class BatchPrimitiveProcessorJL } private: - // void setLBIDForScan(uint64_t rid, uint32_t dbroot); + const size_t perColumnProjectWeight_ = 10; + const size_t perColumnFilteringWeight_ = 10; + const size_t fe1Weight_ = 10; + const size_t fe2Weight_ = 10; + const size_t joinWeight_ = 500; + const size_t aggregationWeight_ = 500; + // This is simple SQL operations-based model leveraged by + // FairThreadPool run by PP facility. + // Every operation mentioned in this calculation spends + // some CPU so the morsel uses this op weights more. + uint32_t calculateBPPWeight() const + { + uint32_t weight = perColumnProjectWeight_ * projectCount; + weight += filterCount * perColumnFilteringWeight_; + weight += tJoiners.size() * joinWeight_; + weight += (aggregatorPM) ? aggregationWeight_ : 0; + weight += (fe1) ? fe1Weight_ : 0; + weight += (fe2) ? fe2Weight_ : 0; + return weight; + } BPSOutputType ot; bool needToSetLBID; diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index 204a7ef60..1acadfb27 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -20,7 +20,6 @@ #include "tuplehashjoin.h" #include "joinpartition.h" #include "threadnaming.h" -#include "../../utils/threadpool/prioritythreadpool.h" #pragma once diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 172abd861..ba83c5656 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -1478,7 +1478,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector& functionVec[i]->fAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAgg.push_back(oidsProj[j]); keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); @@ -1488,7 +1488,7 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector& widthAgg.push_back(sizeof(long double)); ++lastCol; - // sum(x**2) + // sum(x_i - mean)^2 oidsAgg.push_back(oidsProj[j]); keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); @@ -1910,7 +1910,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = ++colAgg; - // sum(x) + // mean(x) oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); @@ -1920,7 +1920,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAggDist.push_back(oidsAgg[j]); keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); @@ -2591,7 +2591,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector widthAggPm.push_back(sizeof(double)); funct->fAuxColumnIndex = ++colAggPm; - // sum(x) + // mean(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); @@ -3253,7 +3253,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector widthAggPm.push_back(sizeof(long double)); ++colAggPm; - // sum(x**2) + // sum(x_i - mean)^2 oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); @@ -3701,7 +3701,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector functionVecUm[i]->fAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAggUm.push_back(oidsAggPm[j]); keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); @@ -3711,7 +3711,7 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector widthAggUm.push_back(sizeof(long double)); ++lastCol; - // sum(x**2) + // sum(x_i - mean)^2 oidsAggUm.push_back(oidsAggPm[j]); keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); @@ -4152,7 +4152,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = ++colAggPm; - // sum(x) + // mean(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); @@ -4162,7 +4162,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vectorfAuxColumnIndex = lastCol; - // sum(x) + // mean(x) oidsAggDist.push_back(oidsAggPm[j]); keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); @@ -4818,7 +4818,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector /dev/null 2>&1" Restart=on-failure -TimeoutStopSec=120 +TimeoutStopSec=750 EnvironmentFile=-/etc/columnstore/systemd.env +TimeoutStopSec=900 \ No newline at end of file diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 99224c7ac..0a0080f61 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -140,6 +140,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() , ptMask(0) , firstInstance(false) , valuesLBID(0) + , weight_(0) { pp.setLogicalBlockMode(true); pp.setBlockPtr((int*)blockData); @@ -193,6 +194,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, // ptMask(processorThreads - 1), , firstInstance(true) , valuesLBID(0) + , weight_(0) { // promote processorThreads to next power of 2. also need to change the name to bucketCount or similar processorThreads = nextPowOf2(processorThreads); @@ -542,6 +544,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con // skip the header, sessionID, stepID, uniqueID, and priority bs.advance(sizeof(ISMPacketHeader) + 16); + bs >> weight_; bs >> dbRoot; bs >> count; bs >> ridCount; diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 7a18c39d5..fc4fcda5b 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -137,6 +137,11 @@ class BatchPrimitiveProcessor fBusy = b; } + size_t getWeight() const + { + return weight_; + } + uint16_t FilterCount() const { return filterCount; @@ -434,6 +439,9 @@ class BatchPrimitiveProcessor uint ptMask; bool firstInstance; uint64_t valuesLBID; + uint32_t weight_; + + static const uint64_t maxResultCount = 1048576; // 2^20 static const uint64_t maxResultCount = 1048576; // 2^20 diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index 5d56d3559..adda29b6a 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -47,7 +47,7 @@ namespace primitiveprocessor { -class BPPSeeder : public threadpool::PriorityThreadPool::Functor +class BPPSeeder : public threadpool::FairThreadPool::Functor { public: BPPSeeder(const messageqcpp::SBS&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& ios, const int pmThreads, @@ -71,6 +71,11 @@ class BPPSeeder : public threadpool::PriorityThreadPool::Functor { return _priority; } + size_t getWeight() const + { + assert(bpp); + return bpp->getWeight(); + } private: BPPSeeder(); diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index fa3ec5ebd..35d2cf63a 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -124,7 +124,7 @@ oam::OamCache* oamCache = oam::OamCache::makeOamCache(); // FIXME: there is an anon ns burried later in between 2 named namespaces... namespace primitiveprocessor { -boost::shared_ptr OOBPool; +boost::shared_ptr OOBPool; BlockRequestProcessor** BRPp; #ifndef _MSC_VER @@ -1050,7 +1050,7 @@ using namespace primitiveprocessor; /** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM) * TODO: Move this & the impl into different files */ -class DictScanJob : public threadpool::PriorityThreadPool::Functor +class DictScanJob : public threadpool::FairThreadPool::Functor { public: DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); @@ -1242,7 +1242,7 @@ struct BPPHandler scoped.unlock(); } - struct BPPHandlerFunctor : public PriorityThreadPool::Functor + struct BPPHandlerFunctor : public FairThreadPool::Functor { BPPHandlerFunctor(boost::shared_ptr r, SBS b) : bs(b) { @@ -1710,7 +1710,7 @@ return 0; PrimitiveServer* fPrimitiveServerPtr; }; -class DictionaryOp : public PriorityThreadPool::Functor +class DictionaryOp : public FairThreadPool::Functor { public: DictionaryOp(SBS cmd) : bs(cmd) @@ -1947,7 +1947,7 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - threadpool::PriorityThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); + threadpool::FairThreadPool* procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2043,35 +2043,69 @@ struct ReadThread switch (ismHdr->Command) { case DICT_CREATE_EQUALITY_FILTER: - { - PriorityThreadPool::Job job; - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - job.functor = boost::shared_ptr(new CreateEqualityFilter(bs)); - OOBPool->addJob(job); - break; - } - case DICT_DESTROY_EQUALITY_FILTER: + case BATCH_PRIMITIVE_CREATE: + case BATCH_PRIMITIVE_ADD_JOINER: + case BATCH_PRIMITIVE_END_JOINER: + case BATCH_PRIMITIVE_DESTROY: + case BATCH_PRIMITIVE_ABORT: { - PriorityThreadPool::Job job; const uint8_t* buf = bs->buf(); uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - job.functor = boost::shared_ptr(new DestroyEqualityFilter(bs)); + const uint32_t txnId = *((uint32_t*)&buf[pos + 2]); + const uint32_t stepID = *((uint32_t*)&buf[pos + 6]); + const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); + const uint32_t weight = 1; + const uint32_t priority = 0; + uint32_t id = 0; + boost::shared_ptr functor; + if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) + { + functor.reset(new CreateEqualityFilter(bs)); + } + else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER) + { + functor.reset(new DestroyEqualityFilter(bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE) + { + functor.reset(new BPPHandler::Create(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER) + { + functor.reset(new BPPHandler::AddJoiner(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER) + { + id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + functor.reset(new BPPHandler::LastJoiner(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY) + { + functor.reset(new BPPHandler::Destroy(fBPPHandler, bs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT) + { + id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + functor.reset(new BPPHandler::Abort(fBPPHandler, bs)); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); OOBPool->addJob(job); break; } case DICT_TOKEN_BY_SCAN_COMPARE: + case BATCH_PRIMITIVE_RUN: { - idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); - TokenByScanRequestHeader* hdr = (TokenByScanRequestHeader*)ismHdr; + TokenByScanRequestHeader* hdr = nullptr; + boost::shared_ptr functor; + uint32_t id = 0; + uint32_t weight = 0; + uint32_t priority = 0; + uint32_t txnId = 0; + uint32_t stepID = 0; + uint32_t uniqueID = 0; + bool isSyscat = false; if (bRotateDest) { @@ -2089,23 +2123,41 @@ struct ReadThread } } - PriorityThreadPool::Job job; - job.functor = boost::shared_ptr(new DictScanJob(outIos, bs, writeLock)); - job.id = hdr->Hdr.UniqueID; - job.weight = LOGICAL_BLOCK_RIDS; - job.priority = hdr->Hdr.Priority; - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - - if (hdr->flags & IS_SYSCAT) + if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE) + { + idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); + hdr = (TokenByScanRequestHeader*)ismHdr; + functor.reset(new DictScanJob(outIos, bs, writeLock)); + id = hdr->Hdr.UniqueID; + weight = LOGICAL_BLOCK_RIDS; + priority = hdr->Hdr.Priority; + const uint8_t* buf = bs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + isSyscat = hdr->flags & IS_SYSCAT; + } + else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) + { + functor.reset(new BPPSeeder(bs, writeLock, outIos, + fPrimitiveServerPtr->ProcessorThreads(), + fPrimitiveServerPtr->PTTrace())); + BPPSeeder* bpps = dynamic_cast(functor.get()); + id = bpps->getID(); + priority = bpps->priority(); + const uint8_t* buf = bs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); + isSyscat = bpps->isSysCat(); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + + if (isSyscat) { - // boost::thread t(DictScanJob(outIos, bs, writeLock)); - // using already-existing threads may cut latency - // if it's changed back to running in an independent thread - // change the issyscat() checks in BPPSeeder as well OOBPool->addJob(job); } else @@ -2116,146 +2168,11 @@ struct ReadThread break; } - case BATCH_PRIMITIVE_RUN: - { - if (bRotateDest) - { - if (!pUmSocketSelector->nextIOSocket(fIos, outIos, writeLock)) - { - // If we ever fall into this part of the - // code we have a "bug" of some sort. - // See handleUmSockSelErr() for more info. - // We reset ios and mutex to defaults. - handleUmSockSelErr(string("BPR cmd")); - outIos = outIosDefault; - writeLock = writeLockDefault; - pUmSocketSelector->delConnection(fIos); - bRotateDest = false; - } - } - - /* Decide whether this is a syscat call and run - right away instead of queueing */ - boost::shared_ptr bpps(new BPPSeeder(bs, writeLock, outIos, - fPrimitiveServerPtr->ProcessorThreads(), - fPrimitiveServerPtr->PTTrace())); - PriorityThreadPool::Job job; - job.functor = bpps; - job.id = bpps->getID(); - job.weight = ismHdr->Size; - job.priority = bpps->priority(); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - - if (bpps->isSysCat()) - { - // boost::thread t(*bpps); - // using already-existing threads may cut latency - // if it's changed back to running in an independent thread - // change the issyscat() checks in BPPSeeder as well - OOBPool->addJob(job); - } - else - { - procPoolPtr->addJob(job); - } - - break; - } - - case BATCH_PRIMITIVE_CREATE: - { - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Create(fBPPHandler, bs)); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->createBPP(*bs); - break; - } - - case BATCH_PRIMITIVE_ADD_JOINER: - { - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::AddJoiner(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->addJoinerToBPP(*bs); - break; - } - - case BATCH_PRIMITIVE_END_JOINER: - { - // lastJoinerMsg can block; must do this in a different thread - // OOBPool->invoke(BPPHandler::LastJoiner(fBPPHandler, bs)); // needs a threadpool that can - // resched boost::thread tmp(BPPHandler::LastJoiner(fBPPHandler, bs)); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::LastJoiner(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - break; - } - - case BATCH_PRIMITIVE_DESTROY: - { - // OOBPool->invoke(BPPHandler::Destroy(fBPPHandler, bs)); // needs a threadpool that can - // resched boost::thread tmp(BPPHandler::Destroy(fBPPHandler, bs)); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Destroy(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - // fBPPHandler->destroyBPP(*bs); - break; - } - case BATCH_PRIMITIVE_ACK: { fBPPHandler->doAck(*bs); break; } - - case BATCH_PRIMITIVE_ABORT: - { - // OBPool->invoke(BPPHandler::Abort(fBPPHandler, bs)); - // fBPPHandler->doAbort(*bs); - PriorityThreadPool::Job job; - job.functor = - boost::shared_ptr(new BPPHandler::Abort(fBPPHandler, bs)); - job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - job.stepID = *((uint32_t*)&buf[pos + 6]); - job.uniqueID = *((uint32_t*)&buf[pos + 10]); - job.sock = outIos; - OOBPool->addJob(job); - break; - } - default: { std::ostringstream os; @@ -2405,12 +2322,12 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fServerpool.setQueueSize(fServerQueueSize); fServerpool.setName("PrimitiveServer"); - fProcessorPool = new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, + fProcessorPool = new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, medPriorityThreads, lowPriorityThreads, 0); // We're not using either the priority or the job-clustering features, just need a threadpool // that can reschedule jobs, and an unlimited non-blocking queue - OOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); + OOBPool.reset(new threadpool::FairThreadPool(1, 5, 0, 0, 1)); asyncCounter = 0; diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 6dde07130..0d6c9df2a 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -37,6 +37,7 @@ #include "threadpool.h" #include "../../utils/threadpool/prioritythreadpool.h" +#include "fair_threadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" #include "batchprimitiveprocessor.h" @@ -48,7 +49,7 @@ extern oam::OamCache* oamCache; namespace primitiveprocessor { -extern boost::shared_ptr OOBPool; +extern boost::shared_ptr OOBPool; extern dbbc::BlockRequestProcessor** BRPp; extern BRM::DBRM* brm; extern boost::mutex bppLock; @@ -130,7 +131,7 @@ class PrimitiveServer /** @brief get a pointer the shared processor thread pool */ - inline threadpool::PriorityThreadPool* getProcessorThreadPool() const + inline threadpool::FairThreadPool* getProcessorThreadPool() const { return fProcessorPool; } @@ -167,7 +168,7 @@ class PrimitiveServer /** @brief the thread pool used to process * primitive commands */ - threadpool::PriorityThreadPool* fProcessorPool; + threadpool::FairThreadPool* fProcessorPool; int fServerThreads; int fServerQueueSize; diff --git a/storage-manager/src/CloudStorage.cpp b/storage-manager/src/CloudStorage.cpp index 8e2465583..40eae93ae 100644 --- a/storage-manager/src/CloudStorage.cpp +++ b/storage-manager/src/CloudStorage.cpp @@ -56,7 +56,18 @@ CloudStorage* CloudStorage::get() if (inst) return inst; if (type == "s3") - inst = new S3Storage(); + { + try + { + inst = new S3Storage(); + } + catch (exception& e) + { + cout << "S3 Storage Manager Configuration Error:" << endl; + cout << e.what() << endl; + throw runtime_error("S3Storage: Failed"); + } + } else if (type == "local" || type == "localstorage") inst = new LocalStorage(); else diff --git a/storage-manager/src/Config.cpp b/storage-manager/src/Config.cpp index fa9873484..f634ececb 100644 --- a/storage-manager/src/Config.cpp +++ b/storage-manager/src/Config.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -30,7 +31,6 @@ #include #include #include -#include #include "SMLogging.h" @@ -165,13 +165,13 @@ bool Config::reload() return rtn; } -string use_envvar(const std::smatch& envvar) +string use_envvar(const boost::smatch& envvar) { char* env = getenv(envvar[1].str().c_str()); return (env ? env : ""); } -string expand_numbers(const std::smatch& match) +string expand_numbers(const boost::smatch& match) { long long num = stol(match[1].str()); char suffix = (char)::tolower(match[2].str()[0]); @@ -187,20 +187,6 @@ string expand_numbers(const std::smatch& match) return ::to_string(num); } -std::string regex_replace_with_format(const std::string& input, - const std::regex& regex, - std::function format) -{ - - std::ostringstream output; - std::sregex_iterator begin(input.begin(), input.end(), regex), end; - for(; begin != end; begin++){ - output << begin->prefix() << format(*begin); - } - output << input.substr(input.size() - begin->position()); - return output.str(); -} - string Config::getValue(const string& section, const string& key) const { // if we care, move this envvar substition stuff to where the file is loaded @@ -216,15 +202,15 @@ string Config::getValue(const string& section, const string& key) const } s.unlock(); - std::regex re("\\$\\{(.+)\\}"); + boost::regex re("\\$\\{(.+)\\}"); - ret = regex_replace_with_format(ret, re, use_envvar); + ret = boost::regex_replace(ret, re, use_envvar); // do the numeric substitutions. ex, the suffixes m, k, g // ehhhhh. going to end up turning a string to a number, to a string, and then to a number again // don't like that. OTOH who cares. - std::regex num_re("^([[:digit:]]+)([mMkKgG])$", std::regex::extended); - ret = regex_replace_with_format(ret, num_re, expand_numbers); + boost::regex num_re("^([[:digit:]]+)([mMkKgG])$", boost::regex::extended); + ret = boost::regex_replace(ret, num_re, expand_numbers); return ret; } diff --git a/storage-manager/src/IOCoordinator.cpp b/storage-manager/src/IOCoordinator.cpp index 52c1567f6..b17d502b2 100644 --- a/storage-manager/src/IOCoordinator.cpp +++ b/storage-manager/src/IOCoordinator.cpp @@ -25,6 +25,8 @@ #include #include #include +#define BOOST_SPIRIT_THREADSAFE +#include #include #include "checks.h" #include "vlarray.h" @@ -1264,10 +1266,9 @@ boost::shared_array IOCoordinator::mergeJournal(const char* object, con boost::shared_array headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); stringstream ss; ss << headertxt.get(); - - nlohmann::json header = nlohmann::json::parse(ss); - - assert(header["version"] == 1); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == 1); // start processing the entries while (1) @@ -1352,9 +1353,9 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array& objData, size boost::shared_array headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); stringstream ss; ss << headertxt.get(); - - nlohmann::json header = nlohmann::json::parse(ss); - assert(header["version"] == 1); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == 1); // read the journal file into memory size_t journalBytes = ::lseek(journalFD, 0, SEEK_END) - l_bytesRead; @@ -1432,9 +1433,9 @@ int IOCoordinator::mergeJournalInMem_bigJ(boost::shared_array& objData, boost::shared_array headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead); stringstream ss; ss << headertxt.get(); - - nlohmann::json header = nlohmann::json::parse(ss); - assert(header["version"] == 1); + boost::property_tree::ptree header; + boost::property_tree::json_parser::read_json(ss, header); + assert(header.get("version") == 1); // start processing the entries while (1) diff --git a/storage-manager/src/MetadataFile.cpp b/storage-manager/src/MetadataFile.cpp index 5386beae5..e45c95173 100644 --- a/storage-manager/src/MetadataFile.cpp +++ b/storage-manager/src/MetadataFile.cpp @@ -20,17 +20,20 @@ */ #include "MetadataFile.h" #include +#define BOOST_SPIRIT_THREADSAFE +#include +#include +#include #include #include #include -#include #include -#include #define max(x, y) (x > y ? x : y) #define min(x, y) (x < y ? x : y) using namespace std; +namespace bpt = boost::property_tree; namespace bf = boost::filesystem; namespace @@ -117,13 +120,12 @@ MetadataFile::MetadataFile(const boost::filesystem::path& filename) { if (boost::filesystem::exists(mFilename)) { - std::ifstream i(mFilename.string()); - jsontree.reset(new nlohmann::json); - i >> *jsontree; + jsontree.reset(new bpt::ptree()); + boost::property_tree::read_json(mFilename.string(), *jsontree); jsonCache.put(mFilename, jsontree); s.unlock(); mVersion = 1; - mRevision = (*jsontree)["revision"]; + mRevision = jsontree->get("revision"); } else { @@ -138,7 +140,7 @@ MetadataFile::MetadataFile(const boost::filesystem::path& filename) { s.unlock(); mVersion = 1; - mRevision = (*jsontree)["revision"];; + mRevision = jsontree->get("revision"); } ++metadataFilesAccessed; } @@ -160,13 +162,12 @@ MetadataFile::MetadataFile(const boost::filesystem::path& filename, no_create_t, if (boost::filesystem::exists(mFilename)) { _exists = true; - jsontree.reset(new nlohmann::json); - std::ifstream i(mFilename.string()); - i >> *jsontree; + jsontree.reset(new bpt::ptree()); + boost::property_tree::read_json(mFilename.string(), *jsontree); jsonCache.put(mFilename, jsontree); s.unlock(); mVersion = 1; - mRevision = (*jsontree)["revision"]; + mRevision = jsontree->get("revision"); } else { @@ -181,7 +182,7 @@ MetadataFile::MetadataFile(const boost::filesystem::path& filename, no_create_t, s.unlock(); _exists = true; mVersion = 1; - mRevision = (*jsontree)["revision"]; + mRevision = jsontree->get("revision"); } ++metadataFilesAccessed; } @@ -192,10 +193,11 @@ MetadataFile::~MetadataFile() void MetadataFile::makeEmptyJsonTree() { - jsontree.reset(new nlohmann::json); - (*jsontree)["version"] = mVersion; - (*jsontree)["revision"] = mRevision; - (*jsontree)["objects"] = nlohmann::json::array(); + jsontree.reset(new bpt::ptree()); + boost::property_tree::ptree objs; + jsontree->put("version", mVersion); + jsontree->put("revision", mRevision); + jsontree->add_child("objects", objs); } void MetadataFile::printKPIs() @@ -217,11 +219,11 @@ size_t MetadataFile::getLength() const { size_t totalSize = 0; - auto &objects = (*jsontree)["objects"]; + auto& objects = jsontree->get_child("objects"); if (!objects.empty()) { - auto& lastObject = objects.back(); - totalSize = lastObject["offset"].get() + lastObject["length"].get(); + auto& lastObject = objects.back().second; + totalSize = lastObject.get("offset") + lastObject.get("length"); } return totalSize; } @@ -241,9 +243,10 @@ vector MetadataFile::metadataRead(off_t offset, size_t length) c rather than write a new alg. */ set mObjects; - for(const auto &v : (*jsontree)["objects"]) + BOOST_FOREACH (const boost::property_tree::ptree::value_type& v, jsontree->get_child("objects")) { - mObjects.insert(metadataObject(v["offset"], v["length"], v["key"])); + mObjects.insert(metadataObject(v.second.get("offset"), v.second.get("length"), + v.second.get("key"))); } if (mObjects.size() == 0) @@ -285,20 +288,20 @@ metadataObject MetadataFile::addMetadataObject(const boost::filesystem::path& fi // metadataObject addObject; - auto& objects = (*jsontree)["objects"]; + auto& objects = jsontree->get_child("objects"); if (!objects.empty()) { - auto& lastObject = objects.back(); - addObject.offset = lastObject["offset"].get() + mpConfig->mObjectSize; + auto& lastObject = objects.back().second; + addObject.offset = lastObject.get("offset") + mpConfig->mObjectSize; } addObject.length = length; addObject.key = getNewKey(filename.string(), addObject.offset, addObject.length); - nlohmann::json object = nlohmann::json::object(); - object["offset"] = addObject.offset; - object["length"] = addObject.length; - object["key"] = addObject.key; - objects.push_back(object); + boost::property_tree::ptree object; + object.put("offset", addObject.offset); + object.put("length", addObject.length); + object.put("key", addObject.key); + objects.push_back(make_pair("", object)); return addObject; } @@ -309,8 +312,7 @@ int MetadataFile::writeMetadata() if (!boost::filesystem::exists(mFilename.parent_path())) boost::filesystem::create_directories(mFilename.parent_path()); - std::ofstream o(mFilename.c_str()); - o << *jsontree; + write_json(mFilename.string(), *jsontree); _exists = true; boost::unique_lock s(jsonCache.getMutex()); @@ -322,13 +324,13 @@ int MetadataFile::writeMetadata() bool MetadataFile::getEntry(off_t offset, metadataObject* out) const { metadataObject addObject; - for(auto &v: (*jsontree)["objects"]) + BOOST_FOREACH (const boost::property_tree::ptree::value_type& v, jsontree->get_child("objects")) { - if (v["offset"].get() == offset) + if (v.second.get("offset") == offset) { out->offset = offset; - out->length = v["length"].get(); - out->key = v["key"]; + out->length = v.second.get("length"); + out->key = v.second.get("key"); return true; } } @@ -337,10 +339,10 @@ bool MetadataFile::getEntry(off_t offset, metadataObject* out) const void MetadataFile::removeEntry(off_t offset) { - auto& objects = (*jsontree)["objects"]; - for (auto it = objects.begin(); it != objects.end(); ++it) + bpt::ptree& objects = jsontree->get_child("objects"); + for (bpt::ptree::iterator it = objects.begin(); it != objects.end(); ++it) { - if ((*it)["offset"].get() == offset) + if (it->second.get("offset") == offset) { objects.erase(it); break; @@ -350,7 +352,7 @@ void MetadataFile::removeEntry(off_t offset) void MetadataFile::removeAllEntries() { - (*jsontree)["objects"] = nlohmann::json::array(); + jsontree->get_child("objects").clear(); } void MetadataFile::deletedMeta(const bf::path& p) @@ -454,21 +456,21 @@ void MetadataFile::setLengthInKey(string& key, size_t newLength) void MetadataFile::printObjects() const { - for (auto& v : (*jsontree)["objects"]) + BOOST_FOREACH (const boost::property_tree::ptree::value_type& v, jsontree->get_child("objects")) { - printf("Name: %s Length: %zu Offset: %lld\n", v["key"].get().c_str(), - v["length"].get(), (long long)v["offset"].get()); + printf("Name: %s Length: %zu Offset: %lld\n", v.second.get("key").c_str(), + v.second.get("length"), (long long)v.second.get("offset")); } } void MetadataFile::updateEntry(off_t offset, const string& newName, size_t newLength) { - for (auto& v : (*jsontree)["objects"]) + for (auto& v : jsontree->get_child("objects")) { - if (v["offset"].get() == offset) + if (v.second.get("offset") == offset) { - v["key"] = newName; - v["length"] = newLength; + v.second.put("key", newName); + v.second.put("length", newLength); return; } } @@ -480,11 +482,11 @@ void MetadataFile::updateEntry(off_t offset, const string& newName, size_t newLe void MetadataFile::updateEntryLength(off_t offset, size_t newLength) { - for (auto& v : (*jsontree)["objects"]) + for (auto& v : jsontree->get_child("objects")) { - if (v["offset"].get() == offset) + if (v.second.get("offset") == offset) { - v["length"] = newLength; + v.second.put("length", newLength); return; } } @@ -496,12 +498,11 @@ void MetadataFile::updateEntryLength(off_t offset, size_t newLength) off_t MetadataFile::getMetadataNewObjectOffset() { - auto& objects = (*jsontree)["objects"]; + auto& objects = jsontree->get_child("objects"); if (objects.empty()) return 0; - - auto& lastObject = objects.back(); - return lastObject["offset"].get() + lastObject["length"].get(); + auto& lastObject = jsontree->get_child("objects").back().second; + return lastObject.get("offset") + lastObject.get("length"); } metadataObject::metadataObject() : offset(0), length(0) diff --git a/storage-manager/src/MetadataFile.h b/storage-manager/src/MetadataFile.h index 94d65c404..5c302c618 100644 --- a/storage-manager/src/MetadataFile.h +++ b/storage-manager/src/MetadataFile.h @@ -28,8 +28,6 @@ #include #include -#include - namespace storagemanager { struct metadataObject @@ -112,7 +110,7 @@ class MetadataFile static void printKPIs(); - typedef boost::shared_ptr Jsontree_t; + typedef boost::shared_ptr Jsontree_t; private: MetadataConfig* mpConfig; diff --git a/storage-manager/src/Replicator.cpp b/storage-manager/src/Replicator.cpp index 2a9241306..10fc07002 100644 --- a/storage-manager/src/Replicator.cpp +++ b/storage-manager/src/Replicator.cpp @@ -27,12 +27,12 @@ #include #include #include +#define BOOST_SPIRIT_THREADSAFE +#include #include #include #include -#include - using namespace std; namespace @@ -279,14 +279,12 @@ int Replicator::addJournalEntry(const boost::filesystem::path& filename, const u stringstream ss; ss << headertxt.get(); headerRollback = headertxt.get(); - nlohmann::json header; - + boost::property_tree::ptree header; try { - header = nlohmann::json::parse(ss); + boost::property_tree::json_parser::read_json(ss, header); } - - catch (const nlohmann::json::exception& e) + catch (const boost::property_tree::json_parser::json_parser_error& e) { mpLogger->log(LOG_CRIT, "%s", e.what()); errno = EIO; @@ -298,8 +296,8 @@ int Replicator::addJournalEntry(const boost::filesystem::path& filename, const u errno = EIO; return -1; } - assert(header["version"] == 1); - uint64_t currentMaxOffset = header["max_offset"]; + assert(header.get("version") == 1); + uint64_t currentMaxOffset = header.get("max_offset"); if (thisEntryMaxOffset > currentMaxOffset) { bHeaderChanged = true; diff --git a/storage-manager/src/S3Storage.cpp b/storage-manager/src/S3Storage.cpp index 9f14e1562..22560b41a 100644 --- a/storage-manager/src/S3Storage.cpp +++ b/storage-manager/src/S3Storage.cpp @@ -26,9 +26,9 @@ #include #include #include - -#include "utils/json/json.hpp" - +#define BOOST_SPIRIT_THREADSAFE +#include +#include #include "Utilities.h" using namespace std; @@ -52,7 +52,7 @@ static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* use inline bool retryable_error(uint8_t s3err) { return (s3err == MS3_ERR_RESPONSE_PARSE || s3err == MS3_ERR_REQUEST_ERROR || s3err == MS3_ERR_OOM || - s3err == MS3_ERR_IMPOSSIBLE || s3err == MS3_ERR_AUTH || s3err == MS3_ERR_SERVER || + s3err == MS3_ERR_IMPOSSIBLE || s3err == MS3_ERR_SERVER || s3err == MS3_ERR_AUTH_ROLE); } @@ -258,12 +258,12 @@ bool S3Storage::getCredentialsFromMetadataEC2() logger->log(LOG_ERR, "CURL fail %u", curl_res); return false; } - - nlohmann::json pt = nlohmann::json::parse(readBuffer); - key = pt["AccessKeyId"]; - secret = pt["SecretAccessKey"]; - token = pt["Token"]; - + stringstream credentials(readBuffer); + boost::property_tree::ptree pt; + boost::property_tree::read_json(credentials, pt); + key = pt.get("AccessKeyId"); + secret = pt.get("SecretAccessKey"); + token = pt.get("Token"); // logger->log(LOG_INFO, "S3Storage: key = %s secret = %s token = // %s",key.c_str(),secret.c_str(),token.c_str()); @@ -294,6 +294,12 @@ void S3Storage::testConnectivityAndPerms() err = deleteObject(testObjKey); if (err) FAIL(DELETE) + err = exists(testObjKey, &_exists); + if (err) + { + logger->log(LOG_CRIT, "S3Storage::exists() failed on nonexistent object. Check 'ListBucket' permissions."); + FAIL(HEAD) + } logger->log(LOG_INFO, "S3Storage: S3 connectivity & permissions are OK"); } diff --git a/storage-manager/src/smcat.cpp b/storage-manager/src/smcat.cpp index 101ab41a7..48dc50772 100644 --- a/storage-manager/src/smcat.cpp +++ b/storage-manager/src/smcat.cpp @@ -87,6 +87,7 @@ void catFileOffline(const char* filename, int prefixlen) catch (exception& e) { cerr << "smcat catFileOffline FAIL: " << e.what() << endl; + exit(1); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a8af32368..b6438868f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -51,6 +51,11 @@ if (WITH_UNITTESTS) target_link_libraries(simd_processors ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) gtest_discover_tests(simd_processors TEST_PREFIX columnstore:) + add_executable(fair_threadpool_test fair_threadpool.cpp) + add_dependencies(fair_threadpool_test googletest) + target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) + gtest_discover_tests(fair_threadpool_test TEST_PREFIX columnstore:) + # CPPUNIT TESTS add_executable(we_shared_components_tests shared_components_tests.cpp) add_dependencies(we_shared_components_tests loggingcpp) diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp new file mode 100644 index 000000000..c4a299bd1 --- /dev/null +++ b/tests/fair_threadpool.cpp @@ -0,0 +1,173 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include + +#include "utils/threadpool/fair_threadpool.h" + +using namespace primitiveprocessor; +using namespace std; +using namespace threadpool; + +using ResultsType = std::vector; +static ResultsType results; + +class FairThreadPoolTest : public testing::Test { + public: + + void SetUp() override + { + results.clear(); + threadPool = new FairThreadPool(1, 1, 0, 0); + } + + + FairThreadPool* threadPool; +}; + +class TestFunctor: public FairThreadPool::Functor +{ + public: + TestFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestFunctor() {}; + int operator()() override + { + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; +}; + +class TestRescheduleFunctor: public FairThreadPool::Functor +{ + public: + TestRescheduleFunctor(const size_t id, const size_t delay): id_(id), delay_(delay) + { + } + ~TestRescheduleFunctor() {}; + int operator()() override + { + if (firstRun) + { + firstRun = false; + return 1; // re-schedule the Job + } + usleep(delay_); + results.push_back(id_); + return 0; + } + private: + size_t id_; + size_t delay_; + bool firstRun = true; +}; + +testing::AssertionResult isThisOrThat(const ResultsType& arr, const size_t idxA, const int a, const size_t idxB, const int b) +{ + if (arr.empty() || arr.size() <= max(idxA, idxB)) + return testing::AssertionFailure() << "The supplied vector is either empty or not big enough."; + if (arr[idxA] == a && arr[idxB] == b) + return testing::AssertionSuccess(); + if (arr[idxA] == b && arr[idxB] == a) + return testing::AssertionSuccess(); + return testing::AssertionFailure() << "The values at positions "<< idxA << " " << idxB + << " are not " << a << " and " << b << std::endl; +} + +TEST_F(FairThreadPoolTest, FairThreadPoolAdd) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 50000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 5000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 5000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); + EXPECT_EQ(results[2], 2); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolRemove) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 1, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 2, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + threadPool->removeJobs(job2.id_); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 2ULL); + EXPECT_EQ(results[0], 1); + EXPECT_EQ(results[1], 3); +} + +TEST_F(FairThreadPoolTest, FairThreadPoolReschedule) +{ + SP_UM_IOSOCK sock(new messageqcpp::IOSocket); + auto functor1 = boost::shared_ptr(new TestFunctor(1, 100000)); + FairThreadPool::Job job1(1, 1, 1, functor1, sock, 1, 0, 1); + auto functor2 = boost::shared_ptr(new TestFunctor(2, 50000)); + FairThreadPool::Job job2(2, 1, 2, functor2, sock, 1, 0, 2); + auto functor3 = boost::shared_ptr(new TestFunctor(3, 50000)); + FairThreadPool::Job job3(3, 1, 3, functor3, sock, 1, 0, 3); + + threadPool->addJob(job1); + threadPool->addJob(job2); + threadPool->addJob(job3); + + while (threadPool->queueSize()) + { + usleep(250000); + } + + EXPECT_EQ(threadPool->queueSize(), 0ULL); + EXPECT_EQ(results.size(), 3ULL); + EXPECT_EQ(results[0], 1); + EXPECT_TRUE(isThisOrThat(results, 1, 2, 2, 3)); +} \ No newline at end of file diff --git a/utils/common/statistics.h b/utils/common/statistics.h index d57f210a1..7ec3e319c 100644 --- a/utils/common/statistics.h +++ b/utils/common/statistics.h @@ -98,7 +98,8 @@ class StatisticsManager std::map keyTypes; StatisticsManager() : epoch(0), version(1) { - IDBPolicy::init(true, false, "", 0); + // Initialize plugins. + IDBPolicy::configIDBPolicy(); } std::unique_ptr convertStatsToDataStream(uint64_t& dataStreamSize); diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index 1faf18211..41301535b 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -1900,8 +1900,8 @@ void RowAggregation::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int6 // rowIn(in) - Row to be included in aggregation. // colIn(in) - column in the input row group // colOut(in) - column in the output row group stores the count -// colAux(in) - column in the output row group stores the sum(x) -// colAux + 1 - column in the output row group stores the sum(x**2) +// colAux(in) - column in the output row group stores the mean(x) +// colAux + 1 - column in the output row group stores the sum(x_i - mean)^2 //------------------------------------------------------------------------------ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux) { @@ -1960,9 +1960,17 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu break; } - fRow.setDoubleField(fRow.getDoubleField(colOut) + 1.0, colOut); - fRow.setLongDoubleField(fRow.getLongDoubleField(colAux) + valIn, colAux); - fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + valIn * valIn, colAux + 1); + double count = fRow.getDoubleField(colOut) + 1.0; + long double mean = fRow.getLongDoubleField(colAux); + long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1); + volatile long double delta = valIn - mean; + mean += delta/count; + scaledMomentum2 += delta * (valIn - mean); + + + fRow.setDoubleField(count, colOut); + fRow.setLongDoubleField(mean, colAux); + fRow.setLongDoubleField(scaledMomentum2, colAux + 1); } void RowAggregation::mergeStatistics(const Row& rowIn, uint64_t colOut, uint64_t colAux) @@ -3156,31 +3164,26 @@ void RowAggregationUM::calculateStatisticsFunctions() } else // count > 1 { - long double sum1 = fRow.getLongDoubleField(colAux); - long double sum2 = fRow.getLongDoubleField(colAux + 1); + long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1); uint32_t scale = fRow.getScale(colOut); auto factor = datatypes::scaleDivisor(scale); if (scale != 0) // adjust the scale if necessary { - sum1 /= factor; - sum2 /= factor * factor; + scaledMomentum2 /= factor * factor; } - long double stat = sum1 * sum1 / cnt; - stat = sum2 - stat; - if (fFunctionCols[i]->fStatsFunction == ROWAGG_STDDEV_POP) - stat = sqrt(stat / cnt); + scaledMomentum2 = sqrt(scaledMomentum2 / cnt); else if (fFunctionCols[i]->fStatsFunction == ROWAGG_STDDEV_SAMP) - stat = sqrt(stat / (cnt - 1)); + scaledMomentum2 = sqrt(scaledMomentum2 / (cnt - 1)); else if (fFunctionCols[i]->fStatsFunction == ROWAGG_VAR_POP) - stat = stat / cnt; + scaledMomentum2 = scaledMomentum2 / cnt; else if (fFunctionCols[i]->fStatsFunction == ROWAGG_VAR_SAMP) - stat = stat / (cnt - 1); + scaledMomentum2 = scaledMomentum2 / (cnt - 1); - fRow.setDoubleField(stat, colOut); + fRow.setDoubleField(scaledMomentum2, colOut); } } } @@ -4281,18 +4284,39 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, // Update the sum and count fields for stattistics if input is not null. // rowIn(in) - Row to be included in aggregation. // colIn(in) - column in the input row group stores the count/logical block -// colIn + 1 - column in the input row group stores the sum(x)/logical block -// colIn + 2 - column in the input row group stores the sum(x**2)/logical block +// colIn + 1 - column in the input row group stores the mean(x)/logical block +// colIn + 2 - column in the input row group stores the sum(x_i - mean)^2/logical block // colOut(in) - column in the output row group stores the count -// colAux(in) - column in the output row group stores the sum(x) -// colAux + 1 - column in the output row group stores the sum(x**2) +// colAux(in) - column in the output row group stores the mean(x) +// colAux + 1 - column in the output row group stores the sum(x_i - mean)^2 //------------------------------------------------------------------------------ void RowAggregationUMP2::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux) { - fRow.setDoubleField(fRow.getDoubleField(colOut) + rowIn.getDoubleField(colIn), colOut); - fRow.setLongDoubleField(fRow.getLongDoubleField(colAux) + rowIn.getLongDoubleField(colIn + 1), colAux); - fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + rowIn.getLongDoubleField(colIn + 2), - colAux + 1); + double count = fRow.getDoubleField(colOut); + long double mean = fRow.getLongDoubleField(colAux); + long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1); + + double blockCount = rowIn.getDoubleField(colIn); + long double blockMean = rowIn.getLongDoubleField(colIn + 1); + long double blockScaledMomentum2 = rowIn.getLongDoubleField(colIn + 2); + + double nextCount = count + blockCount; + long double nextMean; + long double nextScaledMomentum2; + if (nextCount == 0) + { + nextMean = 0; + nextScaledMomentum2 = 0; + } + else + { + volatile long double delta = mean - blockMean; + nextMean = (mean * count + blockMean * blockCount) / nextCount; + nextScaledMomentum2 = scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount); + } + fRow.setDoubleField(nextCount, colOut); + fRow.setLongDoubleField(nextMean, colAux); + fRow.setLongDoubleField(nextScaledMomentum2, colAux + 1); } //------------------------------------------------------------------------------ diff --git a/utils/threadpool/CMakeLists.txt b/utils/threadpool/CMakeLists.txt index 519ba3b5c..dd96304de 100644 --- a/utils/threadpool/CMakeLists.txt +++ b/utils/threadpool/CMakeLists.txt @@ -4,11 +4,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ) ########### next target ############### -set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp) - +set(threadpool_LIB_SRCS weightedthreadpool.cpp threadpool.cpp prioritythreadpool.cpp fair_threadpool.cpp) add_library(threadpool SHARED ${threadpool_LIB_SRCS}) - add_dependencies(threadpool loggingcpp) target_link_libraries(threadpool Boost::chrono) - -install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) +install(TARGETS threadpool DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp new file mode 100644 index 000000000..44d9cd3a9 --- /dev/null +++ b/utils/threadpool/fair_threadpool.cpp @@ -0,0 +1,295 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +using namespace std; + +#include "messageobj.h" +#include "messagelog.h" +#include "threadnaming.h" +using namespace logging; + +#include "fair_threadpool.h" +using namespace boost; + +#include "dbcon/joblist/primitivemsg.h" + +namespace threadpool +{ +FairThreadPool::FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, + uint ID) + : weightPerRun(targetWeightPerRun), id(ID) +{ + boost::thread* newThread; + size_t numberOfThreads = highThreads + midThreads + lowThreads; + for (uint32_t i = 0; i < numberOfThreads; ++i) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + } + cout << "FairThreadPool started " << numberOfThreads << " thread/-s.\n"; + threadCounts_.store(numberOfThreads, std::memory_order_relaxed); + defaultThreadCounts = numberOfThreads; +} + +FairThreadPool::~FairThreadPool() +{ + stop(); +} + +void FairThreadPool::addJob(const Job& job) +{ + addJob_(job); +} + +void FairThreadPool::addJob_(const Job& job, bool useLock) +{ + boost::thread* newThread; + std::unique_lock lk(mutex, std::defer_lock_t()); + + // Create any missing threads + if (defaultThreadCounts != threadCounts_.load(std::memory_order_relaxed)) + { + newThread = threads.create_thread(ThreadHelper(this, PriorityThreadPool::Priority::HIGH)); + newThread->detach(); + threadCounts_.fetch_add(1, std::memory_order_relaxed); + } + + if (useLock) + lk.lock(); + + auto jobsListMapIter = txn2JobsListMap_.find(job.txnIdx_); + if (jobsListMapIter == txn2JobsListMap_.end()) // there is no txn in the map + { + ThreadPoolJobsList* jobsList = new ThreadPoolJobsList; + jobsList->push_back(job); + txn2JobsListMap_[job.txnIdx_] = jobsList; + weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); + } + else // txn is in the map + { + if (jobsListMapIter->second->empty()) // there are no jobs for the txn + { + weightedTxnsQueue_.push({job.weight_, job.txnIdx_}); + } + jobsListMapIter->second->push_back(job); + } + + if (useLock) + newJob.notify_one(); +} + +void FairThreadPool::removeJobs(uint32_t id) +{ + std::unique_lock lk(mutex); + + for (auto& txnJobsMapPair : txn2JobsListMap_) + { + ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; + auto job = txnJobsList->begin(); + while (job != txnJobsList->end()) + { + if (job->id_ == id) + { + job = txnJobsList->erase(job); // update the job iter + if (txnJobsList->empty()) + { + txn2JobsListMap_.erase(txnJobsMapPair.first); + delete txnJobsList; + break; + // There is no clean-up for PQ. It will happen later in threadFcn + } + continue; // go-on skiping job iter increment + } + ++job; + } + } +} + +void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue) +{ + utils::setThreadName("Idle"); + RunListT runList(1); // This is a vector to allow to grab multiple jobs + RescheduleVecType reschedule; + bool running = false; + bool rescheduleJob = false; + + try + { + while (!stop_.load(std::memory_order_relaxed)) + { + runList.clear(); // remove the job + std::unique_lock lk(mutex); + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); + continue; // just go on w/o re-taking the lock + } + + WeightedTxnT weightedTxn = weightedTxnsQueue_.top(); + auto txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + // Looking for non-empty jobsList in a loop + // The loop waits on newJob cond_var if PQ is empty(no jobs in this thread pool) + while (txnAndJobListPair == txn2JobsListMap_.end() || txnAndJobListPair->second->empty()) + { + // JobList is empty. This can happen when this method pops the last Job. + if (txnAndJobListPair != txn2JobsListMap_.end()) + { + ThreadPoolJobsList* txnJobsList = txnAndJobListPair->second; + delete txnJobsList; + txn2JobsListMap_.erase(txnAndJobListPair->first); + } + weightedTxnsQueue_.pop(); + if (weightedTxnsQueue_.empty()) // remove the empty + { + break; + } + weightedTxn = weightedTxnsQueue_.top(); + txnAndJobListPair = txn2JobsListMap_.find(weightedTxn.second); + } + + if (weightedTxnsQueue_.empty()) + { + newJob.wait(lk); // might need a lock here + continue; + } + + // We have non-empty jobsList at this point. + // Remove the txn from a queue first to add it later + weightedTxnsQueue_.pop(); + TransactionIdxT txnIdx = txnAndJobListPair->first; + ThreadPoolJobsList* jobsList = txnAndJobListPair->second; + runList.push_back(jobsList->front()); + + jobsList->pop_front(); + // Add the jobList back into the PQ adding some weight to it + // Current algo doesn't reduce total txn weight if the job is rescheduled. + if (!jobsList->empty()) + { + weightedTxnsQueue_.push({weightedTxn.first + runList[0].weight_, txnIdx}); + } + + lk.unlock(); + + running = true; + jobsRunning_.fetch_add(1, std::memory_order_relaxed); + rescheduleJob = (*(runList[0].functor_))(); // run the functor + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + running = false; + + utils::setThreadName("Idle"); + + if (rescheduleJob) + { + // to avoid excessive CPU usage waiting for data from storage + usleep(500); + lk.lock(); + addJob_(runList[0], false); + newJob.notify_one(); + lk.unlock(); + } + } + } + catch (std::exception& ex) + { + if (running) + { + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + } + // Log the exception and exit this thread + try + { + threadCounts_.fetch_sub(1, std::memory_order_relaxed); +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("threadFcn: Caught exception: "); + args.add(ex.what()); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } + catch (...) + { + // Log the exception and exit this thread + try + { + if (running) + { + jobsRunning_.fetch_sub(1, std::memory_order_relaxed); + } + threadCounts_.fetch_sub(1, std::memory_order_relaxed); + ; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(6); + args.add("threadFcn: Caught unknown exception!"); + + message.format(args); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage(message); +#endif + + if (running) + sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } + catch (...) + { + } + } +} + +void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) +{ + ISMPacketHeader ism; + PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; + + ism.Status = logging::primitiveServerErr; + ph.UniqueID = id; + ph.StepID = step; + messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + msg.append((uint8_t*)&ism, sizeof(ism)); + msg.append((uint8_t*)&ph, sizeof(ph)); + + sock->write(msg); +} + +void FairThreadPool::stop() +{ + stop_.store(true, std::memory_order_relaxed); +} + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h new file mode 100644 index 000000000..ab3afaa71 --- /dev/null +++ b/utils/threadpool/fair_threadpool.h @@ -0,0 +1,165 @@ +/* Copyright (c) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "primitives/primproc/umsocketselector.h" +#include "prioritythreadpool.h" + +namespace threadpool +{ +// The idea of this thread pool is to run morsel jobs(primitive job) is to equaly distribute CPU time +// b/w multiple parallel queries(thread maps morsel to query using txnId). Query(txnId) has its weight +// stored in PriorityQueue that thread increases before run another morsel for the query. When query is +// done(ThreadPoolJobsList is empty) it is removed from PQ and the Map(txn to ThreadPoolJobsList). +// I tested multiple morsels per one loop iteration in ::threadFcn. This approach reduces CPU consumption +// and increases query timings. +class FairThreadPool +{ + public: + using Functor = PriorityThreadPool::Functor; + + using TransactionIdxT = uint32_t; + struct Job + { + Job() : weight_(1), priority_(0), id_(0) + { + } + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, + const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) + : uniqueID_(uniqueID) + , stepID_(stepID) + , txnIdx_(txnIdx) + , functor_(functor) + , sock_(sock) + , weight_(weight) + , priority_(priority) + , id_(id) + { + } + uint32_t uniqueID_; + uint32_t stepID_; + TransactionIdxT txnIdx_; + boost::shared_ptr functor_; + primitiveprocessor::SP_UM_IOSOCK sock_; + uint32_t weight_; + uint32_t priority_; + uint32_t id_; + }; + + /********************************************* + * ctor/dtor + * + *********************************************/ + + /** @brief ctor + */ + + FairThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads, uint id = 0); + virtual ~FairThreadPool(); + + void removeJobs(uint32_t id); + void addJob(const Job& job); + void stop(); + + /** @brief for use in debugging + */ + void dump(); + + size_t queueSize() const + { + return weightedTxnsQueue_.size(); + } + // This method enables a pool current workload estimate. + size_t jobsRunning() const + { + return jobsRunning_.load(std::memory_order_relaxed); + } + + protected: + private: + struct ThreadHelper + { + ThreadHelper(FairThreadPool* impl, PriorityThreadPool::Priority queue) : ptp(impl), preferredQueue(queue) + { + } + void operator()() + { + ptp->threadFcn(preferredQueue); + } + FairThreadPool* ptp; + PriorityThreadPool::Priority preferredQueue; + }; + + explicit FairThreadPool(); + explicit FairThreadPool(const FairThreadPool&); + FairThreadPool& operator=(const FairThreadPool&); + + void addJob_(const Job& job, bool useLock = true); + void threadFcn(const PriorityThreadPool::Priority preferredQueue); + void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); + + uint32_t defaultThreadCounts; + std::mutex mutex; + std::condition_variable newJob; + boost::thread_group threads; + uint32_t weightPerRun; + volatile uint id; // prevent it from being optimized out + + using WeightT = uint32_t; + using WeightedTxnT = std::pair; + using WeightedTxnVec = std::vector; + struct PrioQueueCmp + { + bool operator()(WeightedTxnT lhs, WeightedTxnT rhs) + { + if (lhs.first == rhs.first) + return lhs.second > rhs.second; + return lhs.first > rhs.first; + } + }; + using RunListT = std::vector; + using RescheduleVecType = std::vector; + using WeightedTxnPrioQueue = std::priority_queue; + using ThreadPoolJobsList = std::list; + using Txn2ThreadPoolJobsListMap = std::unordered_map; + Txn2ThreadPoolJobsListMap txn2JobsListMap_; + WeightedTxnPrioQueue weightedTxnsQueue_; + std::atomic jobsRunning_{0}; + std::atomic threadCounts_{0}; + std::atomic stop_{false}; +}; + +} // namespace threadpool \ No newline at end of file diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index 7608b4166..fbd479966 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -53,8 +53,6 @@ class PriorityThreadPool virtual int operator()() = 0; }; - // typedef boost::function0 Functor; - struct Job { Job() : weight(1), priority(0), id(0) diff --git a/utils/windowfunction/wf_stats.cpp b/utils/windowfunction/wf_stats.cpp index d4df1b995..73a668acf 100644 --- a/utils/windowfunction/wf_stats.cpp +++ b/utils/windowfunction/wf_stats.cpp @@ -139,10 +139,10 @@ WindowFunctionType* WF_stats::clone() const template void WF_stats::resetData() { - fSum1 = 0; - fSum2 = 0; - fCount = 0; - fStats = 0.0; + mean_ = 0; + scaledMomentum2_ = 0; + count_ = 0; + stats_ = 0.0; WindowFunctionType::resetData(); } @@ -171,51 +171,46 @@ void WF_stats::operator()(int64_t b, int64_t e, int64_t c) if (fRow.isNullValue(colIn) == true) continue; - + // Welford's single-pass algorithm T valIn; getValue(colIn, valIn, &cdt); long double val = (long double)valIn; - - fSum1 += val; - fSum2 += val * val; - fCount++; + count_++; + long double delta = val - mean_; + mean_ += delta/count_; + scaledMomentum2_ += delta * (val - mean_); } - if (fCount > 1) + if (count_ > 1) { uint32_t scale = fRow.getScale(colIn); auto factor = datatypes::scaleDivisor(scale); - long double ldSum1 = fSum1; - long double ldSum2 = fSum2; + long double stat = scaledMomentum2_; // adjust the scale if necessary if (scale != 0 && cdt != CalpontSystemCatalog::LONGDOUBLE) { - ldSum1 /= factor; - ldSum2 /= factor * factor; + stat /= factor * factor; } - long double stat = ldSum1 * ldSum1 / fCount; - stat = ldSum2 - stat; - if (fFunctionId == WF__STDDEV_POP) - stat = sqrt(stat / fCount); + stat = sqrt(stat / count_); else if (fFunctionId == WF__STDDEV_SAMP) - stat = sqrt(stat / (fCount - 1)); + stat = sqrt(stat / (count_ - 1)); else if (fFunctionId == WF__VAR_POP) - stat = stat / fCount; + stat = stat / count_; else if (fFunctionId == WF__VAR_SAMP) - stat = stat / (fCount - 1); + stat = stat / (count_ - 1); - fStats = (double)stat; + stats_ = (double)stat; } } - if (fCount == 0) + if (count_ == 0) { setValue(CalpontSystemCatalog::DOUBLE, b, e, c, (double*)NULL); } - else if (fCount == 1) + else if (count_ == 1) { if (fFunctionId == WF__STDDEV_SAMP || fFunctionId == WF__VAR_SAMP) { @@ -229,7 +224,7 @@ void WF_stats::operator()(int64_t b, int64_t e, int64_t c) } else { - setValue(CalpontSystemCatalog::DOUBLE, b, e, c, &fStats); + setValue(CalpontSystemCatalog::DOUBLE, b, e, c, &stats_); } fPrev = c; diff --git a/utils/windowfunction/wf_stats.h b/utils/windowfunction/wf_stats.h index a443ad322..b5c98830d 100644 --- a/utils/windowfunction/wf_stats.h +++ b/utils/windowfunction/wf_stats.h @@ -40,10 +40,10 @@ class WF_stats : public WindowFunctionType static boost::shared_ptr makeFunction(int, const string&, int, WindowFunctionColumn*); protected: - long double fSum1; - long double fSum2; - uint64_t fCount; - double fStats; + long double mean_; + long double scaledMomentum2_; + uint64_t count_; + double stats_; }; } // namespace windowfunction