1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

clang format apply

This commit is contained in:
Leonid Fedorov
2022-02-11 12:24:40 +00:00
parent 509f005be7
commit 7c808317dc
1367 changed files with 394342 additions and 413129 deletions

View File

@ -16,10 +16,10 @@
MA 02110-1301, USA. */
/***********************************************************************
* $Id: threadpool.cpp 553 2008-02-27 17:51:16Z rdempsey $
*
*
***********************************************************************/
* $Id: threadpool.cpp 553 2008-02-27 17:51:16Z rdempsey $
*
*
***********************************************************************/
#include <stdexcept>
#include <unistd.h>
@ -38,295 +38,288 @@ using namespace boost;
namespace threadpool
{
PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads,
uint midThreads, uint lowThreads, uint ID) :
_stop(false), weightPerRun(targetWeightPerRun), id(ID),
blockedThreads(0), extraThreads(0), stopExtra(true)
PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads,
uint lowThreads, uint ID)
: _stop(false), weightPerRun(targetWeightPerRun), id(ID), blockedThreads(0), extraThreads(0), stopExtra(true)
{
boost::thread* newThread;
for (uint32_t i = 0; i < highThreads; i++)
{
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
}
for (uint32_t i = 0; i < midThreads; i++)
{
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
}
for (uint32_t i = 0; i < lowThreads; i++)
{
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
}
cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads
<< " low.\n";
defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads;
defaultThreadCounts[MEDIUM] = threadCounts[MEDIUM] = midThreads;
defaultThreadCounts[LOW] = threadCounts[LOW] = lowThreads;
boost::thread* newThread;
for (uint32_t i = 0; i < highThreads; i++)
{
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
}
for (uint32_t i = 0; i < midThreads; i++)
{
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
}
for (uint32_t i = 0; i < lowThreads; i++)
{
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
}
cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n";
defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads;
defaultThreadCounts[MEDIUM] = threadCounts[MEDIUM] = midThreads;
defaultThreadCounts[LOW] = threadCounts[LOW] = lowThreads;
}
PriorityThreadPool::~PriorityThreadPool()
{
stop();
stop();
}
void PriorityThreadPool::addJob(const Job& job, bool useLock)
{
boost::thread* newThread;
boost::mutex::scoped_lock lk(mutex, boost::defer_lock_t());
boost::thread* newThread;
boost::mutex::scoped_lock lk(mutex, boost::defer_lock_t());
if (useLock)
lk.lock();
if (useLock)
lk.lock();
// Create any missing threads
if (defaultThreadCounts[HIGH] != threadCounts[HIGH])
{
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
threadCounts[HIGH]++;
}
// Create any missing threads
if (defaultThreadCounts[HIGH] != threadCounts[HIGH])
{
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
threadCounts[HIGH]++;
}
if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM])
{
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
threadCounts[MEDIUM]++;
}
if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM])
{
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
threadCounts[MEDIUM]++;
}
if (defaultThreadCounts[LOW] != threadCounts[LOW])
{
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
threadCounts[LOW]++;
}
// If some threads have blocked (because of output queue full)
// Temporarily add some extra worker threads to make up for the blocked threads.
if (blockedThreads > extraThreads)
{
stopExtra = false;
newThread = threads.create_thread(ThreadHelper(this, EXTRA));
newThread->detach();
extraThreads++;
}
else if (blockedThreads == 0)
{
// Release the temporary threads -- some threads have become unblocked.
stopExtra = true;
}
if (job.priority > 66)
jobQueues[HIGH].push_back(job);
else if (job.priority > 33)
jobQueues[MEDIUM].push_back(job);
else
jobQueues[LOW].push_back(job);
if (defaultThreadCounts[LOW] != threadCounts[LOW])
{
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
threadCounts[LOW]++;
}
if (useLock)
newJob.notify_one();
// If some threads have blocked (because of output queue full)
// Temporarily add some extra worker threads to make up for the blocked threads.
if (blockedThreads > extraThreads)
{
stopExtra = false;
newThread = threads.create_thread(ThreadHelper(this, EXTRA));
newThread->detach();
extraThreads++;
}
else if (blockedThreads == 0)
{
// Release the temporary threads -- some threads have become unblocked.
stopExtra = true;
}
if (job.priority > 66)
jobQueues[HIGH].push_back(job);
else if (job.priority > 33)
jobQueues[MEDIUM].push_back(job);
else
jobQueues[LOW].push_back(job);
if (useLock)
newJob.notify_one();
}
void PriorityThreadPool::removeJobs(uint32_t id)
{
list<Job>::iterator it;
list<Job>::iterator it;
boost::mutex::scoped_lock lk(mutex);
boost::mutex::scoped_lock lk(mutex);
for (uint32_t i = 0; i < _COUNT; i++)
for (it = jobQueues[i].begin(); it != jobQueues[i].end();)
if (it->id == id)
it = jobQueues[i].erase(it);
else
++it;
for (uint32_t i = 0; i < _COUNT; i++)
for (it = jobQueues[i].begin(); it != jobQueues[i].end();)
if (it->id == id)
it = jobQueues[i].erase(it);
else
++it;
}
PriorityThreadPool::Priority PriorityThreadPool::pickAQueue(Priority preference)
{
if (preference != EXTRA && !jobQueues[preference].empty())
return preference;
else if (!jobQueues[HIGH].empty())
return HIGH;
else if (!jobQueues[MEDIUM].empty())
return MEDIUM;
else
return LOW;
if (preference != EXTRA && !jobQueues[preference].empty())
return preference;
else if (!jobQueues[HIGH].empty())
return HIGH;
else if (!jobQueues[MEDIUM].empty())
return MEDIUM;
else
return LOW;
}
void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
{
if (preferredQueue == EXTRA)
utils::setThreadName("Extra");
else
utils::setThreadName("Idle");
Priority queue = LOW;
uint32_t weight, i = 0;
vector<Job> runList;
vector<bool> reschedule;
uint32_t rescheduleCount;
uint32_t queueSize;
bool running = false;
if (preferredQueue == EXTRA)
utils::setThreadName("Extra");
else
utils::setThreadName("Idle");
Priority queue = LOW;
uint32_t weight, i = 0;
vector<Job> runList;
vector<bool> reschedule;
uint32_t rescheduleCount;
uint32_t queueSize;
bool running = false;
try
{
while (!_stop)
{
boost::mutex::scoped_lock lk(mutex);
queue = pickAQueue(preferredQueue);
if (jobQueues[queue].empty())
{
// If this is an EXTRA thread due toother threads blocking, and all blockers are unblocked,
// we don't want this one any more.
if (preferredQueue == EXTRA && stopExtra)
{
extraThreads--;
return;
}
newJob.wait(lk);
continue;
}
queueSize = jobQueues[queue].size();
weight = 0;
// 3 conditions stop this thread from grabbing all jobs in the queue
//
// 1: The weight limit has been exceeded
// 2: The queue is empty
// 3: It has grabbed more than half of the jobs available &
// should leave some to the other threads
while ((weight < weightPerRun) && (!jobQueues[queue].empty()) && (runList.size() <= queueSize / 2))
{
runList.push_back(jobQueues[queue].front());
jobQueues[queue].pop_front();
weight += runList.back().weight;
}
lk.unlock();
reschedule.resize(runList.size());
rescheduleCount = 0;
for (i = 0; i < runList.size() && !_stop; i++)
{
reschedule[i] = false;
running = true;
reschedule[i] = (*(runList[i].functor))();
running = false;
if (reschedule[i])
rescheduleCount++;
}
if (preferredQueue == EXTRA)
utils::setThreadName("Extra (used)");
else
utils::setThreadName("Idle");
// no real work was done, prevent intensive busy waiting
if (rescheduleCount == runList.size())
usleep(1000);
if (rescheduleCount > 0)
{
lk.lock();
for (i = 0; i < runList.size(); i++)
if (reschedule[i])
addJob(runList[i], false);
if (rescheduleCount > 1)
newJob.notify_all();
else
newJob.notify_one();
lk.unlock();
}
runList.clear();
}
}
catch (std::exception& ex)
{
// Log the exception and exit this thread
try
{
while (!_stop)
{
boost::mutex::scoped_lock lk(mutex);
queue = pickAQueue(preferredQueue);
if (jobQueues[queue].empty())
{
// If this is an EXTRA thread due toother threads blocking, and all blockers are unblocked,
// we don't want this one any more.
if (preferredQueue == EXTRA && stopExtra)
{
extraThreads--;
return;
}
newJob.wait(lk);
continue;
}
queueSize = jobQueues[queue].size();
weight = 0;
// 3 conditions stop this thread from grabbing all jobs in the queue
//
// 1: The weight limit has been exceeded
// 2: The queue is empty
// 3: It has grabbed more than half of the jobs available &
// should leave some to the other threads
while ((weight < weightPerRun) && (!jobQueues[queue].empty())
&& (runList.size() <= queueSize / 2))
{
runList.push_back(jobQueues[queue].front());
jobQueues[queue].pop_front();
weight += runList.back().weight;
}
lk.unlock();
reschedule.resize(runList.size());
rescheduleCount = 0;
for (i = 0; i < runList.size() && !_stop; i++)
{
reschedule[i] = false;
running = true;
reschedule[i] = (*(runList[i].functor))();
running = false;
if (reschedule[i])
rescheduleCount++;
}
if (preferredQueue == EXTRA)
utils::setThreadName("Extra (used)");
else
utils::setThreadName("Idle");
// no real work was done, prevent intensive busy waiting
if (rescheduleCount == runList.size())
usleep(1000);
if (rescheduleCount > 0)
{
lk.lock();
for (i = 0; i < runList.size(); i++)
if (reschedule[i])
addJob(runList[i], false);
if (rescheduleCount > 1)
newJob.notify_all();
else
newJob.notify_one();
lk.unlock();
}
runList.clear();
}
}
catch (std::exception& ex)
{
// Log the exception and exit this thread
try
{
threadCounts[queue]--;
threadCounts[queue]--;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("threadFcn: Caught exception: ");
args.add(ex.what());
logging::Message::Args args;
logging::Message message(5);
args.add("threadFcn: Caught exception: ");
args.add(ex.what());
message.format( args );
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
ml.logErrorMessage(message);
#endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
}
catch (...)
{
}
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
}
catch (...)
{
// Log the exception and exit this thread
try
{
threadCounts[queue]--;
}
}
catch (...)
{
// Log the exception and exit this thread
try
{
threadCounts[queue]--;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(6);
args.add("threadFcn: Caught unknown exception!");
logging::Message::Args args;
logging::Message message(6);
args.add("threadFcn: Caught unknown exception!");
message.format( args );
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
ml.logErrorMessage(message);
#endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
}
catch (...)
{
}
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
}
catch (...)
{
}
}
}
void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0,0,0,0,0,0};
ISMPacketHeader ism;
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
ism.Status = logging::primitiveServerErr;
ph.UniqueID = id;
ph.StepID = step;
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*) &ism, sizeof(ism));
msg.append((uint8_t*) &ph, sizeof(ph));
ism.Status = logging::primitiveServerErr;
ph.UniqueID = id;
ph.StepID = step;
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*)&ism, sizeof(ism));
msg.append((uint8_t*)&ph, sizeof(ph));
sock->write(msg);
sock->write(msg);
}
void PriorityThreadPool::stop()
{
_stop = true;
_stop = true;
}
} // namespace threadpool
} // namespace threadpool
// vim:ts=4 sw=4:

