1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-513 use thread pool for jobsteps

This commit is contained in:
David Hall
2017-02-03 15:22:07 -06:00
parent d50c7c7cab
commit 55d006de1a
30 changed files with 192 additions and 125 deletions

View File

@ -154,6 +154,7 @@ CrossEngineStep::CrossEngineStep(
fRowsPerGroup(256), fRowsPerGroup(256),
fOutputDL(NULL), fOutputDL(NULL),
fOutputIterator(0), fOutputIterator(0),
fRunner(0),
fEndOfResult(false), fEndOfResult(false),
fSchema(schema), fSchema(schema),
fTable(table), fTable(table),
@ -439,14 +440,14 @@ void CrossEngineStep::run()
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
} }
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void CrossEngineStep::join() void CrossEngineStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@ -181,7 +181,7 @@ protected:
CrossEngineStep* fStep; CrossEngineStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
OIDVector fOIDVector; OIDVector fOIDVector;
bool fEndOfResult; bool fEndOfResult;
bool fRunExecuted; bool fRunExecuted;

View File

@ -58,7 +58,7 @@ namespace joblist {
DiskJoinStep::DiskJoinStep() { } DiskJoinStep::DiskJoinStep() { }
DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bool lastOne) : JobStep(*t), thjs(t), DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bool lastOne) : JobStep(*t), thjs(t),
joinerIndex(joinIndex), closedOutput(false) mainThread(0), joinerIndex(joinIndex), closedOutput(false)
{ {
/* /*
grab all relevant vars from THJS grab all relevant vars from THJS
@ -130,9 +130,10 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bo
DiskJoinStep::~DiskJoinStep() DiskJoinStep::~DiskJoinStep()
{ {
abort(); abort();
if (mainThread) { if (mainThread)
mainThread->join(); {
mainThread.reset(); jobstepThreadPool.join(mainThread);
mainThread = 0;
} }
if (jp) if (jp)
atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage());
@ -151,13 +152,16 @@ void DiskJoinStep::loadExistingData(vector<RGData> &data)
void DiskJoinStep::run() void DiskJoinStep::run()
{ {
mainThread.reset(new boost::thread(Runner(this))); mainThread = jobstepThreadPool.invoke(Runner(this));
} }
void DiskJoinStep::join() void DiskJoinStep::join()
{ {
if (mainThread) if (mainThread)
mainThread->join(); {
jobstepThreadPool.join(mainThread);
mainThread = 0;
}
if (jp) { if (jp) {
atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage());
//int64_t memUsage; //int64_t memUsage;
@ -479,12 +483,12 @@ void DiskJoinStep::mainRunner()
loadFIFO.reset(new FIFO<boost::shared_ptr<LoaderOutput> >(1, 1)); // double buffering should be good enough loadFIFO.reset(new FIFO<boost::shared_ptr<LoaderOutput> >(1, 1)); // double buffering should be good enough
buildFIFO.reset(new FIFO<boost::shared_ptr<BuilderOutput> >(1, 1)); buildFIFO.reset(new FIFO<boost::shared_ptr<BuilderOutput> >(1, 1));
loadThread.reset(new boost::thread(Loader(this))); std::vector<uint64_t> thrds;
buildThread.reset(new boost::thread(Builder(this))); thrds.reserve(3);
joinThread.reset(new boost::thread(Joiner(this))); thrds.push_back(jobstepThreadPool.invoke(Loader(this)));
loadThread->join(); thrds.push_back(jobstepThreadPool.invoke(Builder(this)));
buildThread->join(); thrds.push_back(jobstepThreadPool.invoke(Joiner(this)));
joinThread->join(); jobstepThreadPool.join(thrds);
} }
} }
CATCH_AND_LOG; CATCH_AND_LOG;

View File

@ -49,7 +49,6 @@ class DiskJoinStep : public JobStep
boost::shared_array<int> LOMapping, SOMapping, SjoinFEMapping, LjoinFEMapping; boost::shared_array<int> LOMapping, SOMapping, SjoinFEMapping, LjoinFEMapping;
TupleHashJoinStep *thjs; TupleHashJoinStep *thjs;
boost::shared_ptr<boost::thread> runner;
boost::shared_ptr<funcexp::FuncExpWrapper> fe; boost::shared_ptr<funcexp::FuncExpWrapper> fe;
bool typeless; bool typeless;
JoinType joinType; JoinType joinType;
@ -69,7 +68,7 @@ class DiskJoinStep : public JobStep
bool lastLargeIteration; bool lastLargeIteration;
uint32_t largeIterationCount; uint32_t largeIterationCount;
boost::shared_ptr<boost::thread> mainThread; uint64_t mainThread; // thread handle from thread pool
/* Loader structs */ /* Loader structs */
struct LoaderOutput { struct LoaderOutput {
@ -86,8 +85,6 @@ class DiskJoinStep : public JobStep
}; };
void loadFcn(); void loadFcn();
boost::shared_ptr<boost::thread> loadThread;
/* Builder structs */ /* Builder structs */
struct BuilderOutput { struct BuilderOutput {
boost::shared_ptr<joiner::TupleJoiner> tupleJoiner; boost::shared_ptr<joiner::TupleJoiner> tupleJoiner;
@ -104,7 +101,6 @@ class DiskJoinStep : public JobStep
DiskJoinStep *djs; DiskJoinStep *djs;
}; };
void buildFcn(); void buildFcn();
boost::shared_ptr<boost::thread> buildThread;
/* Joining structs */ /* Joining structs */
struct Joiner { struct Joiner {
@ -113,7 +109,6 @@ class DiskJoinStep : public JobStep
DiskJoinStep *djs; DiskJoinStep *djs;
}; };
void joinFcn(); void joinFcn();
boost::shared_ptr<boost::thread> joinThread;
// limits & usage // limits & usage
boost::shared_ptr<int64_t> smallUsage; boost::shared_ptr<int64_t> smallUsage;

View File

@ -108,6 +108,35 @@ JobList::~JobList()
joiners[i]->join(); joiners[i]->join();
delete joiners[i]; delete joiners[i];
} }
#if 0
// Stop all the query steps
end = fQuery.end();
for (iter = fQuery.begin(); iter != end; ++iter)
{
(*iter)->abort();
}
// Stop all the projection steps
end = fProject.end();
for (iter = fProject.begin(); iter != end; ++iter)
{
(*iter)->abort();
}
// Wait for all the query steps to end
end = fQuery.end();
for (iter = fQuery.begin(); iter != end; ++iter)
{
(*iter)->join();
}
// Wait for all the projection steps to end
end = fProject.end();
for (iter = fProject.begin(); iter != end; ++iter)
{
(*iter)->join();
}
#endif
} }
} }
catch (exception& ex) catch (exception& ex)

