1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1165 use the threadpool's idle down feature

This commit is contained in:
David Hall
2018-01-16 13:26:44 -06:00
parent 4f1684c609
commit 9749d825b5
5 changed files with 37 additions and 22 deletions

View File

@ -56,7 +56,7 @@ namespace joblist
{
boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER;
ThreadPool JobStep::jobstepThreadPool(0, 0);
ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0);
ostream& operator<<(ostream& os, const JobStep* rhs)
{

View File

@ -93,6 +93,7 @@ namespace joblist
const uint64_t defaultNumBuckets = 128;
const uint64_t defaultMaxElementsPerBuckert = 16 * 1024 * 1024;
const int defaultEMServerThreads = 50;
const int defaultEMSecondsBetweenMemChecks = 1;
const int defaultEMMaxPct = 95;
const int defaultEMPriority = 21; // @Bug 3385
@ -147,6 +148,10 @@ namespace joblist
typedef std::map <uint32_t, uint64_t> MemMap;
// @MCOL-513 - Added threadpool to ExeMgr
int getEmServerThreads() const { return getIntVal(fExeMgrStr, "ThreadPoolSize", defaultEMServerThreads); }
std::string getExeMgrThreadPoolDebug() const { return getStringVal(fExeMgrStr, "ThreadPoolDebug", "N"); }
int getEmSecondsBetweenMemChecks() const { return getUintVal(fExeMgrStr, "SecondsBetweenMemChecks", defaultEMSecondsBetweenMemChecks); }
int getEmMaxPct() const { return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct); }
EXPORT int getEmPriority() const;
@ -165,8 +170,9 @@ namespace joblist
uint32_t getJlScanLbidReqThreshold() const { return getUintVal(fJobListStr,"ScanLbidReqThreshold", defaultScanLbidReqThreshold); }
// @MCOL-513 - Added threadpool to JobSteps
uint32_t getJLThreadPoolSize() const { return getUintVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize); }
int getJLThreadPoolSize() const { return getIntVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize); }
std::string getJlThreadPoolDebug() const { return getStringVal(fJobListStr, "ThreadPoolDebug", "N"); }
std::string getDMLJlThreadPoolDebug() const { return getStringVal(fJobListStr, "DMLThreadPoolDebug", "N"); }
// @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request.
uint32_t getJlLogicalBlocksPerScan() const { return getUintVal(fJobListStr,"LogicalBlocksPerScan", defaultLogicalBlocksPerScan); }

View File

@ -565,14 +565,14 @@ int main(int argc, char* argv[])
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("DMLProcJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
// {
// JobStep::jobstepThreadPool.setDebug(true);
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
if (rm->getDMLJlThreadPoolDebug() == "Y" || rm->getDMLJlThreadPoolDebug() == "y")
{
JobStep::jobstepThreadPool.setDebug(true);
JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
}
//set ACTIVE state
try

View File

@ -1436,18 +1436,20 @@ int main(int argc, char* argv[])
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("ExeMgrJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
// {
// JobStep::jobstepThreadPool.setDebug(true);
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
{
JobStep::jobstepThreadPool.setDebug(true);
JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
}
int serverThreads = rm->getEmServerThreads();
int maxPct = rm->getEmMaxPct();
int pauseSeconds = rm->getEmSecondsBetweenMemChecks();
int priority = rm->getEmPriority();
FEMsgHandler::threadPool.setMaxThreads(serverThreads);
FEMsgHandler::threadPool.setName("FEMsgHandler");
if (maxPct > 0)
@ -1468,7 +1470,8 @@ int main(int argc, char* argv[])
}
}
cout << "Starting ExeMgr: qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " <<
cout << "Starting ExeMgr: st = " << serverThreads <<
", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " <<
rm->getConfig()->configFile() << endl;
//set ACTIVE state
@ -1483,9 +1486,15 @@ int main(int argc, char* argv[])
}
}
threadpool::ThreadPool exeMgrThreadPool;
threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0);
exeMgrThreadPool.setName("ExeMgrServer");
if (rm->getExeMgrThreadPoolDebug() == "Y" || rm->getExeMgrThreadPoolDebug() == "y")
{
exeMgrThreadPool.setDebug(true);
exeMgrThreadPool.invoke(ThreadPoolMonitor(&exeMgrThreadPool));
}
for (;;)
{
IOSocket ios;

View File

@ -470,7 +470,7 @@ void ThreadPoolMonitor::operator()()
<< setw(4) << tv.tv_usec/100
<< " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize
<< " Most " << fPool->fThreadCount
<< " ThdCnt " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize
<< endl;