View File

@ -16,10 +16,10 @@
MA 02110-1301, USA. */
/***********************************************************************
* $Id: $
*
*
***********************************************************************/
* $Id: $
*
*
***********************************************************************/
/** @file */
#ifndef PRIORITYTHREADPOOL_H
@ -35,121 +35,123 @@
#include <boost/thread/condition.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include <atomic>
#include <atomic>
#include "../winport/winport.h"
#include "primitives/primproc/umsocketselector.h"
#include "atomicops.h"
namespace threadpool
{
class PriorityThreadPool
{
public:
public:
class Functor
{
public:
virtual ~Functor(){};
// as of 12/3/13, all implementors return 0 and -1. -1 will cause
// this thread pool to reschedule the job, 0 will throw it away on return.
virtual int operator()() = 0;
};
class Functor
// typedef boost::function0<int> Functor;
struct Job
{
Job() : weight(1), priority(0), id(0)
{
public:
virtual ~Functor() { };
// as of 12/3/13, all implementors return 0 and -1. -1 will cause
// this thread pool to reschedule the job, 0 will throw it away on return.
virtual int operator()() = 0;
};
//typedef boost::function0<int> Functor;
struct Job
{
Job() : weight(1), priority(0), id(0) { }
boost::shared_ptr<Functor> functor;
uint32_t weight;
uint32_t priority;
uint32_t id;
uint32_t uniqueID;
uint32_t stepID;
primitiveprocessor::SP_UM_IOSOCK sock;
};
enum Priority
{
LOW,
MEDIUM,
HIGH,
_COUNT,
EXTRA // After _COUNT because _COUNT is for jobQueue size and EXTRA isn't a jobQueue. But we need EXTRA in places where Priority is used.
};
/*********************************************
* ctor/dtor
*
*********************************************/
/** @brief ctor
*/
PriorityThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads,
uint lowThreads, uint id = 0);
virtual ~PriorityThreadPool();
void removeJobs(uint32_t id);
void addJob(const Job& job, bool useLock = true);
void stop();
/** @brief for use in debugging
*/
void dump();
// If a job is blocked, we want to temporarily increase the number of threads managed by the pool
// A problem can occur if all threads are running long or blocked for a single query. Other
// queries won't get serviced, even though there are cpu cycles available.
// These calls are currently protected by respondLock in sendThread(). If you call from other
// places, you need to consider atomicity.
void incBlockedThreads()
{
blockedThreads++;
}
void decBlockedThreads()
boost::shared_ptr<Functor> functor;
uint32_t weight;
uint32_t priority;
uint32_t id;
uint32_t uniqueID;
uint32_t stepID;
primitiveprocessor::SP_UM_IOSOCK sock;
};
enum Priority
{
LOW,
MEDIUM,
HIGH,
_COUNT,
EXTRA // After _COUNT because _COUNT is for jobQueue size and EXTRA isn't a jobQueue. But we need EXTRA
// in places where Priority is used.
};
/*********************************************
* ctor/dtor
*
*********************************************/
/** @brief ctor
*/
PriorityThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, uint lowThreads,
uint id = 0);
virtual ~PriorityThreadPool();
void removeJobs(uint32_t id);
void addJob(const Job& job, bool useLock = true);
void stop();
/** @brief for use in debugging
*/
void dump();
// If a job is blocked, we want to temporarily increase the number of threads managed by the pool
// A problem can occur if all threads are running long or blocked for a single query. Other
// queries won't get serviced, even though there are cpu cycles available.
// These calls are currently protected by respondLock in sendThread(). If you call from other
// places, you need to consider atomicity.
void incBlockedThreads()
{
blockedThreads++;
}
void decBlockedThreads()
{
blockedThreads--;
}
protected:
private:
struct ThreadHelper
{
ThreadHelper(PriorityThreadPool* impl, Priority queue) : ptp(impl), preferredQueue(queue)
{
blockedThreads--;
}
protected:
private:
struct ThreadHelper
void operator()()
{
ThreadHelper(PriorityThreadPool* impl, Priority queue) : ptp(impl), preferredQueue(queue) { }
void operator()()
{
ptp->threadFcn(preferredQueue);
}
PriorityThreadPool* ptp;
Priority preferredQueue;
};
ptp->threadFcn(preferredQueue);
}
PriorityThreadPool* ptp;
Priority preferredQueue;
};
explicit PriorityThreadPool();
explicit PriorityThreadPool(const PriorityThreadPool&);
PriorityThreadPool& operator=(const PriorityThreadPool&);
explicit PriorityThreadPool();
explicit PriorityThreadPool(const PriorityThreadPool&);
PriorityThreadPool& operator=(const PriorityThreadPool&);
Priority pickAQueue(Priority preference);
void threadFcn(const Priority preferredQueue) throw();
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
Priority pickAQueue(Priority preference);
void threadFcn(const Priority preferredQueue) throw();
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
std::list<Job> jobQueues[3]; // higher indexes = higher priority
uint32_t threadCounts[3];
uint32_t defaultThreadCounts[3];
boost::mutex mutex;
boost::condition newJob;
boost::thread_group threads;
bool _stop;
uint32_t weightPerRun;
volatile uint id; // prevent it from being optimized out
std::atomic<uint32_t> blockedThreads;
std::atomic<uint32_t> extraThreads;
bool stopExtra;
std::list<Job> jobQueues[3]; // higher indexes = higher priority
uint32_t threadCounts[3];
uint32_t defaultThreadCounts[3];
boost::mutex mutex;
boost::condition newJob;
boost::thread_group threads;
bool _stop;
uint32_t weightPerRun;
volatile uint id; // prevent it from being optimized out
std::atomic<uint32_t> blockedThreads;
std::atomic<uint32_t> extraThreads;
bool stopExtra;
};
} // namespace threadpool
} // namespace threadpool
#endif //PRIORITYTHREADPOOL_H
#endif // PRIORITYTHREADPOOL_H

View File

@ -36,92 +36,81 @@ using namespace std;
int thecount = 0;
boost::mutex mutex;
class ThreadPoolTestSuite : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(ThreadPoolTestSuite);
CPPUNIT_TEST_SUITE( ThreadPoolTestSuite );
CPPUNIT_TEST(test_1);
CPPUNIT_TEST( test_1 );
CPPUNIT_TEST_SUITE_END();
CPPUNIT_TEST_SUITE_END();
private:
// Functor class
struct foo
private:
// Functor class
struct foo
{
void operator()()
{
void operator ()()
{
for (int i = 0; i < 500; i++)
{
// simulate some work
fData++;
}
boost::mutex::scoped_lock lock(mutex);
std::cout << "count = " << ++thecount << ' ' << fData << std::endl;
}
foo(int i):
fData(i)
{}
foo(const foo& copy)
: fData(copy.fData)
{}
int fData;
};
public:
void setUp()
{}
void tearDown()
{}
void test_1()
{
threadpool::ThreadPool pool( 5, 10 );
for (int y = 0; y < 10; y++)
{
foo bar(y);
for (int i = 0; i < 10; ++i)
{
pool.invoke(bar);
}
// Wait until all of the queued up and in-progress work has finished
pool.wait();
pool.dump();
}
for (int i = 0; i < 500; i++)
{
// simulate some work
fData++;
}
boost::mutex::scoped_lock lock(mutex);
std::cout << "count = " << ++thecount << ' ' << fData << std::endl;
}
foo(int i) : fData(i)
{
}
foo(const foo& copy) : fData(copy.fData)
{
}
int fData;
};
public:
void setUp()
{
}
void tearDown()
{
}
void test_1()
{
threadpool::ThreadPool pool(5, 10);
for (int y = 0; y < 10; y++)
{
foo bar(y);
for (int i = 0; i < 10; ++i)
{
pool.invoke(bar);
}
// Wait until all of the queued up and in-progress work has finished
pool.wait();
pool.dump();
}
}
};
CPPUNIT_TEST_SUITE_REGISTRATION( ThreadPoolTestSuite );
CPPUNIT_TEST_SUITE_REGISTRATION(ThreadPoolTestSuite);
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
int main( int argc, char** argv)
int main(int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest( registry.makeTest() );
bool wasSuccessful = runner.run( "", false );
return (wasSuccessful ? 0 : 1);
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest(registry.makeTest());
bool wasSuccessful = runner.run("", false);
return (wasSuccessful ? 0 : 1);
}

View File

