1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-02 17:22:27 +03:00

MCOL-513 Threadpool to unlimited threads when queuesize = 0. Idle down after 10 minutes

This commit is contained in:
David Hall
2017-02-13 11:56:28 -06:00
parent c2344accc9
commit e09b7e10c5
8 changed files with 236 additions and 204 deletions

View File

@ -30,35 +30,34 @@ using namespace logging;
#include "threadpool.h"
#include <iomanip>
#include <sstream>
#include "boost/date_time/posix_time/posix_time_types.hpp"
namespace threadpool
{
ThreadPool::ThreadPool()
:fMaxThreads( 0 ), fQueueSize( 0 )
:fMaxThreads( 0 ), fQueueSize( 0 )
{
init();
}
ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize )
:fMaxThreads( maxThreads ), fQueueSize( queueSize )
:fMaxThreads( maxThreads ), fQueueSize( queueSize )
{
init();
if (fQueueSize == 0)
fQueueSize = fMaxThreads*2;
}
ThreadPool::~ThreadPool() throw()
{
// delete fThreadCreated;
try
{
stop();
}
catch(...)
{}
catch (...)
{
}
}
void ThreadPool::init()
@ -66,13 +65,12 @@ void ThreadPool::init()
fThreadCount = 0;
fGeneralErrors = 0;
fFunctorErrors = 0;
waitingFunctorsSize = 0;
issued = 0;
fDebug = false;
waitingFunctorsSize = 0;
fIssued = 0;
fDebug = false;
fStop = false;
// fThreadCreated = new NoOp();
fNextFunctor = fWaitingFunctors.end();
fNextHandle=1;
fNextHandle=1;
}
void ThreadPool::setQueueSize(size_t queueSize)
@ -88,11 +86,6 @@ void ThreadPool::setMaxThreads(size_t maxThreads)
fMaxThreads = maxThreads;
}
void ThreadPool::setThreadCreatedListener(const Functor_T &f)
{
// fThreadCreated = f;
}
void ThreadPool::stop()
{
boost::mutex::scoped_lock lock1(fMutex);
@ -111,7 +104,7 @@ void ThreadPool::wait()
while (waitingFunctorsSize > 0)
{
fThreadAvailable.wait(lock1);
//cerr << "woke!" << endl;
//cerr << "woke!" << endl;
}
}
@ -121,22 +114,22 @@ void ThreadPool::join(uint64_t thrHandle)
while (waitingFunctorsSize > 0)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
foundit = false;
if (iter->hndl == thrHandle)
{
foundit = true;
break;
}
}
if (!foundit)
{
break;
}
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
foundit = false;
if (iter->hndl == thrHandle)
{
foundit = true;
break;
}
}
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
}
}
@ -147,32 +140,32 @@ void ThreadPool::join(std::vector<uint64_t> thrHandle)
while (waitingFunctorsSize > 0)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
foundit = false;
std::vector<uint64_t>::iterator thrIter;
std::vector<uint64_t>::iterator thrEnd = thrHandle.end();
for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter)
{
if (iter->hndl == *thrIter)
{
foundit = true;
break;
}
}
if (foundit == true)
{
break;
}
}
// If we didn't find any of the handles, then all are complete
if (!foundit)
{
break;
}
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
foundit = false;
std::vector<uint64_t>::iterator thrIter;
std::vector<uint64_t>::iterator thrEnd = thrHandle.end();
for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter)
{
if (iter->hndl == *thrIter)
{
foundit = true;
break;
}
}
if (foundit == true)
{
break;
}
}
// If we didn't find any of the handles, then all are complete
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
}
}
@ -180,13 +173,12 @@ void ThreadPool::join(std::vector<uint64_t> thrHandle)
uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
{
boost::mutex::scoped_lock lock1(fMutex);
uint64_t thrHandle=0;
for(;;)
uint64_t thrHandle=0;
for (;;)
{
try
{
if ( waitingFunctorsSize < fThreadCount)
if (waitingFunctorsSize < fThreadCount)
{
// Don't create a thread unless it's needed. There
// is a thread available to service this request.
@ -197,33 +189,34 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
bool bAdded = false;
if ( waitingFunctorsSize < fQueueSize)
if (waitingFunctorsSize < fQueueSize || fQueueSize == 0)
{
// Don't create a thread unless you have to
thrHandle = addFunctor(threadfunc);
bAdded = true;
}
if ( fThreadCount < fMaxThreads)
// fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run.
if (fThreadCount < fMaxThreads || fQueueSize == 0)
{
++fThreadCount;
lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this));
if (fDebug)
{
ostringstream oss;
oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads
<< " queue " << fQueueSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
if (fDebug)
{
ostringstream oss;
oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads
<< " queue " << fQueueSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
if (bAdded)
break;
@ -241,22 +234,22 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
break;
}
if (fDebug)
{
logging::Message::Args args;
logging::Message message(5);
args.add("invoke: Blocked waiting for thread. Count ");
args.add(fThreadCount);
args.add("max ");
args.add(fMaxThreads);
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
fThreadAvailable.wait(lock1);
if (fDebug)
{
logging::Message::Args args;
logging::Message message(5);
args.add("invoke: Blocked waiting for thread. Count ");
args.add(fThreadCount);
args.add("max ");
args.add(fMaxThreads);
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
fThreadAvailable.wait(lock1);
}
catch(...)
catch (...)
{
++fGeneralErrors;
throw;
@ -264,18 +257,16 @@ uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
}
fNeedThread.notify_one();
return thrHandle;
return thrHandle;
}
void ThreadPool::beginThread() throw()
{
try
{
// fThreadCreated();
boost::mutex::scoped_lock lock1(fMutex);
for(;;)
boost::system_time timeout = boost::get_system_time()+boost::posix_time::minutes(10);
for (;;)
{
if (fStop)
break;
@ -283,51 +274,80 @@ void ThreadPool::beginThread() throw()
if (fNextFunctor == fWaitingFunctors.end())
{
// Wait until someone needs a thread
fNeedThread.wait(lock1);
// Add the timed waait for queueSize == 0 so we can idle away threads
// over fMaxThreads
if (fQueueSize > 0)
{
fNeedThread.wait(lock1);
}
else
{
// Wait no more than 10 minutes
if (fNeedThread.timed_wait(lock1, timeout) == boost::cv_status::timeout)
{
if (fThreadCount > fMaxThreads)
{
--fThreadCount;
return;
}
}
}
}
else
{
/* Need to tune these magic #s */
/* Need to tune these magic #s */
vector<Container_T::iterator> todoList;
int i, num;
Container_T::const_iterator iter;
vector<Container_T::iterator> todoList;
int i, num;
Container_T::const_iterator iter;
/* Use num to control how many jobs are issued to a single thread
should you want to batch more than one */
num = (waitingFunctorsSize - fIssued >= 1 ? 1 : 0);
/* Use this to control how many jobs are issued to a single thread */
num = (waitingFunctorsSize - issued >= 1 ? 1 : 0);
for (i = 0; i < num; i++)
todoList.push_back(fNextFunctor++);
for (i = 0; i < num; i++)
todoList.push_back(fNextFunctor++);
issued += num;
fIssued += num;
// cerr << "got " << num << " jobs." << endl;
// cerr << "got " << num << " jobs. waitingFunctorsSize=" <<
// waitingFunctorsSize << " issued=" << issued << " fThreadCount=" <<
// waitingFunctorsSize << " fIssued=" << fIssued << " fThreadCount=" <<
// fThreadCount << endl;
lock1.unlock();
for (i = 0; i < num; i++) {
try {
(*todoList[i]).functor();
}
catch(exception &e) {
++fFunctorErrors;
cerr << e.what() << endl;
}
}
lock1.lock();
for (i = 0; i < num; i++)
{
try
{
(*todoList[i]).functor();
}
catch (exception &e)
{
++fFunctorErrors;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("ThreadPool: Caught exception during execution: ");
args.add(e.what());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
}
}
lock1.lock();
issued -= num;
waitingFunctorsSize -= num;
for (i = 0; i < num; i++)
fWaitingFunctors.erase(todoList[i]);
fIssued -= num;
waitingFunctorsSize -= num;
for (i = 0; i < num; i++)
fWaitingFunctors.erase(todoList[i]);
/*
if (waitingFunctorsSize != fWaitingFunctors.size())
cerr << "size mismatch! fake size=" << waitingFunctorsSize <<
" real size=" << fWaitingFunctors.size() << endl;
if (waitingFunctorsSize != fWaitingFunctors.size())
cerr << "size mismatch! fake size=" << waitingFunctorsSize <<
" real size=" << fWaitingFunctors.size() << endl;
*/
timeout = boost::get_system_time()+boost::posix_time::minutes(10);
fThreadAvailable.notify_all();
}
}
}
@ -353,12 +373,12 @@ void ThreadPool::beginThread() throw()
ml.logErrorMessage( message );
#endif
}
catch(...)
catch (...)
{
}
}
catch(...)
catch (...)
{
++fGeneralErrors;
@ -379,7 +399,7 @@ void ThreadPool::beginThread() throw()
ml.logErrorMessage( message );
#endif
}
catch(...)
catch (...)
{
}
}
@ -393,16 +413,16 @@ uint64_t ThreadPool::addFunctor(const Functor_T &func)
bAtEnd = true;
// PoolFunction_T poolFunction(fNextHandle, func);
PoolFunction_T poolFunction;
poolFunction.hndl = fNextHandle;
poolFunction.functor = func;
fWaitingFunctors.push_back(poolFunction);
waitingFunctorsSize++;
PoolFunction_T poolFunction;
poolFunction.hndl = fNextHandle;
poolFunction.functor = func;
fWaitingFunctors.push_back(poolFunction);
waitingFunctorsSize++;
if (bAtEnd)
{
--fNextFunctor;
}
return fNextHandle++;
return fNextHandle++;
}
void ThreadPool::dump()
@ -415,47 +435,48 @@ void ThreadPool::dump()
void ThreadPoolMonitor::operator()()
{
ostringstream filename;
filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log";
fLog = new ofstream(filename.str().c_str());
for (;;)
{
if (!fLog || !fLog->is_open())
{
ostringstream oss;
oss << "ThreadPoolMonitor " << fPool->name() << " has no file ";
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
return;
}
// Get a timestamp for output.
struct tm tm;
struct timeval tv;
ostringstream filename;
filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log";
fLog = new ofstream(filename.str().c_str());
for (;;)
{
if (!fLog || !fLog->is_open())
{
ostringstream oss;
oss << "ThreadPoolMonitor " << fPool->name() << " has no file ";
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
return;
}
// Get a timestamp for output.
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
(*fLog) << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec
<< '.'
<< setw(4) << tv.tv_usec/100
<< " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize
<< " Most " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize
<< endl;
(*fLog) << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec
<< '.'
<< setw(4) << tv.tv_usec/100
<< " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize
<< " Most " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize
<< endl;
// struct timespec req = { 0, 1000 * 100 }; //100 usec
// nanosleep(&req, 0);
sleep(2);
}
sleep(2);
}
}
} // namespace threadpool