1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge branch 'develop-1.1' into 1.1-merge-up-2018-10-05

This commit is contained in:
Andrew Hutchings
2018-10-05 18:40:07 +01:00
22 changed files with 625 additions and 246 deletions

View File

@ -42,15 +42,22 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads
uint midThreads, uint lowThreads, uint ID) :
_stop(false), weightPerRun(targetWeightPerRun), id(ID)
{
boost::thread* newThread;
for (uint32_t i = 0; i < highThreads; i++)
threads.create_thread(ThreadHelper(this, HIGH));
{
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
}
for (uint32_t i = 0; i < midThreads; i++)
threads.create_thread(ThreadHelper(this, MEDIUM));
{
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
}
for (uint32_t i = 0; i < lowThreads; i++)
threads.create_thread(ThreadHelper(this, LOW));
{
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
}
cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads
<< " low.\n";
defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads;
@ -65,6 +72,7 @@ PriorityThreadPool::~PriorityThreadPool()
void PriorityThreadPool::addJob(const Job& job, bool useLock)
{
boost::thread* newThread;
mutex::scoped_lock lk(mutex, defer_lock_t());
if (useLock)
@ -73,19 +81,22 @@ void PriorityThreadPool::addJob(const Job& job, bool useLock)
// Create any missing threads
if (defaultThreadCounts[HIGH] != threadCounts[HIGH])
{
threads.create_thread(ThreadHelper(this, HIGH));
newThread = threads.create_thread(ThreadHelper(this, HIGH));
newThread->detach();
threadCounts[HIGH]++;
}
if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM])
{
threads.create_thread(ThreadHelper(this, MEDIUM));
newThread = threads.create_thread(ThreadHelper(this, MEDIUM));
newThread->detach();
threadCounts[MEDIUM]++;
}
if (defaultThreadCounts[LOW] != threadCounts[LOW])
{
threads.create_thread(ThreadHelper(this, LOW));
newThread = threads.create_thread(ThreadHelper(this, LOW));
newThread->detach();
threadCounts[LOW]++;
}
@ -281,7 +292,6 @@ void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveproce
void PriorityThreadPool::stop()
{
_stop = true;
threads.join_all();
}
} // namespace threadpool

View File

@ -43,7 +43,8 @@ ThreadPool::ThreadPool()
}
ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize )
: fMaxThreads( maxThreads ), fQueueSize( queueSize )
:fMaxThreads( maxThreads ), fQueueSize( queueSize ),
fPruneThread( NULL )
{
init();
}
@ -72,6 +73,7 @@ void ThreadPool::init()
fStop = false;
fNextFunctor = fWaitingFunctors.end();
fNextHandle = 1;
fPruneThread = new boost::thread(boost::bind(&ThreadPool::pruneThread, this));
}
void ThreadPool::setQueueSize(size_t queueSize)
@ -80,6 +82,39 @@ void ThreadPool::setQueueSize(size_t queueSize)
fQueueSize = queueSize;
}
void ThreadPool::pruneThread()
{
boost::mutex::scoped_lock lock2(fPruneMutex);
while(true)
{
boost::system_time timeout = boost::get_system_time() + boost::posix_time::minutes(1);
if (!fPruneThreadEnd.timed_wait(fPruneMutex, timeout))
{
while(!fPruneThreads.empty())
{
if (fDebug)
{
ostringstream oss;
oss << "pruning thread " << fPruneThreads.top();
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
fThreads.join_one(fPruneThreads.top());
fPruneThreads.pop();
}
}
else
{
break;
}
}
}
void ThreadPool::setMaxThreads(size_t maxThreads)
{
@ -93,6 +128,9 @@ void ThreadPool::stop()
fStop = true;
lock1.unlock();
fPruneThreadEnd.notify_all();
fPruneThread->join();
delete fPruneThread;
fNeedThread.notify_all();
fThreads.join_all();
}
@ -305,6 +343,8 @@ void ThreadPool::beginThread() throw()
{
if (fThreadCount > fMaxThreads)
{
boost::mutex::scoped_lock lock2(fPruneMutex);
fPruneThreads.push(boost::this_thread::get_id());
--fThreadCount;
return;
}

View File

@ -35,6 +35,7 @@
#include <cstdlib>
#include <sstream>
#include <stdexcept>
#include <stack>
#include <stdint.h>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
@ -51,6 +52,106 @@
namespace threadpool
{
// Taken from boost::thread_group and adapted
class ThreadPoolGroup
{
private:
ThreadPoolGroup(ThreadPoolGroup const&);
ThreadPoolGroup& operator=(ThreadPoolGroup const&);
public:
ThreadPoolGroup() {}
~ThreadPoolGroup()
{
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
delete *it;
}
}
template<typename F>
boost::thread* create_thread(F threadfunc)
{
boost::lock_guard<boost::shared_mutex> guard(m);
std::auto_ptr<boost::thread> new_thread(new boost::thread(threadfunc));
threads.push_back(new_thread.get());
return new_thread.release();
}
void add_thread(boost::thread* thrd)
{
if(thrd)
{
boost::lock_guard<boost::shared_mutex> guard(m);
threads.push_back(thrd);
}
}
void remove_thread(boost::thread* thrd)
{
boost::lock_guard<boost::shared_mutex> guard(m);
std::list<boost::thread*>::iterator const it=std::find(threads.begin(),threads.end(),thrd);
if(it!=threads.end())
{
threads.erase(it);
}
}
void join_all()
{
boost::shared_lock<boost::shared_mutex> guard(m);
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
(*it)->join();
}
}
void interrupt_all()
{
boost::shared_lock<boost::shared_mutex> guard(m);
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
(*it)->interrupt();
}
}
size_t size() const
{
boost::shared_lock<boost::shared_mutex> guard(m);
return threads.size();
}
void join_one(boost::thread::id id)
{
boost::shared_lock<boost::shared_mutex> guard(m);
for(std::list<boost::thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
if ((*it)->get_id() == id)
{
(*it)->join();
threads.erase(it);
return;
}
}
}
private:
std::list<boost::thread*> threads;
mutable boost::shared_mutex m;
};
/** @brief ThreadPool is a component for working with pools of threads and asynchronously
* executing tasks. It is responsible for creating threads and tracking which threads are "busy"
* and which are idle. Idle threads are utilized as "work" is added to the system.
@ -207,6 +308,7 @@ private:
*/
void beginThread() throw();
void pruneThread();
ThreadPool(const ThreadPool&);
ThreadPool& operator = (const ThreadPool&);
@ -245,7 +347,7 @@ private:
boost::mutex fMutex;
boost::condition fThreadAvailable; // triggered when a thread is available
boost::condition fNeedThread; // triggered when a thread is needed
boost::thread_group fThreads;
ThreadPoolGroup fThreads;
bool fStop;
long fGeneralErrors;
@ -255,6 +357,10 @@ private:
std::string fName; // Optional to add a name to the pool for debugging.
bool fDebug;
boost::mutex fPruneMutex;
boost::condition fPruneThreadEnd;
boost::thread* fPruneThread;
std::stack<boost::thread::id> fPruneThreads; // A list of stale thread IDs to be joined
};
// This class, if instantiated, will continuously log details about the indicated threadpool