@ -16,10 +16,10 @@
MA 02110-1301, USA. */
/***********************************************************************
* $Id: threadpool.cpp 3495 2013-01-21 14:09:51Z rdempsey $
*
*
***********************************************************************/
* $Id: threadpool.cpp 3495 2013-01-21 14:09:51Z rdempsey $
*
*
***********************************************************************/
#include <stdexcept>
#include <iostream>
using namespace std;
@ -37,539 +37,508 @@ using namespace logging;
namespace threadpool
{
ThreadPool::ThreadPool()
: fMaxThreads( 0 ), fQueueSize( 0 )
ThreadPool::ThreadPool() : fMaxThreads(0), fQueueSize(0)
{
init();
init();
}
ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize )
:fMaxThreads( maxThreads ), fQueueSize( queueSize ),
fPruneThread( NULL )
ThreadPool::ThreadPool(size_t maxThreads, size_t queueSize)
: fMaxThreads(maxThreads), fQueueSize(queueSize), fPruneThread(NULL)
{
init();
init();
}
ThreadPool::~ThreadPool() throw()
{
try
{
boost::mutex::scoped_lock initLock(fInitMutex);
stop();
}
catch (...)
{
}
try
{
boost::mutex::scoped_lock initLock(fInitMutex);
stop();
}
catch (...)
{
}
}
void ThreadPool::init()
{
boost::mutex::scoped_lock initLock(fInitMutex);
fThreadCount = 0;
fGeneralErrors = 0;
fFunctorErrors = 0;
waitingFunctorsSize = 0;
fIssued = 0;
fDebug = false;
fStop = false;
fNextFunctor = fWaitingFunctors.end();
fNextHandle = 1;
fPruneThread = new boost::thread(boost::bind(&ThreadPool::pruneThread, this));
boost::mutex::scoped_lock initLock(fInitMutex);
fThreadCount = 0;
fGeneralErrors = 0;
fFunctorErrors = 0;
waitingFunctorsSize = 0;
fIssued = 0;
fDebug = false;
fStop = false;
fNextFunctor = fWaitingFunctors.end();
fNextHandle = 1;
fPruneThread = new boost::thread(boost::bind(&ThreadPool::pruneThread, this));
}
void ThreadPool::setQueueSize(size_t queueSize)
{
boost::mutex::scoped_lock lock1(fMutex);
fQueueSize = queueSize;
boost::mutex::scoped_lock lock1(fMutex);
fQueueSize = queueSize;
}
void ThreadPool::pruneThread()
{
utils::setThreadName("pruneThread");
boost::unique_lock<boost::mutex> lock2(fPruneMutex);
utils::setThreadName("pruneThread");
boost::unique_lock<boost::mutex> lock2(fPruneMutex);
while(true)
while (true)
{
if (fStop)
return;
if (fPruneThreadEnd.wait_for(lock2, boost::chrono::minutes{1}) == boost::cv_status::timeout)
{
if (fStop)
return;
if (fPruneThreadEnd.wait_for(lock2, boost::chrono::minutes{1}) ==
boost::cv_status::timeout)
while (!fPruneThreads.empty())
{
if (fDebug)
{
while(!fPruneThreads.empty())
{
if (fDebug)
{
ostringstream oss;
oss << "pruning thread " << fPruneThreads.top();
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
fThreads.join_one(fPruneThreads.top());
fPruneThreads.pop();
}
}
else
{
break;
ostringstream oss;
oss << "pruning thread " << fPruneThreads.top();
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage(message);
}
fThreads.join_one(fPruneThreads.top());
fPruneThreads.pop();
}
}
else
{
break;
}
}
}
void ThreadPool::setMaxThreads(size_t maxThreads)
{
boost::mutex::scoped_lock lock1(fMutex);
fMaxThreads = maxThreads;
boost::mutex::scoped_lock lock1(fMutex);
fMaxThreads = maxThreads;
}
void ThreadPool::stop()
{
boost::mutex::scoped_lock lock1(fMutex);
if (fStop)
return; // Was stopped earlier
fStop = true;
lock1.unlock();
boost::mutex::scoped_lock lock1(fMutex);
if (fStop)
return; // Was stopped earlier
fStop = true;
lock1.unlock();
fPruneThreadEnd.notify_all();
fPruneThread->join();
delete fPruneThread;
fNeedThread.notify_all();
fThreads.join_all();
fPruneThreadEnd.notify_all();
fPruneThread->join();
delete fPruneThread;
fNeedThread.notify_all();
fThreads.join_all();
}
void ThreadPool::wait()
{
boost::mutex::scoped_lock lock1(fMutex);
boost::mutex::scoped_lock lock1(fMutex);
while (waitingFunctorsSize > 0)
{
fThreadAvailable.wait(lock1);
//cerr << "woke!" << endl;
}
while (waitingFunctorsSize > 0)
{
fThreadAvailable.wait(lock1);
// cerr << "woke!" << endl;
}
}
void ThreadPool::join(uint64_t thrHandle)
{
boost::mutex::scoped_lock lock1(fMutex);
boost::mutex::scoped_lock lock1(fMutex);
while (waitingFunctorsSize > 0)
while (waitingFunctorsSize > 0)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
foundit = false;
if (iter->hndl == thrHandle)
{
foundit = true;
break;
}
}
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
if (iter->hndl == thrHandle)
{
foundit = true;
break;
}
}
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
}
}
void ThreadPool::join(std::vector<uint64_t>& thrHandle)
{
boost::mutex::scoped_lock lock1(fMutex);
boost::mutex::scoped_lock lock1(fMutex);
while (waitingFunctorsSize > 0)
while (waitingFunctorsSize > 0)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
foundit = false;
std::vector<uint64_t>::iterator thrIter;
std::vector<uint64_t>::iterator thrEnd = thrHandle.end();
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter)
{
if (iter->hndl == *thrIter)
{
foundit = false;
std::vector<uint64_t>::iterator thrIter;
std::vector<uint64_t>::iterator thrEnd = thrHandle.end();
for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter)
{
if (iter->hndl == *thrIter)
{
foundit = true;
break;
}
}
if (foundit == true)
{
break;
}
foundit = true;
break;
}
}
// If we didn't find any of the handles, then all are complete
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
if (foundit == true)
{
break;
}
}
// If we didn't find any of the handles, then all are complete
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
}
}
uint64_t ThreadPool::invoke(const Functor_T& threadfunc)
{
boost::mutex::scoped_lock lock1(fMutex);
uint64_t thrHandle = 0;
boost::mutex::scoped_lock lock1(fMutex);
uint64_t thrHandle = 0;
for (;;)
for (;;)
{
try
{
try
{
if (waitingFunctorsSize < fThreadCount)
{
// Don't create a thread unless it's needed. There
// is a thread available to service this request.
thrHandle = addFunctor(threadfunc);
lock1.unlock();
break;
}
if (waitingFunctorsSize < fThreadCount)
{
// Don't create a thread unless it's needed. There
// is a thread available to service this request.
thrHandle = addFunctor(threadfunc);
lock1.unlock();
break;
}
bool bAdded = false;
bool bAdded = false;
if (waitingFunctorsSize < fQueueSize || fQueueSize == 0)
{
// Don't create a thread unless you have to
thrHandle = addFunctor(threadfunc);
bAdded = true;
}
if (waitingFunctorsSize < fQueueSize || fQueueSize == 0)
{
// Don't create a thread unless you have to
thrHandle = addFunctor(threadfunc);
bAdded = true;
}
if (fDebug)
{
ostringstream oss;
oss << "invoke thread " << " on " << fName
<< " max " << fMaxThreads
<< " queue " << fQueueSize
<< " threads " << fThreadCount
<< " running " << fIssued
<< " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
if (fDebug)
{
ostringstream oss;
oss << "invoke thread "
<< " on " << fName << " max " << fMaxThreads << " queue " << fQueueSize << " threads "
<< fThreadCount << " running " << fIssued << " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage(message);
}
// fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run.
if (fThreadCount < fMaxThreads || fQueueSize == 0)
{
++fThreadCount;
// fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run.
if (fThreadCount < fMaxThreads || fQueueSize == 0)
{
++fThreadCount;
lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this));
lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this));
if (bAdded)
break;
if (bAdded)
break;
// If the mutex is unlocked before creating the thread
// this allows fThreadAvailable to be triggered
// before the wait below runs. So run the loop again.
lock1.lock();
continue;
}
// If the mutex is unlocked before creating the thread
// this allows fThreadAvailable to be triggered
// before the wait below runs. So run the loop again.
lock1.lock();
continue;
}
if (bAdded)
{
lock1.unlock();
break;
}
if (bAdded)
{
lock1.unlock();
break;
}
if (fDebug)
{
logging::Message::Args args;
logging::Message message(5);
args.add("invoke: Blocked waiting for thread. Count ");
args.add(static_cast<uint64_t>(fThreadCount));
args.add("max ");
args.add(static_cast<uint64_t>(fMaxThreads));
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
if (fDebug)
{
logging::Message::Args args;
logging::Message message(5);
args.add("invoke: Blocked waiting for thread. Count ");
args.add(static_cast<uint64_t>(fThreadCount));
args.add("max ");
args.add(static_cast<uint64_t>(fMaxThreads));
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage(message);
}
fThreadAvailable.wait(lock1);
}
catch (...)
{
++fGeneralErrors;
throw;
}
fThreadAvailable.wait(lock1);
}
catch (...)
{
++fGeneralErrors;
throw;
}
}
fNeedThread.notify_one();
return thrHandle;
fNeedThread.notify_one();
return thrHandle;
}
void ThreadPool::beginThread() throw()
{
utils::setThreadName("Idle");
try
utils::setThreadName("Idle");
try
{
boost::unique_lock<boost::mutex> lock1(fMutex);
for (;;)
{
boost::unique_lock<boost::mutex> lock1(fMutex);
if (fStop)
break;
for (;;)
if (fNextFunctor == fWaitingFunctors.end())
{
// Wait until someone needs a thread
// Add the timed wait for queueSize == 0 so we can idle away threads
// over fMaxThreads
if (fQueueSize > 0)
{
if (fStop)
break;
if (fNextFunctor == fWaitingFunctors.end())
{
// Wait until someone needs a thread
// Add the timed wait for queueSize == 0 so we can idle away threads
// over fMaxThreads
if (fQueueSize > 0)
{
fNeedThread.wait(lock1);
}
else
{
// Wait no more than 10 minutes
if (fNeedThread.wait_for(lock1, boost::chrono::minutes{10}) ==
boost::cv_status::timeout)
{
if (fThreadCount > fMaxThreads)
{
boost::mutex::scoped_lock lock2(fPruneMutex);
fPruneThreads.push(boost::this_thread::get_id());
--fThreadCount;
return;
}
}
}
}
else
{
// If there's anything waiting, run it
if (waitingFunctorsSize - fIssued > 0)
{
Container_T::iterator todo = fNextFunctor++;
++fIssued;
if (fDebug)
{
ostringstream oss;
oss << "starting thread " << " on " << fName
<< " max " << fMaxThreads
<< " queue " << fQueueSize
<< " threads " << fThreadCount
<< " running " << fIssued
<< " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
lock1.unlock();
utils::setThreadName("Unspecified");
try
{
todo->functor();
}
catch (exception& e)
{
++fFunctorErrors;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("ThreadPool: Caught exception during execution: ");
args.add(e.what());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
}
utils::setThreadName("Idle");
lock1.lock();
--fIssued;
--waitingFunctorsSize;
fWaitingFunctors.erase(todo);
if (fDebug)
{
ostringstream oss;
oss << "Ending thread " << " on " << fName
<< " max " << fMaxThreads
<< " queue " << fQueueSize
<< " threads " << fThreadCount
<< " running " << fIssued
<< " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
}
fThreadAvailable.notify_all();
}
fNeedThread.wait(lock1);
}
}
catch (exception& ex)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
else
{
// Wait no more than 10 minutes
if (fNeedThread.wait_for(lock1, boost::chrono::minutes{10}) == boost::cv_status::timeout)
{
if (fThreadCount > fMaxThreads)
{
boost::mutex::scoped_lock lock2(fPruneMutex);
fPruneThreads.push(boost::this_thread::get_id());
--fThreadCount;
return;
}
}
}
}
else
{
// If there's anything waiting, run it
if (waitingFunctorsSize - fIssued > 0)
{
Container_T::iterator todo = fNextFunctor++;
++fIssued;
if (fDebug)
{
ostringstream oss;
oss << "starting thread "
<< " on " << fName << " max " << fMaxThreads << " queue " << fQueueSize << " threads "
<< fThreadCount << " running " << fIssued << " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage(message);
}
lock1.unlock();
utils::setThreadName("Unspecified");
try
{
todo->functor();
}
catch (exception& e)
{
++fFunctorErrors;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("beginThread: Caught exception: ");
args.add(ex.what());
message.format( args );
args.add("ThreadPool: Caught exception during execution: ");
args.add(e.what());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
ml.logErrorMessage(message);
#endif
}
catch (...)
{
}
}
utils::setThreadName("Idle");
lock1.lock();
--fIssued;
--waitingFunctorsSize;
fWaitingFunctors.erase(todo);
if (fDebug)
{
ostringstream oss;
oss << "Ending thread "
<< " on " << fName << " max " << fMaxThreads << " queue " << fQueueSize << " threads "
<< fThreadCount << " running " << fIssued << " waiting " << (waitingFunctorsSize - fIssued)
<< " total " << waitingFunctorsSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage(message);
}
}
fThreadAvailable.notify_all();
}
}
}
catch (exception& ex)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
{
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("beginThread: Caught exception: ");
args.add(ex.what());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage(message);
#endif
}
catch (...)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
{
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(6);
args.add("beginThread: Caught unknown exception!");
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
}
catch (...)
{
}
}
}
catch (...)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
{
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(6);
args.add("beginThread: Caught unknown exception!");
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage(message);
#endif
}
catch (...)
{
}
}
}
uint64_t ThreadPool::addFunctor(const Functor_T& func)
{
bool bAtEnd = false;
bool bAtEnd = false;
if (fNextFunctor == fWaitingFunctors.end())
bAtEnd = true;
if (fNextFunctor == fWaitingFunctors.end())
bAtEnd = true;
PoolFunction_T poolFunction;
poolFunction.hndl = fNextHandle;
poolFunction.functor = func;
fWaitingFunctors.push_back(poolFunction);
waitingFunctorsSize++;
PoolFunction_T poolFunction;
poolFunction.hndl = fNextHandle;
poolFunction.functor = func;
fWaitingFunctors.push_back(poolFunction);
waitingFunctorsSize++;
if (bAtEnd)
{
--fNextFunctor;
}
if (bAtEnd)
{
--fNextFunctor;
}
return fNextHandle++;
return fNextHandle++;
}
void ThreadPool::dump()
{
std::cout << "General Errors: " << fGeneralErrors << std::endl;
std::cout << "Functor Errors: " << fFunctorErrors << std::endl;
std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl;
std::cout << "General Errors: " << fGeneralErrors << std::endl;
std::cout << "Functor Errors: " << fFunctorErrors << std::endl;
std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl;
}
void ThreadPoolMonitor::operator()()
{
ostringstream filename;
filename << MCSLOGDIR << "/trace/ThreadPool_" << fPool->name() << ".log";
fLog = new ofstream(filename.str().c_str());
ostringstream filename;
filename << MCSLOGDIR << "/trace/ThreadPool_" << fPool->name() << ".log";
fLog = new ofstream(filename.str().c_str());
for (;;)
for (;;)
{
if (!fLog || !fLog->is_open())
{
if (!fLog || !fLog->is_open())
{
ostringstream oss;
oss << "ThreadPoolMonitor " << fPool->name() << " has no file ";
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
return;
}
// Get a timestamp for output.
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
(*fLog) << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec
<< '.'
<< setw(4) << tv.tv_usec / 100
<< " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize
<< " running " << fPool->fIssued
<< " waiting " << (fPool->waitingFunctorsSize - fPool->fIssued)
<< " ThdCnt " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize
<< endl;
// struct timespec req = { 0, 1000 * 100 }; //100 usec
// nanosleep(&req, 0);
sleep(2);
ostringstream oss;
oss << "ThreadPoolMonitor " << fPool->name() << " has no file ";
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage(message);
return;
}
// Get a timestamp for output.
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
(*fLog) << setfill('0') << setw(2) << tm.tm_hour << ':' << setw(2) << tm.tm_min << ':' << setw(2)
<< tm.tm_sec << '.' << setw(4) << tv.tv_usec / 100 << " Name " << fPool->fName << " Active "
<< fPool->waitingFunctorsSize << " running " << fPool->fIssued << " waiting "
<< (fPool->waitingFunctorsSize - fPool->fIssued) << " ThdCnt " << fPool->fThreadCount << " Max "
<< fPool->fMaxThreads << " Q " << fPool->fQueueSize << endl;
// struct timespec req = { 0, 1000 * 100 }; //100 usec
// nanosleep(&req, 0);
sleep(2);
}
}
} // namespace threadpool
} // namespace threadpool