View File

@ -248,6 +248,7 @@
<F N="tupleunion.cpp"/> <F N="tupleunion.cpp"/>
<F N="unique32generator.cpp"/> <F N="unique32generator.cpp"/>
<F N="virtualtable.cpp"/> <F N="virtualtable.cpp"/>
<F N="windowfunctionstep.cpp"/>
</Folder> </Folder>
<Folder <Folder
Name="Header Files" Name="Header Files"
@ -305,6 +306,7 @@
<F N="tupleunion.h"/> <F N="tupleunion.h"/>
<F N="unique32generator.h"/> <F N="unique32generator.h"/>
<F N="virtualtable.h"/> <F N="virtualtable.h"/>
<F N="windowfunctionstep.h"/>
</Folder> </Folder>
<Folder <Folder
Name="Resource Files" Name="Resource Files"

View File

@ -55,6 +55,8 @@ namespace joblist
{ {
boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER; boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER;
ThreadPool JobStep::jobstepThreadPool(100,200);
ostream& operator<<(ostream& os, const JobStep* rhs) ostream& operator<<(ostream& os, const JobStep* rhs)
{ {
os << rhs->toString(); os << rhs->toString();
@ -100,6 +102,7 @@ JobStep::JobStep(const JobInfo& j) :
fQtc.serverParms(tsp); fQtc.serverParms(tsp);
//fStepUuid = bu::random_generator()(); //fStepUuid = bu::random_generator()();
fStepUuid = QueryTeleClient::genUUID(); fStepUuid = QueryTeleClient::genUUID();
jobstepThreadPool.setDebug(true);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@ -42,7 +42,7 @@
#include "timestamp.h" #include "timestamp.h"
#include "rowgroup.h" #include "rowgroup.h"
#include "querytele.h" #include "querytele.h"
#include "threadpool.h"
#include "atomicops.h" #include "atomicops.h"
#include "branchpred.h" #include "branchpred.h"
@ -53,6 +53,7 @@
# endif # endif
#endif #endif
using namespace threadpool;
namespace joblist namespace joblist
{ {
@ -234,6 +235,7 @@ public:
void onClauseFilter(bool b) { fOnClauseFilter = b; } void onClauseFilter(bool b) { fOnClauseFilter = b; }
protected: protected:
static ThreadPool jobstepThreadPool;
//@bug6088, for telemetry posting //@bug6088, for telemetry posting
static const int64_t STEP_TELE_INTERVAL = 5000; // now, this is the browser refresh rate static const int64_t STEP_TELE_INTERVAL = 5000; // now, this is the browser refresh rate

View File

@ -134,6 +134,8 @@ pDictionaryScan::pDictionaryScan(
sendWaiting(false), sendWaiting(false),
ridCount(0), ridCount(0),
ridList(0), ridList(0),
pThread(0),
cThread(0),
colType(ct), colType(ct),
fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()), fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()),
fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()), fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()),
@ -214,12 +216,12 @@ void pDictionaryScan::initializeConfigParms()
void pDictionaryScan::startPrimitiveThread() void pDictionaryScan::startPrimitiveThread()
{ {
pThread.reset(new boost::thread(pDictionaryScanPrimitive(this))); pThread = jobstepThreadPool.invoke(pDictionaryScanPrimitive(this));
} }
void pDictionaryScan::startAggregationThread() void pDictionaryScan::startAggregationThread()
{ {
cThread.reset(new boost::thread(pDictionaryScanAggregator(this))); cThread = jobstepThreadPool.invoke(pDictionaryScanAggregator(this));
} }
void pDictionaryScan::run() void pDictionaryScan::run()
@ -243,8 +245,8 @@ void pDictionaryScan::run()
void pDictionaryScan::join() void pDictionaryScan::join()
{ {
pThread->join(); jobstepThreadPool.join(pThread);
cThread->join(); jobstepThreadPool.join(cThread);
if (isEquality && fDec) { if (isEquality && fDec) {
destroyEqualityFilter(); destroyEqualityFilter();
isEquality = false; isEquality = false;

View File

@ -321,8 +321,8 @@ private:
BRM::DBRM dbrm; BRM::DBRM dbrm;
boost::shared_ptr<boost::thread> cThread; //consumer thread // boost::shared_ptr<boost::thread> cThread; //consumer thread
boost::shared_ptr<boost::thread> pThread; //producer thread // boost::shared_ptr<boost::thread> pThread; //producer thread
boost::mutex mutex; boost::mutex mutex;
boost::condition condvar; boost::condition condvar;
boost::condition flushed; boost::condition flushed;
@ -772,8 +772,8 @@ private:
DataList<ElementType> *ridList; DataList<ElementType> *ridList;
messageqcpp::ByteStream fFilterString; messageqcpp::ByteStream fFilterString;
execplan::CalpontSystemCatalog::ColType colType; execplan::CalpontSystemCatalog::ColType colType;
boost::shared_ptr<boost::thread> pThread; //producer thread uint64_t pThread; //producer thread. thread pool handle
boost::shared_ptr<boost::thread> cThread; //producer thread uint64_t cThread; //consumer thread. thread pool handle
DataList_t* requestList; DataList_t* requestList;
//StringDataList* stringList; //StringDataList* stringList;
boost::mutex mutex; boost::mutex mutex;
@ -1036,8 +1036,6 @@ protected:
private: private:
void formatMiniStats(); void formatMiniStats();
typedef boost::shared_ptr<boost::thread> SPTHD;
typedef boost::shared_array<SPTHD> SATHD;
void startPrimitiveThread(); void startPrimitiveThread();
void startAggregationThread(); void startAggregationThread();
void initializeConfigParms(); void initializeConfigParms();
@ -1060,7 +1058,7 @@ private:
uint32_t fNumThreads; uint32_t fNumThreads;
PrimitiveStepType ffirstStepType; PrimitiveStepType ffirstStepType;
bool isFilterFeeder; bool isFilterFeeder;
SATHD fProducerThread; std::vector<uint64_t> fProducerThreads; // thread pool handles
messageqcpp::ByteStream fFilterString; messageqcpp::ByteStream fFilterString;
uint32_t fFilterCount; uint32_t fFilterCount;
execplan::CalpontSystemCatalog::ColType fColType; execplan::CalpontSystemCatalog::ColType fColType;
@ -1097,8 +1095,8 @@ private:
uint64_t fMsgBytesOut; // total byte count for outcoming messages uint64_t fMsgBytesOut; // total byte count for outcoming messages
uint64_t fBlockTouched; // total blocks touched uint64_t fBlockTouched; // total blocks touched
uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File
boost::shared_ptr<boost::thread> cThread; //consumer thread // uint64_t cThread; //consumer thread. thread handle from thread pool
boost::shared_ptr<boost::thread> pThread; //producer thread uint64_t pThread; //producer thread. thread handle from thread pool
boost::mutex tplMutex; boost::mutex tplMutex;
boost::mutex dlMutex; boost::mutex dlMutex;
boost::mutex cpMutex; boost::mutex cpMutex;
@ -1251,7 +1249,7 @@ private:
execplan::CalpontSystemCatalog::OID fTableOID; execplan::CalpontSystemCatalog::OID fTableOID;
execplan::CalpontSystemCatalog::ColType fColType; execplan::CalpontSystemCatalog::ColType fColType;
int8_t fBOP; int8_t fBOP;
boost::shared_ptr<boost::thread> runner; // @bug 686 // int64_t runner; // thread handle from thread pool
// @bug 687 Add data and friend declarations for concurrent filter steps. // @bug 687 Add data and friend declarations for concurrent filter steps.
std::vector<ElementType> fSortedA; // used to internally sort input data std::vector<ElementType> fSortedA; // used to internally sort input data

View File

@ -150,6 +150,7 @@ SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo)
, fEndOfResult(false) , fEndOfResult(false)
, fInputIterator(0) , fInputIterator(0)
, fOutputIterator(0) , fOutputIterator(0)
, fRunner(0)
{ {
fAlias = s->alias(); fAlias = s->alias();
fView = s->view(); fView = s->view();
@ -191,14 +192,14 @@ void SubAdapterStep::run()
if (fDelivery) if (fDelivery)
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void SubAdapterStep::join() void SubAdapterStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@ -230,7 +230,7 @@ protected:
SubAdapterStep* fStep; SubAdapterStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
boost::scoped_ptr<funcexp::FuncExpWrapper> fExpression; boost::scoped_ptr<funcexp::FuncExpWrapper> fExpression;
}; };

View File

@ -180,14 +180,13 @@ void TupleBPS::initializeConfigParms()
else else
fMaxNumThreads = 1; fMaxNumThreads = 1;
fProducerThread.reset(new SPTHD[fMaxNumThreads]); // Reserve the max number of thread space. A bit of an optimization.
// Make maxnum thread objects even if they don't get used to make join() safe. fProducerThreads.clear();
for (uint32_t i = 0; i < fMaxNumThreads; i++) fProducerThreads.reserve(fMaxNumThreads);
fProducerThread[i].reset(new thread());
} }
TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) : TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
BatchPrimitive(jobInfo), fRm(jobInfo.rm) BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm)
{ {
fInputJobStepAssociation = rhs.inputAssociation(); fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation(); fOutputJobStepAssociation = rhs.outputAssociation();
@ -800,7 +799,7 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
void TupleBPS::startPrimitiveThread() void TupleBPS::startPrimitiveThread()
{ {
pThread.reset(new boost::thread(TupleBPSPrimitive(this))); pThread = jobstepThreadPool.invoke(TupleBPSPrimitive(this));
} }
void TupleBPS::startAggregationThread() void TupleBPS::startAggregationThread()
@ -809,13 +808,13 @@ void TupleBPS::startAggregationThread()
// fMaxNumThreads = 1; // fMaxNumThreads = 1;
// fNumThreads = fMaxNumThreads; // fNumThreads = fMaxNumThreads;
// for (uint32_t i = 0; i < fMaxNumThreads; i++) // for (uint32_t i = 0; i < fMaxNumThreads; i++)
// fProducerThread[i].reset(new boost::thread(TupleBPSAggregators(this, i))); // fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i)));
// This block of code starts one thread at a time // This block of code starts one thread at a time
if (fNumThreads >= fMaxNumThreads) if (fNumThreads >= fMaxNumThreads)
return; return;
fNumThreads++; fNumThreads++;
fProducerThread[fNumThreads-1].reset(new boost::thread(TupleBPSAggregators(this, fNumThreads-1))); fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads-1)));
} }
//#include "boost/date_time/posix_time/posix_time.hpp" //#include "boost/date_time/posix_time/posix_time.hpp"
@ -1117,6 +1116,8 @@ void TupleBPS::run()
serializeJoiner(); serializeJoiner();
prepCasualPartitioning(); prepCasualPartitioning();
startPrimitiveThread(); startPrimitiveThread();
fProducerThreads.clear();
fProducerThreads.reserve(fMaxNumThreads);
startAggregationThread(); startAggregationThread();
} }
catch (const std::exception& e) catch (const std::exception& e)
@ -1153,10 +1154,10 @@ void TupleBPS::join()
} }
if (pThread) if (pThread)
pThread->join(); jobstepThreadPool.join(pThread);
jobstepThreadPool.join(fProducerThreads);
for (uint32_t i = 0; i < fMaxNumThreads; i++)
fProducerThread[i]->join();
if (BPPIsAllocated) { if (BPPIsAllocated) {
ByteStream bs; ByteStream bs;
fDec->removeDECEventListener(this); fDec->removeDECEventListener(this);

View File

@ -182,6 +182,7 @@ TupleAggregateStep::TupleAggregateStep(
fAggregator(agg), fAggregator(agg),
fRowGroupOut(rgOut), fRowGroupOut(rgOut),
fRowGroupIn(rgIn), fRowGroupIn(rgIn),
fRunner(0),
fUmOnly(false), fUmOnly(false),
fRm(jobInfo.rm), fRm(jobInfo.rm),
fBucketNum(0), fBucketNum(0),
@ -252,7 +253,7 @@ void TupleAggregateStep::run()
{ {
if (fDelivery == false) if (fDelivery == false)
{ {
fRunner.reset(new thread(Aggregator(this))); fRunner = jobstepThreadPool.invoke(Aggregator(this));
} }
} }
@ -260,7 +261,7 @@ void TupleAggregateStep::run()
void TupleAggregateStep::join() void TupleAggregateStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }
@ -4210,8 +4211,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
// maximum number is reached. // maximum number is reached.
if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads && if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads &&
dlIn->more(fInputIter)) { dlIn->more(fInputIter)) {
fFirstPhaseRunners[fFirstPhaseThreadCount].reset fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount)));
(new boost::thread(ThreadedAggregator(this, fFirstPhaseThreadCount)));
fFirstPhaseThreadCount++; fFirstPhaseThreadCount++;
} }
@ -4482,24 +4482,21 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
fFirstPhaseThreadCount = fNumOfThreads; fFirstPhaseThreadCount = fNumOfThreads;
boost::shared_ptr<boost::thread> runner; boost::shared_ptr<boost::thread> runner;
for (i = 0; i < fNumOfThreads; i++) for (i = 0; i < fNumOfThreads; i++)
{ {
runner.reset(new boost::thread(ThreadedAggregator(this, i))); fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)))
fFirstPhaseRunners.push_back(runner);
} }
*/ */
// This block of code starts one thread, relies on doThreadedAggregation() // This block of code starts one thread, relies on doThreadedAggregation()
// to start more as needed // to start more as needed
fFirstPhaseRunners.resize(fNumOfThreads); // to prevent a resize during use fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
fFirstPhaseThreadCount = 1; fFirstPhaseThreadCount = 1;
for (i = 1; i < fNumOfThreads; i++) fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0)));
// fill with valid thread objects to make joining work
fFirstPhaseRunners[i].reset(new boost::thread());
fFirstPhaseRunners[0].reset(new boost::thread(ThreadedAggregator(this, 0)));
for (i = 0; i < fNumOfThreads; i++) // Now wait for that thread plus all the threads it may have spawned
fFirstPhaseRunners[i]->join(); jobstepThreadPool.join(fFirstPhaseRunners);
fFirstPhaseRunners.clear(); fFirstPhaseRunners.clear();
} }
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0) if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)
@ -4509,8 +4506,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
{ {
if (!fDoneAggregate) if (!fDoneAggregate)
{ {
vector<boost::shared_ptr<thread> > runners; vector<uint64_t> runners; // thread pool handles
boost::shared_ptr<thread> runner;
fRowGroupsDeliveredData.resize(fNumOfBuckets); fRowGroupsDeliveredData.resize(fNumOfBuckets);
uint32_t bucketsPerThread = fNumOfBuckets/fNumOfThreads; uint32_t bucketsPerThread = fNumOfBuckets/fNumOfThreads;
@ -4519,13 +4515,12 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
//uint32_t bucketsPerThread = 1; //uint32_t bucketsPerThread = 1;
//uint32_t numThreads = fNumOfBuckets; //uint32_t numThreads = fNumOfBuckets;
runners.reserve(numThreads);
for (i = 0; i < numThreads; i++) for (i = 0; i < numThreads; i++)
{ {
runner.reset(new boost::thread(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread))); runners.push_back(jobstepThreadPool.invoke(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread)));
runners.push_back(runner);
} }
for (i = 0; i < numThreads; i++) jobstepThreadPool.join(runners);
runners[i]->join();
} }
fDoneAggregate = true; fDoneAggregate = true;

