mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
395 lines
9.2 KiB
C++
395 lines
9.2 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: threadpool.cpp 553 2008-02-27 17:51:16Z rdempsey $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
|
|
#include <stdexcept>
|
|
#include <unistd.h>
|
|
using namespace std;
|
|
|
|
#include "messageobj.h"
|
|
#include "messagelog.h"
|
|
using namespace logging;
|
|
|
|
#include "weightedthreadpool.h"
|
|
|
|
namespace threadpool
|
|
{
|
|
WeightedThreadPool::WeightedThreadPool() : fMaxThreadWeight(0), fMaxThreads(0), fQueueSize(0)
|
|
{
|
|
init();
|
|
}
|
|
|
|
WeightedThreadPool::WeightedThreadPool(size_t maxThreadWeight, size_t maxThreads, size_t queueSize)
|
|
: fMaxThreadWeight(maxThreadWeight), fMaxThreads(maxThreads), fQueueSize(queueSize)
|
|
{
|
|
init();
|
|
|
|
if (fQueueSize == 0)
|
|
fQueueSize = fMaxThreads * 2;
|
|
}
|
|
|
|
WeightedThreadPool::~WeightedThreadPool() throw()
|
|
{
|
|
// delete fThreadCreated;
|
|
try
|
|
{
|
|
stop();
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void WeightedThreadPool::init()
|
|
{
|
|
fThreadCount = 0;
|
|
fGeneralErrors = 0;
|
|
fFunctorErrors = 0;
|
|
fWaitingFunctorsSize = 0;
|
|
fWaitingFunctorsWeight = 0;
|
|
issued = 0;
|
|
fStop = false;
|
|
// fThreadCreated = new NoOp();
|
|
fNextFunctor = fWaitingFunctors.end();
|
|
}
|
|
|
|
void WeightedThreadPool::setQueueSize(size_t queueSize)
|
|
{
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
fQueueSize = queueSize;
|
|
}
|
|
|
|
void WeightedThreadPool::setMaxThreads(size_t maxThreads)
|
|
{
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
fMaxThreads = maxThreads;
|
|
}
|
|
|
|
void WeightedThreadPool::setMaxThreadWeight(size_t maxWeight)
|
|
{
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
fMaxThreadWeight = maxWeight;
|
|
}
|
|
|
|
void WeightedThreadPool::setThreadCreatedListener(const Functor_T& f)
|
|
{
|
|
// fThreadCreated = f;
|
|
}
|
|
|
|
void WeightedThreadPool::stop()
|
|
{
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
fStop = true;
|
|
lock1.unlock();
|
|
|
|
fNeedThread.notify_all();
|
|
fThreads.join_all();
|
|
}
|
|
|
|
void WeightedThreadPool::wait()
|
|
{
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
|
|
while (fWaitingFunctorsSize > 0)
|
|
{
|
|
// cout << "waiting ..." << endl;
|
|
fThreadAvailable.wait(lock1);
|
|
// cerr << "woke!" << endl;
|
|
}
|
|
}
|
|
|
|
void WeightedThreadPool::removeJobs(uint32_t id)
|
|
{
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
Container_T::iterator it;
|
|
|
|
it = fNextFunctor;
|
|
|
|
while (it != fWaitingFunctors.end())
|
|
{
|
|
if (it->id == id)
|
|
{
|
|
fWaitingFunctorsWeight -= it->functorWeight;
|
|
fWaitingFunctorsSize--;
|
|
|
|
if (it == fNextFunctor)
|
|
{
|
|
fWaitingFunctors.erase(fNextFunctor++);
|
|
it = fNextFunctor;
|
|
}
|
|
else
|
|
fWaitingFunctors.erase(it++);
|
|
}
|
|
else
|
|
++it;
|
|
}
|
|
}
|
|
|
|
void WeightedThreadPool::invoke(const Functor_T& threadfunc, uint32_t functor_weight, uint32_t id)
|
|
{
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
|
|
for (;;)
|
|
{
|
|
try
|
|
{
|
|
if (fWaitingFunctorsSize < fThreadCount)
|
|
{
|
|
// Don't create a thread unless it's needed. There
|
|
// is a thread available to service this request.
|
|
addFunctor(threadfunc, functor_weight, id);
|
|
lock1.unlock();
|
|
break;
|
|
}
|
|
|
|
bool bAdded = false;
|
|
|
|
if (fWaitingFunctorsSize < fQueueSize)
|
|
{
|
|
// Don't create a thread unless you have to
|
|
addFunctor(threadfunc, functor_weight, id);
|
|
bAdded = true;
|
|
}
|
|
|
|
// add a thread is necessary
|
|
if (fThreadCount < fMaxThreads)
|
|
{
|
|
++fThreadCount;
|
|
// cout << "\t++invoke() tcnt=" << fThreadCount << endl;
|
|
lock1.unlock();
|
|
fThreads.create_thread(beginThreadFunc(*this));
|
|
|
|
if (bAdded)
|
|
break;
|
|
|
|
// If the mutex is unlocked before creating the thread
|
|
// this allows fThreadAvailable to be triggered
|
|
// before the wait below runs. So run the loop again.
|
|
lock1.lock();
|
|
continue;
|
|
}
|
|
|
|
// else
|
|
// cout << "invoke() no thread created c=" << fThreadCount << " m=" << fMaxThreads << endl;
|
|
|
|
if (bAdded)
|
|
{
|
|
lock1.unlock();
|
|
break;
|
|
}
|
|
|
|
fThreadAvailable.wait(lock1);
|
|
}
|
|
catch (...)
|
|
{
|
|
++fGeneralErrors;
|
|
throw;
|
|
}
|
|
}
|
|
|
|
fNeedThread.notify_one();
|
|
}
|
|
|
|
void WeightedThreadPool::beginThread() throw()
|
|
{
|
|
vector<bool> reschedule;
|
|
|
|
try
|
|
{
|
|
// fThreadCreated();
|
|
boost::mutex::scoped_lock lock1(fMutex);
|
|
|
|
for (;;)
|
|
{
|
|
if (fStop)
|
|
break;
|
|
|
|
if (fNextFunctor == fWaitingFunctors.end())
|
|
{
|
|
// Wait until someone needs a thread
|
|
fNeedThread.wait(lock1);
|
|
}
|
|
else
|
|
{
|
|
vector<Container_T::iterator> todoList;
|
|
int i, num = (fWaitingFunctorsSize - issued);
|
|
Container_T::const_iterator iter;
|
|
uint32_t weight = 0;
|
|
|
|
for (i = 0; i < num && weight < fMaxThreadWeight; i++)
|
|
{
|
|
weight += (*fNextFunctor).functorWeight;
|
|
todoList.push_back(fNextFunctor++);
|
|
}
|
|
|
|
issued += i;
|
|
num = i;
|
|
lock1.unlock();
|
|
|
|
// cerr << "beginThread() " << num
|
|
// << " jobs - fWaitingFunctorsSize=" << fWaitingFunctorsSize
|
|
// << " fWaitingFunctorsWeight=" << fWaitingFunctorsWeight
|
|
// << " weight=" << weight
|
|
// << " issued=" << issued << " todo=" << todoList.size()
|
|
// << " fThreadCount=" << fThreadCount << endl;
|
|
|
|
i = 0;
|
|
reschedule.resize(num);
|
|
bool allWereRescheduled = true, someWereRescheduled = false;
|
|
|
|
while (i < num)
|
|
{
|
|
try
|
|
{
|
|
for (; i < num; i++)
|
|
{
|
|
reschedule[i] = false; // in case of exception in the next line
|
|
reschedule[i] = ((*todoList[i]).functor)();
|
|
allWereRescheduled &= reschedule[i];
|
|
someWereRescheduled |= reschedule[i];
|
|
}
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
i++;
|
|
++fFunctorErrors;
|
|
cerr << e.what() << endl;
|
|
}
|
|
}
|
|
|
|
// no real work was done, prevent intensive busy waiting
|
|
if (allWereRescheduled)
|
|
usleep(1000);
|
|
|
|
// cout << "running " << i << "/" << num << " functor" <<endl;
|
|
lock1.lock();
|
|
|
|
if (someWereRescheduled)
|
|
{
|
|
for (i = 0; i < num; i++)
|
|
if (reschedule[i])
|
|
addFunctor((*todoList[i]).functor, (*todoList[i]).functorWeight, (*todoList[i]).id);
|
|
|
|
if (num > 1)
|
|
fNeedThread.notify_all();
|
|
else
|
|
fNeedThread.notify_one();
|
|
}
|
|
|
|
issued -= num;
|
|
|
|
for (i = 0; i < num; i++)
|
|
{
|
|
fWaitingFunctorsWeight -= (*todoList[i]).functorWeight;
|
|
fWaitingFunctors.erase(todoList[i]);
|
|
}
|
|
|
|
fWaitingFunctorsSize -= num;
|
|
|
|
// if (fWaitingFunctorsSize != fWaitingFunctors.size()) ;
|
|
// cerr << "num=" << num << " cleaned=" << i << " size="
|
|
// << fWaitingFunctorsSize << " list size="
|
|
// << fWaitingFunctors.size()
|
|
// << " w="<<fWaitingFunctorsWeight << endl;
|
|
|
|
fThreadAvailable.notify_all();
|
|
}
|
|
}
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
++fGeneralErrors;
|
|
|
|
// Log the exception and exit this thread
|
|
try
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(5);
|
|
args.add("beginThread: Caught exception: ");
|
|
args.add(ex.what());
|
|
|
|
message.format(args);
|
|
|
|
logging::LoggingID lid(22);
|
|
logging::MessageLog ml(lid);
|
|
|
|
ml.logErrorMessage(message);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
++fGeneralErrors;
|
|
|
|
// Log the exception and exit this thread
|
|
try
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(6);
|
|
args.add("beginThread: Caught unknown exception!");
|
|
|
|
message.format(args);
|
|
|
|
logging::LoggingID lid(22);
|
|
logging::MessageLog ml(lid);
|
|
|
|
ml.logErrorMessage(message);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void WeightedThreadPool::addFunctor(const Functor_T& func, uint32_t functor_weight, uint32_t id)
|
|
{
|
|
bool bAtEnd = false;
|
|
|
|
if (fNextFunctor == fWaitingFunctors.end())
|
|
bAtEnd = true;
|
|
|
|
// cout << "addFunctor() w=" << fWaitingFunctorsWeight
|
|
// << " s=" << fWaitingFunctorsSize << " i=" << id << endl;
|
|
|
|
FunctorListItem fl = {func, functor_weight, id};
|
|
fWaitingFunctors.push_back(fl);
|
|
fWaitingFunctorsSize++;
|
|
fWaitingFunctorsWeight += functor_weight;
|
|
|
|
if (bAtEnd)
|
|
{
|
|
--fNextFunctor;
|
|
}
|
|
}
|
|
|
|
void WeightedThreadPool::dump()
|
|
{
|
|
std::cout << "General Errors: " << fGeneralErrors << std::endl;
|
|
std::cout << "Functor Errors: " << fFunctorErrors << std::endl;
|
|
std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl;
|
|
std::cout << "Waiting functors weight : " << fWaitingFunctorsWeight << std::endl;
|
|
}
|
|
|
|
} // namespace threadpool
|