diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index e310d3d14..0b8b276a9 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -154,6 +154,7 @@ CrossEngineStep::CrossEngineStep( fRowsPerGroup(256), fOutputDL(NULL), fOutputIterator(0), + fRunner(0), fEndOfResult(false), fSchema(schema), fTable(table), @@ -439,14 +440,14 @@ void CrossEngineStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void CrossEngineStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/crossenginestep.h b/dbcon/joblist/crossenginestep.h index a0f21cc50..1d4772165 100644 --- a/dbcon/joblist/crossenginestep.h +++ b/dbcon/joblist/crossenginestep.h @@ -181,7 +181,7 @@ protected: CrossEngineStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle OIDVector fOIDVector; bool fEndOfResult; bool fRunExecuted; diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index 856440ddf..ab125c904 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -58,7 +58,7 @@ namespace joblist { DiskJoinStep::DiskJoinStep() { } DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bool lastOne) : JobStep(*t), thjs(t), - joinerIndex(joinIndex), closedOutput(false) + mainThread(0), joinerIndex(joinIndex), closedOutput(false) { /* grab all relevant vars from THJS @@ -130,9 +130,10 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bo DiskJoinStep::~DiskJoinStep() { abort(); - if (mainThread) { - mainThread->join(); - mainThread.reset(); + if (mainThread) + { + jobstepThreadPool.join(mainThread); + mainThread = 0; } if (jp) atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); @@ -151,13 +152,16 @@ void DiskJoinStep::loadExistingData(vector &data) void DiskJoinStep::run() { - mainThread.reset(new boost::thread(Runner(this))); + mainThread = jobstepThreadPool.invoke(Runner(this)); } void DiskJoinStep::join() { if (mainThread) - mainThread->join(); + { + jobstepThreadPool.join(mainThread); + mainThread = 0; + } if (jp) { atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); //int64_t memUsage; @@ -479,12 +483,12 @@ void DiskJoinStep::mainRunner() loadFIFO.reset(new FIFO >(1, 1)); // double buffering should be good enough buildFIFO.reset(new FIFO >(1, 1)); - loadThread.reset(new boost::thread(Loader(this))); - buildThread.reset(new boost::thread(Builder(this))); - joinThread.reset(new boost::thread(Joiner(this))); - loadThread->join(); - buildThread->join(); - joinThread->join(); + std::vector thrds; + thrds.reserve(3); + thrds.push_back(jobstepThreadPool.invoke(Loader(this))); + thrds.push_back(jobstepThreadPool.invoke(Builder(this))); + thrds.push_back(jobstepThreadPool.invoke(Joiner(this))); + jobstepThreadPool.join(thrds); } } CATCH_AND_LOG; diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index a0524e805..7b8786bc7 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -49,7 +49,6 @@ class DiskJoinStep : public JobStep boost::shared_array LOMapping, SOMapping, SjoinFEMapping, LjoinFEMapping; TupleHashJoinStep *thjs; - boost::shared_ptr runner; boost::shared_ptr fe; bool typeless; JoinType joinType; @@ -69,7 +68,7 @@ class DiskJoinStep : public JobStep bool lastLargeIteration; uint32_t largeIterationCount; - boost::shared_ptr mainThread; + uint64_t mainThread; // thread handle from thread pool /* Loader structs */ struct LoaderOutput { @@ -86,8 +85,6 @@ class DiskJoinStep : public JobStep }; void loadFcn(); - boost::shared_ptr loadThread; - /* Builder structs */ struct BuilderOutput { boost::shared_ptr tupleJoiner; @@ -104,7 +101,6 @@ class DiskJoinStep : public JobStep DiskJoinStep *djs; }; void buildFcn(); - boost::shared_ptr buildThread; /* Joining structs */ struct Joiner { @@ -113,7 +109,6 @@ class DiskJoinStep : public JobStep DiskJoinStep *djs; }; void joinFcn(); - boost::shared_ptr joinThread; // limits & usage boost::shared_ptr smallUsage; diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index e4da0e568..d82689b06 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -108,6 +108,35 @@ JobList::~JobList() joiners[i]->join(); delete joiners[i]; } +#if 0 + // Stop all the query steps + end = fQuery.end(); + for (iter = fQuery.begin(); iter != end; ++iter) + { + (*iter)->abort(); + } + + // Stop all the projection steps + end = fProject.end(); + for (iter = fProject.begin(); iter != end; ++iter) + { + (*iter)->abort(); + } + + // Wait for all the query steps to end + end = fQuery.end(); + for (iter = fQuery.begin(); iter != end; ++iter) + { + (*iter)->join(); + } + + // Wait for all the projection steps to end + end = fProject.end(); + for (iter = fProject.begin(); iter != end; ++iter) + { + (*iter)->join(); + } +#endif } } catch (exception& ex) diff --git a/dbcon/joblist/joblist.vpj b/dbcon/joblist/joblist.vpj index 20b1d6278..cbf9aa13a 100644 --- a/dbcon/joblist/joblist.vpj +++ b/dbcon/joblist/joblist.vpj @@ -248,6 +248,7 @@ + + toString(); @@ -100,6 +102,7 @@ JobStep::JobStep(const JobInfo& j) : fQtc.serverParms(tsp); //fStepUuid = bu::random_generator()(); fStepUuid = QueryTeleClient::genUUID(); + jobstepThreadPool.setDebug(true); } //------------------------------------------------------------------------------ diff --git a/dbcon/joblist/jobstep.h b/dbcon/joblist/jobstep.h index 7dccd30d3..899a131d7 100644 --- a/dbcon/joblist/jobstep.h +++ b/dbcon/joblist/jobstep.h @@ -42,7 +42,7 @@ #include "timestamp.h" #include "rowgroup.h" #include "querytele.h" - +#include "threadpool.h" #include "atomicops.h" #include "branchpred.h" @@ -53,6 +53,7 @@ # endif #endif +using namespace threadpool; namespace joblist { @@ -234,6 +235,7 @@ public: void onClauseFilter(bool b) { fOnClauseFilter = b; } protected: + static ThreadPool jobstepThreadPool; //@bug6088, for telemetry posting static const int64_t STEP_TELE_INTERVAL = 5000; // now, this is the browser refresh rate diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index f2e09eac2..960458585 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -134,6 +134,8 @@ pDictionaryScan::pDictionaryScan( sendWaiting(false), ridCount(0), ridList(0), + pThread(0), + cThread(0), colType(ct), fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()), fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()), @@ -214,12 +216,12 @@ void pDictionaryScan::initializeConfigParms() void pDictionaryScan::startPrimitiveThread() { - pThread.reset(new boost::thread(pDictionaryScanPrimitive(this))); + pThread = jobstepThreadPool.invoke(pDictionaryScanPrimitive(this)); } void pDictionaryScan::startAggregationThread() { - cThread.reset(new boost::thread(pDictionaryScanAggregator(this))); + cThread = jobstepThreadPool.invoke(pDictionaryScanAggregator(this)); } void pDictionaryScan::run() @@ -243,8 +245,8 @@ void pDictionaryScan::run() void pDictionaryScan::join() { - pThread->join(); - cThread->join(); + jobstepThreadPool.join(pThread); + jobstepThreadPool.join(cThread); if (isEquality && fDec) { destroyEqualityFilter(); isEquality = false; diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index c1594b73a..5e1006e2c 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -321,8 +321,8 @@ private: BRM::DBRM dbrm; - boost::shared_ptr cThread; //consumer thread - boost::shared_ptr pThread; //producer thread + // boost::shared_ptr cThread; //consumer thread + // boost::shared_ptr pThread; //producer thread boost::mutex mutex; boost::condition condvar; boost::condition flushed; @@ -772,8 +772,8 @@ private: DataList *ridList; messageqcpp::ByteStream fFilterString; execplan::CalpontSystemCatalog::ColType colType; - boost::shared_ptr pThread; //producer thread - boost::shared_ptr cThread; //producer thread + uint64_t pThread; //producer thread. thread pool handle + uint64_t cThread; //consumer thread. thread pool handle DataList_t* requestList; //StringDataList* stringList; boost::mutex mutex; @@ -1036,8 +1036,6 @@ protected: private: void formatMiniStats(); - typedef boost::shared_ptr SPTHD; - typedef boost::shared_array SATHD; void startPrimitiveThread(); void startAggregationThread(); void initializeConfigParms(); @@ -1060,7 +1058,7 @@ private: uint32_t fNumThreads; PrimitiveStepType ffirstStepType; bool isFilterFeeder; - SATHD fProducerThread; + std::vector fProducerThreads; // thread pool handles messageqcpp::ByteStream fFilterString; uint32_t fFilterCount; execplan::CalpontSystemCatalog::ColType fColType; @@ -1097,8 +1095,8 @@ private: uint64_t fMsgBytesOut; // total byte count for outcoming messages uint64_t fBlockTouched; // total blocks touched uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File - boost::shared_ptr cThread; //consumer thread - boost::shared_ptr pThread; //producer thread + // uint64_t cThread; //consumer thread. thread handle from thread pool + uint64_t pThread; //producer thread. thread handle from thread pool boost::mutex tplMutex; boost::mutex dlMutex; boost::mutex cpMutex; @@ -1251,7 +1249,7 @@ private: execplan::CalpontSystemCatalog::OID fTableOID; execplan::CalpontSystemCatalog::ColType fColType; int8_t fBOP; - boost::shared_ptr runner; // @bug 686 + // int64_t runner; // thread handle from thread pool // @bug 687 Add data and friend declarations for concurrent filter steps. std::vector fSortedA; // used to internally sort input data diff --git a/dbcon/joblist/subquerystep.cpp b/dbcon/joblist/subquerystep.cpp index fa3c6bfc1..3d227d05d 100644 --- a/dbcon/joblist/subquerystep.cpp +++ b/dbcon/joblist/subquerystep.cpp @@ -150,6 +150,7 @@ SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo) , fEndOfResult(false) , fInputIterator(0) , fOutputIterator(0) + , fRunner(0) { fAlias = s->alias(); fView = s->view(); @@ -191,14 +192,14 @@ void SubAdapterStep::run() if (fDelivery) fOutputIterator = fOutputDL->getIterator(); - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void SubAdapterStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/subquerystep.h b/dbcon/joblist/subquerystep.h index 4455e5915..f97cd501a 100644 --- a/dbcon/joblist/subquerystep.h +++ b/dbcon/joblist/subquerystep.h @@ -230,7 +230,7 @@ protected: SubAdapterStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle boost::scoped_ptr fExpression; }; diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 166842595..ad5e90b82 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -180,14 +180,13 @@ void TupleBPS::initializeConfigParms() else fMaxNumThreads = 1; - fProducerThread.reset(new SPTHD[fMaxNumThreads]); - // Make maxnum thread objects even if they don't get used to make join() safe. - for (uint32_t i = 0; i < fMaxNumThreads; i++) - fProducerThread[i].reset(new thread()); + // Reserve the max number of thread space. A bit of an optimization. + fProducerThreads.clear(); + fProducerThreads.reserve(fMaxNumThreads); } TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) : - BatchPrimitive(jobInfo), fRm(jobInfo.rm) + BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm) { fInputJobStepAssociation = rhs.inputAssociation(); fOutputJobStepAssociation = rhs.outputAssociation(); @@ -800,7 +799,7 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts) void TupleBPS::startPrimitiveThread() { - pThread.reset(new boost::thread(TupleBPSPrimitive(this))); + pThread = jobstepThreadPool.invoke(TupleBPSPrimitive(this)); } void TupleBPS::startAggregationThread() @@ -809,13 +808,13 @@ void TupleBPS::startAggregationThread() // fMaxNumThreads = 1; // fNumThreads = fMaxNumThreads; // for (uint32_t i = 0; i < fMaxNumThreads; i++) -// fProducerThread[i].reset(new boost::thread(TupleBPSAggregators(this, i))); +// fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i))); // This block of code starts one thread at a time if (fNumThreads >= fMaxNumThreads) return; fNumThreads++; - fProducerThread[fNumThreads-1].reset(new boost::thread(TupleBPSAggregators(this, fNumThreads-1))); + fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads-1))); } //#include "boost/date_time/posix_time/posix_time.hpp" @@ -1117,6 +1116,8 @@ void TupleBPS::run() serializeJoiner(); prepCasualPartitioning(); startPrimitiveThread(); + fProducerThreads.clear(); + fProducerThreads.reserve(fMaxNumThreads); startAggregationThread(); } catch (const std::exception& e) @@ -1153,10 +1154,10 @@ void TupleBPS::join() } if (pThread) - pThread->join(); + jobstepThreadPool.join(pThread); + + jobstepThreadPool.join(fProducerThreads); - for (uint32_t i = 0; i < fMaxNumThreads; i++) - fProducerThread[i]->join(); if (BPPIsAllocated) { ByteStream bs; fDec->removeDECEventListener(this); diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 5c4bdba0b..8f57d5e95 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -182,6 +182,7 @@ TupleAggregateStep::TupleAggregateStep( fAggregator(agg), fRowGroupOut(rgOut), fRowGroupIn(rgIn), + fRunner(0), fUmOnly(false), fRm(jobInfo.rm), fBucketNum(0), @@ -252,7 +253,7 @@ void TupleAggregateStep::run() { if (fDelivery == false) { - fRunner.reset(new thread(Aggregator(this))); + fRunner = jobstepThreadPool.invoke(Aggregator(this)); } } @@ -260,7 +261,7 @@ void TupleAggregateStep::run() void TupleAggregateStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } @@ -4210,8 +4211,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) // maximum number is reached. if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads && dlIn->more(fInputIter)) { - fFirstPhaseRunners[fFirstPhaseThreadCount].reset - (new boost::thread(ThreadedAggregator(this, fFirstPhaseThreadCount))); + fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount))); fFirstPhaseThreadCount++; } @@ -4482,24 +4482,21 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp fFirstPhaseThreadCount = fNumOfThreads; boost::shared_ptr runner; for (i = 0; i < fNumOfThreads; i++) - { - runner.reset(new boost::thread(ThreadedAggregator(this, i))); - fFirstPhaseRunners.push_back(runner); + { + fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))) } */ // This block of code starts one thread, relies on doThreadedAggregation() // to start more as needed - fFirstPhaseRunners.resize(fNumOfThreads); // to prevent a resize during use + fFirstPhaseRunners.clear(); + fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use fFirstPhaseThreadCount = 1; - for (i = 1; i < fNumOfThreads; i++) - // fill with valid thread objects to make joining work - fFirstPhaseRunners[i].reset(new boost::thread()); - fFirstPhaseRunners[0].reset(new boost::thread(ThreadedAggregator(this, 0))); + fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0))); - for (i = 0; i < fNumOfThreads; i++) - fFirstPhaseRunners[i]->join(); - fFirstPhaseRunners.clear(); + // Now wait for that thread plus all the threads it may have spawned + jobstepThreadPool.join(fFirstPhaseRunners); + fFirstPhaseRunners.clear(); } if (dynamic_cast(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0) @@ -4509,8 +4506,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp { if (!fDoneAggregate) { - vector > runners; - boost::shared_ptr runner; + vector runners; // thread pool handles fRowGroupsDeliveredData.resize(fNumOfBuckets); uint32_t bucketsPerThread = fNumOfBuckets/fNumOfThreads; @@ -4519,13 +4515,12 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp //uint32_t bucketsPerThread = 1; //uint32_t numThreads = fNumOfBuckets; + runners.reserve(numThreads); for (i = 0; i < numThreads; i++) { - runner.reset(new boost::thread(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread))); - runners.push_back(runner); + runners.push_back(jobstepThreadPool.invoke(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread))); } - for (i = 0; i < numThreads; i++) - runners[i]->join(); + jobstepThreadPool.join(runners); } fDoneAggregate = true; diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index c586b4646..0839e8263 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -166,7 +166,7 @@ private: uint32_t bucketCount; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle bool fUmOnly; ResourceManager *fRm; @@ -186,7 +186,7 @@ private: bool fIsMultiThread; int fInputIter; // iterator boost::scoped_array fMemUsage; - vector > fFirstPhaseRunners; + std::vector fFirstPhaseRunners; // thread pool handles uint32_t fFirstPhaseThreadCount; boost::shared_ptr fSessionMemLimit; diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index ea889a14f..7eead22a8 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -106,6 +106,7 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) : fOutputDL(NULL), fInputIterator(0), fOutputIterator(0), + fRunner(0), fRowsProcessed(0), fRowsReturned(0), fLimitStart(0), @@ -205,14 +206,14 @@ void TupleAnnexStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void TupleAnnexStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 69a16734c..2f548f179 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -111,7 +111,7 @@ protected: TupleAnnexStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle uint64_t fRowsProcessed; uint64_t fRowsReturned; diff --git a/dbcon/joblist/tupleconstantstep.cpp b/dbcon/joblist/tupleconstantstep.cpp index 327fd5e77..1b96cd83f 100644 --- a/dbcon/joblist/tupleconstantstep.cpp +++ b/dbcon/joblist/tupleconstantstep.cpp @@ -81,6 +81,7 @@ TupleConstantStep::TupleConstantStep(const JobInfo& jobInfo) : fInputDL(NULL), fOutputDL(NULL), fInputIterator(0), + fRunner(0), fEndOfResult(false) { fExtendedInfo = "TCS: "; @@ -290,7 +291,7 @@ void TupleConstantStep::run() if (fOutputDL == NULL) throw logic_error("Output is not a RowGroup data list."); - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } } @@ -298,7 +299,7 @@ void TupleConstantStep::run() void TupleConstantStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/tupleconstantstep.h b/dbcon/joblist/tupleconstantstep.h index 994e63b05..85472aeca 100644 --- a/dbcon/joblist/tupleconstantstep.h +++ b/dbcon/joblist/tupleconstantstep.h @@ -101,7 +101,7 @@ protected: TupleConstantStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle bool fEndOfResult; }; diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index ef8bb388b..b1b9f8153 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -166,7 +166,7 @@ void TupleHashJoinStep::run() } joiners.resize(smallDLs.size()); - mainRunner.reset(new boost::thread(HJRunner(this))); + mainRunner = jobstepThreadPool.invoke(HJRunner(this)); } void TupleHashJoinStep::join() @@ -175,12 +175,12 @@ void TupleHashJoinStep::join() if (joinRan) return; joinRan = true; - mainRunner->join(); + jobstepThreadPool.join(mainRunner); if (djs) { for (int i = 0; i < (int) djsJoiners.size(); i++) djs[i].join(); - djsReader.join(); - djsRelay.join(); + jobstepThreadPool.join(djsReader); + jobstepThreadPool.join(djsRelay); //cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl; } } @@ -544,9 +544,10 @@ void TupleHashJoinStep::hjRunner() } } + smallRunners.clear(); + smallRunners.reserve(smallDLs.size()); for (i = 0; i < smallDLs.size(); i++) - smallRunners.push_back(boost::shared_ptr - (new boost::thread(SmallRunner(this, i)))); + smallRunners.push_back(jobstepThreadPool.invoke(SmallRunner(this, i))); } catch (thread_resource_error&) { string emsg = "TupleHashJoin caught a thread resource error, aborting...\n"; @@ -557,8 +558,7 @@ void TupleHashJoinStep::hjRunner() deliverMutex.unlock(); } - for (i = 0; i < smallRunners.size(); i++) - smallRunners[i]->join(); + jobstepThreadPool.join(smallRunners); smallRunners.clear(); for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++) @@ -629,9 +629,9 @@ void TupleHashJoinStep::hjRunner() /* If an error happened loading the existing data, these threads are necessary to finish the abort */ try { - djsRelay = boost::thread(DJSRelay(this)); + djsRelay = jobstepThreadPool.invoke(DJSRelay(this)); relay = true; - djsReader = boost::thread(DJSReader(this, smallSideCount)); + djsReader = jobstepThreadPool.invoke(DJSReader(this, smallSideCount)); reader = true; for (i = 0; i < smallSideCount; i++) djs[i].run(); @@ -1091,7 +1091,7 @@ void TupleHashJoinStep::startJoinThreads() bool more = true; RGData oneRG; - if (joinRunners) + if (joinRunners.size() > 0) return; //@bug4836, in error case, stop process, and unblock the next step. @@ -1142,13 +1142,11 @@ void TupleHashJoinStep::startJoinThreads() makeDupList(fe2 ? fe2Output : outputRG); /* Start join runners */ - joinRunners.reset(new boost::shared_ptr[joinThreadCount]); + joinRunners.reserve(joinThreadCount); for (i = 0; i < joinThreadCount; i++) - joinRunners[i].reset(new boost::thread(JoinRunner(this, i))); - + joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i))); /* Join them and call endOfInput */ - for (i = 0; i < joinThreadCount; i++) - joinRunners[i]->join(); + jobstepThreadPool.join(joinRunners); if (lastSmallOuterJoiner != (uint32_t) -1) finishSmallOuterJoin(); diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index d56ad8441..833751396 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -276,8 +276,8 @@ private: uint32_t index; }; - boost::shared_ptr mainRunner; - std::vector > smallRunners; + int64_t mainRunner; // thread handle from thread pool + std::vector smallRunners; // thread handles from thread pool // for notify TupleAggregateStep PM hashjoin // Ideally, hashjoin and delivery communicate with RowGroupDL, @@ -347,7 +347,7 @@ private: void processDupList(uint32_t threadID, rowgroup::RowGroup &ingrp, std::vector *rowData); - boost::scoped_array > joinRunners; + std::vector joinRunners; // thread handles from thread pool boost::mutex inputDLLock, outputDLLock; boost::shared_array > columnMappings, fergMappings; boost::shared_array fe2Mapping; @@ -375,7 +375,7 @@ private: boost::scoped_array djs; boost::scoped_array > fifos; void djsReaderFcn(int index); - boost::thread djsReader; + uint64_t djsReader; // thread handle from thread pool struct DJSReader { DJSReader(TupleHashJoinStep *hj, uint32_t i) : HJ(hj), index(i) { } void operator()() { HJ->djsReaderFcn(index); } @@ -383,7 +383,7 @@ private: uint32_t index; }; - boost::thread djsRelay; + uint64_t djsRelay; // thread handle from thread pool void djsRelayFcn(); struct DJSRelay { DJSRelay(TupleHashJoinStep *hj) : HJ(hj) { } diff --git a/dbcon/joblist/tuplehavingstep.cpp b/dbcon/joblist/tuplehavingstep.cpp index d7196ef85..51e9cf88d 100644 --- a/dbcon/joblist/tuplehavingstep.cpp +++ b/dbcon/joblist/tuplehavingstep.cpp @@ -63,6 +63,7 @@ TupleHavingStep::TupleHavingStep(const JobInfo& jobInfo) : fInputDL(NULL), fOutputDL(NULL), fInputIterator(0), + fRunner(0), fRowsReturned(0), fEndOfResult(false), fFeInstance(funcexp::FuncExp::instance()) @@ -151,7 +152,7 @@ void TupleHavingStep::run() if (fOutputDL == NULL) throw logic_error("Output is not a RowGroup data list."); - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } } @@ -159,7 +160,7 @@ void TupleHavingStep::run() void TupleHavingStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/tuplehavingstep.h b/dbcon/joblist/tuplehavingstep.h index 1e3467b61..e76ca7898 100644 --- a/dbcon/joblist/tuplehavingstep.h +++ b/dbcon/joblist/tuplehavingstep.h @@ -97,7 +97,7 @@ protected: TupleHavingStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle uint64_t fRowsReturned; bool fEndOfResult; diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 370f09d6e..1a2e37d93 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -767,9 +767,9 @@ void TupleUnion::run() } } + runners.reserve(inputs.size()); for (i = 0; i < inputs.size(); i++) { - boost::shared_ptr th(new boost::thread(Runner(this, i))); - runners.push_back(th); + runners.push_back(jobstepThreadPool.invoke(Runner(this, i))); } } @@ -784,8 +784,7 @@ void TupleUnion::join() joinRan = true; lk.unlock(); - for (i = 0; i < runners.size(); i++) - runners[i]->join(); + jobstepThreadPool.join(runners); runners.clear(); uniquer->clear(); rowMemory.clear(); diff --git a/dbcon/joblist/tupleunion.h b/dbcon/joblist/tupleunion.h index 3f4440d11..17498a305 100644 --- a/dbcon/joblist/tupleunion.h +++ b/dbcon/joblist/tupleunion.h @@ -118,7 +118,7 @@ private: Runner(TupleUnion *t, uint32_t in) : tu(t), index(in) { } void operator()() { tu->readInput(index); } }; - std::vector > runners; + std::vector runners; //thread pool handles struct Hasher { TupleUnion *ts; diff --git a/dbcon/joblist/windowfunctionstep.cpp b/dbcon/joblist/windowfunctionstep.cpp index ec37573d8..7f3a8c29d 100644 --- a/dbcon/joblist/windowfunctionstep.cpp +++ b/dbcon/joblist/windowfunctionstep.cpp @@ -140,6 +140,7 @@ namespace joblist WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) : JobStep(jobInfo), + fRunner(0), fCatalog(jobInfo.csc), fRowsReturned(0), fEndOfResult(false), @@ -192,14 +193,14 @@ void WindowFunctionStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void WindowFunctionStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } @@ -855,13 +856,13 @@ void WindowFunctionStep::execute() if (fTotalThreads > fFunctionCount) fTotalThreads = fFunctionCount; + fFunctionThreads.clear(); + fFunctionThreads.reserve(fTotalThreads); for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++) - fFunctionThreads.push_back( - boost::shared_ptr(new boost::thread(WFunction(this)))); + fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this))); - // If cancelled, not all thread is started. - for (uint64_t i = 0; i < fFunctionThreads.size(); i++) - fFunctionThreads[i]->join(); + // If cancelled, not all threads are started. + jobstepThreadPool.join(fFunctionThreads); } if (!(cancelled())) diff --git a/dbcon/joblist/windowfunctionstep.h b/dbcon/joblist/windowfunctionstep.h index 027b6eae3..73d47bacd 100644 --- a/dbcon/joblist/windowfunctionstep.h +++ b/dbcon/joblist/windowfunctionstep.h @@ -153,7 +153,7 @@ private: WindowFunctionStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle boost::shared_ptr fCatalog; uint64_t fRowsReturned; @@ -188,7 +188,7 @@ private: WindowFunctionStep* fStep; }; - std::vector > fFunctionThreads; + std::vector fFunctionThreads; std::vector fRows; std::vector > fFunctions; diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 8ccebcba5..2f6a6d431 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1438,7 +1438,8 @@ int main(int argc, char* argv[]) { IOSocket ios; ios = mqs->accept(); - exeMgrThreadPool.invoke(SessionThread(ios, ec, rm)); + boost::thread thd(SessionThread(ios, ec, rm)); + //exeMgrThreadPool.invoke(SessionThread(ios, ec, rm)); } exeMgrThreadPool.wait(); diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index d4ea02565..644e9d098 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -177,10 +177,10 @@ void ThreadPool::join(std::vector thrHandle) } } -int64_t ThreadPool::invoke(const Functor_T &threadfunc) +uint64_t ThreadPool::invoke(const Functor_T &threadfunc) { boost::mutex::scoped_lock lock1(fMutex); - int64_t thrHandle=0; + uint64_t thrHandle=0; for(;;) { @@ -210,6 +210,22 @@ int64_t ThreadPool::invoke(const Functor_T &threadfunc) lock1.unlock(); fThreads.create_thread(beginThreadFunc(*this)); + + if (fDebug) + { + logging::Message::Args args; + logging::Message message(5); + args.add("invoke: Starting thread "); + args.add(fThreadCount); + args.add(" max "); + args.add(fMaxThreads); + args.add(" queue "); + args.add(fQueueSize); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } if (bAdded) break; @@ -227,7 +243,20 @@ int64_t ThreadPool::invoke(const Functor_T &threadfunc) break; } - fThreadAvailable.wait(lock1); + if (fDebug) + { + logging::Message::Args args; + logging::Message message(5); + args.add("invoke: Blocked waiting for thread. Count "); + args.add(fThreadCount); + args.add("max "); + args.add(fMaxThreads); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + fThreadAvailable.wait(lock1); + } } catch(...) { @@ -358,7 +387,7 @@ void ThreadPool::beginThread() throw() } } -int64_t ThreadPool::addFunctor(const Functor_T &func) +uint64_t ThreadPool::addFunctor(const Functor_T &func) { bool bAtEnd = false; diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index dd5f13b56..7616090fe 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -131,7 +131,7 @@ public: * queueSize tasks already waiting, invoke() will block until a slot in the * queue comes free. */ - EXPORT int64_t invoke(const Functor_T &threadfunc); + EXPORT uint64_t invoke(const Functor_T &threadfunc); /** @brief stop the threads */ @@ -153,13 +153,15 @@ public: */ EXPORT void dump(); + EXPORT void setDebug(bool d) {fDebug = d;} + protected: private: // Used internally to keep a handle associated with each functor for join() struct PoolFunction_T { - int64_t hndl; + uint64_t hndl; Functor_T functor; }; @@ -169,7 +171,7 @@ private: /** @brief add a functor to the list */ - int64_t addFunctor(const Functor_T &func); + uint64_t addFunctor(const Functor_T &func); /** @brief thread entry point */ @@ -222,6 +224,7 @@ private: uint32_t waitingFunctorsSize; uint64_t fNextHandle; + bool fDebug; }; } // namespace threadpool