View File

@ -166,7 +166,7 @@ private:
uint32_t bucketCount; uint32_t bucketCount;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
bool fUmOnly; bool fUmOnly;
ResourceManager *fRm; ResourceManager *fRm;
@ -186,7 +186,7 @@ private:
bool fIsMultiThread; bool fIsMultiThread;
int fInputIter; // iterator int fInputIter; // iterator
boost::scoped_array<uint64_t> fMemUsage; boost::scoped_array<uint64_t> fMemUsage;
vector<boost::shared_ptr<boost::thread> > fFirstPhaseRunners; std::vector<uint64_t> fFirstPhaseRunners; // thread pool handles
uint32_t fFirstPhaseThreadCount; uint32_t fFirstPhaseThreadCount;
boost::shared_ptr<int64_t> fSessionMemLimit; boost::shared_ptr<int64_t> fSessionMemLimit;

View File

@ -106,6 +106,7 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) :
fOutputDL(NULL), fOutputDL(NULL),
fInputIterator(0), fInputIterator(0),
fOutputIterator(0), fOutputIterator(0),
fRunner(0),
fRowsProcessed(0), fRowsProcessed(0),
fRowsReturned(0), fRowsReturned(0),
fLimitStart(0), fLimitStart(0),
@ -205,14 +206,14 @@ void TupleAnnexStep::run()
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
} }
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void TupleAnnexStep::join() void TupleAnnexStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@ -111,7 +111,7 @@ protected:
TupleAnnexStep* fStep; TupleAnnexStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
uint64_t fRowsProcessed; uint64_t fRowsProcessed;
uint64_t fRowsReturned; uint64_t fRowsReturned;

