diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index f25367090..14aa074f6 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -73,6 +73,7 @@ void ThreadPool::init() fStop = false; // fThreadCreated = new NoOp(); fNextFunctor = fWaitingFunctors.end(); + fNextHandle=1; } void ThreadPool::setQueueSize(size_t queueSize) @@ -115,10 +116,35 @@ void ThreadPool::wait() } } -void ThreadPool::invoke(const Functor_T &threadfunc) +void ThreadPool::join(uint64_t thrHandle) { boost::mutex::scoped_lock lock1(fMutex); + while (waitingFunctorsSize > 0) + { + Container_T::iterator iter; + Container_T::iterator end = fWaitingFunctors.end(); + bool foundit = false; + for (iter = fWaitingFunctors.begin(); iter != end; ++iter) + { + if (iter->first == thrHandle) + { + foundit = true; + break; + } + } + if (!foundit) + { + break; + } + fThreadAvailable.wait(lock1); + } +} + +int64_t ThreadPool::invoke(const Functor_T &threadfunc) +{ + boost::mutex::scoped_lock lock1(fMutex); + int64_t thrHandle=0; for(;;) { @@ -128,7 +154,7 @@ void ThreadPool::invoke(const Functor_T &threadfunc) { // Don't create a thread unless it's needed. There // is a thread available to service this request. - addFunctor(threadfunc); + thrHandle = addFunctor(threadfunc); lock1.unlock(); break; } @@ -138,7 +164,7 @@ void ThreadPool::invoke(const Functor_T &threadfunc) if ( waitingFunctorsSize < fQueueSize) { // Don't create a thread unless you have to - addFunctor(threadfunc); + thrHandle = addFunctor(threadfunc); bAdded = true; } @@ -175,6 +201,7 @@ void ThreadPool::invoke(const Functor_T &threadfunc) } fNeedThread.notify_one(); + return thrHandle; } void ThreadPool::beginThread() throw() @@ -218,7 +245,7 @@ void ThreadPool::beginThread() throw() for (i = 0; i < num; i++) { try { - (*todoList[i])(); + (*todoList[i]).second(); } catch(exception &e) { ++fFunctorErrors; @@ -290,24 +317,23 @@ void ThreadPool::beginThread() throw() catch(...) { } - } - } -void ThreadPool::addFunctor(const Functor_T &func) +int64_t ThreadPool::addFunctor(const Functor_T &func) { bool bAtEnd = false; if (fNextFunctor == fWaitingFunctors.end()) bAtEnd = true; - fWaitingFunctors.push_back(func); + fWaitingFunctors.push_back(make_pair(fNextHandle, func)); waitingFunctorsSize++; if (bAtEnd) { --fNextFunctor; } + return fNextHandle++; } void ThreadPool::dump() diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index 23ba5df2e..a21e19c0e 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -60,6 +60,7 @@ class ThreadPool { public: typedef boost::function0 Functor_T; + typedef pair PoolFunction_T; /********************************************* * ctor/dtor @@ -132,7 +133,7 @@ public: * queueSize tasks already waiting, invoke() will block until a slot in the * queue comes free. */ - EXPORT void invoke(const Functor_T &threadfunc); + EXPORT int64_t invoke(const Functor_T &threadfunc); /** @brief stop the threads */ @@ -142,7 +143,11 @@ public: */ EXPORT void wait(); - /** @brief for use in debugging + /** @brief Wait for a specific thread + */ + EXPORT void join(uint64_t thrHandle); + + /** @brief for use in debugging */ EXPORT void dump(); @@ -155,7 +160,7 @@ private: /** @brief add a functor to the list */ - void addFunctor(const Functor_T &func); + int64_t addFunctor(const Functor_T &func); /** @brief thread entry point */ @@ -191,7 +196,7 @@ private: size_t fMaxThreads; size_t fQueueSize; - typedef std::list Container_T; + typedef std::list Container_T; Container_T fWaitingFunctors; Container_T::iterator fNextFunctor; // Functor_T * fThreadCreated; @@ -206,6 +211,7 @@ private: long fGeneralErrors; long fFunctorErrors; uint32_t waitingFunctorsSize; + uint64_t fNextHandle; };