1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2022-01-21 16:43:49 +00:00

237 lines
5.8 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: $
*
*
***********************************************************************/
/** @file */
#pragma once
#include <string>
#include <iostream>
#include <cstdlib>
#include <sstream>
#include <stdexcept>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
namespace threadpool
{
/** @brief ThreadPool is a component for working with pools of threads and asynchronously
* executing tasks. It is responsible for creating threads and tracking which threads are "busy"
* and which are idle. Idle threads are utilized as "work" is added to the system.
*/
class WeightedThreadPool
{
public:
typedef boost::function0<int> Functor_T;
/*********************************************
* ctor/dtor
*
*********************************************/
/** @brief ctor
*/
WeightedThreadPool();
/** @brief ctor
*
* @param maxThreads the maximum number of threads in this pool. This is the maximum number
* of simultaneuous operations that can go on.
* @param queueSize the maximum number of work tasks in the queue. This is the maximum
* number of jobs that can queue up in the work list before invoke() blocks.
*/
explicit WeightedThreadPool(size_t maxThreadWeight, size_t maxThreads, size_t queueSize);
/** @brief dtor
*/
~WeightedThreadPool() throw();
/*********************************************
* accessors/mutators
*
*********************************************/
/** @brief set the work queue size
*
* @param queueSize the size of the work queue
*/
void setQueueSize(size_t queueSize);
/** @brief fet the work queue size
*/
inline size_t getQueueSize() const
{
return fQueueSize;
}
/** @brief set the maximum number of threads to be used to process
* the work queue
*
* @param maxThreads the maximum number of threads
*/
void setMaxThreads(size_t maxThreads);
/** @brief get the maximum number of threads
*/
inline size_t getMaxThreads() const
{
return fMaxThreads;
}
/** @brief set the maximum processing weight of a thread to be
* submitted for execution from the existing jobs
* scheduled in the work queue
*
* @param maxWeight for execution
*/
void setMaxThreadWeight(size_t maxWeight);
/** @brief get the maximum number of threads
*/
inline uint32_t getMaxThreadWeight() const
{
return fMaxThreadWeight;
}
/** @brief register a functor to be called when a new thread
* is created
*/
void setThreadCreatedListener(const Functor_T& f);
/** @brief queue size accessor
*
*/
inline uint32_t getWaiting() const
{
return fWaitingFunctorsSize;
}
inline uint32_t getWeight() const
{
return fWaitingFunctorsWeight;
}
void removeJobs(uint32_t id);
/*********************************************
* operations
*
*********************************************/
/** @brief invoke a functor in a separate thread managed by the pool
*
* If all maxThreads are busy, threadfunc will be added to a work list and
* will run when a thread comes free. If all threads are busy and there are
* queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free.
*/
void invoke(const Functor_T& threadfunc, uint32_t functor_weight, uint32_t id);
/** @brief stop the threads
*/
void stop();
/** @brief wait on all the threads to complete
*/
void wait();
/** @brief for use in debugging
*/
void dump();
protected:
private:
/** @brief initialize data memebers
*/
void init();
/** @brief add a functor to the list
*/
void addFunctor(const Functor_T& func, uint32_t functor_weight, uint32_t id);
/** @brief thread entry point
*/
void beginThread() throw();
WeightedThreadPool(const WeightedThreadPool&);
WeightedThreadPool& operator=(const WeightedThreadPool&);
friend struct beginThreadFunc;
struct beginThreadFunc
{
beginThreadFunc(WeightedThreadPool& impl) : fImpl(impl)
{
}
void operator()()
{
fImpl.beginThread();
}
WeightedThreadPool& fImpl;
};
struct NoOp
{
void operator()() const
{
}
};
size_t fThreadCount;
size_t fMaxThreadWeight;
size_t fMaxThreads;
size_t fQueueSize;
// typedef std::list<Functor_T> Container_T;
struct FunctorListItemStruct
{
Functor_T functor;
uint32_t functorWeight;
uint32_t id;
};
typedef FunctorListItemStruct FunctorListItem;
typedef std::list<FunctorListItem> Container_T;
Container_T fWaitingFunctors;
Container_T::iterator fNextFunctor;
uint32_t issued;
boost::mutex fMutex;
boost::condition fThreadAvailable; // triggered when a thread is available
boost::condition fNeedThread; // triggered when a thread is needed
boost::thread_group fThreads;
bool fStop;
long fGeneralErrors;
long fFunctorErrors;
uint16_t fWaitingFunctorsSize;
uint16_t fWaitingFunctorsWeight;
};
} // namespace threadpool