1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-513 clean up and test thread pool for ExeMgr

This commit is contained in:
David Hall
2017-02-09 17:54:18 -06:00
parent 29785cf202
commit c2344accc9
10 changed files with 121 additions and 30 deletions

View File

@ -81,7 +81,7 @@ JobList::~JobList()
{ {
JobStepVector::iterator iter; JobStepVector::iterator iter;
JobStepVector::iterator end; JobStepVector::iterator end;
#if 0
iter = fQuery.begin(); iter = fQuery.begin();
end = fQuery.end(); end = fQuery.end();
@ -108,7 +108,7 @@ JobList::~JobList()
joiners[i]->join(); joiners[i]->join();
delete joiners[i]; delete joiners[i];
} }
#if 0 #endif
// Stop all the query steps // Stop all the query steps
end = fQuery.end(); end = fQuery.end();
for (iter = fQuery.begin(); iter != end; ++iter) for (iter = fQuery.begin(); iter != end; ++iter)
@ -136,7 +136,6 @@ JobList::~JobList()
{ {
(*iter)->join(); (*iter)->join();
} }
#endif
} }
} }
catch (exception& ex) catch (exception& ex)

View File

@ -20,6 +20,7 @@
#include <string> #include <string>
using namespace std; using namespace std;
#include <stdlib.h>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/uuid/uuid.hpp> #include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_generators.hpp>
@ -102,7 +103,6 @@ JobStep::JobStep(const JobInfo& j) :
fQtc.serverParms(tsp); fQtc.serverParms(tsp);
//fStepUuid = bu::random_generator()(); //fStepUuid = bu::random_generator()();
fStepUuid = QueryTeleClient::genUUID(); fStepUuid = QueryTeleClient::genUUID();
jobstepThreadPool.setDebug(true);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@ -234,8 +234,8 @@ public:
bool onClauseFilter() const { return fOnClauseFilter; } bool onClauseFilter() const { return fOnClauseFilter; }
void onClauseFilter(bool b) { fOnClauseFilter = b; } void onClauseFilter(bool b) { fOnClauseFilter = b; }
protected:
static ThreadPool jobstepThreadPool; static ThreadPool jobstepThreadPool;
protected:
//@bug6088, for telemetry posting //@bug6088, for telemetry posting
static const int64_t STEP_TELE_INTERVAL = 5000; // now, this is the browser refresh rate static const int64_t STEP_TELE_INTERVAL = 5000; // now, this is the browser refresh rate

View File

@ -105,6 +105,8 @@ pDictionaryStep::pDictionaryStep(
recvWaiting(false), recvWaiting(false),
ridCount(0), ridCount(0),
fColType(ct), fColType(ct),
pThread(0),
cThread(0),
fFilterCount(0), fFilterCount(0),
requestList(0), requestList(0),
fInterval(jobInfo.flushInterval), fInterval(jobInfo.flushInterval),

View File

@ -636,8 +636,8 @@ private:
uint32_t recvWaiting; uint32_t recvWaiting;
int64_t ridCount; int64_t ridCount;
execplan::CalpontSystemCatalog::ColType fColType; execplan::CalpontSystemCatalog::ColType fColType;
boost::shared_ptr<boost::thread> pThread; //producer thread uint64_t pThread; //producer thread
boost::shared_ptr<boost::thread> cThread; //producer thread uint64_t cThread; //producer thread
messageqcpp::ByteStream fFilterString; messageqcpp::ByteStream fFilterString;
uint32_t fFilterCount; uint32_t fFilterCount;
@ -1331,7 +1331,7 @@ private:
bool isDictColumn; bool isDictColumn;
bool isEM; bool isEM;
boost::thread* fPTThd; // boost::thread* fPTThd;
// @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4.
// Running with this one will swallow rows at projection. // Running with this one will swallow rows at projection.

View File

@ -4211,12 +4211,14 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
// and if there is more data to read, the // and if there is more data to read, the
// first thread will start another thread until the // first thread will start another thread until the
// maximum number is reached. // maximum number is reached.
#if 0
if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads && if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads &&
dlIn->more(fInputIter)) { dlIn->more(fInputIter))
{
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount))); fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount)));
fFirstPhaseThreadCount++; fFirstPhaseThreadCount++;
} }
#endif
fRowGroupIns[threadID].setData(&rgData); fRowGroupIns[threadID].setData(&rgData);
fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings(); fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings();
if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit)) if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit))
@ -4479,22 +4481,25 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
if (!fDoneAggregate) if (!fDoneAggregate)
{ {
initializeMultiThread(); initializeMultiThread();
/*
// This block of code starts all threads at the start // This block of code starts all threads at the start
fFirstPhaseThreadCount = fNumOfThreads; fFirstPhaseThreadCount = fNumOfThreads;
boost::shared_ptr<boost::thread> runner; fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
for (i = 0; i < fNumOfThreads; i++) for (i = 0; i < fNumOfThreads; i++)
{ {
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))) fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
} }
*/
#if 0
// This block of code starts one thread, relies on doThreadedAggregation() // This block of code starts one thread, relies on doThreadedAggregation()
// For reasons unknown, this doesn't work right with threadpool
// to start more as needed // to start more as needed
fFirstPhaseRunners.clear(); fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
fFirstPhaseThreadCount = 1; fFirstPhaseThreadCount = 1;
fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0))); fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0)));
#endif
// Now wait for that thread plus all the threads it may have spawned // Now wait for that thread plus all the threads it may have spawned
jobstepThreadPool.join(fFirstPhaseRunners); jobstepThreadPool.join(fFirstPhaseRunners);

View File

