/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * * Work dervied from Devguy.com's Open Source C++ thread pool implementation * released under public domain: * http://web.archive.org/liveweb/http://dgpctk.cvs.sourceforge.net/viewvc/dgpctk/dgc%2B%2B/include/dg/thread/threadpool.h?revision=1.22&content-type=text%2Fplain * * http://web.archive.org/web/20100104101109/http://devguy.com/bb/viewtopic.php?t=460 * ***********************************************************************/ /** @file */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define EXPORT namespace threadpool { // Taken from boost::thread_group and adapted class ThreadPoolGroup { private: ThreadPoolGroup(ThreadPoolGroup const&); ThreadPoolGroup& operator=(ThreadPoolGroup const&); public: ThreadPoolGroup() { } ~ThreadPoolGroup() { for (std::list::iterator it = threads.begin(), end = threads.end(); it != end; ++it) { delete *it; } } template boost::thread* create_thread(F threadfunc) { boost::lock_guard guard(m); #if __cplusplus >= 201103L std::unique_ptr new_thread(new boost::thread(threadfunc)); #else std::auto_ptr new_thread(new boost::thread(threadfunc)); #endif threads.push_back(new_thread.get()); return new_thread.release(); } void add_thread(boost::thread* thrd) { if (thrd) { boost::lock_guard guard(m); threads.push_back(thrd); } } void remove_thread(boost::thread* thrd) { boost::lock_guard guard(m); std::list::iterator const it = std::find(threads.begin(), threads.end(), thrd); if (it != threads.end()) { threads.erase(it); } } void join_all() { boost::shared_lock guard(m); for (std::list::iterator it = threads.begin(), end = threads.end(); it != end; ++it) { (*it)->join(); } } void interrupt_all() { boost::shared_lock guard(m); for (std::list::iterator it = threads.begin(), end = threads.end(); it != end; ++it) { (*it)->interrupt(); } } size_t size() const { boost::shared_lock guard(m); return threads.size(); } void join_one(boost::thread::id id) { boost::shared_lock guard(m); for (std::list::iterator it = threads.begin(), end = threads.end(); it != end; ++it) { if ((*it)->get_id() == id) { (*it)->join(); threads.erase(it); return; } } } private: std::list threads; mutable boost::shared_mutex m; }; /** @brief ThreadPool is a component for working with pools of threads and asynchronously * executing tasks. It is responsible for creating threads and tracking which threads are "busy" * and which are idle. Idle threads are utilized as "work" is added to the system. */ class ThreadPool { public: typedef boost::function0 Functor_T; /********************************************* * ctor/dtor * *********************************************/ /** @brief ctor */ EXPORT ThreadPool(); /** @brief ctor * * @param maxThreads the maximum number of threads in this pool. This is the maximum number * of simultaneuous operations that can go on. * @param queueSize the maximum number of work tasks in the queue. This is the maximum * number of jobs that can queue up in the work list before invoke() blocks. * If 0, then threads never block and total threads may * exceed maxThreads. Nothing waits. Thread count will * idle down to maxThreads when less work is required. */ EXPORT explicit ThreadPool(size_t maxThreads, size_t queueSize); /** @brief dtor */ EXPORT ~ThreadPool() throw(); /********************************************* * accessors/mutators * *********************************************/ /** @brief set the work queue size * * @param queueSize the size of the work queue */ EXPORT void setQueueSize(size_t queueSize); /** @brief fet the work queue size */ inline size_t getQueueSize() const { return fQueueSize; } /** @brief set the maximum number of threads to be used to process * the work queue * * @param maxThreads the maximum number of threads */ EXPORT void setMaxThreads(size_t maxThreads); /** @brief get the maximum number of threads */ inline size_t getMaxThreads() const { return fMaxThreads; } /** @brief get the issued number of threads */ inline size_t getIssuedThreads() { return fIssued; } /** @brief queue size accessor * */ inline uint32_t getWaiting() const { return waitingFunctorsSize; } /********************************************* * operations * *********************************************/ /** @brief invoke a functor in a separate thread managed by the pool * * If all maxThreads are busy, threadfunc will be added to a work list and * will run when a thread comes free. If all threads are busy and there are * queueSize tasks already waiting, invoke() will block until a slot in the * queue comes free. */ EXPORT uint64_t invoke(const Functor_T& threadfunc); /** @brief stop the threads */ EXPORT void stop(); /** @brief wait on all the threads to complete */ EXPORT void wait(); /** @brief Wait for a specific thread */ EXPORT void join(uint64_t thrHandle); /** @brief Wait for a specific thread */ EXPORT void join(std::vector& thrHandle); /** @brief for use in debugging */ EXPORT void dump(); EXPORT std::string& name() { return fName; } EXPORT void setName(std::string name) { fName = name; } EXPORT void setName(const char* name) { fName = name; } EXPORT bool debug() { return fDebug; } EXPORT void setDebug(bool d) { fDebug = d; } friend class ThreadPoolMonitor; protected: private: // Used internally to keep a handle associated with each functor for join() struct PoolFunction_T { uint64_t hndl; Functor_T functor; }; /** @brief initialize data memebers */ void init(); /** @brief add a functor to the list */ uint64_t addFunctor(const Functor_T& func); /** @brief thread entry point */ void beginThread() throw(); void pruneThread(); ThreadPool(const ThreadPool&); ThreadPool& operator=(const ThreadPool&); friend struct beginThreadFunc; struct beginThreadFunc { beginThreadFunc(ThreadPool& impl) : fImpl(impl) { } void operator()() { fImpl.beginThread(); } ThreadPool& fImpl; }; struct NoOp { void operator()() const { } }; size_t fThreadCount; size_t fMaxThreads; size_t fQueueSize; typedef std::list Container_T; Container_T fWaitingFunctors; Container_T::iterator fNextFunctor; uint32_t fIssued; boost::mutex fMutex; boost::condition_variable fThreadAvailable; // triggered when a thread is available boost::condition_variable fNeedThread; // triggered when a thread is needed ThreadPoolGroup fThreads; std::atomic fStop = false; long fGeneralErrors; long fFunctorErrors; uint32_t waitingFunctorsSize; uint64_t fNextHandle; std::string fName; // Optional to add a name to the pool for debugging. bool fDebug; boost::mutex fInitMutex; boost::mutex fPruneMutex; boost::condition_variable fPruneThreadEnd; boost::thread* fPruneThread; std::stack fPruneThreads; // A list of stale thread IDs to be joined }; // This class, if instantiated, will continuously log details about the indicated threadpool // The log will end up in /var/log/mariadb/columnstore/trace/threadpool_.log class ThreadPoolMonitor { public: ThreadPoolMonitor(ThreadPool* pool) : fPool(pool), fLog(NULL) { } ~ThreadPoolMonitor() { if (fLog) { delete fLog; } } void operator()(); private: // defaults okay // ThreadPoolMonitor(const ThreadPoolMonitor& rhs); // ThreadPoolMonitor& operator=(const ThreadPoolMonitor& rhs); ThreadPool* fPool; std::ofstream* fLog; }; } // namespace threadpool #undef EXPORT