diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index 9238b80d7..9abfcfe11 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -137,6 +137,7 @@ CrossEngineStep::CrossEngineStep( fRowsPerGroup(256), fOutputDL(NULL), fOutputIterator(0), + fRunner(0), fEndOfResult(false), fSchema(schema), fTable(table), @@ -422,14 +423,14 @@ void CrossEngineStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void CrossEngineStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/crossenginestep.h b/dbcon/joblist/crossenginestep.h index be83f6c2e..bdbba656b 100644 --- a/dbcon/joblist/crossenginestep.h +++ b/dbcon/joblist/crossenginestep.h @@ -182,7 +182,7 @@ protected: CrossEngineStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle OIDVector fOIDVector; bool fEndOfResult; bool fRunExecuted; diff --git a/dbcon/joblist/diskjoinstep.cpp b/dbcon/joblist/diskjoinstep.cpp index 856440ddf..ab125c904 100644 --- a/dbcon/joblist/diskjoinstep.cpp +++ b/dbcon/joblist/diskjoinstep.cpp @@ -58,7 +58,7 @@ namespace joblist { DiskJoinStep::DiskJoinStep() { } DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bool lastOne) : JobStep(*t), thjs(t), - joinerIndex(joinIndex), closedOutput(false) + mainThread(0), joinerIndex(joinIndex), closedOutput(false) { /* grab all relevant vars from THJS @@ -130,9 +130,10 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bo DiskJoinStep::~DiskJoinStep() { abort(); - if (mainThread) { - mainThread->join(); - mainThread.reset(); + if (mainThread) + { + jobstepThreadPool.join(mainThread); + mainThread = 0; } if (jp) atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); @@ -151,13 +152,16 @@ void DiskJoinStep::loadExistingData(vector &data) void DiskJoinStep::run() { - mainThread.reset(new boost::thread(Runner(this))); + mainThread = jobstepThreadPool.invoke(Runner(this)); } void DiskJoinStep::join() { if (mainThread) - mainThread->join(); + { + jobstepThreadPool.join(mainThread); + mainThread = 0; + } if (jp) { atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); //int64_t memUsage; @@ -479,12 +483,12 @@ void DiskJoinStep::mainRunner() loadFIFO.reset(new FIFO >(1, 1)); // double buffering should be good enough buildFIFO.reset(new FIFO >(1, 1)); - loadThread.reset(new boost::thread(Loader(this))); - buildThread.reset(new boost::thread(Builder(this))); - joinThread.reset(new boost::thread(Joiner(this))); - loadThread->join(); - buildThread->join(); - joinThread->join(); + std::vector thrds; + thrds.reserve(3); + thrds.push_back(jobstepThreadPool.invoke(Loader(this))); + thrds.push_back(jobstepThreadPool.invoke(Builder(this))); + thrds.push_back(jobstepThreadPool.invoke(Joiner(this))); + jobstepThreadPool.join(thrds); } } CATCH_AND_LOG; diff --git a/dbcon/joblist/diskjoinstep.h b/dbcon/joblist/diskjoinstep.h index a0524e805..7b8786bc7 100644 --- a/dbcon/joblist/diskjoinstep.h +++ b/dbcon/joblist/diskjoinstep.h @@ -49,7 +49,6 @@ class DiskJoinStep : public JobStep boost::shared_array LOMapping, SOMapping, SjoinFEMapping, LjoinFEMapping; TupleHashJoinStep *thjs; - boost::shared_ptr runner; boost::shared_ptr fe; bool typeless; JoinType joinType; @@ -69,7 +68,7 @@ class DiskJoinStep : public JobStep bool lastLargeIteration; uint32_t largeIterationCount; - boost::shared_ptr mainThread; + uint64_t mainThread; // thread handle from thread pool /* Loader structs */ struct LoaderOutput { @@ -86,8 +85,6 @@ class DiskJoinStep : public JobStep }; void loadFcn(); - boost::shared_ptr loadThread; - /* Builder structs */ struct BuilderOutput { boost::shared_ptr tupleJoiner; @@ -104,7 +101,6 @@ class DiskJoinStep : public JobStep DiskJoinStep *djs; }; void buildFcn(); - boost::shared_ptr buildThread; /* Joining structs */ struct Joiner { @@ -113,7 +109,6 @@ class DiskJoinStep : public JobStep DiskJoinStep *djs; }; void joinFcn(); - boost::shared_ptr joinThread; // limits & usage boost::shared_ptr smallUsage; diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index f70b81541..e5eb86b26 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -28,7 +28,6 @@ using namespace std; #include "joblist.h" - #include "calpontsystemcatalog.h" using namespace execplan; @@ -73,13 +72,17 @@ JobList::JobList(bool isEM) : JobList::~JobList() { - vector joiners; - boost::thread *tmp; try { if (fIsRunning) { - JobStepVector::iterator iter; +#if 0 + // This logic creates a set of threads to wind down the query + vector joiners; + joiners.reserve(20); + NullStep nullStep; // For access to the static jobstepThreadPool. + + JobStepVector::iterator iter; JobStepVector::iterator end; iter = fQuery.begin(); @@ -88,8 +91,7 @@ JobList::~JobList() // Wait for all the query steps to finish while (iter != end) { - tmp = new boost::thread(JSJoiner(iter->get())); - joiners.push_back(tmp); + joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get()))); ++iter; } @@ -99,14 +101,40 @@ JobList::~JobList() // wait for the projection steps while (iter != end) { - tmp = new boost::thread(JSJoiner(iter->get())); - joiners.push_back(tmp); + joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get()))); ++iter; } - for (uint32_t i = 0; i < joiners.size(); i++) { - joiners[i]->join(); - delete joiners[i]; + nullStep.jobstepThreadPool.join(joiners); +#endif + // This logic stops the query steps one at a time + JobStepVector::iterator iter; + JobStepVector::iterator end; + end = fQuery.end(); + for (iter = fQuery.begin(); iter != end; ++iter) + { + (*iter)->abort(); + } + + // Stop all the projection steps + end = fProject.end(); + for (iter = fProject.begin(); iter != end; ++iter) + { + (*iter)->abort(); + } + + // Wait for all the query steps to end + end = fQuery.end(); + for (iter = fQuery.begin(); iter != end; ++iter) + { + (*iter)->join(); + } + + // Wait for all the projection steps to end + end = fProject.end(); + for (iter = fProject.begin(); iter != end; ++iter) + { + (*iter)->join(); } } } diff --git a/dbcon/joblist/joblist.vpj b/dbcon/joblist/joblist.vpj index 20b1d6278..cbf9aa13a 100644 --- a/dbcon/joblist/joblist.vpj +++ b/dbcon/joblist/joblist.vpj @@ -248,6 +248,7 @@ + + using namespace std; +#include #include #include #include @@ -55,6 +56,8 @@ namespace joblist { boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER; +ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0); + ostream& operator<<(ostream& os, const JobStep* rhs) { os << rhs->toString(); diff --git a/dbcon/joblist/jobstep.h b/dbcon/joblist/jobstep.h index 7dccd30d3..00bf243d7 100644 --- a/dbcon/joblist/jobstep.h +++ b/dbcon/joblist/jobstep.h @@ -42,7 +42,7 @@ #include "timestamp.h" #include "rowgroup.h" #include "querytele.h" - +#include "threadpool.h" #include "atomicops.h" #include "branchpred.h" @@ -53,6 +53,7 @@ # endif #endif +using namespace threadpool; namespace joblist { @@ -233,6 +234,7 @@ public: bool onClauseFilter() const { return fOnClauseFilter; } void onClauseFilter(bool b) { fOnClauseFilter = b; } + static ThreadPool jobstepThreadPool; protected: //@bug6088, for telemetry posting @@ -328,6 +330,20 @@ public: virtual bool deliverStringTableRowGroup() const = 0; }; +class NullStep : public JobStep +{ +public: + /** @brief virtual void Run method + */ + virtual void run(){} + /** @brief virtual void join method + */ + virtual void join(){} + /** @brief virtual string toString method + */ + virtual const std::string toString() const {return "NullStep";} +}; + // calls rhs->toString() std::ostream& operator<<(std::ostream& os, const JobStep* rhs); diff --git a/dbcon/joblist/pdictionary.cpp b/dbcon/joblist/pdictionary.cpp index 5397974c0..c85240064 100644 --- a/dbcon/joblist/pdictionary.cpp +++ b/dbcon/joblist/pdictionary.cpp @@ -105,6 +105,8 @@ pDictionaryStep::pDictionaryStep( recvWaiting(false), ridCount(0), fColType(ct), + pThread(0), + cThread(0), fFilterCount(0), requestList(0), fInterval(jobInfo.flushInterval), diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index f2e09eac2..bfb0c27d2 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -134,7 +134,9 @@ pDictionaryScan::pDictionaryScan( sendWaiting(false), ridCount(0), ridList(0), - colType(ct), + colType(ct), + pThread(0), + cThread(0), fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()), fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()), fStopSending(false), @@ -214,12 +216,12 @@ void pDictionaryScan::initializeConfigParms() void pDictionaryScan::startPrimitiveThread() { - pThread.reset(new boost::thread(pDictionaryScanPrimitive(this))); + pThread = jobstepThreadPool.invoke(pDictionaryScanPrimitive(this)); } void pDictionaryScan::startAggregationThread() { - cThread.reset(new boost::thread(pDictionaryScanAggregator(this))); + cThread = jobstepThreadPool.invoke(pDictionaryScanAggregator(this)); } void pDictionaryScan::run() @@ -243,8 +245,8 @@ void pDictionaryScan::run() void pDictionaryScan::join() { - pThread->join(); - cThread->join(); + jobstepThreadPool.join(pThread); + jobstepThreadPool.join(cThread); if (isEquality && fDec) { destroyEqualityFilter(); isEquality = false; diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index c1594b73a..a0f48c3c4 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -321,8 +321,8 @@ private: BRM::DBRM dbrm; - boost::shared_ptr cThread; //consumer thread - boost::shared_ptr pThread; //producer thread + // boost::shared_ptr cThread; //consumer thread + // boost::shared_ptr pThread; //producer thread boost::mutex mutex; boost::condition condvar; boost::condition flushed; @@ -636,8 +636,8 @@ private: uint32_t recvWaiting; int64_t ridCount; execplan::CalpontSystemCatalog::ColType fColType; - boost::shared_ptr pThread; //producer thread - boost::shared_ptr cThread; //producer thread + uint64_t pThread; //producer thread + uint64_t cThread; //producer thread messageqcpp::ByteStream fFilterString; uint32_t fFilterCount; @@ -772,8 +772,8 @@ private: DataList *ridList; messageqcpp::ByteStream fFilterString; execplan::CalpontSystemCatalog::ColType colType; - boost::shared_ptr pThread; //producer thread - boost::shared_ptr cThread; //producer thread + uint64_t pThread; //producer thread. thread pool handle + uint64_t cThread; //consumer thread. thread pool handle DataList_t* requestList; //StringDataList* stringList; boost::mutex mutex; @@ -1036,8 +1036,6 @@ protected: private: void formatMiniStats(); - typedef boost::shared_ptr SPTHD; - typedef boost::shared_array SATHD; void startPrimitiveThread(); void startAggregationThread(); void initializeConfigParms(); @@ -1060,7 +1058,7 @@ private: uint32_t fNumThreads; PrimitiveStepType ffirstStepType; bool isFilterFeeder; - SATHD fProducerThread; + std::vector fProducerThreads; // thread pool handles messageqcpp::ByteStream fFilterString; uint32_t fFilterCount; execplan::CalpontSystemCatalog::ColType fColType; @@ -1097,8 +1095,8 @@ private: uint64_t fMsgBytesOut; // total byte count for outcoming messages uint64_t fBlockTouched; // total blocks touched uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File - boost::shared_ptr cThread; //consumer thread - boost::shared_ptr pThread; //producer thread + // uint64_t cThread; //consumer thread. thread handle from thread pool + uint64_t pThread; //producer thread. thread handle from thread pool boost::mutex tplMutex; boost::mutex dlMutex; boost::mutex cpMutex; @@ -1251,7 +1249,7 @@ private: execplan::CalpontSystemCatalog::OID fTableOID; execplan::CalpontSystemCatalog::ColType fColType; int8_t fBOP; - boost::shared_ptr runner; // @bug 686 + // int64_t runner; // thread handle from thread pool // @bug 687 Add data and friend declarations for concurrent filter steps. std::vector fSortedA; // used to internally sort input data @@ -1333,7 +1331,7 @@ private: bool isDictColumn; bool isEM; - boost::thread* fPTThd; +// boost::thread* fPTThd; // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. // Running with this one will swallow rows at projection. diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 8d31ba1c3..42ee6e569 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -60,7 +60,8 @@ namespace joblist const uint64_t defaultHUATotalMem = 8 * 1024 * 1024 * 1024ULL; const uint32_t defaultTupleDLMaxSize = 64 * 1024; - const uint32_t defaultTupleMaxBuckets = 256; + + const uint32_t defaultJLThreadPoolSize = 100; //pcolscan.cpp const uint32_t defaultScanLbidReqLimit = 10000; @@ -160,7 +161,7 @@ namespace joblist unsigned getHjNumThreads() const { return fHjNumThreads; } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); } uint64_t getHjMaxElems() const { return getUintVal(fHashJoinStr, "MaxElems", defaultHJMaxElems); } uint32_t getHjFifoSizeLargeSide() const { return getUintVal(fHashJoinStr, "FifoSizeLargeSide", defaultHJFifoSizeLargeSide); } - uint32_t getHjCPUniqueLimit() const { return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit); } + uint32_t getHjCPUniqueLimit() const { return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit); } uint64_t getPMJoinMemLimit() const { return pmJoinMemLimit; } uint32_t getJLFlushInterval() const { return getUintVal(fJobListStr, "FlushInterval", defaultFlushInterval); } @@ -168,6 +169,10 @@ namespace joblist uint32_t getJlScanLbidReqLimit() const { return getUintVal(fJobListStr, "ScanLbidReqLimit",defaultScanLbidReqLimit); } uint32_t getJlScanLbidReqThreshold() const { return getUintVal(fJobListStr,"ScanLbidReqThreshold", defaultScanLbidReqThreshold); } + // @MCOL-513 - Added threadpool to JobSteps + uint32_t getJLThreadPoolSize() const { return getUintVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize); } + std::string getJlThreadPoolDebug() const { return getStringVal(fJobListStr, "ThreadPoolDebug", "N"); } + // @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request. uint32_t getJlLogicalBlocksPerScan() const { return getUintVal(fJobListStr,"LogicalBlocksPerScan", defaultLogicalBlocksPerScan); } uint32_t getJlProjectBlockReqLimit() const { return getUintVal(fJobListStr, "ProjectBlockReqLimit", defaultProjectBlockReqLimit ); } @@ -180,9 +185,9 @@ namespace joblist uint32_t getJlMaxOutstandingRequests() const { return getUintVal(fJobListStr,"MaxOutstandingRequests", defaultMaxOutstandingRequests);} uint32_t getJlJoinerChunkSize() const { return getUintVal(fJobListStr,"JoinerChunkSize", defaultJoinerChunkSize);} - int getPsCount() const { return getUintVal(fPrimitiveServersStr, "Count", defaultPSCount ); } + int getPsCount() const { return getUintVal(fPrimitiveServersStr, "Count", defaultPSCount ); } int getPsConnectionsPerPrimProc() const { return getUintVal(fPrimitiveServersStr, "ConnectionsPerPrimProc", defaultConnectionsPerPrimProc); } - uint32_t getPsLBID_Shift() const { return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift ); } + uint32_t getPsLBID_Shift() const { return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift ); } std::string getScTempDiskPath() const { return getStringVal(fSystemConfigStr, "TempDiskPath", defaultTempDiskPath ); } uint64_t getScTempSaveSize() const { return getUintVal(fSystemConfigStr, "TempSaveSize", defaultTempSaveSize); } diff --git a/dbcon/joblist/subquerystep.cpp b/dbcon/joblist/subquerystep.cpp index fa3c6bfc1..3d227d05d 100644 --- a/dbcon/joblist/subquerystep.cpp +++ b/dbcon/joblist/subquerystep.cpp @@ -150,6 +150,7 @@ SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo) , fEndOfResult(false) , fInputIterator(0) , fOutputIterator(0) + , fRunner(0) { fAlias = s->alias(); fView = s->view(); @@ -191,14 +192,14 @@ void SubAdapterStep::run() if (fDelivery) fOutputIterator = fOutputDL->getIterator(); - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void SubAdapterStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/subquerystep.h b/dbcon/joblist/subquerystep.h index 4455e5915..f97cd501a 100644 --- a/dbcon/joblist/subquerystep.h +++ b/dbcon/joblist/subquerystep.h @@ -230,7 +230,7 @@ protected: SubAdapterStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle boost::scoped_ptr fExpression; }; diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 166842595..ad5e90b82 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -180,14 +180,13 @@ void TupleBPS::initializeConfigParms() else fMaxNumThreads = 1; - fProducerThread.reset(new SPTHD[fMaxNumThreads]); - // Make maxnum thread objects even if they don't get used to make join() safe. - for (uint32_t i = 0; i < fMaxNumThreads; i++) - fProducerThread[i].reset(new thread()); + // Reserve the max number of thread space. A bit of an optimization. + fProducerThreads.clear(); + fProducerThreads.reserve(fMaxNumThreads); } TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) : - BatchPrimitive(jobInfo), fRm(jobInfo.rm) + BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm) { fInputJobStepAssociation = rhs.inputAssociation(); fOutputJobStepAssociation = rhs.outputAssociation(); @@ -800,7 +799,7 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts) void TupleBPS::startPrimitiveThread() { - pThread.reset(new boost::thread(TupleBPSPrimitive(this))); + pThread = jobstepThreadPool.invoke(TupleBPSPrimitive(this)); } void TupleBPS::startAggregationThread() @@ -809,13 +808,13 @@ void TupleBPS::startAggregationThread() // fMaxNumThreads = 1; // fNumThreads = fMaxNumThreads; // for (uint32_t i = 0; i < fMaxNumThreads; i++) -// fProducerThread[i].reset(new boost::thread(TupleBPSAggregators(this, i))); +// fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i))); // This block of code starts one thread at a time if (fNumThreads >= fMaxNumThreads) return; fNumThreads++; - fProducerThread[fNumThreads-1].reset(new boost::thread(TupleBPSAggregators(this, fNumThreads-1))); + fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads-1))); } //#include "boost/date_time/posix_time/posix_time.hpp" @@ -1117,6 +1116,8 @@ void TupleBPS::run() serializeJoiner(); prepCasualPartitioning(); startPrimitiveThread(); + fProducerThreads.clear(); + fProducerThreads.reserve(fMaxNumThreads); startAggregationThread(); } catch (const std::exception& e) @@ -1153,10 +1154,10 @@ void TupleBPS::join() } if (pThread) - pThread->join(); + jobstepThreadPool.join(pThread); + + jobstepThreadPool.join(fProducerThreads); - for (uint32_t i = 0; i < fMaxNumThreads; i++) - fProducerThread[i]->join(); if (BPPIsAllocated) { ByteStream bs; fDec->removeDECEventListener(this); diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 937de0ab2..6b355c59b 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -184,6 +184,7 @@ TupleAggregateStep::TupleAggregateStep( fAggregator(agg), fRowGroupOut(rgOut), fRowGroupIn(rgIn), + fRunner(0), fUmOnly(false), fRm(jobInfo.rm), fBucketNum(0), @@ -254,7 +255,7 @@ void TupleAggregateStep::run() { if (fDelivery == false) { - fRunner.reset(new thread(Aggregator(this))); + fRunner = jobstepThreadPool.invoke(Aggregator(this)); } } @@ -262,7 +263,7 @@ void TupleAggregateStep::run() void TupleAggregateStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } @@ -4210,13 +4211,14 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) // and if there is more data to read, the // first thread will start another thread until the // maximum number is reached. +#if 0 if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads && - dlIn->more(fInputIter)) { - fFirstPhaseRunners[fFirstPhaseThreadCount].reset - (new boost::thread(ThreadedAggregator(this, fFirstPhaseThreadCount))); + dlIn->more(fInputIter)) + { + fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount))); fFirstPhaseThreadCount++; } - +#endif fRowGroupIns[threadID].setData(&rgData); fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings(); if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit)) @@ -4479,29 +4481,29 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp if (!fDoneAggregate) { initializeMultiThread(); -/* -// This block of code starts all threads at the start + +// This block of code starts all threads at the start fFirstPhaseThreadCount = fNumOfThreads; - boost::shared_ptr runner; + fFirstPhaseRunners.clear(); + fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use for (i = 0; i < fNumOfThreads; i++) - { - runner.reset(new boost::thread(ThreadedAggregator(this, i))); - fFirstPhaseRunners.push_back(runner); + { + fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))); } -*/ +#if 0 // This block of code starts one thread, relies on doThreadedAggregation() +// For reasons unknown, this doesn't work right with threadpool // to start more as needed - fFirstPhaseRunners.resize(fNumOfThreads); // to prevent a resize during use + fFirstPhaseRunners.clear(); + fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use fFirstPhaseThreadCount = 1; - for (i = 1; i < fNumOfThreads; i++) - // fill with valid thread objects to make joining work - fFirstPhaseRunners[i].reset(new boost::thread()); - fFirstPhaseRunners[0].reset(new boost::thread(ThreadedAggregator(this, 0))); + fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0))); +#endif - for (i = 0; i < fNumOfThreads; i++) - fFirstPhaseRunners[i]->join(); - fFirstPhaseRunners.clear(); + // Now wait for that thread plus all the threads it may have spawned + jobstepThreadPool.join(fFirstPhaseRunners); + fFirstPhaseRunners.clear(); } if (dynamic_cast(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0) @@ -4511,8 +4513,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp { if (!fDoneAggregate) { - vector > runners; - boost::shared_ptr runner; + vector runners; // thread pool handles fRowGroupsDeliveredData.resize(fNumOfBuckets); uint32_t bucketsPerThread = fNumOfBuckets/fNumOfThreads; @@ -4521,13 +4522,12 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp //uint32_t bucketsPerThread = 1; //uint32_t numThreads = fNumOfBuckets; + runners.reserve(numThreads); for (i = 0; i < numThreads; i++) { - runner.reset(new boost::thread(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread))); - runners.push_back(runner); + runners.push_back(jobstepThreadPool.invoke(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread))); } - for (i = 0; i < numThreads; i++) - runners[i]->join(); + jobstepThreadPool.join(runners); } fDoneAggregate = true; diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index c586b4646..0839e8263 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -166,7 +166,7 @@ private: uint32_t bucketCount; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle bool fUmOnly; ResourceManager *fRm; @@ -186,7 +186,7 @@ private: bool fIsMultiThread; int fInputIter; // iterator boost::scoped_array fMemUsage; - vector > fFirstPhaseRunners; + std::vector fFirstPhaseRunners; // thread pool handles uint32_t fFirstPhaseThreadCount; boost::shared_ptr fSessionMemLimit; diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index ea889a14f..7eead22a8 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -106,6 +106,7 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) : fOutputDL(NULL), fInputIterator(0), fOutputIterator(0), + fRunner(0), fRowsProcessed(0), fRowsReturned(0), fLimitStart(0), @@ -205,14 +206,14 @@ void TupleAnnexStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void TupleAnnexStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 69a16734c..2f548f179 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -111,7 +111,7 @@ protected: TupleAnnexStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle uint64_t fRowsProcessed; uint64_t fRowsReturned; diff --git a/dbcon/joblist/tupleconstantstep.cpp b/dbcon/joblist/tupleconstantstep.cpp index 327fd5e77..1b96cd83f 100644 --- a/dbcon/joblist/tupleconstantstep.cpp +++ b/dbcon/joblist/tupleconstantstep.cpp @@ -81,6 +81,7 @@ TupleConstantStep::TupleConstantStep(const JobInfo& jobInfo) : fInputDL(NULL), fOutputDL(NULL), fInputIterator(0), + fRunner(0), fEndOfResult(false) { fExtendedInfo = "TCS: "; @@ -290,7 +291,7 @@ void TupleConstantStep::run() if (fOutputDL == NULL) throw logic_error("Output is not a RowGroup data list."); - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } } @@ -298,7 +299,7 @@ void TupleConstantStep::run() void TupleConstantStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/tupleconstantstep.h b/dbcon/joblist/tupleconstantstep.h index 994e63b05..85472aeca 100644 --- a/dbcon/joblist/tupleconstantstep.h +++ b/dbcon/joblist/tupleconstantstep.h @@ -101,7 +101,7 @@ protected: TupleConstantStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle bool fEndOfResult; }; diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index ef8bb388b..b1b9f8153 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -166,7 +166,7 @@ void TupleHashJoinStep::run() } joiners.resize(smallDLs.size()); - mainRunner.reset(new boost::thread(HJRunner(this))); + mainRunner = jobstepThreadPool.invoke(HJRunner(this)); } void TupleHashJoinStep::join() @@ -175,12 +175,12 @@ void TupleHashJoinStep::join() if (joinRan) return; joinRan = true; - mainRunner->join(); + jobstepThreadPool.join(mainRunner); if (djs) { for (int i = 0; i < (int) djsJoiners.size(); i++) djs[i].join(); - djsReader.join(); - djsRelay.join(); + jobstepThreadPool.join(djsReader); + jobstepThreadPool.join(djsRelay); //cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl; } } @@ -544,9 +544,10 @@ void TupleHashJoinStep::hjRunner() } } + smallRunners.clear(); + smallRunners.reserve(smallDLs.size()); for (i = 0; i < smallDLs.size(); i++) - smallRunners.push_back(boost::shared_ptr - (new boost::thread(SmallRunner(this, i)))); + smallRunners.push_back(jobstepThreadPool.invoke(SmallRunner(this, i))); } catch (thread_resource_error&) { string emsg = "TupleHashJoin caught a thread resource error, aborting...\n"; @@ -557,8 +558,7 @@ void TupleHashJoinStep::hjRunner() deliverMutex.unlock(); } - for (i = 0; i < smallRunners.size(); i++) - smallRunners[i]->join(); + jobstepThreadPool.join(smallRunners); smallRunners.clear(); for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++) @@ -629,9 +629,9 @@ void TupleHashJoinStep::hjRunner() /* If an error happened loading the existing data, these threads are necessary to finish the abort */ try { - djsRelay = boost::thread(DJSRelay(this)); + djsRelay = jobstepThreadPool.invoke(DJSRelay(this)); relay = true; - djsReader = boost::thread(DJSReader(this, smallSideCount)); + djsReader = jobstepThreadPool.invoke(DJSReader(this, smallSideCount)); reader = true; for (i = 0; i < smallSideCount; i++) djs[i].run(); @@ -1091,7 +1091,7 @@ void TupleHashJoinStep::startJoinThreads() bool more = true; RGData oneRG; - if (joinRunners) + if (joinRunners.size() > 0) return; //@bug4836, in error case, stop process, and unblock the next step. @@ -1142,13 +1142,11 @@ void TupleHashJoinStep::startJoinThreads() makeDupList(fe2 ? fe2Output : outputRG); /* Start join runners */ - joinRunners.reset(new boost::shared_ptr[joinThreadCount]); + joinRunners.reserve(joinThreadCount); for (i = 0; i < joinThreadCount; i++) - joinRunners[i].reset(new boost::thread(JoinRunner(this, i))); - + joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i))); /* Join them and call endOfInput */ - for (i = 0; i < joinThreadCount; i++) - joinRunners[i]->join(); + jobstepThreadPool.join(joinRunners); if (lastSmallOuterJoiner != (uint32_t) -1) finishSmallOuterJoin(); diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index d56ad8441..833751396 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -276,8 +276,8 @@ private: uint32_t index; }; - boost::shared_ptr mainRunner; - std::vector > smallRunners; + int64_t mainRunner; // thread handle from thread pool + std::vector smallRunners; // thread handles from thread pool // for notify TupleAggregateStep PM hashjoin // Ideally, hashjoin and delivery communicate with RowGroupDL, @@ -347,7 +347,7 @@ private: void processDupList(uint32_t threadID, rowgroup::RowGroup &ingrp, std::vector *rowData); - boost::scoped_array > joinRunners; + std::vector joinRunners; // thread handles from thread pool boost::mutex inputDLLock, outputDLLock; boost::shared_array > columnMappings, fergMappings; boost::shared_array fe2Mapping; @@ -375,7 +375,7 @@ private: boost::scoped_array djs; boost::scoped_array > fifos; void djsReaderFcn(int index); - boost::thread djsReader; + uint64_t djsReader; // thread handle from thread pool struct DJSReader { DJSReader(TupleHashJoinStep *hj, uint32_t i) : HJ(hj), index(i) { } void operator()() { HJ->djsReaderFcn(index); } @@ -383,7 +383,7 @@ private: uint32_t index; }; - boost::thread djsRelay; + uint64_t djsRelay; // thread handle from thread pool void djsRelayFcn(); struct DJSRelay { DJSRelay(TupleHashJoinStep *hj) : HJ(hj) { } diff --git a/dbcon/joblist/tuplehavingstep.cpp b/dbcon/joblist/tuplehavingstep.cpp index d7196ef85..51e9cf88d 100644 --- a/dbcon/joblist/tuplehavingstep.cpp +++ b/dbcon/joblist/tuplehavingstep.cpp @@ -63,6 +63,7 @@ TupleHavingStep::TupleHavingStep(const JobInfo& jobInfo) : fInputDL(NULL), fOutputDL(NULL), fInputIterator(0), + fRunner(0), fRowsReturned(0), fEndOfResult(false), fFeInstance(funcexp::FuncExp::instance()) @@ -151,7 +152,7 @@ void TupleHavingStep::run() if (fOutputDL == NULL) throw logic_error("Output is not a RowGroup data list."); - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } } @@ -159,7 +160,7 @@ void TupleHavingStep::run() void TupleHavingStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } diff --git a/dbcon/joblist/tuplehavingstep.h b/dbcon/joblist/tuplehavingstep.h index 1e3467b61..e76ca7898 100644 --- a/dbcon/joblist/tuplehavingstep.h +++ b/dbcon/joblist/tuplehavingstep.h @@ -97,7 +97,7 @@ protected: TupleHavingStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle uint64_t fRowsReturned; bool fEndOfResult; diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 370f09d6e..bc83455ce 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -767,25 +767,22 @@ void TupleUnion::run() } } + runners.reserve(inputs.size()); for (i = 0; i < inputs.size(); i++) { - boost::shared_ptr th(new boost::thread(Runner(this, i))); - runners.push_back(th); + runners.push_back(jobstepThreadPool.invoke(Runner(this, i))); } } void TupleUnion::join() { - uint32_t i; mutex::scoped_lock lk(jlLock); - Uniquer_t::iterator it; if (joinRan) return; joinRan = true; lk.unlock(); - for (i = 0; i < runners.size(); i++) - runners[i]->join(); + jobstepThreadPool.join(runners); runners.clear(); uniquer->clear(); rowMemory.clear(); diff --git a/dbcon/joblist/tupleunion.h b/dbcon/joblist/tupleunion.h index 3f4440d11..17498a305 100644 --- a/dbcon/joblist/tupleunion.h +++ b/dbcon/joblist/tupleunion.h @@ -118,7 +118,7 @@ private: Runner(TupleUnion *t, uint32_t in) : tu(t), index(in) { } void operator()() { tu->readInput(index); } }; - std::vector > runners; + std::vector runners; //thread pool handles struct Hasher { TupleUnion *ts; diff --git a/dbcon/joblist/windowfunctionstep.cpp b/dbcon/joblist/windowfunctionstep.cpp index ec37573d8..7f3a8c29d 100644 --- a/dbcon/joblist/windowfunctionstep.cpp +++ b/dbcon/joblist/windowfunctionstep.cpp @@ -140,6 +140,7 @@ namespace joblist WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) : JobStep(jobInfo), + fRunner(0), fCatalog(jobInfo.csc), fRowsReturned(0), fEndOfResult(false), @@ -192,14 +193,14 @@ void WindowFunctionStep::run() fOutputIterator = fOutputDL->getIterator(); } - fRunner.reset(new boost::thread(Runner(this))); + fRunner = jobstepThreadPool.invoke(Runner(this)); } void WindowFunctionStep::join() { if (fRunner) - fRunner->join(); + jobstepThreadPool.join(fRunner); } @@ -855,13 +856,13 @@ void WindowFunctionStep::execute() if (fTotalThreads > fFunctionCount) fTotalThreads = fFunctionCount; + fFunctionThreads.clear(); + fFunctionThreads.reserve(fTotalThreads); for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++) - fFunctionThreads.push_back( - boost::shared_ptr(new boost::thread(WFunction(this)))); + fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this))); - // If cancelled, not all thread is started. - for (uint64_t i = 0; i < fFunctionThreads.size(); i++) - fFunctionThreads[i]->join(); + // If cancelled, not all threads are started. + jobstepThreadPool.join(fFunctionThreads); } if (!(cancelled())) diff --git a/dbcon/joblist/windowfunctionstep.h b/dbcon/joblist/windowfunctionstep.h index 027b6eae3..73d47bacd 100644 --- a/dbcon/joblist/windowfunctionstep.h +++ b/dbcon/joblist/windowfunctionstep.h @@ -153,7 +153,7 @@ private: WindowFunctionStep* fStep; }; - boost::scoped_ptr fRunner; + uint64_t fRunner; // thread pool handle boost::shared_ptr fCatalog; uint64_t fRowsReturned; @@ -188,7 +188,7 @@ private: WindowFunctionStep* fStep; }; - std::vector > fFunctionThreads; + std::vector fFunctionThreads; std::vector fRows; std::vector > fFunctions; diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 22aa4742b..27ee355ff 100755 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -6355,7 +6355,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // select * from derived table case if (gwi.selectCols.empty()) sel_cols_in_create = " * "; - create_query = "create temporary table " + vtb.str() + " as select " + sel_cols_in_create + " from "; + create_query = "create temporary table " + vtb.str() + " engine = aria as select " + sel_cols_in_create + " from "; TABLE_LIST* table_ptr = select_lex.get_table_list(); bool firstTb = true; diff --git a/ddlproc/ddlprocessor.cpp b/ddlproc/ddlprocessor.cpp index 28d234b1a..c7f2f4502 100644 --- a/ddlproc/ddlprocessor.cpp +++ b/ddlproc/ddlprocessor.cpp @@ -340,6 +340,7 @@ DDLProcessor::DDLProcessor( int packageMaxThreads, int packageWorkQueueSize ) { fDdlPackagepool.setMaxThreads(fPackageMaxThreads); fDdlPackagepool.setQueueSize(fPackageWorkQueueSize); + fDdlPackagepool.setName("DdlPackagepool"); csc = CalpontSystemCatalog::makeCalpontSystemCatalog(); csc->identity(CalpontSystemCatalog::EC); string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host")); diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index ced77ec82..c1575ff27 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -558,8 +558,23 @@ int main(int argc, char* argv[]) } DMLServer dmlserver(serverThreads, serverQueueSize,&dbrm); + ResourceManager *rm = ResourceManager::instance(); - //set ACTIVE state + // jobstepThreadPool is used by other processes. We can't call + // resourcemanaager (rm) functions during the static creation of threadpool + // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). + // From the pools perspective, it has no idea if it is ExeMgr doing the + // creation, so it has no idea which way to set the flag. So we set the max here. + JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); + JobStep::jobstepThreadPool.setName("DMLProcJobList"); + +// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") +// { +// JobStep::jobstepThreadPool.setDebug(true); +// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); +// } + + //set ACTIVE state try { oam.processInitComplete("DMLProc", ACTIVE); @@ -567,7 +582,6 @@ int main(int argc, char* argv[]) catch (...) { } - ResourceManager *rm = ResourceManager::instance(); Dec = DistributedEngineComm::instance(rm); #ifndef _MSC_VER diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index b0f36305b..2205d1712 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1130,6 +1130,7 @@ DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm fDmlPackagepool.setMaxThreads(fPackageMaxThreads); fDmlPackagepool.setQueueSize(fPackageWorkQueueSize); + fDmlPackagepool.setName("DmlPackagepool"); } void DMLServer::start() diff --git a/exemgr/femsghandler.cpp b/exemgr/femsghandler.cpp index 776a7cc83..f1b36204e 100644 --- a/exemgr/femsghandler.cpp +++ b/exemgr/femsghandler.cpp @@ -24,6 +24,8 @@ using namespace std; using namespace joblist; using namespace messageqcpp; +threadpool::ThreadPool FEMsgHandler::threadPool(50,100); + namespace { class Runner @@ -50,14 +52,14 @@ FEMsgHandler::FEMsgHandler(boost::shared_ptr j, IOSocket *s) : FEMsgHandler::~FEMsgHandler() { stop(); - thr.join(); + threadPool.join(thr); } void FEMsgHandler::start() { if (!running) { running = true; - thr = boost::thread(Runner(this)); + thr = threadPool.invoke(Runner(this)); } } diff --git a/exemgr/femsghandler.h b/exemgr/femsghandler.h index b7bac18f0..7e7ce438d 100644 --- a/exemgr/femsghandler.h +++ b/exemgr/femsghandler.h @@ -20,6 +20,7 @@ #include "joblist.h" #include "inetstreamsocket.h" +#include "threadpool.h" class FEMsgHandler { @@ -36,12 +37,14 @@ public: void threadFcn(); + static threadpool::ThreadPool threadPool; + private: bool die, running, sawData; messageqcpp::IOSocket *sock; boost::shared_ptr jl; - boost::thread thr; boost::mutex mutex; + uint64_t thr; }; #endif /* FEMSGHANDLER_H_ */ diff --git a/exemgr/main.cpp b/exemgr/main.cpp index a552e65a7..9f67d24ab 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -97,6 +97,8 @@ using namespace querytele; #include "utils_utf8.h" #include "boost/filesystem.hpp" +#include "threadpool.h" + namespace { //If any flags other than the table mode flags are set, produce output to screeen @@ -513,7 +515,7 @@ public: SJLP jl; bool incSessionThreadCnt = true; - bool selfJoin = false; + bool selfJoin = false; bool tryTuples = false; bool usingTuples = false; bool stmtCounted = false; @@ -1410,13 +1412,30 @@ int main(int argc, char* argv[]) } } + // class jobstepThreadPool is used by other processes. We can't call + // resourcemanaager (rm) functions during the static creation of threadpool + // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). + // From the pools perspective, it has no idea if it is ExeMgr doing the + // creation, so it has no idea which way to set the flag. So we set the max here. + JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); + JobStep::jobstepThreadPool.setName("ExeMgrJobList"); +// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") +// { +// JobStep::jobstepThreadPool.setDebug(true); +// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); +// } + int serverThreads = rm->getEmServerThreads(); int serverQueueSize = rm->getEmServerQueueSize(); int maxPct = rm->getEmMaxPct(); int pauseSeconds = rm->getEmSecondsBetweenMemChecks(); int priority = rm->getEmPriority(); - if (maxPct > 0) + FEMsgHandler::threadPool.setMaxThreads(serverThreads); + FEMsgHandler::threadPool.setQueueSize(serverQueueSize); + FEMsgHandler::threadPool.setName("FEMsgHandler"); + + if (maxPct > 0) startRssMon(maxPct, pauseSeconds); #ifndef _MSC_VER @@ -1450,12 +1469,15 @@ int main(int argc, char* argv[]) } } + threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize); + exeMgrThreadPool.setName("ExeMgrServer"); for (;;) { IOSocket ios; ios = mqs->accept(); - boost::thread thd(SessionThread(ios, ec, rm)); + exeMgrThreadPool.invoke(SessionThread(ios, ec, rm)); } + exeMgrThreadPool.wait(); return 0; } diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index f9ecbc116..1c68ca5cf 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -503,6 +503,7 @@ as many threads are available across all PMs. --> 20 + 100 1M @@ -515,9 +516,9 @@ - unassigned + 127.0.0.1 3306 - unassigned + root diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver index 5324cb892..ff466c398 100644 --- a/oam/etc/Columnstore.xml.singleserver +++ b/oam/etc/Columnstore.xml.singleserver @@ -495,8 +495,9 @@ is 20 extents worth of work for the PMs to process at any given time. ProcessorThreadsPerScan * MaxOutstandingRequests should be at least as many threads are available across all PMs. --> - - 20 + + 20 + 100 1M @@ -509,9 +510,9 @@ - unassigned + 127.0.0.1 3306 - unassigned + root diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index deaae576e..1f6145733 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -2032,6 +2032,7 @@ PrimitiveServer::PrimitiveServer(int serverThreads, fCacheCount=cacheCount; fServerpool.setMaxThreads(fServerThreads); fServerpool.setQueueSize(fServerQueueSize); + fServerpool.setName("PrimitiveServer"); fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, medPriorityThreads, lowPriorityThreads, 0)); diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index 57c863aea..bfbb291b3 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -436,15 +436,15 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO uint8_t* msglenp = reinterpret_cast(&msglen); size_t mlread = 0; - bool myIsTimeOut = false; - if (readToMagic(msecs, &myIsTimeOut, stats) == false) //indicates a timeout or EOF + if (readToMagic(msecs, isTimeOut, stats) == false) //indicates a timeout or EOF { - if (!myIsTimeOut) - logIoError("InetStreamSocket::read: EOF during readToMagic", 0); - if (isTimeOut) - { - *isTimeOut = myIsTimeOut; - } + // MCOL-480 The connector calls with timeout in a loop so that + // it can check a killed flag. This means that for a long running query, + // the following fills the warning log. +// if (isTimeOut && *isTimeOut) +// { +// logIoError("InetStreamSocket::read: timeout during readToMagic", 0); +// } return SBS(new ByteStream(0)); } diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index f25367090..1281e0b5f 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -20,7 +20,6 @@ * * ***********************************************************************/ - #include using namespace std; @@ -28,39 +27,37 @@ using namespace std; #include "messagelog.h" using namespace logging; -#define THREADPOOL_DLLEXPORT #include "threadpool.h" -#undef THREADPOOL_DLLEXPORT +#include +#include +#include "boost/date_time/posix_time/posix_time_types.hpp" namespace threadpool { ThreadPool::ThreadPool() - :fMaxThreads( 0 ), fQueueSize( 0 ) +:fMaxThreads( 0 ), fQueueSize( 0 ) { init(); } ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize ) - :fMaxThreads( maxThreads ), fQueueSize( queueSize ) + :fMaxThreads( maxThreads ), fQueueSize( queueSize ) { init(); - - if (fQueueSize == 0) - fQueueSize = fMaxThreads*2; } ThreadPool::~ThreadPool() throw() { -// delete fThreadCreated; try { stop(); } - catch(...) - {} + catch (...) + { + } } void ThreadPool::init() @@ -68,11 +65,12 @@ void ThreadPool::init() fThreadCount = 0; fGeneralErrors = 0; fFunctorErrors = 0; - waitingFunctorsSize = 0; - issued = 0; + waitingFunctorsSize = 0; + fIssued = 0; + fDebug = false; fStop = false; -// fThreadCreated = new NoOp(); fNextFunctor = fWaitingFunctors.end(); + fNextHandle=1; } void ThreadPool::setQueueSize(size_t queueSize) @@ -88,11 +86,6 @@ void ThreadPool::setMaxThreads(size_t maxThreads) fMaxThreads = maxThreads; } -void ThreadPool::setThreadCreatedListener(const Functor_T &f) -{ -// fThreadCreated = f; -} - void ThreadPool::stop() { boost::mutex::scoped_lock lock1(fMutex); @@ -111,44 +104,120 @@ void ThreadPool::wait() while (waitingFunctorsSize > 0) { fThreadAvailable.wait(lock1); - //cerr << "woke!" << endl; + //cerr << "woke!" << endl; } } -void ThreadPool::invoke(const Functor_T &threadfunc) +void ThreadPool::join(uint64_t thrHandle) { boost::mutex::scoped_lock lock1(fMutex); - for(;;) + while (waitingFunctorsSize > 0) { + Container_T::iterator iter; + Container_T::iterator end = fWaitingFunctors.end(); + bool foundit = false; + for (iter = fWaitingFunctors.begin(); iter != end; ++iter) + { + foundit = false; + if (iter->hndl == thrHandle) + { + foundit = true; + break; + } + } + if (!foundit) + { + break; + } + fThreadAvailable.wait(lock1); + } +} +void ThreadPool::join(std::vector& thrHandle) +{ + boost::mutex::scoped_lock lock1(fMutex); + + while (waitingFunctorsSize > 0) + { + Container_T::iterator iter; + Container_T::iterator end = fWaitingFunctors.end(); + bool foundit = false; + for (iter = fWaitingFunctors.begin(); iter != end; ++iter) + { + foundit = false; + std::vector::iterator thrIter; + std::vector::iterator thrEnd = thrHandle.end(); + for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter) + { + if (iter->hndl == *thrIter) + { + foundit = true; + break; + } + } + if (foundit == true) + { + break; + } + } + // If we didn't find any of the handles, then all are complete + if (!foundit) + { + break; + } + fThreadAvailable.wait(lock1); + } +} + +uint64_t ThreadPool::invoke(const Functor_T &threadfunc) +{ + boost::mutex::scoped_lock lock1(fMutex); + uint64_t thrHandle=0; + for (;;) + { try { - if ( waitingFunctorsSize < fThreadCount) + if (waitingFunctorsSize < fThreadCount) { // Don't create a thread unless it's needed. There // is a thread available to service this request. - addFunctor(threadfunc); + thrHandle = addFunctor(threadfunc); lock1.unlock(); break; } bool bAdded = false; - if ( waitingFunctorsSize < fQueueSize) + if (waitingFunctorsSize < fQueueSize || fQueueSize == 0) { // Don't create a thread unless you have to - addFunctor(threadfunc); + thrHandle = addFunctor(threadfunc); bAdded = true; } - if ( fThreadCount < fMaxThreads) + // fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run. + if (fThreadCount < fMaxThreads || fQueueSize == 0) { ++fThreadCount; lock1.unlock(); fThreads.create_thread(beginThreadFunc(*this)); + if (fDebug) + { + ostringstream oss; + oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads + << " queue " << fQueueSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } + if (bAdded) break; @@ -165,9 +234,22 @@ void ThreadPool::invoke(const Functor_T &threadfunc) break; } + if (fDebug) + { + logging::Message::Args args; + logging::Message message(5); + args.add("invoke: Blocked waiting for thread. Count "); + args.add(fThreadCount); + args.add("max "); + args.add(fMaxThreads); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } fThreadAvailable.wait(lock1); } - catch(...) + catch (...) { ++fGeneralErrors; throw; @@ -175,17 +257,16 @@ void ThreadPool::invoke(const Functor_T &threadfunc) } fNeedThread.notify_one(); + return thrHandle; } void ThreadPool::beginThread() throw() { try { -// fThreadCreated(); - boost::mutex::scoped_lock lock1(fMutex); - - for(;;) + boost::system_time timeout = boost::get_system_time()+boost::posix_time::minutes(10); + for (;;) { if (fStop) break; @@ -193,51 +274,60 @@ void ThreadPool::beginThread() throw() if (fNextFunctor == fWaitingFunctors.end()) { // Wait until someone needs a thread - fNeedThread.wait(lock1); + // Add the timed wait for queueSize == 0 so we can idle away threads + // over fMaxThreads + if (fQueueSize > 0) + { + fNeedThread.wait(lock1); + } + else + { + // Wait no more than 10 minutes + if (!fNeedThread.timed_wait(lock1, timeout)) // false means it timed out + { + if (fThreadCount > fMaxThreads) + { + --fThreadCount; + return; + } + timeout = boost::get_system_time()+boost::posix_time::minutes(10); + } + } } else { - /* Need to tune these magic #s */ + // If there's anything waiting, run it + if (waitingFunctorsSize - fIssued > 0) + { + Container_T::iterator todo = fNextFunctor++; + ++fIssued; + lock1.unlock(); + try + { + todo->functor(); + } + catch (exception &e) + { + ++fFunctorErrors; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("ThreadPool: Caught exception during execution: "); + args.add(e.what()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logErrorMessage( message ); +#endif + } + lock1.lock(); + --fIssued; + --waitingFunctorsSize; + fWaitingFunctors.erase(todo); + } - vector todoList; - int i, num; - Container_T::const_iterator iter; - - /* Use this to control how many jobs are issued to a single thread */ - num = (waitingFunctorsSize - issued >= 1 ? 1 : 0); - - for (i = 0; i < num; i++) - todoList.push_back(fNextFunctor++); - - issued += num; -// cerr << "got " << num << " jobs." << endl; -// cerr << "got " << num << " jobs. waitingFunctorsSize=" << -// waitingFunctorsSize << " issued=" << issued << " fThreadCount=" << -// fThreadCount << endl; - lock1.unlock(); - - for (i = 0; i < num; i++) { - try { - (*todoList[i])(); - } - catch(exception &e) { - ++fFunctorErrors; - cerr << e.what() << endl; - } - } - lock1.lock(); - - issued -= num; - waitingFunctorsSize -= num; - for (i = 0; i < num; i++) - fWaitingFunctors.erase(todoList[i]); -/* - if (waitingFunctorsSize != fWaitingFunctors.size()) - cerr << "size mismatch! fake size=" << waitingFunctorsSize << - " real size=" << fWaitingFunctors.size() << endl; -*/ + timeout = boost::get_system_time()+boost::posix_time::minutes(10); fThreadAvailable.notify_all(); - } } } @@ -249,6 +339,7 @@ void ThreadPool::beginThread() throw() // Log the exception and exit this thread try { +#ifndef NOLOGGING logging::Message::Args args; logging::Message message(5); args.add("beginThread: Caught exception: "); @@ -260,14 +351,14 @@ void ThreadPool::beginThread() throw() logging::MessageLog ml(lid); ml.logErrorMessage( message ); - +#endif } - catch(...) + catch (...) { } } - catch(...) + catch (...) { ++fGeneralErrors; @@ -275,6 +366,7 @@ void ThreadPool::beginThread() throw() // Log the exception and exit this thread try { +#ifndef NOLOGGING logging::Message::Args args; logging::Message message(6); args.add("beginThread: Caught unknown exception!"); @@ -285,29 +377,31 @@ void ThreadPool::beginThread() throw() logging::MessageLog ml(lid); ml.logErrorMessage( message ); - +#endif } - catch(...) + catch (...) { } - } - } -void ThreadPool::addFunctor(const Functor_T &func) +uint64_t ThreadPool::addFunctor(const Functor_T &func) { bool bAtEnd = false; if (fNextFunctor == fWaitingFunctors.end()) bAtEnd = true; - fWaitingFunctors.push_back(func); - waitingFunctorsSize++; + PoolFunction_T poolFunction; + poolFunction.hndl = fNextHandle; + poolFunction.functor = func; + fWaitingFunctors.push_back(poolFunction); + waitingFunctorsSize++; if (bAtEnd) { --fNextFunctor; } + return fNextHandle++; } void ThreadPool::dump() @@ -317,4 +411,51 @@ void ThreadPool::dump() std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl; } + +void ThreadPoolMonitor::operator()() +{ + ostringstream filename; + filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log"; + fLog = new ofstream(filename.str().c_str()); + for (;;) + { + if (!fLog || !fLog->is_open()) + { + ostringstream oss; + oss << "ThreadPoolMonitor " << fPool->name() << " has no file "; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + return; + } + // Get a timestamp for output. + struct tm tm; + struct timeval tv; + + gettimeofday(&tv, 0); + localtime_r(&tv.tv_sec, &tm); + + (*fLog) << setfill('0') + << setw(2) << tm.tm_hour << ':' + << setw(2) << tm.tm_min << ':' + << setw(2) << tm.tm_sec + << '.' + << setw(4) << tv.tv_usec/100 + << " Name " << fPool->fName + << " Active " << fPool->waitingFunctorsSize + << " Most " << fPool->fThreadCount + << " Max " << fPool->fMaxThreads + << " Q " << fPool->fQueueSize + << endl; + +// struct timespec req = { 0, 1000 * 100 }; //100 usec +// nanosleep(&req, 0); + sleep(2); + } +} + } // namespace threadpool diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index 23ba5df2e..f11bb4b2b 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -31,7 +31,7 @@ #define THREADPOOL_H #include -#include +#include #include #include #include @@ -55,7 +55,6 @@ namespace threadpool * executing tasks. It is responsible for creating threads and tracking which threads are "busy" * and which are idle. Idle threads are utilized as "work" is added to the system. */ - class ThreadPool { public: @@ -75,7 +74,10 @@ public: * @param maxThreads the maximum number of threads in this pool. This is the maximum number * of simultaneuous operations that can go on. * @param queueSize the maximum number of work tasks in the queue. This is the maximum - * number of jobs that can queue up in the work list before invoke() blocks. + * number of jobs that can queue up in the work list before invoke() blocks. + * If 0, then threads never block and total threads may + * exceed maxThreads. Nothing waits. Thread count will + * idle down to maxThreads when less work is required. */ EXPORT explicit ThreadPool( size_t maxThreads, size_t queueSize ); @@ -109,11 +111,6 @@ public: */ inline size_t getMaxThreads() const { return fMaxThreads; } - /** @brief register a functor to be called when a new thread - * is created - */ - EXPORT void setThreadCreatedListener(const Functor_T &f) ; - /** @brief queue size accessor * */ @@ -132,7 +129,7 @@ public: * queueSize tasks already waiting, invoke() will block until a slot in the * queue comes free. */ - EXPORT void invoke(const Functor_T &threadfunc); + EXPORT uint64_t invoke(const Functor_T &threadfunc); /** @brief stop the threads */ @@ -142,20 +139,45 @@ public: */ EXPORT void wait(); + /** @brief Wait for a specific thread + */ + EXPORT void join(uint64_t thrHandle); + + /** @brief Wait for a specific thread + */ + EXPORT void join(std::vector& thrHandle); + /** @brief for use in debugging */ EXPORT void dump(); + EXPORT std::string& name() {return fName;} + + EXPORT void setName(std::string name) {fName = name;} + EXPORT void setName(const char* name) {fName = name;} + + EXPORT bool debug() {return fDebug;} + + EXPORT void setDebug(bool d) {fDebug = d;} + + friend class ThreadPoolMonitor; protected: private: + // Used internally to keep a handle associated with each functor for join() + struct PoolFunction_T + { + uint64_t hndl; + Functor_T functor; + }; + /** @brief initialize data memebers */ void init(); /** @brief add a functor to the list */ - void addFunctor(const Functor_T &func); + uint64_t addFunctor(const Functor_T &func); /** @brief thread entry point */ @@ -184,19 +206,18 @@ private: struct NoOp { void operator () () const - {}} - ; + {} + }; size_t fThreadCount; size_t fMaxThreads; size_t fQueueSize; - typedef std::list Container_T; + typedef std::list Container_T; Container_T fWaitingFunctors; Container_T::iterator fNextFunctor; -// Functor_T * fThreadCreated; - uint32_t issued; + uint32_t fIssued; boost::mutex fMutex; boost::condition fThreadAvailable; // triggered when a thread is available boost::condition fNeedThread; // triggered when a thread is needed @@ -206,7 +227,36 @@ private: long fGeneralErrors; long fFunctorErrors; uint32_t waitingFunctorsSize; + uint64_t fNextHandle; + std::string fName; // Optional to add a name to the pool for debugging. + bool fDebug; +}; + +// This class, if instantiated, will continuously log details about the indicated threadpool +// The log will end up in /var/log/mariadb/columnstore/trace/threadpool_.log +class ThreadPoolMonitor +{ +public: + ThreadPoolMonitor(ThreadPool* pool) : fPool(pool), fLog(NULL) + { + } + + ~ThreadPoolMonitor() + { + if (fLog) + { + delete fLog; + } + } + + void operator()(); +private: + //defaults okay + //ThreadPoolMonitor(const ThreadPoolMonitor& rhs); + //ThreadPoolMonitor& operator=(const ThreadPoolMonitor& rhs); + ThreadPool* fPool; + std::ofstream* fLog; }; } // namespace threadpool diff --git a/utils/threadpool/threadpool.vpj b/utils/threadpool/threadpool.vpj index e9270a137..63c370751 100644 --- a/utils/threadpool/threadpool.vpj +++ b/utils/threadpool/threadpool.vpj @@ -1,222 +1,224 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + Version="10.0" + VendorName="SlickEdit" + TemplateName="GNU C/C++" + WorkingDir="."> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/utils/threadpool/tp.cpp b/utils/threadpool/tp.cpp new file mode 100644 index 000000000..ec128a28e --- /dev/null +++ b/utils/threadpool/tp.cpp @@ -0,0 +1,130 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + + +#include +#include +#include +#include +using namespace std; +#include +#include +#include +#include +#include +#include +#include +#include "threadpool.h" + +int64_t thecount = 0; +boost::mutex mutex; + +const string timeNow() +{ + time_t outputTime = time(0); + struct tm ltm; + char buf[32]; //ctime(3) says at least 26 + size_t len = 0; +#ifdef _MSC_VER + asctime_s(buf, 32, localtime_r(&outputTime, <m)); +#else + asctime_r(localtime_r(&outputTime, <m), buf); +#endif + len = strlen(buf); + if (len > 0) --len; + if (buf[len] == '\n') buf[len] = 0; + return buf; +} + +// Functor class +struct foo +{ + int64_t fData; + int64_t fThd; + string start; + bool running; + + void operator ()() + { + start = timeNow(); + + std::cout << "foo thd = " << fThd << " start " << start << std::endl; + for (int64_t i = 0; i < 1024*1024*(fThd+0)*128; i++) + // simulate some work + fData++; + + boost::mutex::scoped_lock lock(mutex); + std::cout << "foo thd = " << fThd << " start " << start << " fin " << timeNow() << std::endl; + } + + foo(int64_t i) : fThd(i), fData(i), running(true) {start=timeNow();} + + foo(const foo& copy) : fData(copy.fData), fThd(copy.fThd), start(copy.start), running(copy.running) {std::cout << "new foo " << fThd << endl;} + + ~foo() {running=false;} +}; + + + +int main( int argc, char **argv) +{ + threadpool::ThreadPool pool( 20, 10 ); + std::vector hndl; + hndl.reserve(10); + int t1 = hndl.capacity(); + uint64_t testHndl; + uint64_t thdhndl=999; + int64_t thd = 1; + boost::function0 foofunc; + boost::function0 foofunc2; + for (int64_t y = 0; y < 1; y++) + { + foo bar(y); +// foofunc = bar; +// foofunc2 = foofunc; + std::cout << "Done with assign" << std::endl; + for (int64_t i = 0; i < 1; ++i) + { + bar.fThd=thd++; + thdhndl = pool.invoke(bar); + if (y<10) + { + hndl.push_back(thdhndl); + } + if (y == 0) + { + testHndl = thdhndl; + } + } + + boost::mutex::scoped_lock lock(mutex); + } + // Wait until all of the queued up and in-progress work has finished + std::cout << "Threads for join " << hndl.size() << std::endl; + pool.dump(); + std::cout << "*** JOIN 1 ***" << std::endl; + pool.join(testHndl); + pool.dump(); + std::cout << "*** JOIN 10 ***" << std::endl; + pool.join(hndl); + pool.dump(); + std::cout << "*** WAIT ***" << std::endl; + pool.wait(); + pool.dump(); + sleep(2); + return 0; +} diff --git a/utils/threadpool/tp.vpj b/utils/threadpool/tp.vpj new file mode 100644 index 000000000..a8f1497cb --- /dev/null +++ b/utils/threadpool/tp.vpj @@ -0,0 +1,240 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/utils/threadpool/tp.vpw b/utils/threadpool/tp.vpw new file mode 100644 index 000000000..c15af49c7 --- /dev/null +++ b/utils/threadpool/tp.vpw @@ -0,0 +1,6 @@ + + + + + +