View File

@ -81,6 +81,7 @@ TupleConstantStep::TupleConstantStep(const JobInfo& jobInfo) :
fInputDL(NULL), fInputDL(NULL),
fOutputDL(NULL), fOutputDL(NULL),
fInputIterator(0), fInputIterator(0),
fRunner(0),
fEndOfResult(false) fEndOfResult(false)
{ {
fExtendedInfo = "TCS: "; fExtendedInfo = "TCS: ";
@ -290,7 +291,7 @@ void TupleConstantStep::run()
if (fOutputDL == NULL) if (fOutputDL == NULL)
throw logic_error("Output is not a RowGroup data list."); throw logic_error("Output is not a RowGroup data list.");
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
} }
@ -298,7 +299,7 @@ void TupleConstantStep::run()
void TupleConstantStep::join() void TupleConstantStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@ -101,7 +101,7 @@ protected:
TupleConstantStep* fStep; TupleConstantStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
bool fEndOfResult; bool fEndOfResult;
}; };

View File

@ -166,7 +166,7 @@ void TupleHashJoinStep::run()
} }
joiners.resize(smallDLs.size()); joiners.resize(smallDLs.size());
mainRunner.reset(new boost::thread(HJRunner(this))); mainRunner = jobstepThreadPool.invoke(HJRunner(this));
} }
void TupleHashJoinStep::join() void TupleHashJoinStep::join()
@ -175,12 +175,12 @@ void TupleHashJoinStep::join()
if (joinRan) if (joinRan)
return; return;
joinRan = true; joinRan = true;
mainRunner->join(); jobstepThreadPool.join(mainRunner);
if (djs) { if (djs) {
for (int i = 0; i < (int) djsJoiners.size(); i++) for (int i = 0; i < (int) djsJoiners.size(); i++)
djs[i].join(); djs[i].join();
djsReader.join(); jobstepThreadPool.join(djsReader);
djsRelay.join(); jobstepThreadPool.join(djsRelay);
//cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl; //cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl;
} }
} }
@ -544,9 +544,10 @@ void TupleHashJoinStep::hjRunner()
} }
} }
smallRunners.clear();
smallRunners.reserve(smallDLs.size());
for (i = 0; i < smallDLs.size(); i++) for (i = 0; i < smallDLs.size(); i++)
smallRunners.push_back(boost::shared_ptr<boost::thread> smallRunners.push_back(jobstepThreadPool.invoke(SmallRunner(this, i)));
(new boost::thread(SmallRunner(this, i))));
} }
catch (thread_resource_error&) { catch (thread_resource_error&) {
string emsg = "TupleHashJoin caught a thread resource error, aborting...\n"; string emsg = "TupleHashJoin caught a thread resource error, aborting...\n";
@ -557,8 +558,7 @@ void TupleHashJoinStep::hjRunner()
deliverMutex.unlock(); deliverMutex.unlock();
} }
for (i = 0; i < smallRunners.size(); i++) jobstepThreadPool.join(smallRunners);
smallRunners[i]->join();
smallRunners.clear(); smallRunners.clear();
for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++) for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++)
@ -629,9 +629,9 @@ void TupleHashJoinStep::hjRunner()
/* If an error happened loading the existing data, these threads are necessary /* If an error happened loading the existing data, these threads are necessary
to finish the abort */ to finish the abort */
try { try {
djsRelay = boost::thread(DJSRelay(this)); djsRelay = jobstepThreadPool.invoke(DJSRelay(this));
relay = true; relay = true;
djsReader = boost::thread(DJSReader(this, smallSideCount)); djsReader = jobstepThreadPool.invoke(DJSReader(this, smallSideCount));
reader = true; reader = true;
for (i = 0; i < smallSideCount; i++) for (i = 0; i < smallSideCount; i++)
djs[i].run(); djs[i].run();
@ -1091,7 +1091,7 @@ void TupleHashJoinStep::startJoinThreads()
bool more = true; bool more = true;
RGData oneRG; RGData oneRG;
if (joinRunners) if (joinRunners.size() > 0)
return; return;
//@bug4836, in error case, stop process, and unblock the next step. //@bug4836, in error case, stop process, and unblock the next step.
@ -1142,13 +1142,11 @@ void TupleHashJoinStep::startJoinThreads()
makeDupList(fe2 ? fe2Output : outputRG); makeDupList(fe2 ? fe2Output : outputRG);
/* Start join runners */ /* Start join runners */
joinRunners.reset(new boost::shared_ptr<boost::thread>[joinThreadCount]); joinRunners.reserve(joinThreadCount);
for (i = 0; i < joinThreadCount; i++) for (i = 0; i < joinThreadCount; i++)
joinRunners[i].reset(new boost::thread(JoinRunner(this, i))); joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i)));
/* Join them and call endOfInput */ /* Join them and call endOfInput */
for (i = 0; i < joinThreadCount; i++) jobstepThreadPool.join(joinRunners);
joinRunners[i]->join();
if (lastSmallOuterJoiner != (uint32_t) -1) if (lastSmallOuterJoiner != (uint32_t) -1)
finishSmallOuterJoin(); finishSmallOuterJoin();

