From c2344accc95ce84b9a489b0b4b33d3644346c5db Mon Sep 17 00:00:00 2001 From: David Hall Date: Thu, 9 Feb 2017 17:54:18 -0600 Subject: [PATCH] MCOL-513 clean up and test thread pool for ExeMgr --- dbcon/joblist/joblist.cpp | 5 +-- dbcon/joblist/jobstep.cpp | 2 +- dbcon/joblist/jobstep.h | 2 +- dbcon/joblist/pdictionary.cpp | 2 + dbcon/joblist/primitivestep.h | 6 +-- dbcon/joblist/tupleaggregatestep.cpp | 19 +++++--- dbcon/mysql/ha_calpont_execplan.cpp | 2 +- exemgr/main.cpp | 10 ++++- utils/threadpool/threadpool.cpp | 66 +++++++++++++++++++++++----- utils/threadpool/threadpool.h | 37 +++++++++++++++- 10 files changed, 121 insertions(+), 30 deletions(-) diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index 833c4447d..286c733bf 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -81,7 +81,7 @@ JobList::~JobList() { JobStepVector::iterator iter; JobStepVector::iterator end; - +#if 0 iter = fQuery.begin(); end = fQuery.end(); @@ -108,7 +108,7 @@ JobList::~JobList() joiners[i]->join(); delete joiners[i]; } -#if 0 +#endif // Stop all the query steps end = fQuery.end(); for (iter = fQuery.begin(); iter != end; ++iter) @@ -136,7 +136,6 @@ JobList::~JobList() { (*iter)->join(); } -#endif } } catch (exception& ex) diff --git a/dbcon/joblist/jobstep.cpp b/dbcon/joblist/jobstep.cpp index a84d665f8..825512219 100644 --- a/dbcon/joblist/jobstep.cpp +++ b/dbcon/joblist/jobstep.cpp @@ -20,6 +20,7 @@ #include using namespace std; +#include #include #include #include @@ -102,7 +103,6 @@ JobStep::JobStep(const JobInfo& j) : fQtc.serverParms(tsp); //fStepUuid = bu::random_generator()(); fStepUuid = QueryTeleClient::genUUID(); - jobstepThreadPool.setDebug(true); } //------------------------------------------------------------------------------ diff --git a/dbcon/joblist/jobstep.h b/dbcon/joblist/jobstep.h index 899a131d7..5763ce782 100644 --- a/dbcon/joblist/jobstep.h +++ b/dbcon/joblist/jobstep.h @@ -234,8 +234,8 @@ public: bool onClauseFilter() const { return fOnClauseFilter; } void onClauseFilter(bool b) { fOnClauseFilter = b; } -protected: static ThreadPool jobstepThreadPool; +protected: //@bug6088, for telemetry posting static const int64_t STEP_TELE_INTERVAL = 5000; // now, this is the browser refresh rate diff --git a/dbcon/joblist/pdictionary.cpp b/dbcon/joblist/pdictionary.cpp index 5397974c0..c85240064 100644 --- a/dbcon/joblist/pdictionary.cpp +++ b/dbcon/joblist/pdictionary.cpp @@ -105,6 +105,8 @@ pDictionaryStep::pDictionaryStep( recvWaiting(false), ridCount(0), fColType(ct), + pThread(0), + cThread(0), fFilterCount(0), requestList(0), fInterval(jobInfo.flushInterval), diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 5e1006e2c..a0f48c3c4 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -636,8 +636,8 @@ private: uint32_t recvWaiting; int64_t ridCount; execplan::CalpontSystemCatalog::ColType fColType; - boost::shared_ptr pThread; //producer thread - boost::shared_ptr cThread; //producer thread + uint64_t pThread; //producer thread + uint64_t cThread; //producer thread messageqcpp::ByteStream fFilterString; uint32_t fFilterCount; @@ -1331,7 +1331,7 @@ private: bool isDictColumn; bool isEM; - boost::thread* fPTThd; +// boost::thread* fPTThd; // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. // Running with this one will swallow rows at projection. diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index ced92856d..6b355c59b 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -4211,12 +4211,14 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) // and if there is more data to read, the // first thread will start another thread until the // maximum number is reached. +#if 0 if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads && - dlIn->more(fInputIter)) { + dlIn->more(fInputIter)) + { fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount))); fFirstPhaseThreadCount++; } - +#endif fRowGroupIns[threadID].setData(&rgData); fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings(); if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit)) @@ -4479,22 +4481,25 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp if (!fDoneAggregate) { initializeMultiThread(); -/* -// This block of code starts all threads at the start + +// This block of code starts all threads at the start fFirstPhaseThreadCount = fNumOfThreads; - boost::shared_ptr runner; + fFirstPhaseRunners.clear(); + fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use for (i = 0; i < fNumOfThreads; i++) { - fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))) + fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))); } -*/ +#if 0 // This block of code starts one thread, relies on doThreadedAggregation() +// For reasons unknown, this doesn't work right with threadpool // to start more as needed fFirstPhaseRunners.clear(); fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use fFirstPhaseThreadCount = 1; fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0))); +#endif // Now wait for that thread plus all the threads it may have spawned jobstepThreadPool.join(fFirstPhaseRunners); diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 22aa4742b..27ee355ff 100755 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -6355,7 +6355,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // select * from derived table case if (gwi.selectCols.empty()) sel_cols_in_create = " * "; - create_query = "create temporary table " + vtb.str() + " as select " + sel_cols_in_create + " from "; + create_query = "create temporary table " + vtb.str() + " engine = aria as select " + sel_cols_in_create + " from "; TABLE_LIST* table_ptr = select_lex.get_table_list(); bool firstTb = true; diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 2f6a6d431..b2862e875 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -1433,13 +1433,19 @@ int main(int argc, char* argv[]) } } +// if (!JobStep::jobstepThreadPool.debug()) +// { +// JobStep::jobstepThreadPool.setName("ExeMgr"); +// JobStep::jobstepThreadPool.setDebug(true); +// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool)); +// } + threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize); for (;;) { IOSocket ios; ios = mqs->accept(); - boost::thread thd(SessionThread(ios, ec, rm)); - //exeMgrThreadPool.invoke(SessionThread(ios, ec, rm)); + exeMgrThreadPool.invoke(SessionThread(ios, ec, rm)); } exeMgrThreadPool.wait(); diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 644e9d098..d180dc4cb 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -27,10 +27,9 @@ using namespace std; #include "messagelog.h" using namespace logging; -#define THREADPOOL_DLLEXPORT #include "threadpool.h" -#undef THREADPOOL_DLLEXPORT - +#include +#include namespace threadpool { @@ -69,6 +68,7 @@ void ThreadPool::init() fFunctorErrors = 0; waitingFunctorsSize = 0; issued = 0; + fDebug = false; fStop = false; // fThreadCreated = new NoOp(); fNextFunctor = fWaitingFunctors.end(); @@ -213,14 +213,12 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc) if (fDebug) { + ostringstream oss; + oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads + << " queue " << fQueueSize; logging::Message::Args args; - logging::Message message(5); - args.add("invoke: Starting thread "); - args.add(fThreadCount); - args.add(" max "); - args.add(fMaxThreads); - args.add(" queue "); - args.add(fQueueSize); + logging::Message message(0); + args.add(oss.str()); message.format( args ); logging::LoggingID lid(22); logging::MessageLog ml(lid); @@ -255,8 +253,8 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc) logging::LoggingID lid(22); logging::MessageLog ml(lid); ml.logWarningMessage( message ); - fThreadAvailable.wait(lock1); } + fThreadAvailable.wait(lock1); } catch(...) { @@ -414,4 +412,50 @@ void ThreadPool::dump() std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl; } + +void ThreadPoolMonitor::operator()() +{ + ostringstream filename; + filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log"; + fLog = new ofstream(filename.str().c_str()); + for (;;) + { + if (!fLog || !fLog->is_open()) + { + ostringstream oss; + oss << "ThreadPoolMonitor " << fPool->name() << " has no file "; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + return; + } + // Get a timestamp for output. + struct tm tm; + struct timeval tv; + + gettimeofday(&tv, 0); + localtime_r(&tv.tv_sec, &tm); + + (*fLog) << setfill('0') + << setw(2) << tm.tm_hour << ':' + << setw(2) << tm.tm_min << ':' + << setw(2) << tm.tm_sec + << '.' + << setw(4) << tv.tv_usec/100 + << " Name " << fPool->fName + << " Active " << fPool->waitingFunctorsSize + << " Most " << fPool->fThreadCount + << " Max " << fPool->fMaxThreads + << " Q " << fPool->fQueueSize + << endl; + +// struct timespec req = { 0, 1000 * 100 }; //100 usec +// nanosleep(&req, 0); + sleep(2); + } +} } // namespace threadpool diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index 7616090fe..22c138e0b 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -31,7 +31,7 @@ #define THREADPOOL_H #include -#include +#include #include #include #include @@ -153,8 +153,16 @@ public: */ EXPORT void dump(); + EXPORT std::string& name() {return fName;} + + EXPORT void setName(std::string name) {fName = name;} + EXPORT void setName(const char* name) {fName = name;} + + EXPORT bool debug() {return fDebug;} + EXPORT void setDebug(bool d) {fDebug = d;} + friend class ThreadPoolMonitor; protected: private: @@ -224,9 +232,36 @@ private: uint32_t waitingFunctorsSize; uint64_t fNextHandle; + std::string fName; // Optional to add a name to the pool for debugging. bool fDebug; }; +// This class, if instantiated, will continuously log details about the indicated threadpool +// The log will end up in /var/log/mariadb/columnstore/trace/threadpool_.log +class ThreadPoolMonitor +{ +public: + ThreadPoolMonitor(ThreadPool* pool) : fPool(pool), fLog(NULL) + { + } + + ~ThreadPoolMonitor() + { + if (fLog) + { + delete fLog; + } + } + + void operator()(); +private: + //defaults okay + //ThreadPoolMonitor(const ThreadPoolMonitor& rhs); + //ThreadPoolMonitor& operator=(const ThreadPoolMonitor& rhs); + ThreadPool* fPool; + std::ofstream* fLog; +}; + } // namespace threadpool #undef EXPORT