View File

@ -16,14 +16,14 @@
MA 02110-1301, USA. */
/***********************************************************************
*
* Work dervied from Devguy.com's Open Source C++ thread pool implementation
* released under public domain:
* http://web.archive.org/liveweb/http://dgpctk.cvs.sourceforge.net/viewvc/dgpctk/dgc%2B%2B/include/dg/thread/threadpool.h?revision=1.22&content-type=text%2Fplain
*
* http://web.archive.org/web/20100104101109/http://devguy.com/bb/viewtopic.php?t=460
*
***********************************************************************/
*
* Work dervied from Devguy.com's Open Source C++ thread pool implementation
* released under public domain:
* http://web.archive.org/liveweb/http://dgpctk.cvs.sourceforge.net/viewvc/dgpctk/dgc%2B%2B/include/dg/thread/threadpool.h?revision=1.22&content-type=text%2Fplain
*
* http://web.archive.org/web/20100104101109/http://devguy.com/bb/viewtopic.php?t=460
*
***********************************************************************/
/** @file */
@ -59,351 +59,346 @@ namespace threadpool
// Taken from boost::thread_group and adapted
class ThreadPoolGroup
{
private:
ThreadPoolGroup(ThreadPoolGroup const&);
ThreadPoolGroup& operator=(ThreadPoolGroup const&);
public:
ThreadPoolGroup() {}
~ThreadPoolGroup()
{
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
delete *it;
}
}
private:
ThreadPoolGroup(ThreadPoolGroup const&);
ThreadPoolGroup& operator=(ThreadPoolGroup const&);
template<typename F>
boost::thread* create_thread(F threadfunc)
public:
ThreadPoolGroup()
{
}
~ThreadPoolGroup()
{
for (std::list<boost::thread*>::iterator it = threads.begin(), end = threads.end(); it != end; ++it)
{
boost::lock_guard<boost::shared_mutex> guard(m);
delete *it;
}
}
template <typename F>
boost::thread* create_thread(F threadfunc)
{
boost::lock_guard<boost::shared_mutex> guard(m);
#if __cplusplus >= 201103L
std::unique_ptr<boost::thread> new_thread(new boost::thread(threadfunc));
std::unique_ptr<boost::thread> new_thread(new boost::thread(threadfunc));
#else
std::auto_ptr<boost::thread> new_thread(new boost::thread(threadfunc));
std::auto_ptr<boost::thread> new_thread(new boost::thread(threadfunc));
#endif
threads.push_back(new_thread.get());
return new_thread.release();
}
threads.push_back(new_thread.get());
return new_thread.release();
}
void add_thread(boost::thread* thrd)
void add_thread(boost::thread* thrd)
{
if (thrd)
{
if(thrd)
{
boost::lock_guard<boost::shared_mutex> guard(m);
threads.push_back(thrd);
}
boost::lock_guard<boost::shared_mutex> guard(m);
threads.push_back(thrd);
}
}
void remove_thread(boost::thread* thrd)
void remove_thread(boost::thread* thrd)
{
boost::lock_guard<boost::shared_mutex> guard(m);
std::list<boost::thread*>::iterator const it = std::find(threads.begin(), threads.end(), thrd);
if (it != threads.end())
{
boost::lock_guard<boost::shared_mutex> guard(m);
std::list<boost::thread*>::iterator const it=std::find(threads.begin(),threads.end(),thrd);
if(it!=threads.end())
{
threads.erase(it);
}
threads.erase(it);
}
}
void join_all()
void join_all()
{
boost::shared_lock<boost::shared_mutex> guard(m);
for (std::list<boost::thread*>::iterator it = threads.begin(), end = threads.end(); it != end; ++it)
{
boost::shared_lock<boost::shared_mutex> guard(m);
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
(*it)->join();
}
(*it)->join();
}
}
void interrupt_all()
void interrupt_all()
{
boost::shared_lock<boost::shared_mutex> guard(m);
for (std::list<boost::thread*>::iterator it = threads.begin(), end = threads.end(); it != end; ++it)
{
boost::shared_lock<boost::shared_mutex> guard(m);
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
(*it)->interrupt();
}
(*it)->interrupt();
}
}
size_t size() const
size_t size() const
{
boost::shared_lock<boost::shared_mutex> guard(m);
return threads.size();
}
void join_one(boost::thread::id id)
{
boost::shared_lock<boost::shared_mutex> guard(m);
for (std::list<boost::thread*>::iterator it = threads.begin(), end = threads.end(); it != end; ++it)
{
boost::shared_lock<boost::shared_mutex> guard(m);
return threads.size();
if ((*it)->get_id() == id)
{
(*it)->join();
threads.erase(it);
return;
}
}
}
void join_one(boost::thread::id id)
{
boost::shared_lock<boost::shared_mutex> guard(m);
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
if ((*it)->get_id() == id)
{
(*it)->join();
threads.erase(it);
return;
}
}
}
private:
std::list<boost::thread*> threads;
mutable boost::shared_mutex m;
private:
std::list<boost::thread*> threads;
mutable boost::shared_mutex m;
};
/** @brief ThreadPool is a component for working with pools of threads and asynchronously
* executing tasks. It is responsible for creating threads and tracking which threads are "busy"
* and which are idle. Idle threads are utilized as "work" is added to the system.
*/
* executing tasks. It is responsible for creating threads and tracking which threads are "busy"
* and which are idle. Idle threads are utilized as "work" is added to the system.
*/
class ThreadPool
{
public:
typedef boost::function0<void> Functor_T;
public:
typedef boost::function0<void> Functor_T;
/*********************************************
* ctor/dtor
*
*********************************************/
/*********************************************
* ctor/dtor
*
*********************************************/
/** @brief ctor
*/
EXPORT ThreadPool();
/** @brief ctor
*/
EXPORT ThreadPool();
/** @brief ctor
*
* @param maxThreads the maximum number of threads in this pool. This is the maximum number
* of simultaneuous operations that can go on.
* @param queueSize the maximum number of work tasks in the queue. This is the maximum
* number of jobs that can queue up in the work list before invoke() blocks.
* If 0, then threads never block and total threads may
* exceed maxThreads. Nothing waits. Thread count will
* idle down to maxThreads when less work is required.
*/
EXPORT explicit ThreadPool( size_t maxThreads, size_t queueSize );
/** @brief ctor
*
* @param maxThreads the maximum number of threads in this pool. This is the maximum number
* of simultaneuous operations that can go on.
* @param queueSize the maximum number of work tasks in the queue. This is the maximum
* number of jobs that can queue up in the work list before invoke() blocks.
* If 0, then threads never block and total threads may
* exceed maxThreads. Nothing waits. Thread count will
* idle down to maxThreads when less work is required.
*/
EXPORT explicit ThreadPool(size_t maxThreads, size_t queueSize);
/** @brief dtor
*/
EXPORT ~ThreadPool() throw();
/** @brief dtor
*/
EXPORT ~ThreadPool() throw();
/*********************************************
* accessors/mutators
*
*********************************************/
/** @brief set the work queue size
*
* @param queueSize the size of the work queue
*/
EXPORT void setQueueSize(size_t queueSize);
/*********************************************
* accessors/mutators
*
*********************************************/
/** @brief set the work queue size
*
* @param queueSize the size of the work queue
*/
EXPORT void setQueueSize( size_t queueSize );
/** @brief fet the work queue size
*/
inline size_t getQueueSize() const
{
return fQueueSize;
}
/** @brief fet the work queue size
*/
inline size_t getQueueSize() const
/** @brief set the maximum number of threads to be used to process
* the work queue
*
* @param maxThreads the maximum number of threads
*/
EXPORT void setMaxThreads(size_t maxThreads);
/** @brief get the maximum number of threads
*/
inline size_t getMaxThreads() const
{
return fMaxThreads;
}
/** @brief get the issued number of threads
*/
inline size_t getIssuedThreads()
{
return fIssued;
}
/** @brief queue size accessor
*
*/
inline uint32_t getWaiting() const
{
return waitingFunctorsSize;
}
/*********************************************
* operations
*
*********************************************/
/** @brief invoke a functor in a separate thread managed by the pool
*
* If all maxThreads are busy, threadfunc will be added to a work list and
* will run when a thread comes free. If all threads are busy and there are
* queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free.
*/
EXPORT uint64_t invoke(const Functor_T& threadfunc);
/** @brief stop the threads
*/
EXPORT void stop();
/** @brief wait on all the threads to complete
*/
EXPORT void wait();
/** @brief Wait for a specific thread
*/
EXPORT void join(uint64_t thrHandle);
/** @brief Wait for a specific thread
*/
EXPORT void join(std::vector<uint64_t>& thrHandle);
/** @brief for use in debugging
*/
EXPORT void dump();
EXPORT std::string& name()
{
return fName;
}
EXPORT void setName(std::string name)
{
fName = name;
}
EXPORT void setName(const char* name)
{
fName = name;
}
EXPORT bool debug()
{
return fDebug;
}
EXPORT void setDebug(bool d)
{
fDebug = d;
}
friend class ThreadPoolMonitor;
protected:
private:
// Used internally to keep a handle associated with each functor for join()
struct PoolFunction_T
{
uint64_t hndl;
Functor_T functor;
};
/** @brief initialize data memebers
*/
void init();
/** @brief add a functor to the list
*/
uint64_t addFunctor(const Functor_T& func);
/** @brief thread entry point
*/
void beginThread() throw();
void pruneThread();
ThreadPool(const ThreadPool&);
ThreadPool& operator=(const ThreadPool&);
friend struct beginThreadFunc;
struct beginThreadFunc
{
beginThreadFunc(ThreadPool& impl) : fImpl(impl)
{
return fQueueSize;
}
/** @brief set the maximum number of threads to be used to process
* the work queue
*
* @param maxThreads the maximum number of threads
*/
EXPORT void setMaxThreads( size_t maxThreads );
/** @brief get the maximum number of threads
*/
inline size_t getMaxThreads() const
void operator()()
{
return fMaxThreads;
fImpl.beginThread();
}
/** @brief get the issued number of threads
*/
inline size_t getIssuedThreads() { return fIssued; }
ThreadPool& fImpl;
};
/** @brief queue size accessor
*
*/
inline uint32_t getWaiting() const
struct NoOp
{
void operator()() const
{
return waitingFunctorsSize;
}
};
size_t fThreadCount;
size_t fMaxThreads;
size_t fQueueSize;
/*********************************************
* operations
*
*********************************************/
typedef std::list<PoolFunction_T> Container_T;
Container_T fWaitingFunctors;
Container_T::iterator fNextFunctor;
/** @brief invoke a functor in a separate thread managed by the pool
*
* If all maxThreads are busy, threadfunc will be added to a work list and
* will run when a thread comes free. If all threads are busy and there are
* queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free.
*/
EXPORT uint64_t invoke(const Functor_T& threadfunc);
uint32_t fIssued;
boost::mutex fMutex;
boost::condition_variable fThreadAvailable; // triggered when a thread is available
boost::condition_variable fNeedThread; // triggered when a thread is needed
ThreadPoolGroup fThreads;
/** @brief stop the threads
*/
EXPORT void stop();
bool fStop;
long fGeneralErrors;
long fFunctorErrors;
uint32_t waitingFunctorsSize;
uint64_t fNextHandle;
/** @brief wait on all the threads to complete
*/
EXPORT void wait();
/** @brief Wait for a specific thread
*/
EXPORT void join(uint64_t thrHandle);
/** @brief Wait for a specific thread
*/
EXPORT void join(std::vector<uint64_t>& thrHandle);
/** @brief for use in debugging
*/
EXPORT void dump();
EXPORT std::string& name()
{
return fName;
}
EXPORT void setName(std::string name)
{
fName = name;
}
EXPORT void setName(const char* name)
{
fName = name;
}
EXPORT bool debug()
{
return fDebug;
}
EXPORT void setDebug(bool d)
{
fDebug = d;
}
friend class ThreadPoolMonitor;
protected:
private:
// Used internally to keep a handle associated with each functor for join()
struct PoolFunction_T
{
uint64_t hndl;
Functor_T functor;
};
/** @brief initialize data memebers
*/
void init();
/** @brief add a functor to the list
*/
uint64_t addFunctor(const Functor_T& func);
/** @brief thread entry point
*/
void beginThread() throw();
void pruneThread();
ThreadPool(const ThreadPool&);
ThreadPool& operator = (const ThreadPool&);
friend struct beginThreadFunc;
struct beginThreadFunc
{
beginThreadFunc(ThreadPool& impl)
: fImpl(impl)
{}
void operator() ()
{
fImpl.beginThread();
}
ThreadPool& fImpl;
};
struct NoOp
{
void operator () () const
{}
};
size_t fThreadCount;
size_t fMaxThreads;
size_t fQueueSize;
typedef std::list<PoolFunction_T> Container_T;
Container_T fWaitingFunctors;
Container_T::iterator fNextFunctor;
uint32_t fIssued;
boost::mutex fMutex;
boost::condition_variable fThreadAvailable; // triggered when a thread is available
boost::condition_variable fNeedThread; // triggered when a thread is needed
ThreadPoolGroup fThreads;
bool fStop;
long fGeneralErrors;
long fFunctorErrors;
uint32_t waitingFunctorsSize;
uint64_t fNextHandle;
std::string fName; // Optional to add a name to the pool for debugging.
bool fDebug;
boost::mutex fInitMutex;
boost::mutex fPruneMutex;
boost::condition_variable fPruneThreadEnd;
boost::thread* fPruneThread;
std::stack<boost::thread::id> fPruneThreads; // A list of stale thread IDs to be joined
std::string fName; // Optional to add a name to the pool for debugging.
bool fDebug;
boost::mutex fInitMutex;
boost::mutex fPruneMutex;
boost::condition_variable fPruneThreadEnd;
boost::thread* fPruneThread;
std::stack<boost::thread::id> fPruneThreads; // A list of stale thread IDs to be joined
};
// This class, if instantiated, will continuously log details about the indicated threadpool
// The log will end up in /var/log/mariadb/columnstore/trace/threadpool_<name>.log
class ThreadPoolMonitor
{
public:
ThreadPoolMonitor(ThreadPool* pool) : fPool(pool), fLog(NULL)
{
}
public:
ThreadPoolMonitor(ThreadPool* pool) : fPool(pool), fLog(NULL)
{
}
~ThreadPoolMonitor()
~ThreadPoolMonitor()
{
if (fLog)
{
if (fLog)
{
delete fLog;
}
delete fLog;
}
}
void operator()();
private:
//defaults okay
//ThreadPoolMonitor(const ThreadPoolMonitor& rhs);
//ThreadPoolMonitor& operator=(const ThreadPoolMonitor& rhs);
ThreadPool* fPool;
std::ofstream* fLog;
void operator()();
private:
// defaults okay
// ThreadPoolMonitor(const ThreadPoolMonitor& rhs);
// ThreadPoolMonitor& operator=(const ThreadPoolMonitor& rhs);
ThreadPool* fPool;
std::ofstream* fLog;
};
} // namespace threadpool
} // namespace threadpool
#undef EXPORT
#endif //THREADPOOL_H
#endif // THREADPOOL_H