View File

@ -276,8 +276,8 @@ private:
uint32_t index; uint32_t index;
}; };
boost::shared_ptr<boost::thread> mainRunner; int64_t mainRunner; // thread handle from thread pool
std::vector<boost::shared_ptr<boost::thread> > smallRunners; std::vector<uint64_t> smallRunners; // thread handles from thread pool
// for notify TupleAggregateStep PM hashjoin // for notify TupleAggregateStep PM hashjoin
// Ideally, hashjoin and delivery communicate with RowGroupDL, // Ideally, hashjoin and delivery communicate with RowGroupDL,
@ -347,7 +347,7 @@ private:
void processDupList(uint32_t threadID, rowgroup::RowGroup &ingrp, void processDupList(uint32_t threadID, rowgroup::RowGroup &ingrp,
std::vector<rowgroup::RGData> *rowData); std::vector<rowgroup::RGData> *rowData);
boost::scoped_array<boost::shared_ptr<boost::thread> > joinRunners; std::vector<uint64_t> joinRunners; // thread handles from thread pool
boost::mutex inputDLLock, outputDLLock; boost::mutex inputDLLock, outputDLLock;
boost::shared_array<boost::shared_array<int> > columnMappings, fergMappings; boost::shared_array<boost::shared_array<int> > columnMappings, fergMappings;
boost::shared_array<int> fe2Mapping; boost::shared_array<int> fe2Mapping;
@ -375,7 +375,7 @@ private:
boost::scoped_array<DiskJoinStep> djs; boost::scoped_array<DiskJoinStep> djs;
boost::scoped_array<boost::shared_ptr<RowGroupDL> > fifos; boost::scoped_array<boost::shared_ptr<RowGroupDL> > fifos;
void djsReaderFcn(int index); void djsReaderFcn(int index);
boost::thread djsReader; uint64_t djsReader; // thread handle from thread pool
struct DJSReader { struct DJSReader {
DJSReader(TupleHashJoinStep *hj, uint32_t i) : HJ(hj), index(i) { } DJSReader(TupleHashJoinStep *hj, uint32_t i) : HJ(hj), index(i) { }
void operator()() { HJ->djsReaderFcn(index); } void operator()() { HJ->djsReaderFcn(index); }
@ -383,7 +383,7 @@ private:
uint32_t index; uint32_t index;
}; };
boost::thread djsRelay; uint64_t djsRelay; // thread handle from thread pool
void djsRelayFcn(); void djsRelayFcn();
struct DJSRelay { struct DJSRelay {
DJSRelay(TupleHashJoinStep *hj) : HJ(hj) { } DJSRelay(TupleHashJoinStep *hj) : HJ(hj) { }

View File

@ -63,6 +63,7 @@ TupleHavingStep::TupleHavingStep(const JobInfo& jobInfo) :
fInputDL(NULL), fInputDL(NULL),
fOutputDL(NULL), fOutputDL(NULL),
fInputIterator(0), fInputIterator(0),
fRunner(0),
fRowsReturned(0), fRowsReturned(0),
fEndOfResult(false), fEndOfResult(false),
fFeInstance(funcexp::FuncExp::instance()) fFeInstance(funcexp::FuncExp::instance())
@ -151,7 +152,7 @@ void TupleHavingStep::run()
if (fOutputDL == NULL) if (fOutputDL == NULL)
throw logic_error("Output is not a RowGroup data list."); throw logic_error("Output is not a RowGroup data list.");
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
} }
@ -159,7 +160,7 @@ void TupleHavingStep::run()
void TupleHavingStep::join() void TupleHavingStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@ -97,7 +97,7 @@ protected:
TupleHavingStep* fStep; TupleHavingStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
uint64_t fRowsReturned; uint64_t fRowsReturned;
bool fEndOfResult; bool fEndOfResult;