@ -6355,7 +6355,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
// select * from derived table case // select * from derived table case
if (gwi.selectCols.empty()) if (gwi.selectCols.empty())
sel_cols_in_create = " * "; sel_cols_in_create = " * ";
create_query = "create temporary table " + vtb.str() + " as select " + sel_cols_in_create + " from "; create_query = "create temporary table " + vtb.str() + " engine = aria as select " + sel_cols_in_create + " from ";
TABLE_LIST* table_ptr = select_lex.get_table_list(); TABLE_LIST* table_ptr = select_lex.get_table_list();
bool firstTb = true; bool firstTb = true;

View File

@ -1433,13 +1433,19 @@ int main(int argc, char* argv[])
} }
} }
// if (!JobStep::jobstepThreadPool.debug())
// {
// JobStep::jobstepThreadPool.setName("ExeMgr");
// JobStep::jobstepThreadPool.setDebug(true);
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize); threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize);
for (;;) for (;;)
{ {
IOSocket ios; IOSocket ios;
ios = mqs->accept(); ios = mqs->accept();
boost::thread thd(SessionThread(ios, ec, rm)); exeMgrThreadPool.invoke(SessionThread(ios, ec, rm));
//exeMgrThreadPool.invoke(SessionThread(ios, ec, rm));
} }
exeMgrThreadPool.wait(); exeMgrThreadPool.wait();

View File

@ -27,10 +27,9 @@ using namespace std;
#include "messagelog.h" #include "messagelog.h"
using namespace logging; using namespace logging;
#define THREADPOOL_DLLEXPORT
#include "threadpool.h" #include "threadpool.h"
#undef THREADPOOL_DLLEXPORT #include <iomanip>
#include <sstream>
namespace threadpool namespace threadpool
{ {
@ -69,6 +68,7 @@ void ThreadPool::init()
fFunctorErrors = 0; fFunctorErrors = 0;
waitingFunctorsSize = 0; waitingFunctorsSize = 0;
issued = 0; issued = 0;
fDebug = false;
fStop = false; fStop = false;
// fThreadCreated = new NoOp(); // fThreadCreated = new NoOp();
fNextFunctor = fWaitingFunctors.end(); fNextFunctor = fWaitingFunctors.end();
@ -213,14 +213,12 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
if (fDebug) if (fDebug)
{ {
ostringstream oss;
oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads
<< " queue " << fQueueSize;
logging::Message::Args args; logging::Message::Args args;
logging::Message message(5); logging::Message message(0);
args.add("invoke: Starting thread "); args.add(oss.str());
args.add(fThreadCount);
args.add(" max ");
args.add(fMaxThreads);
args.add(" queue ");
args.add(fQueueSize);
message.format( args ); message.format( args );
logging::LoggingID lid(22); logging::LoggingID lid(22);
logging::MessageLog ml(lid); logging::MessageLog ml(lid);
@ -255,8 +253,8 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
logging::LoggingID lid(22); logging::LoggingID lid(22);
logging::MessageLog ml(lid); logging::MessageLog ml(lid);
ml.logWarningMessage( message ); ml.logWarningMessage( message );
fThreadAvailable.wait(lock1);
} }
fThreadAvailable.wait(lock1);
} }
catch(...) catch(...)
{ {
@ -414,4 +412,50 @@ void ThreadPool::dump()
std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl; std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl;
} }
void ThreadPoolMonitor::operator()()
{
ostringstream filename;
filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log";
fLog = new ofstream(filename.str().c_str());
for (;;)
{
if (!fLog || !fLog->is_open())
{
ostringstream oss;
oss << "ThreadPoolMonitor " << fPool->name() << " has no file ";
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
return;
}
// Get a timestamp for output.
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
(*fLog) << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec
<< '.'
<< setw(4) << tv.tv_usec/100
<< " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize
<< " Most " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize
<< endl;
// struct timespec req = { 0, 1000 * 100 }; //100 usec
// nanosleep(&req, 0);
sleep(2);
}
}
} // namespace threadpool } // namespace threadpool

View File

@ -31,7 +31,7 @@
#define THREADPOOL_H #define THREADPOOL_H
#include <string> #include <string>
#include <iostream> #include <fstream>
#include <cstdlib> #include <cstdlib>
#include <sstream> #include <sstream>
#include <stdexcept> #include <stdexcept>
@ -153,8 +153,16 @@ public:
*/ */
EXPORT void dump(); EXPORT void dump();
EXPORT std::string& name() {return fName;}
EXPORT void setName(std::string name) {fName = name;}
EXPORT void setName(const char* name) {fName = name;}
EXPORT bool debug() {return fDebug;}
EXPORT void setDebug(bool d) {fDebug = d;} EXPORT void setDebug(bool d) {fDebug = d;}
friend class ThreadPoolMonitor;
protected: protected:
private: private:
@ -224,9 +232,36 @@ private:
uint32_t waitingFunctorsSize; uint32_t waitingFunctorsSize;
uint64_t fNextHandle; uint64_t fNextHandle;
std::string fName; // Optional to add a name to the pool for debugging.
bool fDebug; bool fDebug;
}; };
// This class, if instantiated, will continuously log details about the indicated threadpool
// The log will end up in /var/log/mariadb/columnstore/trace/threadpool_<name>.log
class ThreadPoolMonitor
{
public:
ThreadPoolMonitor(ThreadPool* pool) : fPool(pool), fLog(NULL)
{
}
~ThreadPoolMonitor()
{
if (fLog)
{
delete fLog;
}
}
void operator()();
private:
//defaults okay
//ThreadPoolMonitor(const ThreadPoolMonitor& rhs);
//ThreadPoolMonitor& operator=(const ThreadPoolMonitor& rhs);
ThreadPool* fPool;
std::ofstream* fLog;
};
} // namespace threadpool } // namespace threadpool
#undef EXPORT #undef EXPORT