View File

@ -15,7 +15,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <string>
#include <stdexcept>
#include <iostream>
@ -35,114 +34,114 @@ boost::mutex mutex;
const string timeNow()
{
time_t outputTime = time(0);
struct tm ltm;
char buf[32]; //ctime(3) says at least 26
size_t len = 0;
time_t outputTime = time(0);
struct tm ltm;
char buf[32]; // ctime(3) says at least 26
size_t len = 0;
#ifdef _MSC_VER
asctime_s(buf, 32, localtime_r(&outputTime, &ltm));
asctime_s(buf, 32, localtime_r(&outputTime, &ltm));
#else
asctime_r(localtime_r(&outputTime, &ltm), buf);
asctime_r(localtime_r(&outputTime, &ltm), buf);
#endif
len = strlen(buf);
len = strlen(buf);
if (len > 0) --len;
if (len > 0)
--len;
if (buf[len] == '\n') buf[len] = 0;
if (buf[len] == '\n')
buf[len] = 0;
return buf;
return buf;
}
// Functor class
struct foo
{
int64_t fData;
int64_t fThd;
string start;
bool running;
int64_t fData;
int64_t fThd;
string start;
bool running;
void operator ()()
{
start = timeNow();
void operator()()
{
start = timeNow();
std::cout << "foo thd = " << fThd << " start " << start << std::endl;
std::cout << "foo thd = " << fThd << " start " << start << std::endl;
for (int64_t i = 0; i < 1024 * 1024 * (fThd + 0) * 128; i++)
// simulate some work
fData++;
for (int64_t i = 0; i < 1024 * 1024 * (fThd + 0) * 128; i++)
// simulate some work
fData++;
boost::mutex::scoped_lock lock(mutex);
std::cout << "foo thd = " << fThd << " start " << start << " fin " << timeNow() << std::endl;
}
boost::mutex::scoped_lock lock(mutex);
std::cout << "foo thd = " << fThd << " start " << start << " fin " << timeNow() << std::endl;
}
foo(int64_t i) : fThd(i), fData(i), running(true)
{
start = timeNow();
}
foo(int64_t i) : fThd(i), fData(i), running(true)
{
start = timeNow();
}
foo(const foo& copy) : fData(copy.fData), fThd(copy.fThd), start(copy.start), running(copy.running)
{
std::cout << "new foo " << fThd << endl;
}
foo(const foo& copy) : fData(copy.fData), fThd(copy.fThd), start(copy.start), running(copy.running)
{
std::cout << "new foo " << fThd << endl;
}
~foo()
{
running = false;
}
~foo()
{
running = false;
}
};
int main( int argc, char** argv)
int main(int argc, char** argv)
{
threadpool::ThreadPool pool( 20, 10 );
std::vector<uint64_t> hndl;
hndl.reserve(10);
int t1 = hndl.capacity();
uint64_t testHndl;
uint64_t thdhndl = 999;
int64_t thd = 1;
boost::function0<void> foofunc;
boost::function0<void> foofunc2;
threadpool::ThreadPool pool(20, 10);
std::vector<uint64_t> hndl;
hndl.reserve(10);
int t1 = hndl.capacity();
uint64_t testHndl;
uint64_t thdhndl = 999;
int64_t thd = 1;
boost::function0<void> foofunc;
boost::function0<void> foofunc2;
for (int64_t y = 0; y < 1; y++)
for (int64_t y = 0; y < 1; y++)
{
foo bar(y);
// foofunc = bar;
// foofunc2 = foofunc;
std::cout << "Done with assign" << std::endl;
for (int64_t i = 0; i < 1; ++i)
{
foo bar(y);
// foofunc = bar;
// foofunc2 = foofunc;
std::cout << "Done with assign" << std::endl;
bar.fThd = thd++;
thdhndl = pool.invoke(bar);
for (int64_t i = 0; i < 1; ++i)
{
bar.fThd = thd++;
thdhndl = pool.invoke(bar);
if (y < 10)
{
hndl.push_back(thdhndl);
}
if (y < 10)
{
hndl.push_back(thdhndl);
}
if (y == 0)
{
testHndl = thdhndl;
}
}
boost::mutex::scoped_lock lock(mutex);
if (y == 0)
{
testHndl = thdhndl;
}
}
// Wait until all of the queued up and in-progress work has finished
std::cout << "Threads for join " << hndl.size() << std::endl;
pool.dump();
std::cout << "*** JOIN 1 ***" << std::endl;
pool.join(testHndl);
pool.dump();
std::cout << "*** JOIN 10 ***" << std::endl;
pool.join(hndl);
pool.dump();
std::cout << "*** WAIT ***" << std::endl;
pool.wait();
pool.dump();
sleep(2);
return 0;
boost::mutex::scoped_lock lock(mutex);
}
// Wait until all of the queued up and in-progress work has finished
std::cout << "Threads for join " << hndl.size() << std::endl;
pool.dump();
std::cout << "*** JOIN 1 ***" << std::endl;
pool.join(testHndl);
pool.dump();
std::cout << "*** JOIN 10 ***" << std::endl;
pool.join(hndl);
pool.dump();
std::cout << "*** WAIT ***" << std::endl;
pool.wait();
pool.dump();
sleep(2);
return 0;
}

