diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index 286c733bf..6e9607209 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -74,7 +74,7 @@ JobList::JobList(bool isEM) : JobList::~JobList() { vector joiners; - boost::thread *tmp; +// boost::thread *tmp; try { if (fIsRunning) diff --git a/dbcon/joblist/jobstep.cpp b/dbcon/joblist/jobstep.cpp index 825512219..94229871a 100644 --- a/dbcon/joblist/jobstep.cpp +++ b/dbcon/joblist/jobstep.cpp @@ -56,7 +56,7 @@ namespace joblist { boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER; -ThreadPool JobStep::jobstepThreadPool(100,200); +ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0); ostream& operator<<(ostream& os, const JobStep* rhs) { diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 8d31ba1c3..42ee6e569 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -60,7 +60,8 @@ namespace joblist const uint64_t defaultHUATotalMem = 8 * 1024 * 1024 * 1024ULL; const uint32_t defaultTupleDLMaxSize = 64 * 1024; - const uint32_t defaultTupleMaxBuckets = 256; + + const uint32_t defaultJLThreadPoolSize = 100; //pcolscan.cpp const uint32_t defaultScanLbidReqLimit = 10000; @@ -160,7 +161,7 @@ namespace joblist unsigned getHjNumThreads() const { return fHjNumThreads; } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); } uint64_t getHjMaxElems() const { return getUintVal(fHashJoinStr, "MaxElems", defaultHJMaxElems); } uint32_t getHjFifoSizeLargeSide() const { return getUintVal(fHashJoinStr, "FifoSizeLargeSide", defaultHJFifoSizeLargeSide); } - uint32_t getHjCPUniqueLimit() const { return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit); } + uint32_t getHjCPUniqueLimit() const { return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit); } uint64_t getPMJoinMemLimit() const { return pmJoinMemLimit; } uint32_t getJLFlushInterval() const { return getUintVal(fJobListStr, "FlushInterval", defaultFlushInterval); } @@ -168,6 +169,10 @@ namespace joblist uint32_t getJlScanLbidReqLimit() const { return getUintVal(fJobListStr, "ScanLbidReqLimit",defaultScanLbidReqLimit); } uint32_t getJlScanLbidReqThreshold() const { return getUintVal(fJobListStr,"ScanLbidReqThreshold", defaultScanLbidReqThreshold); } + // @MCOL-513 - Added threadpool to JobSteps + uint32_t getJLThreadPoolSize() const { return getUintVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize); } + std::string getJlThreadPoolDebug() const { return getStringVal(fJobListStr, "ThreadPoolDebug", "N"); } + // @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request. uint32_t getJlLogicalBlocksPerScan() const { return getUintVal(fJobListStr,"LogicalBlocksPerScan", defaultLogicalBlocksPerScan); } uint32_t getJlProjectBlockReqLimit() const { return getUintVal(fJobListStr, "ProjectBlockReqLimit", defaultProjectBlockReqLimit ); } @@ -180,9 +185,9 @@ namespace joblist uint32_t getJlMaxOutstandingRequests() const { return getUintVal(fJobListStr,"MaxOutstandingRequests", defaultMaxOutstandingRequests);} uint32_t getJlJoinerChunkSize() const { return getUintVal(fJobListStr,"JoinerChunkSize", defaultJoinerChunkSize);} - int getPsCount() const { return getUintVal(fPrimitiveServersStr, "Count", defaultPSCount ); } + int getPsCount() const { return getUintVal(fPrimitiveServersStr, "Count", defaultPSCount ); } int getPsConnectionsPerPrimProc() const { return getUintVal(fPrimitiveServersStr, "ConnectionsPerPrimProc", defaultConnectionsPerPrimProc); } - uint32_t getPsLBID_Shift() const { return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift ); } + uint32_t getPsLBID_Shift() const { return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift ); } std::string getScTempDiskPath() const { return getStringVal(fSystemConfigStr, "TempDiskPath", defaultTempDiskPath ); } uint64_t getScTempSaveSize() const { return getUintVal(fSystemConfigStr, "TempSaveSize", defaultTempSaveSize); } diff --git a/exemgr/main.cpp b/exemgr/main.cpp index b2862e875..0d97394a7 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1393,6 +1393,20 @@ int main(int argc, char* argv[]) } } + // It's possible that PM modules use this threadpool. Only ExeMgr creates + // massive amounts of threads and needs to be settable. It's also possible that + // other process on this UM module use this threadpool. In this case, they share. + // We can't call rm functions during the static creation because rm has a isExeMgr + // flag that is set upon first creation. For the pool, who has no idea if it is ExeMgr, + // to create the singleton rm would be wrong, no matter which way we set the flag. + JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); + JobStep::jobstepThreadPool.setName("ExeMgr"); + if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") + { + JobStep::jobstepThreadPool.setDebug(true); + JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); + } + int serverThreads = rm->getEmServerThreads(); int serverQueueSize = rm->getEmServerQueueSize(); int maxPct = rm->getEmMaxPct(); @@ -1433,13 +1447,6 @@ int main(int argc, char* argv[]) } } -// if (!JobStep::jobstepThreadPool.debug()) -// { -// JobStep::jobstepThreadPool.setName("ExeMgr"); -// JobStep::jobstepThreadPool.setDebug(true); -// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); -// } - threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize); for (;;) { diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index f9ecbc116..1c68ca5cf 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -503,6 +503,7 @@ as many threads are available across all PMs. --> 20 + 100 1M @@ -515,9 +516,9 @@ - unassigned + 127.0.0.1 3306 - unassigned + root diff --git a/oam/etc/Columnstore.xml.singleserver b/oam/etc/Columnstore.xml.singleserver index 5324cb892..ff466c398 100644 --- a/oam/etc/Columnstore.xml.singleserver +++ b/oam/etc/Columnstore.xml.singleserver @@ -495,8 +495,9 @@ is 20 extents worth of work for the PMs to process at any given time. ProcessorThreadsPerScan * MaxOutstandingRequests should be at least as many threads are available across all PMs. --> - - 20 + + 20 + 100 1M @@ -509,9 +510,9 @@ - unassigned + 127.0.0.1 3306 - unassigned + root diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index d180dc4cb..41ab57065 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -30,35 +30,34 @@ using namespace logging; #include "threadpool.h" #include #include +#include "boost/date_time/posix_time/posix_time_types.hpp" + namespace threadpool { ThreadPool::ThreadPool() - :fMaxThreads( 0 ), fQueueSize( 0 ) +:fMaxThreads( 0 ), fQueueSize( 0 ) { init(); } ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize ) - :fMaxThreads( maxThreads ), fQueueSize( queueSize ) + :fMaxThreads( maxThreads ), fQueueSize( queueSize ) { init(); - - if (fQueueSize == 0) - fQueueSize = fMaxThreads*2; } ThreadPool::~ThreadPool() throw() { -// delete fThreadCreated; try { stop(); } - catch(...) - {} + catch (...) + { + } } void ThreadPool::init() @@ -66,13 +65,12 @@ void ThreadPool::init() fThreadCount = 0; fGeneralErrors = 0; fFunctorErrors = 0; - waitingFunctorsSize = 0; - issued = 0; - fDebug = false; + waitingFunctorsSize = 0; + fIssued = 0; + fDebug = false; fStop = false; -// fThreadCreated = new NoOp(); fNextFunctor = fWaitingFunctors.end(); - fNextHandle=1; + fNextHandle=1; } void ThreadPool::setQueueSize(size_t queueSize) @@ -88,11 +86,6 @@ void ThreadPool::setMaxThreads(size_t maxThreads) fMaxThreads = maxThreads; } -void ThreadPool::setThreadCreatedListener(const Functor_T &f) -{ -// fThreadCreated = f; -} - void ThreadPool::stop() { boost::mutex::scoped_lock lock1(fMutex); @@ -111,7 +104,7 @@ void ThreadPool::wait() while (waitingFunctorsSize > 0) { fThreadAvailable.wait(lock1); - //cerr << "woke!" << endl; + //cerr << "woke!" << endl; } } @@ -121,22 +114,22 @@ void ThreadPool::join(uint64_t thrHandle) while (waitingFunctorsSize > 0) { - Container_T::iterator iter; - Container_T::iterator end = fWaitingFunctors.end(); - bool foundit = false; - for (iter = fWaitingFunctors.begin(); iter != end; ++iter) - { - foundit = false; - if (iter->hndl == thrHandle) - { - foundit = true; - break; - } - } - if (!foundit) - { - break; - } + Container_T::iterator iter; + Container_T::iterator end = fWaitingFunctors.end(); + bool foundit = false; + for (iter = fWaitingFunctors.begin(); iter != end; ++iter) + { + foundit = false; + if (iter->hndl == thrHandle) + { + foundit = true; + break; + } + } + if (!foundit) + { + break; + } fThreadAvailable.wait(lock1); } } @@ -147,32 +140,32 @@ void ThreadPool::join(std::vector thrHandle) while (waitingFunctorsSize > 0) { - Container_T::iterator iter; - Container_T::iterator end = fWaitingFunctors.end(); - bool foundit = false; - for (iter = fWaitingFunctors.begin(); iter != end; ++iter) - { - foundit = false; - std::vector::iterator thrIter; - std::vector::iterator thrEnd = thrHandle.end(); - for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter) - { - if (iter->hndl == *thrIter) - { - foundit = true; - break; - } - } - if (foundit == true) - { - break; - } - } - // If we didn't find any of the handles, then all are complete - if (!foundit) - { - break; - } + Container_T::iterator iter; + Container_T::iterator end = fWaitingFunctors.end(); + bool foundit = false; + for (iter = fWaitingFunctors.begin(); iter != end; ++iter) + { + foundit = false; + std::vector::iterator thrIter; + std::vector::iterator thrEnd = thrHandle.end(); + for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter) + { + if (iter->hndl == *thrIter) + { + foundit = true; + break; + } + } + if (foundit == true) + { + break; + } + } + // If we didn't find any of the handles, then all are complete + if (!foundit) + { + break; + } fThreadAvailable.wait(lock1); } } @@ -180,13 +173,12 @@ void ThreadPool::join(std::vector thrHandle) uint64_t ThreadPool::invoke(const Functor_T &threadfunc) { boost::mutex::scoped_lock lock1(fMutex); - uint64_t thrHandle=0; - for(;;) + uint64_t thrHandle=0; + for (;;) { - try { - if ( waitingFunctorsSize < fThreadCount) + if (waitingFunctorsSize < fThreadCount) { // Don't create a thread unless it's needed. There // is a thread available to service this request. @@ -197,33 +189,34 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc) bool bAdded = false; - if ( waitingFunctorsSize < fQueueSize) + if (waitingFunctorsSize < fQueueSize || fQueueSize == 0) { // Don't create a thread unless you have to thrHandle = addFunctor(threadfunc); bAdded = true; } - if ( fThreadCount < fMaxThreads) + // fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run. + if (fThreadCount < fMaxThreads || fQueueSize == 0) { ++fThreadCount; lock1.unlock(); fThreads.create_thread(beginThreadFunc(*this)); - - if (fDebug) - { - ostringstream oss; - oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads - << " queue " << fQueueSize; - logging::Message::Args args; - logging::Message message(0); - args.add(oss.str()); - message.format( args ); - logging::LoggingID lid(22); - logging::MessageLog ml(lid); - ml.logWarningMessage( message ); - } + + if (fDebug) + { + ostringstream oss; + oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads + << " queue " << fQueueSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } if (bAdded) break; @@ -241,22 +234,22 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc) break; } - if (fDebug) - { - logging::Message::Args args; - logging::Message message(5); - args.add("invoke: Blocked waiting for thread. Count "); - args.add(fThreadCount); - args.add("max "); - args.add(fMaxThreads); - message.format( args ); - logging::LoggingID lid(22); - logging::MessageLog ml(lid); - ml.logWarningMessage( message ); - } - fThreadAvailable.wait(lock1); + if (fDebug) + { + logging::Message::Args args; + logging::Message message(5); + args.add("invoke: Blocked waiting for thread. Count "); + args.add(fThreadCount); + args.add("max "); + args.add(fMaxThreads); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } + fThreadAvailable.wait(lock1); } - catch(...) + catch (...) { ++fGeneralErrors; throw; @@ -264,18 +257,16 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc) } fNeedThread.notify_one(); - return thrHandle; + return thrHandle; } void ThreadPool::beginThread() throw() { try { -// fThreadCreated(); - boost::mutex::scoped_lock lock1(fMutex); - - for(;;) + boost::system_time timeout = boost::get_system_time()+boost::posix_time::minutes(10); + for (;;) { if (fStop) break; @@ -283,51 +274,80 @@ void ThreadPool::beginThread() throw() if (fNextFunctor == fWaitingFunctors.end()) { // Wait until someone needs a thread - fNeedThread.wait(lock1); + // Add the timed waait for queueSize == 0 so we can idle away threads + // over fMaxThreads + if (fQueueSize > 0) + { + fNeedThread.wait(lock1); + } + else + { + // Wait no more than 10 minutes + if (fNeedThread.timed_wait(lock1, timeout) == boost::cv_status::timeout) + { + if (fThreadCount > fMaxThreads) + { + --fThreadCount; + return; + } + } + } } else { - /* Need to tune these magic #s */ + /* Need to tune these magic #s */ + vector todoList; + int i, num; + Container_T::const_iterator iter; - vector todoList; - int i, num; - Container_T::const_iterator iter; + /* Use num to control how many jobs are issued to a single thread + should you want to batch more than one */ + num = (waitingFunctorsSize - fIssued >= 1 ? 1 : 0); - /* Use this to control how many jobs are issued to a single thread */ - num = (waitingFunctorsSize - issued >= 1 ? 1 : 0); + for (i = 0; i < num; i++) + todoList.push_back(fNextFunctor++); - for (i = 0; i < num; i++) - todoList.push_back(fNextFunctor++); - - issued += num; + fIssued += num; // cerr << "got " << num << " jobs." << endl; // cerr << "got " << num << " jobs. waitingFunctorsSize=" << -// waitingFunctorsSize << " issued=" << issued << " fThreadCount=" << +// waitingFunctorsSize << " fIssued=" << fIssued << " fThreadCount=" << // fThreadCount << endl; lock1.unlock(); - for (i = 0; i < num; i++) { - try { - (*todoList[i]).functor(); - } - catch(exception &e) { - ++fFunctorErrors; - cerr << e.what() << endl; - } - } - lock1.lock(); + for (i = 0; i < num; i++) + { + try + { + (*todoList[i]).functor(); + } + catch (exception &e) + { + ++fFunctorErrors; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("ThreadPool: Caught exception during execution: "); + args.add(e.what()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logErrorMessage( message ); +#endif + } + } + lock1.lock(); - issued -= num; - waitingFunctorsSize -= num; - for (i = 0; i < num; i++) - fWaitingFunctors.erase(todoList[i]); + fIssued -= num; + waitingFunctorsSize -= num; + for (i = 0; i < num; i++) + fWaitingFunctors.erase(todoList[i]); /* - if (waitingFunctorsSize != fWaitingFunctors.size()) - cerr << "size mismatch! fake size=" << waitingFunctorsSize << - " real size=" << fWaitingFunctors.size() << endl; + if (waitingFunctorsSize != fWaitingFunctors.size()) + cerr << "size mismatch! fake size=" << waitingFunctorsSize << + " real size=" << fWaitingFunctors.size() << endl; */ + timeout = boost::get_system_time()+boost::posix_time::minutes(10); fThreadAvailable.notify_all(); - } } } @@ -353,12 +373,12 @@ void ThreadPool::beginThread() throw() ml.logErrorMessage( message ); #endif } - catch(...) + catch (...) { } } - catch(...) + catch (...) { ++fGeneralErrors; @@ -379,7 +399,7 @@ void ThreadPool::beginThread() throw() ml.logErrorMessage( message ); #endif } - catch(...) + catch (...) { } } @@ -393,16 +413,16 @@ uint64_t ThreadPool::addFunctor(const Functor_T &func) bAtEnd = true; // PoolFunction_T poolFunction(fNextHandle, func); - PoolFunction_T poolFunction; - poolFunction.hndl = fNextHandle; - poolFunction.functor = func; - fWaitingFunctors.push_back(poolFunction); - waitingFunctorsSize++; + PoolFunction_T poolFunction; + poolFunction.hndl = fNextHandle; + poolFunction.functor = func; + fWaitingFunctors.push_back(poolFunction); + waitingFunctorsSize++; if (bAtEnd) { --fNextFunctor; } - return fNextHandle++; + return fNextHandle++; } void ThreadPool::dump() @@ -415,47 +435,48 @@ void ThreadPool::dump() void ThreadPoolMonitor::operator()() { - ostringstream filename; - filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log"; - fLog = new ofstream(filename.str().c_str()); - for (;;) - { - if (!fLog || !fLog->is_open()) - { - ostringstream oss; - oss << "ThreadPoolMonitor " << fPool->name() << " has no file "; - logging::Message::Args args; - logging::Message message(0); - args.add(oss.str()); - message.format( args ); - logging::LoggingID lid(22); - logging::MessageLog ml(lid); - ml.logWarningMessage( message ); - return; - } - // Get a timestamp for output. - struct tm tm; - struct timeval tv; + ostringstream filename; + filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log"; + fLog = new ofstream(filename.str().c_str()); + for (;;) + { + if (!fLog || !fLog->is_open()) + { + ostringstream oss; + oss << "ThreadPoolMonitor " << fPool->name() << " has no file "; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + return; + } + // Get a timestamp for output. + struct tm tm; + struct timeval tv; - gettimeofday(&tv, 0); - localtime_r(&tv.tv_sec, &tm); + gettimeofday(&tv, 0); + localtime_r(&tv.tv_sec, &tm); - (*fLog) << setfill('0') - << setw(2) << tm.tm_hour << ':' - << setw(2) << tm.tm_min << ':' - << setw(2) << tm.tm_sec - << '.' - << setw(4) << tv.tv_usec/100 - << " Name " << fPool->fName - << " Active " << fPool->waitingFunctorsSize - << " Most " << fPool->fThreadCount - << " Max " << fPool->fMaxThreads - << " Q " << fPool->fQueueSize - << endl; + (*fLog) << setfill('0') + << setw(2) << tm.tm_hour << ':' + << setw(2) << tm.tm_min << ':' + << setw(2) << tm.tm_sec + << '.' + << setw(4) << tv.tv_usec/100 + << " Name " << fPool->fName + << " Active " << fPool->waitingFunctorsSize + << " Most " << fPool->fThreadCount + << " Max " << fPool->fMaxThreads + << " Q " << fPool->fQueueSize + << endl; // struct timespec req = { 0, 1000 * 100 }; //100 usec // nanosleep(&req, 0); - sleep(2); - } + sleep(2); + } } + } // namespace threadpool diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index 22c138e0b..a000f060b 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -74,7 +74,10 @@ public: * @param maxThreads the maximum number of threads in this pool. This is the maximum number * of simultaneuous operations that can go on. * @param queueSize the maximum number of work tasks in the queue. This is the maximum - * number of jobs that can queue up in the work list before invoke() blocks. + * number of jobs that can queue up in the work list before invoke() blocks. + * If 0, then threads never block and total threads may + * exceed maxThreads. Nothing waits. Thread count will + * idle down to maxThreads when less work is required. */ EXPORT explicit ThreadPool( size_t maxThreads, size_t queueSize ); @@ -108,11 +111,6 @@ public: */ inline size_t getMaxThreads() const { return fMaxThreads; } - /** @brief register a functor to be called when a new thread - * is created - */ - EXPORT void setThreadCreatedListener(const Functor_T &f) ; - /** @brief queue size accessor * */ @@ -218,9 +216,8 @@ private: typedef std::list Container_T; Container_T fWaitingFunctors; Container_T::iterator fNextFunctor; -// Functor_T * fThreadCreated; - uint32_t issued; + uint32_t fIssued; boost::mutex fMutex; boost::condition fThreadAvailable; // triggered when a thread is available boost::condition fNeedThread; // triggered when a thread is needed