View File

@ -767,9 +767,9 @@ void TupleUnion::run()
} }
} }
runners.reserve(inputs.size());
for (i = 0; i < inputs.size(); i++) { for (i = 0; i < inputs.size(); i++) {
boost::shared_ptr<boost::thread> th(new boost::thread(Runner(this, i))); runners.push_back(jobstepThreadPool.invoke(Runner(this, i)));
runners.push_back(th);
} }
} }
@ -784,8 +784,7 @@ void TupleUnion::join()
joinRan = true; joinRan = true;
lk.unlock(); lk.unlock();
for (i = 0; i < runners.size(); i++) jobstepThreadPool.join(runners);
runners[i]->join();
runners.clear(); runners.clear();
uniquer->clear(); uniquer->clear();
rowMemory.clear(); rowMemory.clear();

View File

@ -118,7 +118,7 @@ private:
Runner(TupleUnion *t, uint32_t in) : tu(t), index(in) { } Runner(TupleUnion *t, uint32_t in) : tu(t), index(in) { }
void operator()() { tu->readInput(index); } void operator()() { tu->readInput(index); }
}; };
std::vector<boost::shared_ptr<boost::thread> > runners; std::vector<uint64_t> runners; //thread pool handles
struct Hasher { struct Hasher {
TupleUnion *ts; TupleUnion *ts;

View File

@ -140,6 +140,7 @@ namespace joblist
WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) : WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) :
JobStep(jobInfo), JobStep(jobInfo),
fRunner(0),
fCatalog(jobInfo.csc), fCatalog(jobInfo.csc),
fRowsReturned(0), fRowsReturned(0),
fEndOfResult(false), fEndOfResult(false),
@ -192,14 +193,14 @@ void WindowFunctionStep::run()
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
} }
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void WindowFunctionStep::join() void WindowFunctionStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }
@ -855,13 +856,13 @@ void WindowFunctionStep::execute()
if (fTotalThreads > fFunctionCount) if (fTotalThreads > fFunctionCount)
fTotalThreads = fFunctionCount; fTotalThreads = fFunctionCount;
fFunctionThreads.clear();
fFunctionThreads.reserve(fTotalThreads);
for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++) for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++)
fFunctionThreads.push_back( fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this)));
boost::shared_ptr<boost::thread>(new boost::thread(WFunction(this))));
// If cancelled, not all thread is started. // If cancelled, not all threads are started.
for (uint64_t i = 0; i < fFunctionThreads.size(); i++) jobstepThreadPool.join(fFunctionThreads);
fFunctionThreads[i]->join();
} }
if (!(cancelled())) if (!(cancelled()))

