From 9749d825b509750911b136f69f2c25b4b4597121 Mon Sep 17 00:00:00 2001 From: David Hall Date: Tue, 16 Jan 2018 13:26:44 -0600 Subject: [PATCH] MCOL-1165 use the threadpool's idle down feature --- dbcon/joblist/jobstep.cpp | 2 +- dbcon/joblist/resourcemanager.h | 16 +++++++++++----- dmlproc/dmlproc.cpp | 12 ++++++------ exemgr/main.cpp | 27 ++++++++++++++++++--------- utils/threadpool/threadpool.cpp | 2 +- 5 files changed, 37 insertions(+), 22 deletions(-) diff --git a/dbcon/joblist/jobstep.cpp b/dbcon/joblist/jobstep.cpp index b24a0c0ea..94229871a 100644 --- a/dbcon/joblist/jobstep.cpp +++ b/dbcon/joblist/jobstep.cpp @@ -56,7 +56,7 @@ namespace joblist { boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER; -ThreadPool JobStep::jobstepThreadPool(0, 0); +ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0); ostream& operator<<(ostream& os, const JobStep* rhs) { diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index ffc2de639..73de7ce75 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -93,6 +93,7 @@ namespace joblist const uint64_t defaultNumBuckets = 128; const uint64_t defaultMaxElementsPerBuckert = 16 * 1024 * 1024; + const int defaultEMServerThreads = 50; const int defaultEMSecondsBetweenMemChecks = 1; const int defaultEMMaxPct = 95; const int defaultEMPriority = 21; // @Bug 3385 @@ -147,10 +148,14 @@ namespace joblist typedef std::map MemMap; - int getEmSecondsBetweenMemChecks() const { return getUintVal(fExeMgrStr, "SecondsBetweenMemChecks", defaultEMSecondsBetweenMemChecks); } - int getEmMaxPct() const { return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct); } - EXPORT int getEmPriority() const; - int getEmExecQueueSize() const { return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); } + // @MCOL-513 - Added threadpool to ExeMgr + int getEmServerThreads() const { return getIntVal(fExeMgrStr, "ThreadPoolSize", defaultEMServerThreads); } + std::string getExeMgrThreadPoolDebug() const { return getStringVal(fExeMgrStr, "ThreadPoolDebug", "N"); } + + int getEmSecondsBetweenMemChecks() const { return getUintVal(fExeMgrStr, "SecondsBetweenMemChecks", defaultEMSecondsBetweenMemChecks); } + int getEmMaxPct() const { return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct); } + EXPORT int getEmPriority() const; + int getEmExecQueueSize() const { return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); } int getHjMaxBuckets() const { return getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets); } unsigned getHjNumThreads() const { return fHjNumThreads; } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); } @@ -165,8 +170,9 @@ namespace joblist uint32_t getJlScanLbidReqThreshold() const { return getUintVal(fJobListStr,"ScanLbidReqThreshold", defaultScanLbidReqThreshold); } // @MCOL-513 - Added threadpool to JobSteps - uint32_t getJLThreadPoolSize() const { return getUintVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize); } + int getJLThreadPoolSize() const { return getIntVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize); } std::string getJlThreadPoolDebug() const { return getStringVal(fJobListStr, "ThreadPoolDebug", "N"); } + std::string getDMLJlThreadPoolDebug() const { return getStringVal(fJobListStr, "DMLThreadPoolDebug", "N"); } // @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request. uint32_t getJlLogicalBlocksPerScan() const { return getUintVal(fJobListStr,"LogicalBlocksPerScan", defaultLogicalBlocksPerScan); } diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index c8ad57b62..0db77f23a 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -565,14 +565,14 @@ int main(int argc, char* argv[]) // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). // From the pools perspective, it has no idea if it is ExeMgr doing the // creation, so it has no idea which way to set the flag. So we set the max here. -// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); + JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); JobStep::jobstepThreadPool.setName("DMLProcJobList"); -// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") -// { -// JobStep::jobstepThreadPool.setDebug(true); -// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); -// } + if (rm->getDMLJlThreadPoolDebug() == "Y" || rm->getDMLJlThreadPoolDebug() == "y") + { + JobStep::jobstepThreadPool.setDebug(true); + JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); + } //set ACTIVE state try diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 6967b2ca9..5abed0144 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1436,18 +1436,20 @@ int main(int argc, char* argv[]) // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). // From the pools perspective, it has no idea if it is ExeMgr doing the // creation, so it has no idea which way to set the flag. So we set the max here. -// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); + JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); JobStep::jobstepThreadPool.setName("ExeMgrJobList"); -// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") -// { -// JobStep::jobstepThreadPool.setDebug(true); -// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); -// } + if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") + { + JobStep::jobstepThreadPool.setDebug(true); + JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); + } + int serverThreads = rm->getEmServerThreads(); int maxPct = rm->getEmMaxPct(); int pauseSeconds = rm->getEmSecondsBetweenMemChecks(); int priority = rm->getEmPriority(); + FEMsgHandler::threadPool.setMaxThreads(serverThreads); FEMsgHandler::threadPool.setName("FEMsgHandler"); if (maxPct > 0) @@ -1468,7 +1470,8 @@ int main(int argc, char* argv[]) } } - cout << "Starting ExeMgr: qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " << + cout << "Starting ExeMgr: st = " << serverThreads << + ", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " << rm->getConfig()->configFile() << endl; //set ACTIVE state @@ -1483,10 +1486,16 @@ int main(int argc, char* argv[]) } } - threadpool::ThreadPool exeMgrThreadPool; + threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0); exeMgrThreadPool.setName("ExeMgrServer"); - for (;;) + if (rm->getExeMgrThreadPoolDebug() == "Y" || rm->getExeMgrThreadPoolDebug() == "y") + { + exeMgrThreadPool.setDebug(true); + exeMgrThreadPool.invoke(ThreadPoolMonitor(&exeMgrThreadPool)); + } + + for (;;) { IOSocket ios; ios = mqs->accept(); diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 8b0c0a848..197bdedd9 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -470,7 +470,7 @@ void ThreadPoolMonitor::operator()() << setw(4) << tv.tv_usec/100 << " Name " << fPool->fName << " Active " << fPool->waitingFunctorsSize - << " Most " << fPool->fThreadCount + << " ThdCnt " << fPool->fThreadCount << " Max " << fPool->fMaxThreads << " Q " << fPool->fQueueSize << endl;