1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-513 Modify ThreadPool to have a join() method

This commit is contained in:
David Hall
2017-01-27 15:53:11 -06:00
parent e2d66222cd
commit c4742b8363
2 changed files with 44 additions and 12 deletions

View File

@ -73,6 +73,7 @@ void ThreadPool::init()
fStop = false; fStop = false;
// fThreadCreated = new NoOp(); // fThreadCreated = new NoOp();
fNextFunctor = fWaitingFunctors.end(); fNextFunctor = fWaitingFunctors.end();
fNextHandle=1;
} }
void ThreadPool::setQueueSize(size_t queueSize) void ThreadPool::setQueueSize(size_t queueSize)
@ -115,10 +116,35 @@ void ThreadPool::wait()
} }
} }
void ThreadPool::invoke(const Functor_T &threadfunc) void ThreadPool::join(uint64_t thrHandle)
{ {
boost::mutex::scoped_lock lock1(fMutex); boost::mutex::scoped_lock lock1(fMutex);
while (waitingFunctorsSize > 0)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
if (iter->first == thrHandle)
{
foundit = true;
break;
}
}
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
}
}
int64_t ThreadPool::invoke(const Functor_T &threadfunc)
{
boost::mutex::scoped_lock lock1(fMutex);
int64_t thrHandle=0;
for(;;) for(;;)
{ {
@ -128,7 +154,7 @@ void ThreadPool::invoke(const Functor_T &threadfunc)
{ {
// Don't create a thread unless it's needed. There // Don't create a thread unless it's needed. There
// is a thread available to service this request. // is a thread available to service this request.
addFunctor(threadfunc); thrHandle = addFunctor(threadfunc);
lock1.unlock(); lock1.unlock();
break; break;
} }
@ -138,7 +164,7 @@ void ThreadPool::invoke(const Functor_T &threadfunc)
if ( waitingFunctorsSize < fQueueSize) if ( waitingFunctorsSize < fQueueSize)
{ {
// Don't create a thread unless you have to // Don't create a thread unless you have to
addFunctor(threadfunc); thrHandle = addFunctor(threadfunc);
bAdded = true; bAdded = true;
} }
@ -175,6 +201,7 @@ void ThreadPool::invoke(const Functor_T &threadfunc)
} }
fNeedThread.notify_one(); fNeedThread.notify_one();
return thrHandle;
} }
void ThreadPool::beginThread() throw() void ThreadPool::beginThread() throw()
@ -218,7 +245,7 @@ void ThreadPool::beginThread() throw()
for (i = 0; i < num; i++) { for (i = 0; i < num; i++) {
try { try {
(*todoList[i])(); (*todoList[i]).second();
} }
catch(exception &e) { catch(exception &e) {
++fFunctorErrors; ++fFunctorErrors;
@ -290,24 +317,23 @@ void ThreadPool::beginThread() throw()
catch(...) catch(...)
{ {
} }
} }
} }
void ThreadPool::addFunctor(const Functor_T &func) int64_t ThreadPool::addFunctor(const Functor_T &func)
{ {
bool bAtEnd = false; bool bAtEnd = false;
if (fNextFunctor == fWaitingFunctors.end()) if (fNextFunctor == fWaitingFunctors.end())
bAtEnd = true; bAtEnd = true;
fWaitingFunctors.push_back(func); fWaitingFunctors.push_back(make_pair(fNextHandle, func));
waitingFunctorsSize++; waitingFunctorsSize++;
if (bAtEnd) if (bAtEnd)
{ {
--fNextFunctor; --fNextFunctor;
} }
return fNextHandle++;
} }
void ThreadPool::dump() void ThreadPool::dump()

View File

@ -60,6 +60,7 @@ class ThreadPool
{ {
public: public:
typedef boost::function0<void> Functor_T; typedef boost::function0<void> Functor_T;
typedef pair<int64_t, Functor_T> PoolFunction_T;
/********************************************* /*********************************************
* ctor/dtor * ctor/dtor
@ -132,7 +133,7 @@ public:
* queueSize tasks already waiting, invoke() will block until a slot in the * queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free. * queue comes free.
*/ */
EXPORT void invoke(const Functor_T &threadfunc); EXPORT int64_t invoke(const Functor_T &threadfunc);
/** @brief stop the threads /** @brief stop the threads
*/ */
@ -142,6 +143,10 @@ public:
*/ */
EXPORT void wait(); EXPORT void wait();
/** @brief Wait for a specific thread
*/
EXPORT void join(uint64_t thrHandle);
/** @brief for use in debugging /** @brief for use in debugging
*/ */
EXPORT void dump(); EXPORT void dump();
@ -155,7 +160,7 @@ private:
/** @brief add a functor to the list /** @brief add a functor to the list
*/ */
void addFunctor(const Functor_T &func); int64_t addFunctor(const Functor_T &func);
/** @brief thread entry point /** @brief thread entry point
*/ */
@ -191,7 +196,7 @@ private:
size_t fMaxThreads; size_t fMaxThreads;
size_t fQueueSize; size_t fQueueSize;
typedef std::list<Functor_T> Container_T; typedef std::list<PoolFunction_T> Container_T;
Container_T fWaitingFunctors; Container_T fWaitingFunctors;
Container_T::iterator fNextFunctor; Container_T::iterator fNextFunctor;
// Functor_T * fThreadCreated; // Functor_T * fThreadCreated;
@ -206,6 +211,7 @@ private:
long fGeneralErrors; long fGeneralErrors;
long fFunctorErrors; long fFunctorErrors;
uint32_t waitingFunctorsSize; uint32_t waitingFunctorsSize;
uint64_t fNextHandle;
}; };