View File

@ -153,7 +153,7 @@ private:
WindowFunctionStep* fStep; WindowFunctionStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
boost::shared_ptr<execplan::CalpontSystemCatalog> fCatalog; boost::shared_ptr<execplan::CalpontSystemCatalog> fCatalog;
uint64_t fRowsReturned; uint64_t fRowsReturned;
@ -188,7 +188,7 @@ private:
WindowFunctionStep* fStep; WindowFunctionStep* fStep;
}; };
std::vector<boost::shared_ptr<boost::thread> > fFunctionThreads; std::vector<uint64_t> fFunctionThreads;
std::vector<RowPosition> fRows; std::vector<RowPosition> fRows;
std::vector<boost::shared_ptr<windowfunction::WindowFunction> > fFunctions; std::vector<boost::shared_ptr<windowfunction::WindowFunction> > fFunctions;

View File

@ -1438,7 +1438,8 @@ int main(int argc, char* argv[])
{ {
IOSocket ios; IOSocket ios;
ios = mqs->accept(); ios = mqs->accept();
exeMgrThreadPool.invoke(SessionThread(ios, ec, rm)); boost::thread thd(SessionThread(ios, ec, rm));
//exeMgrThreadPool.invoke(SessionThread(ios, ec, rm));
} }
exeMgrThreadPool.wait(); exeMgrThreadPool.wait();