View File

@ -16,10 +16,10 @@
MA 02110-1301, USA. */
/***********************************************************************
* $Id: threadpool.cpp 553 2008-02-27 17:51:16Z rdempsey $
*
*
***********************************************************************/
* $Id: threadpool.cpp 553 2008-02-27 17:51:16Z rdempsey $
*
*
***********************************************************************/
#include <stdexcept>
#include <unistd.h>
@ -33,378 +33,362 @@ using namespace logging;
namespace threadpool
{
WeightedThreadPool::WeightedThreadPool()
: fMaxThreadWeight(0), fMaxThreads( 0 ), fQueueSize( 0 )
WeightedThreadPool::WeightedThreadPool() : fMaxThreadWeight(0), fMaxThreads(0), fQueueSize(0)
{
init();
init();
}
WeightedThreadPool::WeightedThreadPool( size_t maxThreadWeight, size_t maxThreads, size_t queueSize )
: fMaxThreadWeight(maxThreadWeight), fMaxThreads( maxThreads ), fQueueSize( queueSize )
WeightedThreadPool::WeightedThreadPool(size_t maxThreadWeight, size_t maxThreads, size_t queueSize)
: fMaxThreadWeight(maxThreadWeight), fMaxThreads(maxThreads), fQueueSize(queueSize)
{
init();
init();
if (fQueueSize == 0)
fQueueSize = fMaxThreads * 2;
if (fQueueSize == 0)
fQueueSize = fMaxThreads * 2;
}
WeightedThreadPool::~WeightedThreadPool() throw()
{
// delete fThreadCreated;
try
{
stop();
}
catch (...)
{}
// delete fThreadCreated;
try
{
stop();
}
catch (...)
{
}
}
void WeightedThreadPool::init()
{
fThreadCount = 0;
fGeneralErrors = 0;
fFunctorErrors = 0;
fWaitingFunctorsSize = 0;
fWaitingFunctorsWeight = 0;
issued = 0;
fStop = false;
// fThreadCreated = new NoOp();
fNextFunctor = fWaitingFunctors.end();
fThreadCount = 0;
fGeneralErrors = 0;
fFunctorErrors = 0;
fWaitingFunctorsSize = 0;
fWaitingFunctorsWeight = 0;
issued = 0;
fStop = false;
// fThreadCreated = new NoOp();
fNextFunctor = fWaitingFunctors.end();
}
void WeightedThreadPool::setQueueSize(size_t queueSize)
{
boost::mutex::scoped_lock lock1(fMutex);
fQueueSize = queueSize;
boost::mutex::scoped_lock lock1(fMutex);
fQueueSize = queueSize;
}
void WeightedThreadPool::setMaxThreads(size_t maxThreads)
{
boost::mutex::scoped_lock lock1(fMutex);
fMaxThreads = maxThreads;
boost::mutex::scoped_lock lock1(fMutex);
fMaxThreads = maxThreads;
}
void WeightedThreadPool::setMaxThreadWeight(size_t maxWeight)
{
boost::mutex::scoped_lock lock1(fMutex);
fMaxThreadWeight = maxWeight;
boost::mutex::scoped_lock lock1(fMutex);
fMaxThreadWeight = maxWeight;
}
void WeightedThreadPool::setThreadCreatedListener(const Functor_T& f)
{
// fThreadCreated = f;
// fThreadCreated = f;
}
void WeightedThreadPool::stop()
{
boost::mutex::scoped_lock lock1(fMutex);
fStop = true;
lock1.unlock();
boost::mutex::scoped_lock lock1(fMutex);
fStop = true;
lock1.unlock();
fNeedThread.notify_all();
fThreads.join_all();
fNeedThread.notify_all();
fThreads.join_all();
}
void WeightedThreadPool::wait()
{
boost::mutex::scoped_lock lock1(fMutex);
boost::mutex::scoped_lock lock1(fMutex);
while (fWaitingFunctorsSize > 0)
{
//cout << "waiting ..." << endl;
fThreadAvailable.wait(lock1);
//cerr << "woke!" << endl;
}
while (fWaitingFunctorsSize > 0)
{
// cout << "waiting ..." << endl;
fThreadAvailable.wait(lock1);
// cerr << "woke!" << endl;
}
}
void WeightedThreadPool::removeJobs(uint32_t id)
{
boost::mutex::scoped_lock lock1(fMutex);
Container_T::iterator it;
boost::mutex::scoped_lock lock1(fMutex);
Container_T::iterator it;
it = fNextFunctor;
it = fNextFunctor;
while (it != fWaitingFunctors.end())
while (it != fWaitingFunctors.end())
{
if (it->id == id)
{
if (it->id == id)
{
fWaitingFunctorsWeight -= it->functorWeight;
fWaitingFunctorsSize--;
fWaitingFunctorsWeight -= it->functorWeight;
fWaitingFunctorsSize--;
if (it == fNextFunctor)
{
fWaitingFunctors.erase(fNextFunctor++);
it = fNextFunctor;
}
else
fWaitingFunctors.erase(it++);
}
else
++it;
if (it == fNextFunctor)
{
fWaitingFunctors.erase(fNextFunctor++);
it = fNextFunctor;
}
else
fWaitingFunctors.erase(it++);
}
else
++it;
}
}
void WeightedThreadPool::invoke(const Functor_T& threadfunc, uint32_t functor_weight,
uint32_t id)
void WeightedThreadPool::invoke(const Functor_T& threadfunc, uint32_t functor_weight, uint32_t id)
{
boost::mutex::scoped_lock lock1(fMutex);
boost::mutex::scoped_lock lock1(fMutex);
for (;;)
for (;;)
{
try
{
try
{
if ( fWaitingFunctorsSize < fThreadCount)
{
// Don't create a thread unless it's needed. There
// is a thread available to service this request.
addFunctor(threadfunc, functor_weight, id);
lock1.unlock();
break;
}
if (fWaitingFunctorsSize < fThreadCount)
{
// Don't create a thread unless it's needed. There
// is a thread available to service this request.
addFunctor(threadfunc, functor_weight, id);
lock1.unlock();
break;
}
bool bAdded = false;
bool bAdded = false;
if ( fWaitingFunctorsSize < fQueueSize)
{
// Don't create a thread unless you have to
addFunctor(threadfunc, functor_weight, id);
bAdded = true;
}
if (fWaitingFunctorsSize < fQueueSize)
{
// Don't create a thread unless you have to
addFunctor(threadfunc, functor_weight, id);
bAdded = true;
}
// add a thread is necessary
if ( fThreadCount < fMaxThreads)
{
++fThreadCount;
//cout << "\t++invoke() tcnt=" << fThreadCount << endl;
lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this));
// add a thread is necessary
if (fThreadCount < fMaxThreads)
{
++fThreadCount;
// cout << "\t++invoke() tcnt=" << fThreadCount << endl;
lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this));
if (bAdded)
break;
if (bAdded)
break;
// If the mutex is unlocked before creating the thread
// this allows fThreadAvailable to be triggered
// before the wait below runs. So run the loop again.
lock1.lock();
continue;
}
// If the mutex is unlocked before creating the thread
// this allows fThreadAvailable to be triggered
// before the wait below runs. So run the loop again.
lock1.lock();
continue;
}
//else
// cout << "invoke() no thread created c=" << fThreadCount << " m=" << fMaxThreads << endl;
// else
// cout << "invoke() no thread created c=" << fThreadCount << " m=" << fMaxThreads << endl;
if (bAdded)
{
lock1.unlock();
break;
}
if (bAdded)
{
lock1.unlock();
break;
}
fThreadAvailable.wait(lock1);
}
catch (...)
{
++fGeneralErrors;
throw;
}
fThreadAvailable.wait(lock1);
}
catch (...)
{
++fGeneralErrors;
throw;
}
}
fNeedThread.notify_one();
fNeedThread.notify_one();
}
void WeightedThreadPool::beginThread() throw()
{
vector<bool> reschedule;
vector<bool> reschedule;
try
{
// fThreadCreated();
boost::mutex::scoped_lock lock1(fMutex);
for (;;)
{
if (fStop)
break;
if (fNextFunctor == fWaitingFunctors.end())
{
// Wait until someone needs a thread
fNeedThread.wait(lock1);
}
else
{
vector<Container_T::iterator> todoList;
int i, num = (fWaitingFunctorsSize - issued);
Container_T::const_iterator iter;
uint32_t weight = 0;
for (i = 0; i < num && weight < fMaxThreadWeight; i++)
{
weight += (*fNextFunctor).functorWeight;
todoList.push_back(fNextFunctor++);
}
issued += i;
num = i;
lock1.unlock();
// cerr << "beginThread() " << num
// << " jobs - fWaitingFunctorsSize=" << fWaitingFunctorsSize
// << " fWaitingFunctorsWeight=" << fWaitingFunctorsWeight
// << " weight=" << weight
// << " issued=" << issued << " todo=" << todoList.size()
// << " fThreadCount=" << fThreadCount << endl;
i = 0;
reschedule.resize(num);
bool allWereRescheduled = true, someWereRescheduled = false;
while (i < num)
{
try
{
for (; i < num; i++)
{
reschedule[i] = false; // in case of exception in the next line
reschedule[i] = ((*todoList[i]).functor)();
allWereRescheduled &= reschedule[i];
someWereRescheduled |= reschedule[i];
}
}
catch (exception& e)
{
i++;
++fFunctorErrors;
cerr << e.what() << endl;
}
}
// no real work was done, prevent intensive busy waiting
if (allWereRescheduled)
usleep(1000);
// cout << "running " << i << "/" << num << " functor" <<endl;
lock1.lock();
if (someWereRescheduled)
{
for (i = 0; i < num; i++)
if (reschedule[i])
addFunctor((*todoList[i]).functor, (*todoList[i]).functorWeight, (*todoList[i]).id);
if (num > 1)
fNeedThread.notify_all();
else
fNeedThread.notify_one();
}
issued -= num;
for (i = 0; i < num; i++)
{
fWaitingFunctorsWeight -= (*todoList[i]).functorWeight;
fWaitingFunctors.erase(todoList[i]);
}
fWaitingFunctorsSize -= num;
// if (fWaitingFunctorsSize != fWaitingFunctors.size()) ;
// cerr << "num=" << num << " cleaned=" << i << " size="
// << fWaitingFunctorsSize << " list size="
// << fWaitingFunctors.size()
// << " w="<<fWaitingFunctorsWeight << endl;
fThreadAvailable.notify_all();
}
}
}
catch (exception& ex)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
{
// fThreadCreated();
boost::mutex::scoped_lock lock1(fMutex);
logging::Message::Args args;
logging::Message message(5);
args.add("beginThread: Caught exception: ");
args.add(ex.what());
for (;;)
{
if (fStop)
break;
message.format(args);
if (fNextFunctor == fWaitingFunctors.end())
{
// Wait until someone needs a thread
fNeedThread.wait(lock1);
}
else
{
vector<Container_T::iterator> todoList;
int i, num = (fWaitingFunctorsSize - issued);
Container_T::const_iterator iter;
uint32_t weight = 0;
for (i = 0; i < num && weight < fMaxThreadWeight; i++)
{
weight += (*fNextFunctor).functorWeight;
todoList.push_back(fNextFunctor++);
}
issued += i;
num = i;
lock1.unlock();
//cerr << "beginThread() " << num
// << " jobs - fWaitingFunctorsSize=" << fWaitingFunctorsSize
// << " fWaitingFunctorsWeight=" << fWaitingFunctorsWeight
// << " weight=" << weight
// << " issued=" << issued << " todo=" << todoList.size()
// << " fThreadCount=" << fThreadCount << endl;
i = 0;
reschedule.resize(num);
bool allWereRescheduled = true, someWereRescheduled = false;
while (i < num)
{
try
{
for (; i < num; i++)
{
reschedule[i] = false; // in case of exception in the next line
reschedule[i] = ((*todoList[i]).functor)();
allWereRescheduled &= reschedule[i];
someWereRescheduled |= reschedule[i];
}
}
catch (exception& e)
{
i++;
++fFunctorErrors;
cerr << e.what() << endl;
}
}
// no real work was done, prevent intensive busy waiting
if (allWereRescheduled)
usleep(1000);
//cout << "running " << i << "/" << num << " functor" <<endl;
lock1.lock();
if (someWereRescheduled)
{
for (i = 0; i < num; i++)
if (reschedule[i])
addFunctor((*todoList[i]).functor, (*todoList[i]).functorWeight,
(*todoList[i]).id);
if (num > 1)
fNeedThread.notify_all();
else
fNeedThread.notify_one();
}
issued -= num;
for (i = 0; i < num; i++)
{
fWaitingFunctorsWeight -= (*todoList[i]).functorWeight;
fWaitingFunctors.erase(todoList[i]);
}
fWaitingFunctorsSize -= num;
//if (fWaitingFunctorsSize != fWaitingFunctors.size()) ;
// cerr << "num=" << num << " cleaned=" << i << " size="
// << fWaitingFunctorsSize << " list size="
// << fWaitingFunctors.size()
// << " w="<<fWaitingFunctorsWeight << endl;
fThreadAvailable.notify_all();
}
}
}
catch (exception& ex)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
{
logging::Message::Args args;
logging::Message message(5);
args.add("beginThread: Caught exception: ");
args.add(ex.what());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
}
catch (...)
{
}
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage(message);
}
catch (...)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
{
logging::Message::Args args;
logging::Message message(6);
args.add("beginThread: Caught unknown exception!");
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
}
catch (...)
{
}
}
}
catch (...)
{
++fGeneralErrors;
// Log the exception and exit this thread
try
{
logging::Message::Args args;
logging::Message message(6);
args.add("beginThread: Caught unknown exception!");
message.format(args);
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage(message);
}
catch (...)
{
}
}
}
void WeightedThreadPool::addFunctor(const Functor_T& func, uint32_t functor_weight,
uint32_t id)
void WeightedThreadPool::addFunctor(const Functor_T& func, uint32_t functor_weight, uint32_t id)
{
bool bAtEnd = false;
bool bAtEnd = false;
if (fNextFunctor == fWaitingFunctors.end())
bAtEnd = true;
if (fNextFunctor == fWaitingFunctors.end())
bAtEnd = true;
//cout << "addFunctor() w=" << fWaitingFunctorsWeight
// << " s=" << fWaitingFunctorsSize << " i=" << id << endl;
// cout << "addFunctor() w=" << fWaitingFunctorsWeight
// << " s=" << fWaitingFunctorsSize << " i=" << id << endl;
FunctorListItem fl = {func, functor_weight, id};
fWaitingFunctors.push_back(fl);
fWaitingFunctorsSize++;
fWaitingFunctorsWeight += functor_weight;
FunctorListItem fl = {func, functor_weight, id};
fWaitingFunctors.push_back(fl);
fWaitingFunctorsSize++;
fWaitingFunctorsWeight += functor_weight;
if (bAtEnd)
{
--fNextFunctor;
}
if (bAtEnd)
{
--fNextFunctor;
}
}
void WeightedThreadPool::dump()
{
std::cout << "General Errors: " << fGeneralErrors << std::endl;
std::cout << "Functor Errors: " << fFunctorErrors << std::endl;
std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl;
std::cout << "Waiting functors weight : " << fWaitingFunctorsWeight << std::endl;
std::cout << "General Errors: " << fGeneralErrors << std::endl;
std::cout << "Functor Errors: " << fFunctorErrors << std::endl;
std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl;
std::cout << "Waiting functors weight : " << fWaitingFunctorsWeight << std::endl;
}
} // namespace threadpool
} // namespace threadpool

