diff --git a/dbcon/joblist/jobstep.cpp b/dbcon/joblist/jobstep.cpp index 94229871a..b24a0c0ea 100644 --- a/dbcon/joblist/jobstep.cpp +++ b/dbcon/joblist/jobstep.cpp @@ -56,7 +56,7 @@ namespace joblist { boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER; -ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0); +ThreadPool JobStep::jobstepThreadPool(0, 0); ostream& operator<<(ostream& os, const JobStep* rhs) { diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index 42ee6e569..ffc2de639 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -93,8 +93,6 @@ namespace joblist const uint64_t defaultNumBuckets = 128; const uint64_t defaultMaxElementsPerBuckert = 16 * 1024 * 1024; - const int defaultEMServerThreads = 50; - const int defaultEMServerQueueSize = 100; const int defaultEMSecondsBetweenMemChecks = 1; const int defaultEMMaxPct = 95; const int defaultEMPriority = 21; // @Bug 3385 @@ -149,13 +147,10 @@ namespace joblist typedef std::map MemMap; - - int getEmServerThreads() const { return getUintVal(fExeMgrStr, "ServerThreads", defaultEMServerThreads); } - int getEmServerQueueSize() const { return getUintVal(fExeMgrStr, "ServerQueueSize", defaultEMServerQueueSize); } int getEmSecondsBetweenMemChecks() const { return getUintVal(fExeMgrStr, "SecondsBetweenMemChecks", defaultEMSecondsBetweenMemChecks); } int getEmMaxPct() const { return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct); } EXPORT int getEmPriority() const; - int getEmExecQueueSize() const { return getUintVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); } + int getEmExecQueueSize() const { return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); } int getHjMaxBuckets() const { return getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets); } unsigned getHjNumThreads() const { return fHjNumThreads; } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); } diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index d4669c8d5..3b80a25bd 100755 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -3466,7 +3466,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0)) ci->useCpimport = 0; - if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))) //If autocommit on batch insert will use cpimport to load data + if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))) //If autocommit on batch insert will use cpimport to load data { //store table info to connection info CalpontSystemCatalog::TableName tableName; @@ -3923,7 +3923,7 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table) if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert ) { - //@Bug 2438. Only load dta infile calls last batch process + //@Bug 2438. Only load data infile calls last batch process /* if ( ci->isLoaddataInfile && ((thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) || (ci->useCpimport == 0))) { //@Bug 2829 Handle ctrl-C if ( thd->killed > 0 ) diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index c1575ff27..c8ad57b62 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -565,7 +565,7 @@ int main(int argc, char* argv[]) // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). // From the pools perspective, it has no idea if it is ExeMgr doing the // creation, so it has no idea which way to set the flag. So we set the max here. - JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); +// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); JobStep::jobstepThreadPool.setName("DMLProcJobList"); // if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") diff --git a/exemgr/femsghandler.cpp b/exemgr/femsghandler.cpp index f1b36204e..b306e00ac 100644 --- a/exemgr/femsghandler.cpp +++ b/exemgr/femsghandler.cpp @@ -24,7 +24,7 @@ using namespace std; using namespace joblist; using namespace messageqcpp; -threadpool::ThreadPool FEMsgHandler::threadPool(50,100); +threadpool::ThreadPool FEMsgHandler::threadPool; namespace { diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 0b63388d3..690511198 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1436,7 +1436,7 @@ int main(int argc, char* argv[]) // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton). // From the pools perspective, it has no idea if it is ExeMgr doing the // creation, so it has no idea which way to set the flag. So we set the max here. - JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); +// JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize()); JobStep::jobstepThreadPool.setName("ExeMgrJobList"); // if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y") // { @@ -1444,14 +1444,10 @@ int main(int argc, char* argv[]) // JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); // } - int serverThreads = rm->getEmServerThreads(); - int serverQueueSize = rm->getEmServerQueueSize(); int maxPct = rm->getEmMaxPct(); int pauseSeconds = rm->getEmSecondsBetweenMemChecks(); int priority = rm->getEmPriority(); - FEMsgHandler::threadPool.setMaxThreads(serverThreads); - FEMsgHandler::threadPool.setQueueSize(serverQueueSize); FEMsgHandler::threadPool.setName("FEMsgHandler"); if (maxPct > 0) @@ -1472,8 +1468,7 @@ int main(int argc, char* argv[]) } } - cout << "Starting ExeMgr: st = " << serverThreads << ", sq = " << - serverQueueSize << ", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " << + cout << "Starting ExeMgr: qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " << rm->getConfig()->configFile() << endl; //set ACTIVE state @@ -1488,8 +1483,9 @@ int main(int argc, char* argv[]) } } - threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize); + threadpool::ThreadPool exeMgrThreadPool; exeMgrThreadPool.setName("ExeMgrServer"); + exeMgrThreadPool.setDebug(true); for (;;) { IOSocket ios; diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 1281e0b5f..8b0c0a848 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -196,6 +196,25 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc) bAdded = true; } + if (fDebug) + { + ostringstream oss; + oss << "invoke thread " << " on " << fName + << " max " << fMaxThreads + << " queue " << fQueueSize + << " threads " << fThreadCount + << " running " << fIssued + << " waiting " << (waitingFunctorsSize - fIssued) + << " total " << waitingFunctorsSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } + // fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run. if (fThreadCount < fMaxThreads || fQueueSize == 0) { @@ -204,20 +223,6 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc) lock1.unlock(); fThreads.create_thread(beginThreadFunc(*this)); - if (fDebug) - { - ostringstream oss; - oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads - << " queue " << fQueueSize; - logging::Message::Args args; - logging::Message message(0); - args.add(oss.str()); - message.format( args ); - logging::LoggingID lid(22); - logging::MessageLog ml(lid); - ml.logWarningMessage( message ); - } - if (bAdded) break; @@ -301,6 +306,24 @@ void ThreadPool::beginThread() throw() { Container_T::iterator todo = fNextFunctor++; ++fIssued; + if (fDebug) + { + ostringstream oss; + oss << "starting thread " << " on " << fName + << " max " << fMaxThreads + << " queue " << fQueueSize + << " threads " << fThreadCount + << " running " << fIssued + << " waiting " << (waitingFunctorsSize - fIssued) + << " total " << waitingFunctorsSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } lock1.unlock(); try {