From 94dfacfe2590e2a4524959e9b309402bf592b66b Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Fri, 28 Sep 2018 07:21:49 +0100 Subject: [PATCH] MCOL-1750 Fix threadpool stack leaks When a thread has been idle for 10 minutes and we have too many threads in the threadpool the thread will be pruned. This is done by the thread's main function just returning. Unfortunately this does not free up the memory, the thread either needs to be joined or detatched. We cannot use detached threads since there are mutexes and conditional variables between the main thread and the threadpool threads. If the main thread finishes before the threadpool threads (as would happen in cpimport) then crashes occur. The parent needs to wait on the child threads which is the whole point in joining. So this fix spawns a new thread which every minute will check the list of threads to be joined due to timeout and join them. We have had to use an adapted version of boost::thread_group so that we can join a single thread based off its thread ID. In addition with have modified PriorityThreadPool to use detached threads since this does not need to signal the child threads at the end. --- utils/threadpool/prioritythreadpool.cpp | 27 ++++-- utils/threadpool/threadpool.cpp | 42 ++++++++- utils/threadpool/threadpool.h | 108 +++++++++++++++++++++++- 3 files changed, 168 insertions(+), 9 deletions(-) diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 4d19df91e..a5c713eab 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -42,12 +42,22 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads uint midThreads, uint lowThreads, uint ID) : _stop(false), weightPerRun(targetWeightPerRun), id(ID) { + boost::thread* newThread; for (uint32_t i = 0; i < highThreads; i++) - threads.create_thread(ThreadHelper(this, HIGH)); + { + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); + } for (uint32_t i = 0; i < midThreads; i++) - threads.create_thread(ThreadHelper(this, MEDIUM)); + { + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); + } for (uint32_t i = 0; i < lowThreads; i++) - threads.create_thread(ThreadHelper(this, LOW)); + { + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); + } cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n"; defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads; @@ -62,6 +72,7 @@ PriorityThreadPool::~PriorityThreadPool() void PriorityThreadPool::addJob(const Job &job, bool useLock) { + boost::thread* newThread; mutex::scoped_lock lk(mutex, defer_lock_t()); if (useLock) @@ -70,17 +81,20 @@ void PriorityThreadPool::addJob(const Job &job, bool useLock) // Create any missing threads if (defaultThreadCounts[HIGH] != threadCounts[HIGH]) { - threads.create_thread(ThreadHelper(this, HIGH)); + newThread = threads.create_thread(ThreadHelper(this, HIGH)); + newThread->detach(); threadCounts[HIGH]++; } if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM]) { - threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread = threads.create_thread(ThreadHelper(this, MEDIUM)); + newThread->detach(); threadCounts[MEDIUM]++; } if (defaultThreadCounts[LOW] != threadCounts[LOW]) { - threads.create_thread(ThreadHelper(this, LOW)); + newThread = threads.create_thread(ThreadHelper(this, LOW)); + newThread->detach(); threadCounts[LOW]++; } @@ -261,7 +275,6 @@ void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveproce void PriorityThreadPool::stop() { _stop = true; - threads.join_all(); } } // namespace threadpool diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index d903f9892..0b1546e98 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -43,7 +43,8 @@ ThreadPool::ThreadPool() } ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize ) - :fMaxThreads( maxThreads ), fQueueSize( queueSize ) + :fMaxThreads( maxThreads ), fQueueSize( queueSize ), + fPruneThread( NULL ) { init(); } @@ -72,6 +73,7 @@ void ThreadPool::init() fStop = false; fNextFunctor = fWaitingFunctors.end(); fNextHandle=1; + fPruneThread = new boost::thread(boost::bind(&ThreadPool::pruneThread, this)); } void ThreadPool::setQueueSize(size_t queueSize) @@ -80,6 +82,39 @@ void ThreadPool::setQueueSize(size_t queueSize) fQueueSize = queueSize; } +void ThreadPool::pruneThread() +{ + boost::mutex::scoped_lock lock2(fPruneMutex); + + while(true) + { + boost::system_time timeout = boost::get_system_time() + boost::posix_time::minutes(1); + if (!fPruneThreadEnd.timed_wait(fPruneMutex, timeout)) + { + while(!fPruneThreads.empty()) + { + if (fDebug) + { + ostringstream oss; + oss << "pruning thread " << fPruneThreads.top(); + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } + fThreads.join_one(fPruneThreads.top()); + fPruneThreads.pop(); + } + } + else + { + break; + } + } +} void ThreadPool::setMaxThreads(size_t maxThreads) { @@ -93,6 +128,9 @@ void ThreadPool::stop() fStop = true; lock1.unlock(); + fPruneThreadEnd.notify_all(); + fPruneThread->join(); + delete fPruneThread; fNeedThread.notify_all(); fThreads.join_all(); } @@ -293,6 +331,8 @@ void ThreadPool::beginThread() throw() { if (fThreadCount > fMaxThreads) { + boost::mutex::scoped_lock lock2(fPruneMutex); + fPruneThreads.push(boost::this_thread::get_id()); --fThreadCount; return; } diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index f11bb4b2b..95f7d4c7e 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,106 @@ namespace threadpool { + +// Taken from boost::thread_group and adapted +class ThreadPoolGroup +{ +private: + ThreadPoolGroup(ThreadPoolGroup const&); + ThreadPoolGroup& operator=(ThreadPoolGroup const&); +public: + ThreadPoolGroup() {} + ~ThreadPoolGroup() + { + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + delete *it; + } + } + + template + boost::thread* create_thread(F threadfunc) + { + boost::lock_guard guard(m); + std::unique_ptr new_thread(new boost::thread(threadfunc)); + threads.push_back(new_thread.get()); + return new_thread.release(); + } + + void add_thread(boost::thread* thrd) + { + if(thrd) + { + boost::lock_guard guard(m); + threads.push_back(thrd); + } + } + + void remove_thread(boost::thread* thrd) + { + boost::lock_guard guard(m); + std::list::iterator const it=std::find(threads.begin(),threads.end(),thrd); + if(it!=threads.end()) + { + threads.erase(it); + } + } + + void join_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->join(); + } + } + + void interrupt_all() + { + boost::shared_lock guard(m); + + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + (*it)->interrupt(); + } + } + + size_t size() const + { + boost::shared_lock guard(m); + return threads.size(); + } + + void join_one(boost::thread::id id) + { + boost::shared_lock guard(m); + for(std::list::iterator it=threads.begin(),end=threads.end(); + it!=end; + ++it) + { + if ((*it)->get_id() == id) + { + (*it)->join(); + threads.erase(it); + return; + } + } + + } + +private: + std::list threads; + mutable boost::shared_mutex m; +}; + + /** @brief ThreadPool is a component for working with pools of threads and asynchronously * executing tasks. It is responsible for creating threads and tracking which threads are "busy" * and which are idle. Idle threads are utilized as "work" is added to the system. @@ -183,6 +284,7 @@ private: */ void beginThread() throw(); + void pruneThread(); ThreadPool(const ThreadPool&); ThreadPool& operator = (const ThreadPool&); @@ -221,7 +323,7 @@ private: boost::mutex fMutex; boost::condition fThreadAvailable; // triggered when a thread is available boost::condition fNeedThread; // triggered when a thread is needed - boost::thread_group fThreads; + ThreadPoolGroup fThreads; bool fStop; long fGeneralErrors; @@ -231,6 +333,10 @@ private: std::string fName; // Optional to add a name to the pool for debugging. bool fDebug; + boost::mutex fPruneMutex; + boost::condition fPruneThreadEnd; + boost::thread* fPruneThread; + std::stack fPruneThreads; // A list of stale thread IDs to be joined }; // This class, if instantiated, will continuously log details about the indicated threadpool