View File

@ -16,10 +16,10 @@
MA 02110-1301, USA. */
/***********************************************************************
* $Id: $
*
*
***********************************************************************/
* $Id: $
*
*
***********************************************************************/
/** @file */
#ifndef WEIGHTEDTHREADPOOL_H
@ -40,203 +40,200 @@
namespace threadpool
{
/** @brief ThreadPool is a component for working with pools of threads and asynchronously
* executing tasks. It is responsible for creating threads and tracking which threads are "busy"
* and which are idle. Idle threads are utilized as "work" is added to the system.
*/
* executing tasks. It is responsible for creating threads and tracking which threads are "busy"
* and which are idle. Idle threads are utilized as "work" is added to the system.
*/
class WeightedThreadPool
{
public:
typedef boost::function0<int> Functor_T;
public:
typedef boost::function0<int> Functor_T;
/*********************************************
* ctor/dtor
*
*********************************************/
/*********************************************
* ctor/dtor
*
*********************************************/
/** @brief ctor
*/
WeightedThreadPool();
/** @brief ctor
*/
WeightedThreadPool();
/** @brief ctor
*
* @param maxThreads the maximum number of threads in this pool. This is the maximum number
* of simultaneuous operations that can go on.
* @param queueSize the maximum number of work tasks in the queue. This is the maximum
* number of jobs that can queue up in the work list before invoke() blocks.
*/
explicit WeightedThreadPool( size_t maxThreadWeight, size_t maxThreads, size_t queueSize );
/** @brief ctor
*
* @param maxThreads the maximum number of threads in this pool. This is the maximum number
* of simultaneuous operations that can go on.
* @param queueSize the maximum number of work tasks in the queue. This is the maximum
* number of jobs that can queue up in the work list before invoke() blocks.
*/
explicit WeightedThreadPool(size_t maxThreadWeight, size_t maxThreads, size_t queueSize);
/** @brief dtor
*/
~WeightedThreadPool() throw();
/** @brief dtor
*/
~WeightedThreadPool() throw();
/*********************************************
* accessors/mutators
*
*********************************************/
/** @brief set the work queue size
*
* @param queueSize the size of the work queue
*/
void setQueueSize(size_t queueSize);
/*********************************************
* accessors/mutators
*
*********************************************/
/** @brief set the work queue size
*
* @param queueSize the size of the work queue
*/
void setQueueSize( size_t queueSize );
/** @brief fet the work queue size
*/
inline size_t getQueueSize() const
{
return fQueueSize;
}
/** @brief fet the work queue size
*/
inline size_t getQueueSize() const
/** @brief set the maximum number of threads to be used to process
* the work queue
*
* @param maxThreads the maximum number of threads
*/
void setMaxThreads(size_t maxThreads);
/** @brief get the maximum number of threads
*/
inline size_t getMaxThreads() const
{
return fMaxThreads;
}
/** @brief set the maximum processing weight of a thread to be
* submitted for execution from the existing jobs
* scheduled in the work queue
*
* @param maxWeight for execution
*/
void setMaxThreadWeight(size_t maxWeight);
/** @brief get the maximum number of threads
*/
inline uint32_t getMaxThreadWeight() const
{
return fMaxThreadWeight;
}
/** @brief register a functor to be called when a new thread
* is created
*/
void setThreadCreatedListener(const Functor_T& f);
/** @brief queue size accessor
*
*/
inline uint32_t getWaiting() const
{
return fWaitingFunctorsSize;
}
inline uint32_t getWeight() const
{
return fWaitingFunctorsWeight;
}
void removeJobs(uint32_t id);
/*********************************************
* operations
*
*********************************************/
/** @brief invoke a functor in a separate thread managed by the pool
*
* If all maxThreads are busy, threadfunc will be added to a work list and
* will run when a thread comes free. If all threads are busy and there are
* queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free.
*/
void invoke(const Functor_T& threadfunc, uint32_t functor_weight, uint32_t id);
/** @brief stop the threads
*/
void stop();
/** @brief wait on all the threads to complete
*/
void wait();
/** @brief for use in debugging
*/
void dump();
protected:
private:
/** @brief initialize data memebers
*/
void init();
/** @brief add a functor to the list
*/
void addFunctor(const Functor_T& func, uint32_t functor_weight, uint32_t id);
/** @brief thread entry point
*/
void beginThread() throw();
WeightedThreadPool(const WeightedThreadPool&);
WeightedThreadPool& operator=(const WeightedThreadPool&);
friend struct beginThreadFunc;
struct beginThreadFunc
{
beginThreadFunc(WeightedThreadPool& impl) : fImpl(impl)
{
return fQueueSize;
}
/** @brief set the maximum number of threads to be used to process
* the work queue
*
* @param maxThreads the maximum number of threads
*/
void setMaxThreads( size_t maxThreads );
/** @brief get the maximum number of threads
*/
inline size_t getMaxThreads() const
void operator()()
{
return fMaxThreads;
fImpl.beginThread();
}
/** @brief set the maximum processing weight of a thread to be
* submitted for execution from the existing jobs
* scheduled in the work queue
*
* @param maxWeight for execution
*/
void setMaxThreadWeight( size_t maxWeight );
WeightedThreadPool& fImpl;
};
/** @brief get the maximum number of threads
*/
inline uint32_t getMaxThreadWeight() const
struct NoOp
{
void operator()() const
{
return fMaxThreadWeight;
}
};
/** @brief register a functor to be called when a new thread
* is created
*/
void setThreadCreatedListener(const Functor_T& f) ;
size_t fThreadCount;
size_t fMaxThreadWeight;
size_t fMaxThreads;
size_t fQueueSize;
/** @brief queue size accessor
*
*/
inline uint32_t getWaiting() const
{
return fWaitingFunctorsSize;
}
// typedef std::list<Functor_T> Container_T;
struct FunctorListItemStruct
{
Functor_T functor;
uint32_t functorWeight;
uint32_t id;
};
inline uint32_t getWeight() const
{
return fWaitingFunctorsWeight;
}
typedef FunctorListItemStruct FunctorListItem;
typedef std::list<FunctorListItem> Container_T;
Container_T fWaitingFunctors;
Container_T::iterator fNextFunctor;
void removeJobs(uint32_t id);
/*********************************************
* operations
*
*********************************************/
/** @brief invoke a functor in a separate thread managed by the pool
*
* If all maxThreads are busy, threadfunc will be added to a work list and
* will run when a thread comes free. If all threads are busy and there are
* queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free.
*/
void invoke(const Functor_T& threadfunc, uint32_t functor_weight, uint32_t id);
/** @brief stop the threads
*/
void stop();
/** @brief wait on all the threads to complete
*/
void wait();
/** @brief for use in debugging
*/
void dump();
protected:
private:
/** @brief initialize data memebers
*/
void init();
/** @brief add a functor to the list
*/
void addFunctor(const Functor_T& func, uint32_t functor_weight, uint32_t id);
/** @brief thread entry point
*/
void beginThread() throw();
WeightedThreadPool(const WeightedThreadPool&);
WeightedThreadPool& operator = (const WeightedThreadPool&);
friend struct beginThreadFunc;
struct beginThreadFunc
{
beginThreadFunc(WeightedThreadPool& impl)
: fImpl(impl)
{}
void operator() ()
{
fImpl.beginThread();
}
WeightedThreadPool& fImpl;
};
struct NoOp
{
void operator () () const
{}
};
size_t fThreadCount;
size_t fMaxThreadWeight;
size_t fMaxThreads;
size_t fQueueSize;
//typedef std::list<Functor_T> Container_T;
struct FunctorListItemStruct
{
Functor_T functor;
uint32_t functorWeight;
uint32_t id;
};
typedef FunctorListItemStruct FunctorListItem;
typedef std::list<FunctorListItem> Container_T;
Container_T fWaitingFunctors;
Container_T::iterator fNextFunctor;
uint32_t issued;
boost::mutex fMutex;
boost::condition fThreadAvailable; // triggered when a thread is available
boost::condition fNeedThread; // triggered when a thread is needed
boost::thread_group fThreads;
bool fStop;
long fGeneralErrors;
long fFunctorErrors;
uint16_t fWaitingFunctorsSize;
uint16_t fWaitingFunctorsWeight;
uint32_t issued;
boost::mutex fMutex;
boost::condition fThreadAvailable; // triggered when a thread is available
boost::condition fNeedThread; // triggered when a thread is needed
boost::thread_group fThreads;
bool fStop;
long fGeneralErrors;
long fFunctorErrors;
uint16_t fWaitingFunctorsSize;
uint16_t fWaitingFunctorsWeight;
};
} // namespace threadpool
} // namespace threadpool
#endif //WEIGHTEDTHREADPOOL_H
#endif // WEIGHTEDTHREADPOOL_H

