From 87a679e6ebdbf6fd258625992e0a9bd1d3f6484e Mon Sep 17 00:00:00 2001 From: David Hall Date: Fri, 17 Feb 2017 09:44:32 -0600 Subject: [PATCH] MCOL-513 use a single funcor per thread. ThreadPool was doing one at a time anyway, but in a convpluted way that made it easier to add more if wanted. But it was expensive. Cleanup and polish. --- dbcon/joblist/joblist.cpp | 28 +- dbcon/joblist/jobstep.h | 14 + dbcon/joblist/pdictionaryscan.cpp | 2 +- dbcon/joblist/tupleunion.cpp | 2 - ddlproc/ddlprocessor.cpp | 1 + dmlproc/dmlproc.cpp | 18 +- dmlproc/dmlprocessor.cpp | 1 + exemgr/femsghandler.cpp | 8 +- exemgr/femsghandler.h | 10 +- exemgr/main.cpp | 32 +- primitives/primproc/primitiveserver.cpp | 1 + utils/threadpool/threadpool.cpp | 44 +-- utils/threadpool/threadpool.vpj | 440 ++++++++++++------------ 13 files changed, 305 insertions(+), 296 deletions(-) diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index 6e9607209..e5eb86b26 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -28,7 +28,6 @@ using namespace std; #include "joblist.h" - #include "calpontsystemcatalog.h" using namespace execplan; @@ -73,23 +72,26 @@ JobList::JobList(bool isEM) : JobList::~JobList() { - vector joiners; -// boost::thread *tmp; try { if (fIsRunning) { - JobStepVector::iterator iter; - JobStepVector::iterator end; #if 0 + // This logic creates a set of threads to wind down the query + vector joiners; + joiners.reserve(20); + NullStep nullStep; // For access to the static jobstepThreadPool. + + JobStepVector::iterator iter; + JobStepVector::iterator end; + iter = fQuery.begin(); end = fQuery.end(); // Wait for all the query steps to finish while (iter != end) { - tmp = new boost::thread(JSJoiner(iter->get())); - joiners.push_back(tmp); + joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get()))); ++iter; } @@ -99,17 +101,15 @@ JobList::~JobList() // wait for the projection steps while (iter != end) { - tmp = new boost::thread(JSJoiner(iter->get())); - joiners.push_back(tmp); + joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get()))); ++iter; } - for (uint32_t i = 0; i < joiners.size(); i++) { - joiners[i]->join(); - delete joiners[i]; - } + nullStep.jobstepThreadPool.join(joiners); #endif - // Stop all the query steps + // This logic stops the query steps one at a time + JobStepVector::iterator iter; + JobStepVector::iterator end; end = fQuery.end(); for (iter = fQuery.begin(); iter != end; ++iter) { diff --git a/dbcon/joblist/jobstep.h b/dbcon/joblist/jobstep.h index 5763ce782..00bf243d7 100644 --- a/dbcon/joblist/jobstep.h +++ b/dbcon/joblist/jobstep.h @@ -330,6 +330,20 @@ public: virtual bool deliverStringTableRowGroup() const = 0; }; +class NullStep : public JobStep +{ +public: + /** @brief virtual void Run method + */ + virtual void run(){} + /** @brief virtual void join method + */ + virtual void join(){} + /** @brief virtual string toString method + */ + virtual const std::string toString() const {return "NullStep";} +}; + // calls rhs->toString() std::ostream& operator<<(std::ostream& os, const JobStep* rhs); diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index 960458585..bfb0c27d2 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -134,9 +134,9 @@ pDictionaryScan::pDictionaryScan( sendWaiting(false), ridCount(0), ridList(0), + colType(ct), pThread(0), cThread(0), - colType(ct), fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()), fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()), fStopSending(false), diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 1a2e37d93..bc83455ce 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -775,9 +775,7 @@ void TupleUnion::run() void TupleUnion::join() { - uint32_t i; mutex::scoped_lock lk(jlLock); - Uniquer_t::iterator it; if (joinRan) return; diff --git a/ddlproc/ddlprocessor.cpp b/ddlproc/ddlprocessor.cpp index 28d234b1a..c7f2f4502 100644 --- a/ddlproc/ddlprocessor.cpp +++ b/ddlproc/ddlprocessor.cpp @@ -340,6 +340,7 @@ DDLProcessor::DDLProcessor( int packageMaxThreads, int packageWorkQueueSize ) { fDdlPackagepool.setMaxThreads(fPackageMaxThreads); fDdlPackagepool.setQueueSize(fPackageWorkQueueSize); + fDdlPackagepool.setName("DdlPackagepool"); csc = CalpontSystemCatalog::makeCalpontSystemCatalog(); csc->identity(CalpontSystemCatalog::EC); string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host")); diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index ced77ec82..c1575ff27 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -558,8 +558,23 @@ int main(int argc, char* argv[]) } DMLServer dmlserver(serverThreads, serverQueueSize,&dbrm); + ResourceManager *rm = ResourceManager::instance(); - //set ACTIVE state + // jobstepThreadPool is used by other processes. We can't call + // resourcemanaager (rm) functions during the static creation of threadpool + // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). + // From the pools perspective, it has no idea if it is ExeMgr doing the + // creation, so it has no idea which way to set the flag. So we set the max here. + JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); + JobStep::jobstepThreadPool.setName("DMLProcJobList"); + +// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") +// { +// JobStep::jobstepThreadPool.setDebug(true); +// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); +// } + + //set ACTIVE state try { oam.processInitComplete("DMLProc", ACTIVE); @@ -567,7 +582,6 @@ int main(int argc, char* argv[]) catch (...) { } - ResourceManager *rm = ResourceManager::instance(); Dec = DistributedEngineComm::instance(rm); #ifndef _MSC_VER diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index b0f36305b..2205d1712 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1130,6 +1130,7 @@ DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm fDmlPackagepool.setMaxThreads(fPackageMaxThreads); fDmlPackagepool.setQueueSize(fPackageWorkQueueSize); + fDmlPackagepool.setName("DmlPackagepool"); } void DMLServer::start() diff --git a/exemgr/femsghandler.cpp b/exemgr/femsghandler.cpp index c3df2ba92..f1b36204e 100644 --- a/exemgr/femsghandler.cpp +++ b/exemgr/femsghandler.cpp @@ -24,7 +24,7 @@ using namespace std; using namespace joblist; using namespace messageqcpp; -threadpool::ThreadPool FEMsgHandler::threadPool(100,200); +threadpool::ThreadPool FEMsgHandler::threadPool(50,100); namespace { @@ -52,15 +52,14 @@ FEMsgHandler::FEMsgHandler(boost::shared_ptr j, IOSocket *s) : FEMsgHandler::~FEMsgHandler() { stop(); -// thr.join(); - boost::unique_lock lk(joinMutex); + threadPool.join(thr); } void FEMsgHandler::start() { if (!running) { running = true; - threadPool.invoke(Runner(this)); + thr = threadPool.invoke(Runner(this)); } } @@ -109,7 +108,6 @@ bool FEMsgHandler::aborted() void FEMsgHandler::threadFcn() { int err = 0; - boost::unique_lock lk(joinMutex); int connectionNum = sock->getConnectionNum(); /* This waits for the next readable event on sock. An abort is signaled diff --git a/exemgr/femsghandler.h b/exemgr/femsghandler.h index 1d51e6d95..7e7ce438d 100644 --- a/exemgr/femsghandler.h +++ b/exemgr/femsghandler.h @@ -37,18 +37,14 @@ public: void threadFcn(); + static threadpool::ThreadPool threadPool; + private: bool die, running, sawData; messageqcpp::IOSocket *sock; boost::shared_ptr jl; boost::mutex mutex; -// boost::thread thr; - static threadpool::ThreadPool threadPool; - - // Because we can't join() a thread from a thread pool, threadFcn will - // unlock when it exits and the destructor can block until the thread is done. - boost::mutex joinMutex; - + uint64_t thr; }; #endif /* FEMSGHANDLER_H_ */ diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 0d97394a7..2ff51bb2e 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -515,7 +515,7 @@ public: SJLP jl; bool incSessionThreadCnt = true; - bool selfJoin = false; + bool selfJoin = false; bool tryTuples = false; bool usingTuples = false; bool stmtCounted = false; @@ -1393,19 +1393,18 @@ int main(int argc, char* argv[]) } } - // It's possible that PM modules use this threadpool. Only ExeMgr creates - // massive amounts of threads and needs to be settable. It's also possible that - // other process on this UM module use this threadpool. In this case, they share. - // We can't call rm functions during the static creation because rm has a isExeMgr - // flag that is set upon first creation. For the pool, who has no idea if it is ExeMgr, - // to create the singleton rm would be wrong, no matter which way we set the flag. + // jobstepThreadPool is used by other processes. We can't call + // resourcemanaager (rm) functions during the static creation of threadpool + // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). + // From the pools perspective, it has no idea if it is ExeMgr doing the + // creation, so it has no idea which way to set the flag. So we set the max here. JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); - JobStep::jobstepThreadPool.setName("ExeMgr"); - if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") - { - JobStep::jobstepThreadPool.setDebug(true); - JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); - } + JobStep::jobstepThreadPool.setName("ExeMgrJobList"); +// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") +// { +// JobStep::jobstepThreadPool.setDebug(true); +// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); +// } int serverThreads = rm->getEmServerThreads(); int serverQueueSize = rm->getEmServerQueueSize(); @@ -1413,7 +1412,11 @@ int main(int argc, char* argv[]) int pauseSeconds = rm->getEmSecondsBetweenMemChecks(); int priority = rm->getEmPriority(); - if (maxPct > 0) + FEMsgHandler::threadPool.setMaxThreads(serverThreads); + FEMsgHandler::threadPool.setQueueSize(serverQueueSize); + FEMsgHandler::threadPool.setName("FEMsgHandler"); + + if (maxPct > 0) startRssMon(maxPct, pauseSeconds); #ifndef _MSC_VER @@ -1448,6 +1451,7 @@ int main(int argc, char* argv[]) } threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize); + exeMgrThreadPool.setName("ExeMgrServer"); for (;;) { IOSocket ios; diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index deaae576e..1f6145733 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -2032,6 +2032,7 @@ PrimitiveServer::PrimitiveServer(int serverThreads, fCacheCount=cacheCount; fServerpool.setMaxThreads(fServerThreads); fServerpool.setQueueSize(fServerQueueSize); + fServerpool.setName("PrimitiveServer"); fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, medPriorityThreads, lowPriorityThreads, 0)); diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 41ab57065..442d43c53 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -283,42 +283,28 @@ void ThreadPool::beginThread() throw() else { // Wait no more than 10 minutes - if (fNeedThread.timed_wait(lock1, timeout) == boost::cv_status::timeout) + if (!fNeedThread.timed_wait(lock1, timeout)) // false means it timed out { if (fThreadCount > fMaxThreads) { --fThreadCount; return; } + timeout = boost::get_system_time()+boost::posix_time::minutes(10); } } } else { - /* Need to tune these magic #s */ - vector todoList; - int i, num; - Container_T::const_iterator iter; - - /* Use num to control how many jobs are issued to a single thread - should you want to batch more than one */ - num = (waitingFunctorsSize - fIssued >= 1 ? 1 : 0); - - for (i = 0; i < num; i++) - todoList.push_back(fNextFunctor++); - - fIssued += num; -// cerr << "got " << num << " jobs." << endl; -// cerr << "got " << num << " jobs. waitingFunctorsSize=" << -// waitingFunctorsSize << " fIssued=" << fIssued << " fThreadCount=" << -// fThreadCount << endl; - lock1.unlock(); - - for (i = 0; i < num; i++) + // If there's anything waiting, run it + if (waitingFunctorsSize - fIssued > 0) { + Container_T::iterator todo = fNextFunctor++; + ++fIssued; + lock1.unlock(); try { - (*todoList[i]).functor(); + todo->functor(); } catch (exception &e) { @@ -334,18 +320,12 @@ void ThreadPool::beginThread() throw() ml.logErrorMessage( message ); #endif } + lock1.lock(); + --fIssued; + --waitingFunctorsSize; + fWaitingFunctors.erase(todo); } - lock1.lock(); - fIssued -= num; - waitingFunctorsSize -= num; - for (i = 0; i < num; i++) - fWaitingFunctors.erase(todoList[i]); -/* - if (waitingFunctorsSize != fWaitingFunctors.size()) - cerr << "size mismatch! fake size=" << waitingFunctorsSize << - " real size=" << fWaitingFunctors.size() << endl; -*/ timeout = boost::get_system_time()+boost::posix_time::minutes(10); fThreadAvailable.notify_all(); } diff --git a/utils/threadpool/threadpool.vpj b/utils/threadpool/threadpool.vpj index e9270a137..63c370751 100644 --- a/utils/threadpool/threadpool.vpj +++ b/utils/threadpool/threadpool.vpj @@ -1,222 +1,224 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + Version="10.0" + VendorName="SlickEdit" + TemplateName="GNU C/C++" + WorkingDir="."> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +