diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 4d19df91e..a5c713eab 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -42,12 +42,22 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads uint midThreads, uint lowThreads, uint ID) : _stop(false), weightPerRun(targetWeightPerRun), id(ID) { + boost::thread* newThread; for (uint32_t i = 0; i < highThreads; i++) - threads.create_thread(ThreadHelper(this, HIGH)); + { + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); + } for (uint32_t i = 0; i < midThreads; i++) - threads.create_thread(ThreadHelper(this, MEDIUM)); + { + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); + } for (uint32_t i = 0; i < lowThreads; i++) - threads.create_thread(ThreadHelper(this, LOW)); + { + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); + } cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n"; defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads; @@ -62,6 +72,7 @@ PriorityThreadPool::~PriorityThreadPool() void PriorityThreadPool::addJob(const Job &job, bool useLock) { + boost::thread* newThread; mutex::scoped_lock lk(mutex, defer_lock_t()); if (useLock) @@ -70,17 +81,20 @@ void PriorityThreadPool::addJob(const Job &job, bool useLock) // Create any missing threads if (defaultThreadCounts[HIGH] != threadCounts[HIGH]) { - threads.create_thread(ThreadHelper(this, HIGH)); + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); threadCounts[HIGH]++; } if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM]) { - threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); threadCounts[MEDIUM]++; } if (defaultThreadCounts[LOW] != threadCounts[LOW]) { - threads.create_thread(ThreadHelper(this, LOW)); + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); threadCounts[LOW]++; } @@ -261,7 +275,6 @@ void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveproce void PriorityThreadPool::stop() { _stop = true; - threads.join_all(); } } // namespace threadpool diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index d903f9892..0b1546e98 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -43,7 +43,8 @@ ThreadPool::ThreadPool() } ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize ) - :fMaxThreads( maxThreads ), fQueueSize( queueSize ) + :fMaxThreads( maxThreads ), fQueueSize( queueSize ), + fPruneThread( NULL ) { init(); } @@ -72,6 +73,7 @@ void ThreadPool::init() fStop = false; fNextFunctor = fWaitingFunctors.end(); fNextHandle=1; + fPruneThread = new boost::thread(boost::bind(&ThreadPool::pruneThread, this)); } void ThreadPool::setQueueSize(size_t queueSize) @@ -80,6 +82,39 @@ void ThreadPool::setQueueSize(size_t queueSize) fQueueSize = queueSize; } +void ThreadPool::pruneThread() +{ + boost::mutex::scoped_lock lock2(fPruneMutex); + + while(true) + { + boost::system_time timeout = boost::get_system_time() + boost::posix_time::minutes(1); + if (!fPruneThreadEnd.timed_wait(fPruneMutex, timeout)) + { + while(!fPruneThreads.empty()) + { + if (fDebug) + { + ostringstream oss; + oss << "pruning thread " << fPruneThreads.top(); + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } + fThreads.join_one(fPruneThreads.top()); + fPruneThreads.pop(); + } + } + else + { + break; + } + } +} void ThreadPool::setMaxThreads(size_t maxThreads) { @@ -93,6 +128,9 @@ void ThreadPool::stop() fStop = true; lock1.unlock(); + fPruneThreadEnd.notify_all(); + fPruneThread->join(); + delete fPruneThread; fNeedThread.notify_all(); fThreads.join_all(); } @@ -293,6 +331,8 @@ void ThreadPool::beginThread() throw() { if (fThreadCount > fMaxThreads) { + boost::mutex::scoped_lock lock2(fPruneMutex); + fPruneThreads.push(boost::this_thread::get_id()); --fThreadCount; return; } diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index f11bb4b2b..95f7d4c7e 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,106 @@ namespace threadpool { + +// Taken from boost::thread_group and adapted +class ThreadPoolGroup +{ +private: + ThreadPoolGroup(ThreadPoolGroup const&); + ThreadPoolGroup& operator=(ThreadPoolGroup const&); +public: + ThreadPoolGroup() {} + ~ThreadPoolGroup() + { + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + delete *it; + } + } + + template + boost::thread* create_thread(F threadfunc) + { + boost::lock_guard guard(m); + std::unique_ptr new_thread(new boost::thread(threadfunc)); + threads.push_back(new_thread.get()); + return new_thread.release(); + } + + void add_thread(boost::thread* thrd) + { + if(thrd) + { + boost::lock_guard guard(m); + threads.push_back(thrd); + } + } + + void remove_thread(boost::thread* thrd) + { + boost::lock_guard guard(m); + std::list::iterator const it=std::find(threads.begin(),threads.end(),thrd); + if(it!=threads.end()) + { + threads.erase(it); + } + } + + void join_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->join(); + } + } + + void interrupt_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->interrupt(); + } + } + + size_t size() const + { + boost::shared_lock guard(m); + return threads.size(); + } + + void join_one(boost::thread::id id) + { + boost::shared_lock guard(m); + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + if ((*it)->get_id() == id) + { + (*it)->join(); + threads.erase(it); + return; + } + } + + } + +private: + std::list threads; + mutable boost::shared_mutex m; +}; + + /** @brief ThreadPool is a component for working with pools of threads and asynchronously * executing tasks. It is responsible for creating threads and tracking which threads are "busy" * and which are idle. Idle threads are utilized as "work" is added to the system. @@ -183,6 +284,7 @@ private: */ void beginThread() throw(); + void pruneThread(); ThreadPool(const ThreadPool&); ThreadPool& operator = (const ThreadPool&); @@ -221,7 +323,7 @@ private: boost::mutex fMutex; boost::condition fThreadAvailable; // triggered when a thread is available boost::condition fNeedThread; // triggered when a thread is needed - boost::thread_group fThreads; + ThreadPoolGroup fThreads; bool fStop; long fGeneralErrors; @@ -231,6 +333,10 @@ private: std::string fName; // Optional to add a name to the pool for debugging. bool fDebug; + boost::mutex fPruneMutex; + boost::condition fPruneThreadEnd; + boost::thread* fPruneThread; + std::stack fPruneThreads; // A list of stale thread IDs to be joined }; // This class, if instantiated, will continuously log details about the indicated threadpool