1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-513 use a single funcor per thread. ThreadPool was doing one at a time anyway, but in a convpluted way that made it easier to add more if wanted. But it was expensive. Cleanup and polish.

This commit is contained in:
David Hall
2017-02-17 09:44:32 -06:00
parent 776add46dc
commit 87a679e6eb
13 changed files with 305 additions and 296 deletions

View File

@ -28,7 +28,6 @@
using namespace std;
#include "joblist.h"
#include "calpontsystemcatalog.h"
using namespace execplan;
@ -73,23 +72,26 @@ JobList::JobList(bool isEM) :
JobList::~JobList()
{
vector<boost::thread *> joiners;
// boost::thread *tmp;
try
{
if (fIsRunning)
{
#if 0
// This logic creates a set of threads to wind down the query
vector<uint64_t> joiners;
joiners.reserve(20);
NullStep nullStep; // For access to the static jobstepThreadPool.
JobStepVector::iterator iter;
JobStepVector::iterator end;
#if 0
iter = fQuery.begin();
end = fQuery.end();
// Wait for all the query steps to finish
while (iter != end)
{
tmp = new boost::thread(JSJoiner(iter->get()));
joiners.push_back(tmp);
joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get())));
++iter;
}
@ -99,17 +101,15 @@ JobList::~JobList()
// wait for the projection steps
while (iter != end)
{
tmp = new boost::thread(JSJoiner(iter->get()));
joiners.push_back(tmp);
joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get())));
++iter;
}
for (uint32_t i = 0; i < joiners.size(); i++) {
joiners[i]->join();
delete joiners[i];
}
nullStep.jobstepThreadPool.join(joiners);
#endif
// Stop all the query steps
// This logic stops the query steps one at a time
JobStepVector::iterator iter;
JobStepVector::iterator end;
end = fQuery.end();
for (iter = fQuery.begin(); iter != end; ++iter)
{

View File

@ -330,6 +330,20 @@ public:
virtual bool deliverStringTableRowGroup() const = 0;
};
class NullStep : public JobStep
{
public:
/** @brief virtual void Run method
*/
virtual void run(){}
/** @brief virtual void join method
*/
virtual void join(){}
/** @brief virtual string toString method
*/
virtual const std::string toString() const {return "NullStep";}
};
// calls rhs->toString()
std::ostream& operator<<(std::ostream& os, const JobStep* rhs);

View File

@ -134,9 +134,9 @@ pDictionaryScan::pDictionaryScan(
sendWaiting(false),
ridCount(0),
ridList(0),
colType(ct),
pThread(0),
cThread(0),
colType(ct),
fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()),
fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()),
fStopSending(false),

View File

@ -775,9 +775,7 @@ void TupleUnion::run()
void TupleUnion::join()
{
uint32_t i;
mutex::scoped_lock lk(jlLock);
Uniquer_t::iterator it;
if (joinRan)
return;

View File

@ -340,6 +340,7 @@ DDLProcessor::DDLProcessor( int packageMaxThreads, int packageWorkQueueSize )
{
fDdlPackagepool.setMaxThreads(fPackageMaxThreads);
fDdlPackagepool.setQueueSize(fPackageWorkQueueSize);
fDdlPackagepool.setName("DdlPackagepool");
csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
csc->identity(CalpontSystemCatalog::EC);
string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));

View File

@ -558,6 +558,21 @@ int main(int argc, char* argv[])
}
DMLServer dmlserver(serverThreads, serverQueueSize,&dbrm);
ResourceManager *rm = ResourceManager::instance();
// jobstepThreadPool is used by other processes. We can't call
// resourcemanaager (rm) functions during the static creation of threadpool
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("DMLProcJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
// {
// JobStep::jobstepThreadPool.setDebug(true);
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
//set ACTIVE state
try
@ -567,7 +582,6 @@ int main(int argc, char* argv[])
catch (...)
{
}
ResourceManager *rm = ResourceManager::instance();
Dec = DistributedEngineComm::instance(rm);
#ifndef _MSC_VER

View File

@ -1130,6 +1130,7 @@ DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm
fDmlPackagepool.setMaxThreads(fPackageMaxThreads);
fDmlPackagepool.setQueueSize(fPackageWorkQueueSize);
fDmlPackagepool.setName("DmlPackagepool");
}
void DMLServer::start()

View File

