1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1750 Fix threadpool stack leaks

When a thread has been idle for 10 minutes and we have too many threads
in the threadpool the thread will be pruned. This is done by the
thread's main function just returning. Unfortunately this does not free
up the memory, the thread either needs to be joined or detatched.

We cannot use detached threads since there are mutexes and conditional
variables between the main thread and the threadpool threads. If the
main thread finishes before the threadpool threads (as would happen in
cpimport) then crashes occur. The parent needs to wait on the child
threads which is the whole point in joining.

So this fix spawns a new thread which every minute will check the list
of threads to be joined due to timeout and join them.

We have had to use an adapted version of boost::thread_group so that we
can join a single thread based off its thread ID.

In addition with have modified PriorityThreadPool to use detached
threads since this does not need to signal the child threads at the end.
This commit is contained in:
Andrew Hutchings
2018-09-28 07:21:49 +01:00
parent f78c90cd3c
commit 94dfacfe25
3 changed files with 168 additions and 9 deletions

View File

@ -42,12 +42,22 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads
uint midThreads, uint lowThreads, uint ID) :
_stop(false), weightPerRun(targetWeightPerRun), id(ID)
{
boost::thread* newThread;
for (uint32_t i = 0; i < highThreads; i++)
threads.create_thread(ThreadHelper(this, HIGH));
{
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
}
for (uint32_t i = 0; i < midThreads; i++)
threads.create_thread(ThreadHelper(this, MEDIUM));
{
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
}
for (uint32_t i = 0; i < lowThreads; i++)
threads.create_thread(ThreadHelper(this, LOW));
{
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
}
cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads
<< " low.\n";
defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads;
@ -62,6 +72,7 @@ PriorityThreadPool::~PriorityThreadPool()
void PriorityThreadPool::addJob(const Job &job, bool useLock)
{
boost::thread* newThread;
mutex::scoped_lock lk(mutex, defer_lock_t());
if (useLock)
@ -70,17 +81,20 @@ void PriorityThreadPool::addJob(const Job &job, bool useLock)
// Create any missing threads
if (defaultThreadCounts[HIGH] != threadCounts[HIGH])
{
threads.create_thread(ThreadHelper(this, HIGH));
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
threadCounts[HIGH]++;
}
if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM])
{
threads.create_thread(ThreadHelper(this, MEDIUM));
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
threadCounts[MEDIUM]++;
}
if (defaultThreadCounts[LOW] != threadCounts[LOW])
{
threads.create_thread(ThreadHelper(this, LOW));
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
threadCounts[LOW]++;
}
@ -261,7 +275,6 @@ void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveproce
void PriorityThreadPool::stop()
{
_stop = true;
threads.join_all();
}
} // namespace threadpool