View File

@ -15,7 +15,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <string>
#include <stdexcept>
#include <iostream>
@ -37,49 +36,45 @@ boost::mutex mutex;
// Functor class
struct foo
{
void operator ()()
{
for (int i = 0; i < 1024 * 1024 * 10; i++)
// simulate some work
fData++;
void operator()()
{
for (int i = 0; i < 1024 * 1024 * 10; i++)
// simulate some work
fData++;
//boost::mutex::scoped_lock lock(mutex);
//std::cout << "foo count = " << ++thecount << " " << fData << std::endl;
}
// boost::mutex::scoped_lock lock(mutex);
// std::cout << "foo count = " << ++thecount << " " << fData << std::endl;
}
foo(int i):
fData(i)
{}
foo(int i) : fData(i)
{
}
foo(const foo& copy)
: fData(copy.fData)
{}
int fData;
foo(const foo& copy) : fData(copy.fData)
{
}
int fData;
};
int main( int argc, char** argv)
int main(int argc, char** argv)
{
threadpool::WeightedThreadPool pool( 100, 10, 5 );
threadpool::WeightedThreadPool pool(100, 10, 5);
for (int y = 0; y < 10; y++)
for (int y = 0; y < 10; y++)
{
foo bar(y);
for (int i = 0; i < 10; ++i)
{
foo bar(y);
for (int i = 0; i < 10; ++i)
{
pool.invoke(bar, 25);
}
boost::mutex::scoped_lock lock(mutex);
std::cout << "count = " << ++thecount << std::endl;
// Wait until all of the queued up and in-progress work has finished
pool.wait();
pool.dump();
pool.invoke(bar, 25);
}
boost::mutex::scoped_lock lock(mutex);
std::cout << "count = " << ++thecount << std::endl;
// Wait until all of the queued up and in-progress work has finished
pool.wait();
pool.dump();
}
}