@ -24,7 +24,7 @@ using namespace std;
using namespace joblist;
using namespace messageqcpp;
threadpool::ThreadPool FEMsgHandler::threadPool(100,200);
threadpool::ThreadPool FEMsgHandler::threadPool(50,100);
namespace {
@ -52,15 +52,14 @@ FEMsgHandler::FEMsgHandler(boost::shared_ptr<JobList> j, IOSocket *s) :
FEMsgHandler::~FEMsgHandler()
{
stop();
// thr.join();
boost::unique_lock<boost::mutex> lk(joinMutex);
threadPool.join(thr);
}
void FEMsgHandler::start()
{
if (!running) {
running = true;
threadPool.invoke(Runner(this));
thr = threadPool.invoke(Runner(this));
}
}
@ -109,7 +108,6 @@ bool FEMsgHandler::aborted()
void FEMsgHandler::threadFcn()
{
int err = 0;
boost::unique_lock<boost::mutex> lk(joinMutex);
int connectionNum = sock->getConnectionNum();
/* This waits for the next readable event on sock. An abort is signaled

View File

@ -37,18 +37,14 @@ public:
void threadFcn();
static threadpool::ThreadPool threadPool;
private:
bool die, running, sawData;
messageqcpp::IOSocket *sock;
boost::shared_ptr<joblist::JobList> jl;
boost::mutex mutex;
// boost::thread thr;
static threadpool::ThreadPool threadPool;
// Because we can't join() a thread from a thread pool, threadFcn will
// unlock when it exits and the destructor can block until the thread is done.
boost::mutex joinMutex;
uint64_t thr;
};
#endif /* FEMSGHANDLER_H_ */

View File

@ -1393,19 +1393,18 @@ int main(int argc, char* argv[])
}
}
// It's possible that PM modules use this threadpool. Only ExeMgr creates
// massive amounts of threads and needs to be settable. It's also possible that
// other process on this UM module use this threadpool. In this case, they share.
// We can't call rm functions during the static creation because rm has a isExeMgr
// flag that is set upon first creation. For the pool, who has no idea if it is ExeMgr,
// to create the singleton rm would be wrong, no matter which way we set the flag.
// jobstepThreadPool is used by other processes. We can't call
// resourcemanaager (rm) functions during the static creation of threadpool
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("ExeMgr");
if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
{
JobStep::jobstepThreadPool.setDebug(true);
JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
}
JobStep::jobstepThreadPool.setName("ExeMgrJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
// {
// JobStep::jobstepThreadPool.setDebug(true);
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
int serverThreads = rm->getEmServerThreads();
int serverQueueSize = rm->getEmServerQueueSize();
@ -1413,6 +1412,10 @@ int main(int argc, char* argv[])
int pauseSeconds = rm->getEmSecondsBetweenMemChecks();
int priority = rm->getEmPriority();
FEMsgHandler::threadPool.setMaxThreads(serverThreads);
FEMsgHandler::threadPool.setQueueSize(serverQueueSize);
FEMsgHandler::threadPool.setName("FEMsgHandler");
if (maxPct > 0)
startRssMon(maxPct, pauseSeconds);
@ -1448,6 +1451,7 @@ int main(int argc, char* argv[])
}
threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize);
exeMgrThreadPool.setName("ExeMgrServer");
for (;;)
{
IOSocket ios;

View File

@ -2032,6 +2032,7 @@ PrimitiveServer::PrimitiveServer(int serverThreads,
fCacheCount=cacheCount;
fServerpool.setMaxThreads(fServerThreads);
fServerpool.setQueueSize(fServerQueueSize);
fServerpool.setName("PrimitiveServer");
fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0));

View File

@ -283,42 +283,28 @@ void ThreadPool::beginThread() throw()
else
{
// Wait no more than 10 minutes
if (fNeedThread.timed_wait(lock1, timeout) == boost::cv_status::timeout)
if (!fNeedThread.timed_wait(lock1, timeout)) // false means it timed out
{
if (fThreadCount > fMaxThreads)
{
--fThreadCount;
return;
}
timeout = boost::get_system_time()+boost::posix_time::minutes(10);
}
}
}
else
{
/* Need to tune these magic #s */
vector<Container_T::iterator> todoList;
int i, num;
Container_T::const_iterator iter;
/* Use num to control how many jobs are issued to a single thread
should you want to batch more than one */
num = (waitingFunctorsSize - fIssued >= 1 ? 1 : 0);
for (i = 0; i < num; i++)
todoList.push_back(fNextFunctor++);
fIssued += num;
// cerr << "got " << num << " jobs." << endl;
// cerr << "got " << num << " jobs. waitingFunctorsSize=" <<
// waitingFunctorsSize << " fIssued=" << fIssued << " fThreadCount=" <<
// fThreadCount << endl;
lock1.unlock();
for (i = 0; i < num; i++)
// If there's anything waiting, run it
if (waitingFunctorsSize - fIssued > 0)
{
Container_T::iterator todo = fNextFunctor++;
++fIssued;
lock1.unlock();
try
{
(*todoList[i]).functor();
todo->functor();
}
catch (exception &e)
{
@ -334,18 +320,12 @@ void ThreadPool::beginThread() throw()
ml.logErrorMessage( message );
#endif
}
}
lock1.lock();
--fIssued;
--waitingFunctorsSize;
fWaitingFunctors.erase(todo);
}
fIssued -= num;
waitingFunctorsSize -= num;
for (i = 0; i < num; i++)
fWaitingFunctors.erase(todoList[i]);
/*
if (waitingFunctorsSize != fWaitingFunctors.size())
cerr << "size mismatch! fake size=" << waitingFunctorsSize <<
" real size=" << fWaitingFunctors.size() << endl;
*/
timeout = boost::get_system_time()+boost::posix_time::minutes(10);
fThreadAvailable.notify_all();
}

View File

@ -194,6 +194,7 @@
<Folder
Name="Source Files"
Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d">
<F N="prioritythreadpool.cpp"/>
<F N="tdriver.cpp"/>
<F N="threadpool.cpp"/>
<F N="weightedthreadpool.cpp"/>
@ -202,6 +203,7 @@
<Folder
Name="Header Files"
Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if">
<F N="prioritythreadpool.h"/>
<F N="threadpool.h"/>
<F N="weightedthreadpool.h"/>
</Folder>