View File

@ -177,10 +177,10 @@ void ThreadPool::join(std::vector<uint64_t> thrHandle)
} }
} }
int64_t ThreadPool::invoke(const Functor_T &threadfunc) uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
{ {
boost::mutex::scoped_lock lock1(fMutex); boost::mutex::scoped_lock lock1(fMutex);
int64_t thrHandle=0; uint64_t thrHandle=0;
for(;;) for(;;)
{ {
@ -210,6 +210,22 @@ int64_t ThreadPool::invoke(const Functor_T &threadfunc)
lock1.unlock(); lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this)); fThreads.create_thread(beginThreadFunc(*this));
if (fDebug)
{
logging::Message::Args args;
logging::Message message(5);
args.add("invoke: Starting thread ");
args.add(fThreadCount);
args.add(" max ");
args.add(fMaxThreads);
args.add(" queue ");
args.add(fQueueSize);
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
if (bAdded) if (bAdded)
break; break;
@ -227,7 +243,20 @@ int64_t ThreadPool::invoke(const Functor_T &threadfunc)
break; break;
} }
fThreadAvailable.wait(lock1); if (fDebug)
{
logging::Message::Args args;
logging::Message message(5);
args.add("invoke: Blocked waiting for thread. Count ");
args.add(fThreadCount);
args.add("max ");
args.add(fMaxThreads);
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
fThreadAvailable.wait(lock1);
}
} }
catch(...) catch(...)
{ {
@ -358,7 +387,7 @@ void ThreadPool::beginThread() throw()
} }
} }
int64_t ThreadPool::addFunctor(const Functor_T &func) uint64_t ThreadPool::addFunctor(const Functor_T &func)
{ {
bool bAtEnd = false; bool bAtEnd = false;

View File

@ -131,7 +131,7 @@ public:
* queueSize tasks already waiting, invoke() will block until a slot in the * queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free. * queue comes free.
*/ */
EXPORT int64_t invoke(const Functor_T &threadfunc); EXPORT uint64_t invoke(const Functor_T &threadfunc);
/** @brief stop the threads /** @brief stop the threads
*/ */
@ -153,13 +153,15 @@ public:
*/ */
EXPORT void dump(); EXPORT void dump();
EXPORT void setDebug(bool d) {fDebug = d;}
protected: protected:
private: private:
// Used internally to keep a handle associated with each functor for join() // Used internally to keep a handle associated with each functor for join()
struct PoolFunction_T struct PoolFunction_T
{ {
int64_t hndl; uint64_t hndl;
Functor_T functor; Functor_T functor;
}; };
@ -169,7 +171,7 @@ private:
/** @brief add a functor to the list /** @brief add a functor to the list
*/ */
int64_t addFunctor(const Functor_T &func); uint64_t addFunctor(const Functor_T &func);
/** @brief thread entry point /** @brief thread entry point
*/ */
@ -222,6 +224,7 @@ private:
uint32_t waitingFunctorsSize; uint32_t waitingFunctorsSize;
uint64_t fNextHandle; uint64_t fNextHandle;
bool fDebug;
}; };
} // namespace threadpool } // namespace threadpool