1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge pull request #125 from mariadb-corporation/MCOL-513

Mcol 513
This commit is contained in:
Andrew Hutchings
2017-02-20 17:47:36 +00:00
committed by GitHub
46 changed files with 1155 additions and 483 deletions

View File

@@ -137,6 +137,7 @@ CrossEngineStep::CrossEngineStep(
fRowsPerGroup(256), fRowsPerGroup(256),
fOutputDL(NULL), fOutputDL(NULL),
fOutputIterator(0), fOutputIterator(0),
fRunner(0),
fEndOfResult(false), fEndOfResult(false),
fSchema(schema), fSchema(schema),
fTable(table), fTable(table),
@@ -422,14 +423,14 @@ void CrossEngineStep::run()
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
} }
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void CrossEngineStep::join() void CrossEngineStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@@ -182,7 +182,7 @@ protected:
CrossEngineStep* fStep; CrossEngineStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
OIDVector fOIDVector; OIDVector fOIDVector;
bool fEndOfResult; bool fEndOfResult;
bool fRunExecuted; bool fRunExecuted;

View File

@@ -58,7 +58,7 @@ namespace joblist {
DiskJoinStep::DiskJoinStep() { } DiskJoinStep::DiskJoinStep() { }
DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bool lastOne) : JobStep(*t), thjs(t), DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bool lastOne) : JobStep(*t), thjs(t),
joinerIndex(joinIndex), closedOutput(false) mainThread(0), joinerIndex(joinIndex), closedOutput(false)
{ {
/* /*
grab all relevant vars from THJS grab all relevant vars from THJS
@@ -130,9 +130,10 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep *t, int djsIndex, int joinIndex, bo
DiskJoinStep::~DiskJoinStep() DiskJoinStep::~DiskJoinStep()
{ {
abort(); abort();
if (mainThread) { if (mainThread)
mainThread->join(); {
mainThread.reset(); jobstepThreadPool.join(mainThread);
mainThread = 0;
} }
if (jp) if (jp)
atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage());
@@ -151,13 +152,16 @@ void DiskJoinStep::loadExistingData(vector<RGData> &data)
void DiskJoinStep::run() void DiskJoinStep::run()
{ {
mainThread.reset(new boost::thread(Runner(this))); mainThread = jobstepThreadPool.invoke(Runner(this));
} }
void DiskJoinStep::join() void DiskJoinStep::join()
{ {
if (mainThread) if (mainThread)
mainThread->join(); {
jobstepThreadPool.join(mainThread);
mainThread = 0;
}
if (jp) { if (jp) {
atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage());
//int64_t memUsage; //int64_t memUsage;
@@ -479,12 +483,12 @@ void DiskJoinStep::mainRunner()
loadFIFO.reset(new FIFO<boost::shared_ptr<LoaderOutput> >(1, 1)); // double buffering should be good enough loadFIFO.reset(new FIFO<boost::shared_ptr<LoaderOutput> >(1, 1)); // double buffering should be good enough
buildFIFO.reset(new FIFO<boost::shared_ptr<BuilderOutput> >(1, 1)); buildFIFO.reset(new FIFO<boost::shared_ptr<BuilderOutput> >(1, 1));
loadThread.reset(new boost::thread(Loader(this))); std::vector<uint64_t> thrds;
buildThread.reset(new boost::thread(Builder(this))); thrds.reserve(3);
joinThread.reset(new boost::thread(Joiner(this))); thrds.push_back(jobstepThreadPool.invoke(Loader(this)));
loadThread->join(); thrds.push_back(jobstepThreadPool.invoke(Builder(this)));
buildThread->join(); thrds.push_back(jobstepThreadPool.invoke(Joiner(this)));
joinThread->join(); jobstepThreadPool.join(thrds);
} }
} }
CATCH_AND_LOG; CATCH_AND_LOG;

View File

@@ -49,7 +49,6 @@ class DiskJoinStep : public JobStep
boost::shared_array<int> LOMapping, SOMapping, SjoinFEMapping, LjoinFEMapping; boost::shared_array<int> LOMapping, SOMapping, SjoinFEMapping, LjoinFEMapping;
TupleHashJoinStep *thjs; TupleHashJoinStep *thjs;
boost::shared_ptr<boost::thread> runner;
boost::shared_ptr<funcexp::FuncExpWrapper> fe; boost::shared_ptr<funcexp::FuncExpWrapper> fe;
bool typeless; bool typeless;
JoinType joinType; JoinType joinType;
@@ -69,7 +68,7 @@ class DiskJoinStep : public JobStep
bool lastLargeIteration; bool lastLargeIteration;
uint32_t largeIterationCount; uint32_t largeIterationCount;
boost::shared_ptr<boost::thread> mainThread; uint64_t mainThread; // thread handle from thread pool
/* Loader structs */ /* Loader structs */
struct LoaderOutput { struct LoaderOutput {
@@ -86,8 +85,6 @@ class DiskJoinStep : public JobStep
}; };
void loadFcn(); void loadFcn();
boost::shared_ptr<boost::thread> loadThread;
/* Builder structs */ /* Builder structs */
struct BuilderOutput { struct BuilderOutput {
boost::shared_ptr<joiner::TupleJoiner> tupleJoiner; boost::shared_ptr<joiner::TupleJoiner> tupleJoiner;
@@ -104,7 +101,6 @@ class DiskJoinStep : public JobStep
DiskJoinStep *djs; DiskJoinStep *djs;
}; };
void buildFcn(); void buildFcn();
boost::shared_ptr<boost::thread> buildThread;
/* Joining structs */ /* Joining structs */
struct Joiner { struct Joiner {
@@ -113,7 +109,6 @@ class DiskJoinStep : public JobStep
DiskJoinStep *djs; DiskJoinStep *djs;
}; };
void joinFcn(); void joinFcn();
boost::shared_ptr<boost::thread> joinThread;
// limits & usage // limits & usage
boost::shared_ptr<int64_t> smallUsage; boost::shared_ptr<int64_t> smallUsage;

View File

@@ -28,7 +28,6 @@
using namespace std; using namespace std;
#include "joblist.h" #include "joblist.h"
#include "calpontsystemcatalog.h" #include "calpontsystemcatalog.h"
using namespace execplan; using namespace execplan;
@@ -73,13 +72,17 @@ JobList::JobList(bool isEM) :
JobList::~JobList() JobList::~JobList()
{ {
vector<boost::thread *> joiners;
boost::thread *tmp;
try try
{ {
if (fIsRunning) if (fIsRunning)
{ {
JobStepVector::iterator iter; #if 0
// This logic creates a set of threads to wind down the query
vector<uint64_t> joiners;
joiners.reserve(20);
NullStep nullStep; // For access to the static jobstepThreadPool.
JobStepVector::iterator iter;
JobStepVector::iterator end; JobStepVector::iterator end;
iter = fQuery.begin(); iter = fQuery.begin();
@@ -88,8 +91,7 @@ JobList::~JobList()
// Wait for all the query steps to finish // Wait for all the query steps to finish
while (iter != end) while (iter != end)
{ {
tmp = new boost::thread(JSJoiner(iter->get())); joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get())));
joiners.push_back(tmp);
++iter; ++iter;
} }
@@ -99,14 +101,40 @@ JobList::~JobList()
// wait for the projection steps // wait for the projection steps
while (iter != end) while (iter != end)
{ {
tmp = new boost::thread(JSJoiner(iter->get())); joiners.push_back(nullStep.jobstepThreadPool.invoke(JSJoiner(iter->get())));
joiners.push_back(tmp);
++iter; ++iter;
} }
for (uint32_t i = 0; i < joiners.size(); i++) { nullStep.jobstepThreadPool.join(joiners);
joiners[i]->join(); #endif
delete joiners[i]; // This logic stops the query steps one at a time
JobStepVector::iterator iter;
JobStepVector::iterator end;
end = fQuery.end();
for (iter = fQuery.begin(); iter != end; ++iter)
{
(*iter)->abort();
}
// Stop all the projection steps
end = fProject.end();
for (iter = fProject.begin(); iter != end; ++iter)
{
(*iter)->abort();
}
// Wait for all the query steps to end
end = fQuery.end();
for (iter = fQuery.begin(); iter != end; ++iter)
{
(*iter)->join();
}
// Wait for all the projection steps to end
end = fProject.end();
for (iter = fProject.begin(); iter != end; ++iter)
{
(*iter)->join();
} }
} }
} }

View File

@@ -248,6 +248,7 @@
<F N="tupleunion.cpp"/> <F N="tupleunion.cpp"/>
<F N="unique32generator.cpp"/> <F N="unique32generator.cpp"/>
<F N="virtualtable.cpp"/> <F N="virtualtable.cpp"/>
<F N="windowfunctionstep.cpp"/>
</Folder> </Folder>
<Folder <Folder
Name="Header Files" Name="Header Files"
@@ -305,6 +306,7 @@
<F N="tupleunion.h"/> <F N="tupleunion.h"/>
<F N="unique32generator.h"/> <F N="unique32generator.h"/>
<F N="virtualtable.h"/> <F N="virtualtable.h"/>
<F N="windowfunctionstep.h"/>
</Folder> </Folder>
<Folder <Folder
Name="Resource Files" Name="Resource Files"

View File

@@ -20,6 +20,7 @@
#include <string> #include <string>
using namespace std; using namespace std;
#include <stdlib.h>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/uuid/uuid.hpp> #include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_generators.hpp>
@@ -55,6 +56,8 @@ namespace joblist
{ {
boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER; boost::mutex JobStep::fLogMutex; //=PTHREAD_MUTEX_INITIALIZER;
ThreadPool JobStep::jobstepThreadPool(defaultJLThreadPoolSize, 0);
ostream& operator<<(ostream& os, const JobStep* rhs) ostream& operator<<(ostream& os, const JobStep* rhs)
{ {
os << rhs->toString(); os << rhs->toString();

View File

@@ -42,7 +42,7 @@
#include "timestamp.h" #include "timestamp.h"
#include "rowgroup.h" #include "rowgroup.h"
#include "querytele.h" #include "querytele.h"
#include "threadpool.h"
#include "atomicops.h" #include "atomicops.h"
#include "branchpred.h" #include "branchpred.h"
@@ -53,6 +53,7 @@
# endif # endif
#endif #endif
using namespace threadpool;
namespace joblist namespace joblist
{ {
@@ -233,6 +234,7 @@ public:
bool onClauseFilter() const { return fOnClauseFilter; } bool onClauseFilter() const { return fOnClauseFilter; }
void onClauseFilter(bool b) { fOnClauseFilter = b; } void onClauseFilter(bool b) { fOnClauseFilter = b; }
static ThreadPool jobstepThreadPool;
protected: protected:
//@bug6088, for telemetry posting //@bug6088, for telemetry posting
@@ -328,6 +330,20 @@ public:
virtual bool deliverStringTableRowGroup() const = 0; virtual bool deliverStringTableRowGroup() const = 0;
}; };
class NullStep : public JobStep
{
public:
/** @brief virtual void Run method
*/
virtual void run(){}
/** @brief virtual void join method
*/
virtual void join(){}
/** @brief virtual string toString method
*/
virtual const std::string toString() const {return "NullStep";}
};
// calls rhs->toString() // calls rhs->toString()
std::ostream& operator<<(std::ostream& os, const JobStep* rhs); std::ostream& operator<<(std::ostream& os, const JobStep* rhs);

View File

@@ -105,6 +105,8 @@ pDictionaryStep::pDictionaryStep(
recvWaiting(false), recvWaiting(false),
ridCount(0), ridCount(0),
fColType(ct), fColType(ct),
pThread(0),
cThread(0),
fFilterCount(0), fFilterCount(0),
requestList(0), requestList(0),
fInterval(jobInfo.flushInterval), fInterval(jobInfo.flushInterval),

View File

@@ -134,7 +134,9 @@ pDictionaryScan::pDictionaryScan(
sendWaiting(false), sendWaiting(false),
ridCount(0), ridCount(0),
ridList(0), ridList(0),
colType(ct), colType(ct),
pThread(0),
cThread(0),
fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()), fScanLbidReqLimit(jobInfo.rm->getJlScanLbidReqLimit()),
fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()), fScanLbidReqThreshold(jobInfo.rm->getJlScanLbidReqThreshold()),
fStopSending(false), fStopSending(false),
@@ -214,12 +216,12 @@ void pDictionaryScan::initializeConfigParms()
void pDictionaryScan::startPrimitiveThread() void pDictionaryScan::startPrimitiveThread()
{ {
pThread.reset(new boost::thread(pDictionaryScanPrimitive(this))); pThread = jobstepThreadPool.invoke(pDictionaryScanPrimitive(this));
} }
void pDictionaryScan::startAggregationThread() void pDictionaryScan::startAggregationThread()
{ {
cThread.reset(new boost::thread(pDictionaryScanAggregator(this))); cThread = jobstepThreadPool.invoke(pDictionaryScanAggregator(this));
} }
void pDictionaryScan::run() void pDictionaryScan::run()
@@ -243,8 +245,8 @@ void pDictionaryScan::run()
void pDictionaryScan::join() void pDictionaryScan::join()
{ {
pThread->join(); jobstepThreadPool.join(pThread);
cThread->join(); jobstepThreadPool.join(cThread);
if (isEquality && fDec) { if (isEquality && fDec) {
destroyEqualityFilter(); destroyEqualityFilter();
isEquality = false; isEquality = false;

View File

@@ -321,8 +321,8 @@ private:
BRM::DBRM dbrm; BRM::DBRM dbrm;
boost::shared_ptr<boost::thread> cThread; //consumer thread // boost::shared_ptr<boost::thread> cThread; //consumer thread
boost::shared_ptr<boost::thread> pThread; //producer thread // boost::shared_ptr<boost::thread> pThread; //producer thread
boost::mutex mutex; boost::mutex mutex;
boost::condition condvar; boost::condition condvar;
boost::condition flushed; boost::condition flushed;
@@ -636,8 +636,8 @@ private:
uint32_t recvWaiting; uint32_t recvWaiting;
int64_t ridCount; int64_t ridCount;
execplan::CalpontSystemCatalog::ColType fColType; execplan::CalpontSystemCatalog::ColType fColType;
boost::shared_ptr<boost::thread> pThread; //producer thread uint64_t pThread; //producer thread
boost::shared_ptr<boost::thread> cThread; //producer thread uint64_t cThread; //producer thread
messageqcpp::ByteStream fFilterString; messageqcpp::ByteStream fFilterString;
uint32_t fFilterCount; uint32_t fFilterCount;
@@ -772,8 +772,8 @@ private:
DataList<ElementType> *ridList; DataList<ElementType> *ridList;
messageqcpp::ByteStream fFilterString; messageqcpp::ByteStream fFilterString;
execplan::CalpontSystemCatalog::ColType colType; execplan::CalpontSystemCatalog::ColType colType;
boost::shared_ptr<boost::thread> pThread; //producer thread uint64_t pThread; //producer thread. thread pool handle
boost::shared_ptr<boost::thread> cThread; //producer thread uint64_t cThread; //consumer thread. thread pool handle
DataList_t* requestList; DataList_t* requestList;
//StringDataList* stringList; //StringDataList* stringList;
boost::mutex mutex; boost::mutex mutex;
@@ -1036,8 +1036,6 @@ protected:
private: private:
void formatMiniStats(); void formatMiniStats();
typedef boost::shared_ptr<boost::thread> SPTHD;
typedef boost::shared_array<SPTHD> SATHD;
void startPrimitiveThread(); void startPrimitiveThread();
void startAggregationThread(); void startAggregationThread();
void initializeConfigParms(); void initializeConfigParms();
@@ -1060,7 +1058,7 @@ private:
uint32_t fNumThreads; uint32_t fNumThreads;
PrimitiveStepType ffirstStepType; PrimitiveStepType ffirstStepType;
bool isFilterFeeder; bool isFilterFeeder;
SATHD fProducerThread; std::vector<uint64_t> fProducerThreads; // thread pool handles
messageqcpp::ByteStream fFilterString; messageqcpp::ByteStream fFilterString;
uint32_t fFilterCount; uint32_t fFilterCount;
execplan::CalpontSystemCatalog::ColType fColType; execplan::CalpontSystemCatalog::ColType fColType;
@@ -1097,8 +1095,8 @@ private:
uint64_t fMsgBytesOut; // total byte count for outcoming messages uint64_t fMsgBytesOut; // total byte count for outcoming messages
uint64_t fBlockTouched; // total blocks touched uint64_t fBlockTouched; // total blocks touched
uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File uint32_t fExtentsPerSegFile;//config num of Extents Per Segment File
boost::shared_ptr<boost::thread> cThread; //consumer thread // uint64_t cThread; //consumer thread. thread handle from thread pool
boost::shared_ptr<boost::thread> pThread; //producer thread uint64_t pThread; //producer thread. thread handle from thread pool
boost::mutex tplMutex; boost::mutex tplMutex;
boost::mutex dlMutex; boost::mutex dlMutex;
boost::mutex cpMutex; boost::mutex cpMutex;
@@ -1251,7 +1249,7 @@ private:
execplan::CalpontSystemCatalog::OID fTableOID; execplan::CalpontSystemCatalog::OID fTableOID;
execplan::CalpontSystemCatalog::ColType fColType; execplan::CalpontSystemCatalog::ColType fColType;
int8_t fBOP; int8_t fBOP;
boost::shared_ptr<boost::thread> runner; // @bug 686 // int64_t runner; // thread handle from thread pool
// @bug 687 Add data and friend declarations for concurrent filter steps. // @bug 687 Add data and friend declarations for concurrent filter steps.
std::vector<ElementType> fSortedA; // used to internally sort input data std::vector<ElementType> fSortedA; // used to internally sort input data
@@ -1333,7 +1331,7 @@ private:
bool isDictColumn; bool isDictColumn;
bool isEM; bool isEM;
boost::thread* fPTThd; // boost::thread* fPTThd;
// @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4. // @bug 663 - Added fSwallowRows for calpont.caltrace(16) which is TRACE_FLAGS::TRACE_NO_ROWS4.
// Running with this one will swallow rows at projection. // Running with this one will swallow rows at projection.

View File

@@ -60,7 +60,8 @@ namespace joblist
const uint64_t defaultHUATotalMem = 8 * 1024 * 1024 * 1024ULL; const uint64_t defaultHUATotalMem = 8 * 1024 * 1024 * 1024ULL;
const uint32_t defaultTupleDLMaxSize = 64 * 1024; const uint32_t defaultTupleDLMaxSize = 64 * 1024;
const uint32_t defaultTupleMaxBuckets = 256;
const uint32_t defaultJLThreadPoolSize = 100;
//pcolscan.cpp //pcolscan.cpp
const uint32_t defaultScanLbidReqLimit = 10000; const uint32_t defaultScanLbidReqLimit = 10000;
@@ -160,7 +161,7 @@ namespace joblist
unsigned getHjNumThreads() const { return fHjNumThreads; } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); } unsigned getHjNumThreads() const { return fHjNumThreads; } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); }
uint64_t getHjMaxElems() const { return getUintVal(fHashJoinStr, "MaxElems", defaultHJMaxElems); } uint64_t getHjMaxElems() const { return getUintVal(fHashJoinStr, "MaxElems", defaultHJMaxElems); }
uint32_t getHjFifoSizeLargeSide() const { return getUintVal(fHashJoinStr, "FifoSizeLargeSide", defaultHJFifoSizeLargeSide); } uint32_t getHjFifoSizeLargeSide() const { return getUintVal(fHashJoinStr, "FifoSizeLargeSide", defaultHJFifoSizeLargeSide); }
uint32_t getHjCPUniqueLimit() const { return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit); } uint32_t getHjCPUniqueLimit() const { return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit); }
uint64_t getPMJoinMemLimit() const { return pmJoinMemLimit; } uint64_t getPMJoinMemLimit() const { return pmJoinMemLimit; }
uint32_t getJLFlushInterval() const { return getUintVal(fJobListStr, "FlushInterval", defaultFlushInterval); } uint32_t getJLFlushInterval() const { return getUintVal(fJobListStr, "FlushInterval", defaultFlushInterval); }
@@ -168,6 +169,10 @@ namespace joblist
uint32_t getJlScanLbidReqLimit() const { return getUintVal(fJobListStr, "ScanLbidReqLimit",defaultScanLbidReqLimit); } uint32_t getJlScanLbidReqLimit() const { return getUintVal(fJobListStr, "ScanLbidReqLimit",defaultScanLbidReqLimit); }
uint32_t getJlScanLbidReqThreshold() const { return getUintVal(fJobListStr,"ScanLbidReqThreshold", defaultScanLbidReqThreshold); } uint32_t getJlScanLbidReqThreshold() const { return getUintVal(fJobListStr,"ScanLbidReqThreshold", defaultScanLbidReqThreshold); }
// @MCOL-513 - Added threadpool to JobSteps
uint32_t getJLThreadPoolSize() const { return getUintVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize); }
std::string getJlThreadPoolDebug() const { return getStringVal(fJobListStr, "ThreadPoolDebug", "N"); }
// @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request. // @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request.
uint32_t getJlLogicalBlocksPerScan() const { return getUintVal(fJobListStr,"LogicalBlocksPerScan", defaultLogicalBlocksPerScan); } uint32_t getJlLogicalBlocksPerScan() const { return getUintVal(fJobListStr,"LogicalBlocksPerScan", defaultLogicalBlocksPerScan); }
uint32_t getJlProjectBlockReqLimit() const { return getUintVal(fJobListStr, "ProjectBlockReqLimit", defaultProjectBlockReqLimit ); } uint32_t getJlProjectBlockReqLimit() const { return getUintVal(fJobListStr, "ProjectBlockReqLimit", defaultProjectBlockReqLimit ); }
@@ -180,9 +185,9 @@ namespace joblist
uint32_t getJlMaxOutstandingRequests() const { return getUintVal(fJobListStr,"MaxOutstandingRequests", defaultMaxOutstandingRequests);} uint32_t getJlMaxOutstandingRequests() const { return getUintVal(fJobListStr,"MaxOutstandingRequests", defaultMaxOutstandingRequests);}
uint32_t getJlJoinerChunkSize() const { return getUintVal(fJobListStr,"JoinerChunkSize", defaultJoinerChunkSize);} uint32_t getJlJoinerChunkSize() const { return getUintVal(fJobListStr,"JoinerChunkSize", defaultJoinerChunkSize);}
int getPsCount() const { return getUintVal(fPrimitiveServersStr, "Count", defaultPSCount ); } int getPsCount() const { return getUintVal(fPrimitiveServersStr, "Count", defaultPSCount ); }
int getPsConnectionsPerPrimProc() const { return getUintVal(fPrimitiveServersStr, "ConnectionsPerPrimProc", defaultConnectionsPerPrimProc); } int getPsConnectionsPerPrimProc() const { return getUintVal(fPrimitiveServersStr, "ConnectionsPerPrimProc", defaultConnectionsPerPrimProc); }
uint32_t getPsLBID_Shift() const { return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift ); } uint32_t getPsLBID_Shift() const { return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift ); }
std::string getScTempDiskPath() const { return getStringVal(fSystemConfigStr, "TempDiskPath", defaultTempDiskPath ); } std::string getScTempDiskPath() const { return getStringVal(fSystemConfigStr, "TempDiskPath", defaultTempDiskPath ); }
uint64_t getScTempSaveSize() const { return getUintVal(fSystemConfigStr, "TempSaveSize", defaultTempSaveSize); } uint64_t getScTempSaveSize() const { return getUintVal(fSystemConfigStr, "TempSaveSize", defaultTempSaveSize); }

View File

@@ -150,6 +150,7 @@ SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo)
, fEndOfResult(false) , fEndOfResult(false)
, fInputIterator(0) , fInputIterator(0)
, fOutputIterator(0) , fOutputIterator(0)
, fRunner(0)
{ {
fAlias = s->alias(); fAlias = s->alias();
fView = s->view(); fView = s->view();
@@ -191,14 +192,14 @@ void SubAdapterStep::run()
if (fDelivery) if (fDelivery)
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void SubAdapterStep::join() void SubAdapterStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@@ -230,7 +230,7 @@ protected:
SubAdapterStep* fStep; SubAdapterStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
boost::scoped_ptr<funcexp::FuncExpWrapper> fExpression; boost::scoped_ptr<funcexp::FuncExpWrapper> fExpression;
}; };

View File

@@ -180,14 +180,13 @@ void TupleBPS::initializeConfigParms()
else else
fMaxNumThreads = 1; fMaxNumThreads = 1;
fProducerThread.reset(new SPTHD[fMaxNumThreads]); // Reserve the max number of thread space. A bit of an optimization.
// Make maxnum thread objects even if they don't get used to make join() safe. fProducerThreads.clear();
for (uint32_t i = 0; i < fMaxNumThreads; i++) fProducerThreads.reserve(fMaxNumThreads);
fProducerThread[i].reset(new thread());
} }
TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) : TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
BatchPrimitive(jobInfo), fRm(jobInfo.rm) BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm)
{ {
fInputJobStepAssociation = rhs.inputAssociation(); fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation(); fOutputJobStepAssociation = rhs.outputAssociation();
@@ -800,7 +799,7 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
void TupleBPS::startPrimitiveThread() void TupleBPS::startPrimitiveThread()
{ {
pThread.reset(new boost::thread(TupleBPSPrimitive(this))); pThread = jobstepThreadPool.invoke(TupleBPSPrimitive(this));
} }
void TupleBPS::startAggregationThread() void TupleBPS::startAggregationThread()
@@ -809,13 +808,13 @@ void TupleBPS::startAggregationThread()
// fMaxNumThreads = 1; // fMaxNumThreads = 1;
// fNumThreads = fMaxNumThreads; // fNumThreads = fMaxNumThreads;
// for (uint32_t i = 0; i < fMaxNumThreads; i++) // for (uint32_t i = 0; i < fMaxNumThreads; i++)
// fProducerThread[i].reset(new boost::thread(TupleBPSAggregators(this, i))); // fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, i)));
// This block of code starts one thread at a time // This block of code starts one thread at a time
if (fNumThreads >= fMaxNumThreads) if (fNumThreads >= fMaxNumThreads)
return; return;
fNumThreads++; fNumThreads++;
fProducerThread[fNumThreads-1].reset(new boost::thread(TupleBPSAggregators(this, fNumThreads-1))); fProducerThreads.push_back(jobstepThreadPool.invoke(TupleBPSAggregators(this, fNumThreads-1)));
} }
//#include "boost/date_time/posix_time/posix_time.hpp" //#include "boost/date_time/posix_time/posix_time.hpp"
@@ -1117,6 +1116,8 @@ void TupleBPS::run()
serializeJoiner(); serializeJoiner();
prepCasualPartitioning(); prepCasualPartitioning();
startPrimitiveThread(); startPrimitiveThread();
fProducerThreads.clear();
fProducerThreads.reserve(fMaxNumThreads);
startAggregationThread(); startAggregationThread();
} }
catch (const std::exception& e) catch (const std::exception& e)
@@ -1153,10 +1154,10 @@ void TupleBPS::join()
} }
if (pThread) if (pThread)
pThread->join(); jobstepThreadPool.join(pThread);
jobstepThreadPool.join(fProducerThreads);
for (uint32_t i = 0; i < fMaxNumThreads; i++)
fProducerThread[i]->join();
if (BPPIsAllocated) { if (BPPIsAllocated) {
ByteStream bs; ByteStream bs;
fDec->removeDECEventListener(this); fDec->removeDECEventListener(this);

View File

@@ -184,6 +184,7 @@ TupleAggregateStep::TupleAggregateStep(
fAggregator(agg), fAggregator(agg),
fRowGroupOut(rgOut), fRowGroupOut(rgOut),
fRowGroupIn(rgIn), fRowGroupIn(rgIn),
fRunner(0),
fUmOnly(false), fUmOnly(false),
fRm(jobInfo.rm), fRm(jobInfo.rm),
fBucketNum(0), fBucketNum(0),
@@ -254,7 +255,7 @@ void TupleAggregateStep::run()
{ {
if (fDelivery == false) if (fDelivery == false)
{ {
fRunner.reset(new thread(Aggregator(this))); fRunner = jobstepThreadPool.invoke(Aggregator(this));
} }
} }
@@ -262,7 +263,7 @@ void TupleAggregateStep::run()
void TupleAggregateStep::join() void TupleAggregateStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }
@@ -4210,13 +4211,14 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
// and if there is more data to read, the // and if there is more data to read, the
// first thread will start another thread until the // first thread will start another thread until the
// maximum number is reached. // maximum number is reached.
#if 0
if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads && if (threadID == 0 && fFirstPhaseThreadCount < fNumOfThreads &&
dlIn->more(fInputIter)) { dlIn->more(fInputIter))
fFirstPhaseRunners[fFirstPhaseThreadCount].reset {
(new boost::thread(ThreadedAggregator(this, fFirstPhaseThreadCount))); fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, fFirstPhaseThreadCount)));
fFirstPhaseThreadCount++; fFirstPhaseThreadCount++;
} }
#endif
fRowGroupIns[threadID].setData(&rgData); fRowGroupIns[threadID].setData(&rgData);
fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings(); fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings();
if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit)) if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit))
@@ -4479,29 +4481,29 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
if (!fDoneAggregate) if (!fDoneAggregate)
{ {
initializeMultiThread(); initializeMultiThread();
/*
// This block of code starts all threads at the start // This block of code starts all threads at the start
fFirstPhaseThreadCount = fNumOfThreads; fFirstPhaseThreadCount = fNumOfThreads;
boost::shared_ptr<boost::thread> runner; fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
for (i = 0; i < fNumOfThreads; i++) for (i = 0; i < fNumOfThreads; i++)
{ {
runner.reset(new boost::thread(ThreadedAggregator(this, i))); fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
fFirstPhaseRunners.push_back(runner);
} }
*/
#if 0
// This block of code starts one thread, relies on doThreadedAggregation() // This block of code starts one thread, relies on doThreadedAggregation()
// For reasons unknown, this doesn't work right with threadpool
// to start more as needed // to start more as needed
fFirstPhaseRunners.resize(fNumOfThreads); // to prevent a resize during use fFirstPhaseRunners.clear();
fFirstPhaseRunners.reserve(fNumOfThreads); // to prevent a resize during use
fFirstPhaseThreadCount = 1; fFirstPhaseThreadCount = 1;
for (i = 1; i < fNumOfThreads; i++) fFirstPhaseRunners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, 0)));
// fill with valid thread objects to make joining work #endif
fFirstPhaseRunners[i].reset(new boost::thread());
fFirstPhaseRunners[0].reset(new boost::thread(ThreadedAggregator(this, 0)));
for (i = 0; i < fNumOfThreads; i++) // Now wait for that thread plus all the threads it may have spawned
fFirstPhaseRunners[i]->join(); jobstepThreadPool.join(fFirstPhaseRunners);
fFirstPhaseRunners.clear(); fFirstPhaseRunners.clear();
} }
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0) if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)
@@ -4511,8 +4513,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
{ {
if (!fDoneAggregate) if (!fDoneAggregate)
{ {
vector<boost::shared_ptr<thread> > runners; vector<uint64_t> runners; // thread pool handles
boost::shared_ptr<thread> runner;
fRowGroupsDeliveredData.resize(fNumOfBuckets); fRowGroupsDeliveredData.resize(fNumOfBuckets);
uint32_t bucketsPerThread = fNumOfBuckets/fNumOfThreads; uint32_t bucketsPerThread = fNumOfBuckets/fNumOfThreads;
@@ -4521,13 +4522,12 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
//uint32_t bucketsPerThread = 1; //uint32_t bucketsPerThread = 1;
//uint32_t numThreads = fNumOfBuckets; //uint32_t numThreads = fNumOfBuckets;
runners.reserve(numThreads);
for (i = 0; i < numThreads; i++) for (i = 0; i < numThreads; i++)
{ {
runner.reset(new boost::thread(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread))); runners.push_back(jobstepThreadPool.invoke(ThreadedSecondPhaseAggregator(this, i*bucketsPerThread, bucketsPerThread)));
runners.push_back(runner);
} }
for (i = 0; i < numThreads; i++) jobstepThreadPool.join(runners);
runners[i]->join();
} }
fDoneAggregate = true; fDoneAggregate = true;

View File

@@ -166,7 +166,7 @@ private:
uint32_t bucketCount; uint32_t bucketCount;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
bool fUmOnly; bool fUmOnly;
ResourceManager *fRm; ResourceManager *fRm;
@@ -186,7 +186,7 @@ private:
bool fIsMultiThread; bool fIsMultiThread;
int fInputIter; // iterator int fInputIter; // iterator
boost::scoped_array<uint64_t> fMemUsage; boost::scoped_array<uint64_t> fMemUsage;
vector<boost::shared_ptr<boost::thread> > fFirstPhaseRunners; std::vector<uint64_t> fFirstPhaseRunners; // thread pool handles
uint32_t fFirstPhaseThreadCount; uint32_t fFirstPhaseThreadCount;
boost::shared_ptr<int64_t> fSessionMemLimit; boost::shared_ptr<int64_t> fSessionMemLimit;

View File

@@ -106,6 +106,7 @@ TupleAnnexStep::TupleAnnexStep(const JobInfo& jobInfo) :
fOutputDL(NULL), fOutputDL(NULL),
fInputIterator(0), fInputIterator(0),
fOutputIterator(0), fOutputIterator(0),
fRunner(0),
fRowsProcessed(0), fRowsProcessed(0),
fRowsReturned(0), fRowsReturned(0),
fLimitStart(0), fLimitStart(0),
@@ -205,14 +206,14 @@ void TupleAnnexStep::run()
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
} }
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void TupleAnnexStep::join() void TupleAnnexStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@@ -111,7 +111,7 @@ protected:
TupleAnnexStep* fStep; TupleAnnexStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
uint64_t fRowsProcessed; uint64_t fRowsProcessed;
uint64_t fRowsReturned; uint64_t fRowsReturned;

View File

@@ -81,6 +81,7 @@ TupleConstantStep::TupleConstantStep(const JobInfo& jobInfo) :
fInputDL(NULL), fInputDL(NULL),
fOutputDL(NULL), fOutputDL(NULL),
fInputIterator(0), fInputIterator(0),
fRunner(0),
fEndOfResult(false) fEndOfResult(false)
{ {
fExtendedInfo = "TCS: "; fExtendedInfo = "TCS: ";
@@ -290,7 +291,7 @@ void TupleConstantStep::run()
if (fOutputDL == NULL) if (fOutputDL == NULL)
throw logic_error("Output is not a RowGroup data list."); throw logic_error("Output is not a RowGroup data list.");
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
} }
@@ -298,7 +299,7 @@ void TupleConstantStep::run()
void TupleConstantStep::join() void TupleConstantStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@@ -101,7 +101,7 @@ protected:
TupleConstantStep* fStep; TupleConstantStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
bool fEndOfResult; bool fEndOfResult;
}; };

View File

@@ -166,7 +166,7 @@ void TupleHashJoinStep::run()
} }
joiners.resize(smallDLs.size()); joiners.resize(smallDLs.size());
mainRunner.reset(new boost::thread(HJRunner(this))); mainRunner = jobstepThreadPool.invoke(HJRunner(this));
} }
void TupleHashJoinStep::join() void TupleHashJoinStep::join()
@@ -175,12 +175,12 @@ void TupleHashJoinStep::join()
if (joinRan) if (joinRan)
return; return;
joinRan = true; joinRan = true;
mainRunner->join(); jobstepThreadPool.join(mainRunner);
if (djs) { if (djs) {
for (int i = 0; i < (int) djsJoiners.size(); i++) for (int i = 0; i < (int) djsJoiners.size(); i++)
djs[i].join(); djs[i].join();
djsReader.join(); jobstepThreadPool.join(djsReader);
djsRelay.join(); jobstepThreadPool.join(djsRelay);
//cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl; //cout << "THJS: joined all DJS threads, shared usage = " << *djsSmallUsage << endl;
} }
} }
@@ -544,9 +544,10 @@ void TupleHashJoinStep::hjRunner()
} }
} }
smallRunners.clear();
smallRunners.reserve(smallDLs.size());
for (i = 0; i < smallDLs.size(); i++) for (i = 0; i < smallDLs.size(); i++)
smallRunners.push_back(boost::shared_ptr<boost::thread> smallRunners.push_back(jobstepThreadPool.invoke(SmallRunner(this, i)));
(new boost::thread(SmallRunner(this, i))));
} }
catch (thread_resource_error&) { catch (thread_resource_error&) {
string emsg = "TupleHashJoin caught a thread resource error, aborting...\n"; string emsg = "TupleHashJoin caught a thread resource error, aborting...\n";
@@ -557,8 +558,7 @@ void TupleHashJoinStep::hjRunner()
deliverMutex.unlock(); deliverMutex.unlock();
} }
for (i = 0; i < smallRunners.size(); i++) jobstepThreadPool.join(smallRunners);
smallRunners[i]->join();
smallRunners.clear(); smallRunners.clear();
for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++) for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++)
@@ -629,9 +629,9 @@ void TupleHashJoinStep::hjRunner()
/* If an error happened loading the existing data, these threads are necessary /* If an error happened loading the existing data, these threads are necessary
to finish the abort */ to finish the abort */
try { try {
djsRelay = boost::thread(DJSRelay(this)); djsRelay = jobstepThreadPool.invoke(DJSRelay(this));
relay = true; relay = true;
djsReader = boost::thread(DJSReader(this, smallSideCount)); djsReader = jobstepThreadPool.invoke(DJSReader(this, smallSideCount));
reader = true; reader = true;
for (i = 0; i < smallSideCount; i++) for (i = 0; i < smallSideCount; i++)
djs[i].run(); djs[i].run();
@@ -1091,7 +1091,7 @@ void TupleHashJoinStep::startJoinThreads()
bool more = true; bool more = true;
RGData oneRG; RGData oneRG;
if (joinRunners) if (joinRunners.size() > 0)
return; return;
//@bug4836, in error case, stop process, and unblock the next step. //@bug4836, in error case, stop process, and unblock the next step.
@@ -1142,13 +1142,11 @@ void TupleHashJoinStep::startJoinThreads()
makeDupList(fe2 ? fe2Output : outputRG); makeDupList(fe2 ? fe2Output : outputRG);
/* Start join runners */ /* Start join runners */
joinRunners.reset(new boost::shared_ptr<boost::thread>[joinThreadCount]); joinRunners.reserve(joinThreadCount);
for (i = 0; i < joinThreadCount; i++) for (i = 0; i < joinThreadCount; i++)
joinRunners[i].reset(new boost::thread(JoinRunner(this, i))); joinRunners.push_back(jobstepThreadPool.invoke(JoinRunner(this, i)));
/* Join them and call endOfInput */ /* Join them and call endOfInput */
for (i = 0; i < joinThreadCount; i++) jobstepThreadPool.join(joinRunners);
joinRunners[i]->join();
if (lastSmallOuterJoiner != (uint32_t) -1) if (lastSmallOuterJoiner != (uint32_t) -1)
finishSmallOuterJoin(); finishSmallOuterJoin();

View File

@@ -276,8 +276,8 @@ private:
uint32_t index; uint32_t index;
}; };
boost::shared_ptr<boost::thread> mainRunner; int64_t mainRunner; // thread handle from thread pool
std::vector<boost::shared_ptr<boost::thread> > smallRunners; std::vector<uint64_t> smallRunners; // thread handles from thread pool
// for notify TupleAggregateStep PM hashjoin // for notify TupleAggregateStep PM hashjoin
// Ideally, hashjoin and delivery communicate with RowGroupDL, // Ideally, hashjoin and delivery communicate with RowGroupDL,
@@ -347,7 +347,7 @@ private:
void processDupList(uint32_t threadID, rowgroup::RowGroup &ingrp, void processDupList(uint32_t threadID, rowgroup::RowGroup &ingrp,
std::vector<rowgroup::RGData> *rowData); std::vector<rowgroup::RGData> *rowData);
boost::scoped_array<boost::shared_ptr<boost::thread> > joinRunners; std::vector<uint64_t> joinRunners; // thread handles from thread pool
boost::mutex inputDLLock, outputDLLock; boost::mutex inputDLLock, outputDLLock;
boost::shared_array<boost::shared_array<int> > columnMappings, fergMappings; boost::shared_array<boost::shared_array<int> > columnMappings, fergMappings;
boost::shared_array<int> fe2Mapping; boost::shared_array<int> fe2Mapping;
@@ -375,7 +375,7 @@ private:
boost::scoped_array<DiskJoinStep> djs; boost::scoped_array<DiskJoinStep> djs;
boost::scoped_array<boost::shared_ptr<RowGroupDL> > fifos; boost::scoped_array<boost::shared_ptr<RowGroupDL> > fifos;
void djsReaderFcn(int index); void djsReaderFcn(int index);
boost::thread djsReader; uint64_t djsReader; // thread handle from thread pool
struct DJSReader { struct DJSReader {
DJSReader(TupleHashJoinStep *hj, uint32_t i) : HJ(hj), index(i) { } DJSReader(TupleHashJoinStep *hj, uint32_t i) : HJ(hj), index(i) { }
void operator()() { HJ->djsReaderFcn(index); } void operator()() { HJ->djsReaderFcn(index); }
@@ -383,7 +383,7 @@ private:
uint32_t index; uint32_t index;
}; };
boost::thread djsRelay; uint64_t djsRelay; // thread handle from thread pool
void djsRelayFcn(); void djsRelayFcn();
struct DJSRelay { struct DJSRelay {
DJSRelay(TupleHashJoinStep *hj) : HJ(hj) { } DJSRelay(TupleHashJoinStep *hj) : HJ(hj) { }

View File

@@ -63,6 +63,7 @@ TupleHavingStep::TupleHavingStep(const JobInfo& jobInfo) :
fInputDL(NULL), fInputDL(NULL),
fOutputDL(NULL), fOutputDL(NULL),
fInputIterator(0), fInputIterator(0),
fRunner(0),
fRowsReturned(0), fRowsReturned(0),
fEndOfResult(false), fEndOfResult(false),
fFeInstance(funcexp::FuncExp::instance()) fFeInstance(funcexp::FuncExp::instance())
@@ -151,7 +152,7 @@ void TupleHavingStep::run()
if (fOutputDL == NULL) if (fOutputDL == NULL)
throw logic_error("Output is not a RowGroup data list."); throw logic_error("Output is not a RowGroup data list.");
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
} }
@@ -159,7 +160,7 @@ void TupleHavingStep::run()
void TupleHavingStep::join() void TupleHavingStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }

View File

@@ -97,7 +97,7 @@ protected:
TupleHavingStep* fStep; TupleHavingStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
uint64_t fRowsReturned; uint64_t fRowsReturned;
bool fEndOfResult; bool fEndOfResult;

View File

@@ -767,25 +767,22 @@ void TupleUnion::run()
} }
} }
runners.reserve(inputs.size());
for (i = 0; i < inputs.size(); i++) { for (i = 0; i < inputs.size(); i++) {
boost::shared_ptr<boost::thread> th(new boost::thread(Runner(this, i))); runners.push_back(jobstepThreadPool.invoke(Runner(this, i)));
runners.push_back(th);
} }
} }
void TupleUnion::join() void TupleUnion::join()
{ {
uint32_t i;
mutex::scoped_lock lk(jlLock); mutex::scoped_lock lk(jlLock);
Uniquer_t::iterator it;
if (joinRan) if (joinRan)
return; return;
joinRan = true; joinRan = true;
lk.unlock(); lk.unlock();
for (i = 0; i < runners.size(); i++) jobstepThreadPool.join(runners);
runners[i]->join();
runners.clear(); runners.clear();
uniquer->clear(); uniquer->clear();
rowMemory.clear(); rowMemory.clear();

View File

@@ -118,7 +118,7 @@ private:
Runner(TupleUnion *t, uint32_t in) : tu(t), index(in) { } Runner(TupleUnion *t, uint32_t in) : tu(t), index(in) { }
void operator()() { tu->readInput(index); } void operator()() { tu->readInput(index); }
}; };
std::vector<boost::shared_ptr<boost::thread> > runners; std::vector<uint64_t> runners; //thread pool handles
struct Hasher { struct Hasher {
TupleUnion *ts; TupleUnion *ts;

View File

@@ -140,6 +140,7 @@ namespace joblist
WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) : WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) :
JobStep(jobInfo), JobStep(jobInfo),
fRunner(0),
fCatalog(jobInfo.csc), fCatalog(jobInfo.csc),
fRowsReturned(0), fRowsReturned(0),
fEndOfResult(false), fEndOfResult(false),
@@ -192,14 +193,14 @@ void WindowFunctionStep::run()
fOutputIterator = fOutputDL->getIterator(); fOutputIterator = fOutputDL->getIterator();
} }
fRunner.reset(new boost::thread(Runner(this))); fRunner = jobstepThreadPool.invoke(Runner(this));
} }
void WindowFunctionStep::join() void WindowFunctionStep::join()
{ {
if (fRunner) if (fRunner)
fRunner->join(); jobstepThreadPool.join(fRunner);
} }
@@ -855,13 +856,13 @@ void WindowFunctionStep::execute()
if (fTotalThreads > fFunctionCount) if (fTotalThreads > fFunctionCount)
fTotalThreads = fFunctionCount; fTotalThreads = fFunctionCount;
fFunctionThreads.clear();
fFunctionThreads.reserve(fTotalThreads);
for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++) for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++)
fFunctionThreads.push_back( fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this)));
boost::shared_ptr<boost::thread>(new boost::thread(WFunction(this))));
// If cancelled, not all thread is started. // If cancelled, not all threads are started.
for (uint64_t i = 0; i < fFunctionThreads.size(); i++) jobstepThreadPool.join(fFunctionThreads);
fFunctionThreads[i]->join();
} }
if (!(cancelled())) if (!(cancelled()))

View File

@@ -153,7 +153,7 @@ private:
WindowFunctionStep* fStep; WindowFunctionStep* fStep;
}; };
boost::scoped_ptr<boost::thread> fRunner; uint64_t fRunner; // thread pool handle
boost::shared_ptr<execplan::CalpontSystemCatalog> fCatalog; boost::shared_ptr<execplan::CalpontSystemCatalog> fCatalog;
uint64_t fRowsReturned; uint64_t fRowsReturned;
@@ -188,7 +188,7 @@ private:
WindowFunctionStep* fStep; WindowFunctionStep* fStep;
}; };
std::vector<boost::shared_ptr<boost::thread> > fFunctionThreads; std::vector<uint64_t> fFunctionThreads;
std::vector<RowPosition> fRows; std::vector<RowPosition> fRows;
std::vector<boost::shared_ptr<windowfunction::WindowFunction> > fFunctions; std::vector<boost::shared_ptr<windowfunction::WindowFunction> > fFunctions;

View File

@@ -6355,7 +6355,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
// select * from derived table case // select * from derived table case
if (gwi.selectCols.empty()) if (gwi.selectCols.empty())
sel_cols_in_create = " * "; sel_cols_in_create = " * ";
create_query = "create temporary table " + vtb.str() + " as select " + sel_cols_in_create + " from "; create_query = "create temporary table " + vtb.str() + " engine = aria as select " + sel_cols_in_create + " from ";
TABLE_LIST* table_ptr = select_lex.get_table_list(); TABLE_LIST* table_ptr = select_lex.get_table_list();
bool firstTb = true; bool firstTb = true;

View File

@@ -340,6 +340,7 @@ DDLProcessor::DDLProcessor( int packageMaxThreads, int packageWorkQueueSize )
{ {
fDdlPackagepool.setMaxThreads(fPackageMaxThreads); fDdlPackagepool.setMaxThreads(fPackageMaxThreads);
fDdlPackagepool.setQueueSize(fPackageWorkQueueSize); fDdlPackagepool.setQueueSize(fPackageWorkQueueSize);
fDdlPackagepool.setName("DdlPackagepool");
csc = CalpontSystemCatalog::makeCalpontSystemCatalog(); csc = CalpontSystemCatalog::makeCalpontSystemCatalog();
csc->identity(CalpontSystemCatalog::EC); csc->identity(CalpontSystemCatalog::EC);
string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host")); string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));

View File

@@ -558,8 +558,23 @@ int main(int argc, char* argv[])
} }
DMLServer dmlserver(serverThreads, serverQueueSize,&dbrm); DMLServer dmlserver(serverThreads, serverQueueSize,&dbrm);
ResourceManager *rm = ResourceManager::instance();
//set ACTIVE state // jobstepThreadPool is used by other processes. We can't call
// resourcemanaager (rm) functions during the static creation of threadpool
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("DMLProcJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
// {
// JobStep::jobstepThreadPool.setDebug(true);
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
//set ACTIVE state
try try
{ {
oam.processInitComplete("DMLProc", ACTIVE); oam.processInitComplete("DMLProc", ACTIVE);
@@ -567,7 +582,6 @@ int main(int argc, char* argv[])
catch (...) catch (...)
{ {
} }
ResourceManager *rm = ResourceManager::instance();
Dec = DistributedEngineComm::instance(rm); Dec = DistributedEngineComm::instance(rm);
#ifndef _MSC_VER #ifndef _MSC_VER

View File

@@ -1130,6 +1130,7 @@ DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm
fDmlPackagepool.setMaxThreads(fPackageMaxThreads); fDmlPackagepool.setMaxThreads(fPackageMaxThreads);
fDmlPackagepool.setQueueSize(fPackageWorkQueueSize); fDmlPackagepool.setQueueSize(fPackageWorkQueueSize);
fDmlPackagepool.setName("DmlPackagepool");
} }
void DMLServer::start() void DMLServer::start()

View File

@@ -24,6 +24,8 @@ using namespace std;
using namespace joblist; using namespace joblist;
using namespace messageqcpp; using namespace messageqcpp;
threadpool::ThreadPool FEMsgHandler::threadPool(50,100);
namespace { namespace {
class Runner class Runner
@@ -50,14 +52,14 @@ FEMsgHandler::FEMsgHandler(boost::shared_ptr<JobList> j, IOSocket *s) :
FEMsgHandler::~FEMsgHandler() FEMsgHandler::~FEMsgHandler()
{ {
stop(); stop();
thr.join(); threadPool.join(thr);
} }
void FEMsgHandler::start() void FEMsgHandler::start()
{ {
if (!running) { if (!running) {
running = true; running = true;
thr = boost::thread(Runner(this)); thr = threadPool.invoke(Runner(this));
} }
} }

View File

@@ -20,6 +20,7 @@
#include "joblist.h" #include "joblist.h"
#include "inetstreamsocket.h" #include "inetstreamsocket.h"
#include "threadpool.h"
class FEMsgHandler class FEMsgHandler
{ {
@@ -36,12 +37,14 @@ public:
void threadFcn(); void threadFcn();
static threadpool::ThreadPool threadPool;
private: private:
bool die, running, sawData; bool die, running, sawData;
messageqcpp::IOSocket *sock; messageqcpp::IOSocket *sock;
boost::shared_ptr<joblist::JobList> jl; boost::shared_ptr<joblist::JobList> jl;
boost::thread thr;
boost::mutex mutex; boost::mutex mutex;
uint64_t thr;
}; };
#endif /* FEMSGHANDLER_H_ */ #endif /* FEMSGHANDLER_H_ */

View File

@@ -97,6 +97,8 @@ using namespace querytele;
#include "utils_utf8.h" #include "utils_utf8.h"
#include "boost/filesystem.hpp" #include "boost/filesystem.hpp"
#include "threadpool.h"
namespace { namespace {
//If any flags other than the table mode flags are set, produce output to screeen //If any flags other than the table mode flags are set, produce output to screeen
@@ -513,7 +515,7 @@ public:
SJLP jl; SJLP jl;
bool incSessionThreadCnt = true; bool incSessionThreadCnt = true;
bool selfJoin = false; bool selfJoin = false;
bool tryTuples = false; bool tryTuples = false;
bool usingTuples = false; bool usingTuples = false;
bool stmtCounted = false; bool stmtCounted = false;
@@ -1410,13 +1412,30 @@ int main(int argc, char* argv[])
} }
} }
// class jobstepThreadPool is used by other processes. We can't call
// resourcemanaager (rm) functions during the static creation of threadpool
// because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
// From the pools perspective, it has no idea if it is ExeMgr doing the
// creation, so it has no idea which way to set the flag. So we set the max here.
JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
JobStep::jobstepThreadPool.setName("ExeMgrJobList");
// if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
// {
// JobStep::jobstepThreadPool.setDebug(true);
// JobStep::jobstepThreadPool.invoke(ThreadPoolMonitor(&JobStep::jobstepThreadPool));
// }
int serverThreads = rm->getEmServerThreads(); int serverThreads = rm->getEmServerThreads();
int serverQueueSize = rm->getEmServerQueueSize(); int serverQueueSize = rm->getEmServerQueueSize();
int maxPct = rm->getEmMaxPct(); int maxPct = rm->getEmMaxPct();
int pauseSeconds = rm->getEmSecondsBetweenMemChecks(); int pauseSeconds = rm->getEmSecondsBetweenMemChecks();
int priority = rm->getEmPriority(); int priority = rm->getEmPriority();
if (maxPct > 0) FEMsgHandler::threadPool.setMaxThreads(serverThreads);
FEMsgHandler::threadPool.setQueueSize(serverQueueSize);
FEMsgHandler::threadPool.setName("FEMsgHandler");
if (maxPct > 0)
startRssMon(maxPct, pauseSeconds); startRssMon(maxPct, pauseSeconds);
#ifndef _MSC_VER #ifndef _MSC_VER
@@ -1450,12 +1469,15 @@ int main(int argc, char* argv[])
} }
} }
threadpool::ThreadPool exeMgrThreadPool(serverThreads, serverQueueSize);
exeMgrThreadPool.setName("ExeMgrServer");
for (;;) for (;;)
{ {
IOSocket ios; IOSocket ios;
ios = mqs->accept(); ios = mqs->accept();
boost::thread thd(SessionThread(ios, ec, rm)); exeMgrThreadPool.invoke(SessionThread(ios, ec, rm));
} }
exeMgrThreadPool.wait();
return 0; return 0;
} }

View File

@@ -503,6 +503,7 @@
as many threads are available across all PMs. --> as many threads are available across all PMs. -->
<!-- <ProcessorThreadsPerScan>16</ProcessorThreadsPerScan> --> <!-- <ProcessorThreadsPerScan>16</ProcessorThreadsPerScan> -->
<MaxOutstandingRequests>20</MaxOutstandingRequests> <MaxOutstandingRequests>20</MaxOutstandingRequests>
<ThreadPoolSize>100</ThreadPoolSize>
</JobList> </JobList>
<TupleWSDL> <TupleWSDL>
<MaxSize>1M</MaxSize> <!-- Max size in bytes per bucket --> <MaxSize>1M</MaxSize> <!-- Max size in bytes per bucket -->
@@ -515,9 +516,9 @@
<!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 --> <!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 -->
</RowAggregation> </RowAggregation>
<CrossEngineSupport> <CrossEngineSupport>
<Host>unassigned</Host> <Host>127.0.0.1</Host>
<Port>3306</Port> <Port>3306</Port>
<User>unassigned</User> <User>root</User>
<Password></Password> <Password></Password>
</CrossEngineSupport> </CrossEngineSupport>
<QueryStats> <QueryStats>

View File

@@ -495,8 +495,9 @@
is 20 extents worth of work for the PMs to process at any given time. is 20 extents worth of work for the PMs to process at any given time.
ProcessorThreadsPerScan * MaxOutstandingRequests should be at least ProcessorThreadsPerScan * MaxOutstandingRequests should be at least
as many threads are available across all PMs. --> as many threads are available across all PMs. -->
<!-- <ProcessorThreadsPerScan>16</ProcessorThreadsPerScan> --> <!-- <ProcessorThreadsPerScan>16</ProcessorThreadsPerScan> -->
<MaxOutstandingRequests>20</MaxOutstandingRequests> <MaxOutstandingRequests>20</MaxOutstandingRequests>
<ThreadPoolSize>100</ThreadPoolSize>
</JobList> </JobList>
<TupleWSDL> <TupleWSDL>
<MaxSize>1M</MaxSize> <!-- Max size in bytes per bucket --> <MaxSize>1M</MaxSize> <!-- Max size in bytes per bucket -->
@@ -509,9 +510,9 @@
<!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 --> <!-- <RowAggrRowGroupsPerThread>20</RowAggrRowGroupsPerThread> --> <!-- Default value is 20 -->
</RowAggregation> </RowAggregation>
<CrossEngineSupport> <CrossEngineSupport>
<Host>unassigned</Host> <Host>127.0.0.1</Host>
<Port>3306</Port> <Port>3306</Port>
<User>unassigned</User> <User>root</User>
<Password></Password> <Password></Password>
</CrossEngineSupport> </CrossEngineSupport>
<QueryStats> <QueryStats>

View File

@@ -2032,6 +2032,7 @@ PrimitiveServer::PrimitiveServer(int serverThreads,
fCacheCount=cacheCount; fCacheCount=cacheCount;
fServerpool.setMaxThreads(fServerThreads); fServerpool.setMaxThreads(fServerThreads);
fServerpool.setQueueSize(fServerQueueSize); fServerpool.setQueueSize(fServerQueueSize);
fServerpool.setName("PrimitiveServer");
fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads, fProcessorPool.reset(new threadpool::PriorityThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0)); medPriorityThreads, lowPriorityThreads, 0));

View File

@@ -436,15 +436,15 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
uint8_t* msglenp = reinterpret_cast<uint8_t*>(&msglen); uint8_t* msglenp = reinterpret_cast<uint8_t*>(&msglen);
size_t mlread = 0; size_t mlread = 0;
bool myIsTimeOut = false; if (readToMagic(msecs, isTimeOut, stats) == false) //indicates a timeout or EOF
if (readToMagic(msecs, &myIsTimeOut, stats) == false) //indicates a timeout or EOF
{ {
if (!myIsTimeOut) // MCOL-480 The connector calls with timeout in a loop so that
logIoError("InetStreamSocket::read: EOF during readToMagic", 0); // it can check a killed flag. This means that for a long running query,
if (isTimeOut) // the following fills the warning log.
{ // if (isTimeOut && *isTimeOut)
*isTimeOut = myIsTimeOut; // {
} // logIoError("InetStreamSocket::read: timeout during readToMagic", 0);
// }
return SBS(new ByteStream(0)); return SBS(new ByteStream(0));
} }

View File

@@ -20,7 +20,6 @@
* *
* *
***********************************************************************/ ***********************************************************************/
#include <stdexcept> #include <stdexcept>
using namespace std; using namespace std;
@@ -28,39 +27,37 @@ using namespace std;
#include "messagelog.h" #include "messagelog.h"
using namespace logging; using namespace logging;
#define THREADPOOL_DLLEXPORT
#include "threadpool.h" #include "threadpool.h"
#undef THREADPOOL_DLLEXPORT #include <iomanip>
#include <sstream>
#include "boost/date_time/posix_time/posix_time_types.hpp"
namespace threadpool namespace threadpool
{ {
ThreadPool::ThreadPool() ThreadPool::ThreadPool()
:fMaxThreads( 0 ), fQueueSize( 0 ) :fMaxThreads( 0 ), fQueueSize( 0 )
{ {
init(); init();
} }
ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize ) ThreadPool::ThreadPool( size_t maxThreads, size_t queueSize )
:fMaxThreads( maxThreads ), fQueueSize( queueSize ) :fMaxThreads( maxThreads ), fQueueSize( queueSize )
{ {
init(); init();
if (fQueueSize == 0)
fQueueSize = fMaxThreads*2;
} }
ThreadPool::~ThreadPool() throw() ThreadPool::~ThreadPool() throw()
{ {
// delete fThreadCreated;
try try
{ {
stop(); stop();
} }
catch(...) catch (...)
{} {
}
} }
void ThreadPool::init() void ThreadPool::init()
@@ -68,11 +65,12 @@ void ThreadPool::init()
fThreadCount = 0; fThreadCount = 0;
fGeneralErrors = 0; fGeneralErrors = 0;
fFunctorErrors = 0; fFunctorErrors = 0;
waitingFunctorsSize = 0; waitingFunctorsSize = 0;
issued = 0; fIssued = 0;
fDebug = false;
fStop = false; fStop = false;
// fThreadCreated = new NoOp();
fNextFunctor = fWaitingFunctors.end(); fNextFunctor = fWaitingFunctors.end();
fNextHandle=1;
} }
void ThreadPool::setQueueSize(size_t queueSize) void ThreadPool::setQueueSize(size_t queueSize)
@@ -88,11 +86,6 @@ void ThreadPool::setMaxThreads(size_t maxThreads)
fMaxThreads = maxThreads; fMaxThreads = maxThreads;
} }
void ThreadPool::setThreadCreatedListener(const Functor_T &f)
{
// fThreadCreated = f;
}
void ThreadPool::stop() void ThreadPool::stop()
{ {
boost::mutex::scoped_lock lock1(fMutex); boost::mutex::scoped_lock lock1(fMutex);
@@ -111,44 +104,120 @@ void ThreadPool::wait()
while (waitingFunctorsSize > 0) while (waitingFunctorsSize > 0)
{ {
fThreadAvailable.wait(lock1); fThreadAvailable.wait(lock1);
//cerr << "woke!" << endl; //cerr << "woke!" << endl;
} }
} }
void ThreadPool::invoke(const Functor_T &threadfunc) void ThreadPool::join(uint64_t thrHandle)
{ {
boost::mutex::scoped_lock lock1(fMutex); boost::mutex::scoped_lock lock1(fMutex);
for(;;) while (waitingFunctorsSize > 0)
{ {
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
foundit = false;
if (iter->hndl == thrHandle)
{
foundit = true;
break;
}
}
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
}
}
void ThreadPool::join(std::vector<uint64_t>& thrHandle)
{
boost::mutex::scoped_lock lock1(fMutex);
while (waitingFunctorsSize > 0)
{
Container_T::iterator iter;
Container_T::iterator end = fWaitingFunctors.end();
bool foundit = false;
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
{
foundit = false;
std::vector<uint64_t>::iterator thrIter;
std::vector<uint64_t>::iterator thrEnd = thrHandle.end();
for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter)
{
if (iter->hndl == *thrIter)
{
foundit = true;
break;
}
}
if (foundit == true)
{
break;
}
}
// If we didn't find any of the handles, then all are complete
if (!foundit)
{
break;
}
fThreadAvailable.wait(lock1);
}
}
uint64_t ThreadPool::invoke(const Functor_T &threadfunc)
{
boost::mutex::scoped_lock lock1(fMutex);
uint64_t thrHandle=0;
for (;;)
{
try try
{ {
if ( waitingFunctorsSize < fThreadCount) if (waitingFunctorsSize < fThreadCount)
{ {
// Don't create a thread unless it's needed. There // Don't create a thread unless it's needed. There
// is a thread available to service this request. // is a thread available to service this request.
addFunctor(threadfunc); thrHandle = addFunctor(threadfunc);
lock1.unlock(); lock1.unlock();
break; break;
} }
bool bAdded = false; bool bAdded = false;
if ( waitingFunctorsSize < fQueueSize) if (waitingFunctorsSize < fQueueSize || fQueueSize == 0)
{ {
// Don't create a thread unless you have to // Don't create a thread unless you have to
addFunctor(threadfunc); thrHandle = addFunctor(threadfunc);
bAdded = true; bAdded = true;
} }
if ( fThreadCount < fMaxThreads) // fQueueSize = 0 disables the queue and is an indicator to allow any number of threads to actually run.
if (fThreadCount < fMaxThreads || fQueueSize == 0)
{ {
++fThreadCount; ++fThreadCount;
lock1.unlock(); lock1.unlock();
fThreads.create_thread(beginThreadFunc(*this)); fThreads.create_thread(beginThreadFunc(*this));
if (fDebug)
{
ostringstream oss;
oss << "invoke: Starting thread " << fThreadCount << " max " << fMaxThreads
<< " queue " << fQueueSize;
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
if (bAdded) if (bAdded)
break; break;
@@ -165,9 +234,22 @@ void ThreadPool::invoke(const Functor_T &threadfunc)
break; break;
} }
if (fDebug)
{
logging::Message::Args args;
logging::Message message(5);
args.add("invoke: Blocked waiting for thread. Count ");
args.add(fThreadCount);
args.add("max ");
args.add(fMaxThreads);
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
}
fThreadAvailable.wait(lock1); fThreadAvailable.wait(lock1);
} }
catch(...) catch (...)
{ {
++fGeneralErrors; ++fGeneralErrors;
throw; throw;
@@ -175,17 +257,16 @@ void ThreadPool::invoke(const Functor_T &threadfunc)
} }
fNeedThread.notify_one(); fNeedThread.notify_one();
return thrHandle;
} }
void ThreadPool::beginThread() throw() void ThreadPool::beginThread() throw()
{ {
try try
{ {
// fThreadCreated();
boost::mutex::scoped_lock lock1(fMutex); boost::mutex::scoped_lock lock1(fMutex);
boost::system_time timeout = boost::get_system_time()+boost::posix_time::minutes(10);
for(;;) for (;;)
{ {
if (fStop) if (fStop)
break; break;
@@ -193,51 +274,60 @@ void ThreadPool::beginThread() throw()
if (fNextFunctor == fWaitingFunctors.end()) if (fNextFunctor == fWaitingFunctors.end())
{ {
// Wait until someone needs a thread // Wait until someone needs a thread
fNeedThread.wait(lock1); // Add the timed wait for queueSize == 0 so we can idle away threads
// over fMaxThreads
if (fQueueSize > 0)
{
fNeedThread.wait(lock1);
}
else
{
// Wait no more than 10 minutes
if (!fNeedThread.timed_wait(lock1, timeout)) // false means it timed out
{
if (fThreadCount > fMaxThreads)
{
--fThreadCount;
return;
}
timeout = boost::get_system_time()+boost::posix_time::minutes(10);
}
}
} }
else else
{ {
/* Need to tune these magic #s */ // If there's anything waiting, run it
if (waitingFunctorsSize - fIssued > 0)
{
Container_T::iterator todo = fNextFunctor++;
++fIssued;
lock1.unlock();
try
{
todo->functor();
}
catch (exception &e)
{
++fFunctorErrors;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("ThreadPool: Caught exception during execution: ");
args.add(e.what());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
}
lock1.lock();
--fIssued;
--waitingFunctorsSize;
fWaitingFunctors.erase(todo);
}
vector<Container_T::iterator> todoList; timeout = boost::get_system_time()+boost::posix_time::minutes(10);
int i, num;
Container_T::const_iterator iter;
/* Use this to control how many jobs are issued to a single thread */
num = (waitingFunctorsSize - issued >= 1 ? 1 : 0);
for (i = 0; i < num; i++)
todoList.push_back(fNextFunctor++);
issued += num;
// cerr << "got " << num << " jobs." << endl;
// cerr << "got " << num << " jobs. waitingFunctorsSize=" <<
// waitingFunctorsSize << " issued=" << issued << " fThreadCount=" <<
// fThreadCount << endl;
lock1.unlock();
for (i = 0; i < num; i++) {
try {
(*todoList[i])();
}
catch(exception &e) {
++fFunctorErrors;
cerr << e.what() << endl;
}
}
lock1.lock();
issued -= num;
waitingFunctorsSize -= num;
for (i = 0; i < num; i++)
fWaitingFunctors.erase(todoList[i]);
/*
if (waitingFunctorsSize != fWaitingFunctors.size())
cerr << "size mismatch! fake size=" << waitingFunctorsSize <<
" real size=" << fWaitingFunctors.size() << endl;
*/
fThreadAvailable.notify_all(); fThreadAvailable.notify_all();
} }
} }
} }
@@ -249,6 +339,7 @@ void ThreadPool::beginThread() throw()
// Log the exception and exit this thread // Log the exception and exit this thread
try try
{ {
#ifndef NOLOGGING
logging::Message::Args args; logging::Message::Args args;
logging::Message message(5); logging::Message message(5);
args.add("beginThread: Caught exception: "); args.add("beginThread: Caught exception: ");
@@ -260,14 +351,14 @@ void ThreadPool::beginThread() throw()
logging::MessageLog ml(lid); logging::MessageLog ml(lid);
ml.logErrorMessage( message ); ml.logErrorMessage( message );
#endif
} }
catch(...) catch (...)
{ {
} }
} }
catch(...) catch (...)
{ {
++fGeneralErrors; ++fGeneralErrors;
@@ -275,6 +366,7 @@ void ThreadPool::beginThread() throw()
// Log the exception and exit this thread // Log the exception and exit this thread
try try
{ {
#ifndef NOLOGGING
logging::Message::Args args; logging::Message::Args args;
logging::Message message(6); logging::Message message(6);
args.add("beginThread: Caught unknown exception!"); args.add("beginThread: Caught unknown exception!");
@@ -285,29 +377,31 @@ void ThreadPool::beginThread() throw()
logging::MessageLog ml(lid); logging::MessageLog ml(lid);
ml.logErrorMessage( message ); ml.logErrorMessage( message );
#endif
} }
catch(...) catch (...)
{ {
} }
} }
} }
void ThreadPool::addFunctor(const Functor_T &func) uint64_t ThreadPool::addFunctor(const Functor_T &func)
{ {
bool bAtEnd = false; bool bAtEnd = false;
if (fNextFunctor == fWaitingFunctors.end()) if (fNextFunctor == fWaitingFunctors.end())
bAtEnd = true; bAtEnd = true;
fWaitingFunctors.push_back(func); PoolFunction_T poolFunction;
waitingFunctorsSize++; poolFunction.hndl = fNextHandle;
poolFunction.functor = func;
fWaitingFunctors.push_back(poolFunction);
waitingFunctorsSize++;
if (bAtEnd) if (bAtEnd)
{ {
--fNextFunctor; --fNextFunctor;
} }
return fNextHandle++;
} }
void ThreadPool::dump() void ThreadPool::dump()
@@ -317,4 +411,51 @@ void ThreadPool::dump()
std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl; std::cout << "Waiting functors: " << fWaitingFunctors.size() << std::endl;
} }
void ThreadPoolMonitor::operator()()
{
ostringstream filename;
filename << "/var/log/mariadb/columnstore/trace/ThreadPool_" << fPool->name() << ".log";
fLog = new ofstream(filename.str().c_str());
for (;;)
{
if (!fLog || !fLog->is_open())
{
ostringstream oss;
oss << "ThreadPoolMonitor " << fPool->name() << " has no file ";
logging::Message::Args args;
logging::Message message(0);
args.add(oss.str());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logWarningMessage( message );
return;
}
// Get a timestamp for output.
struct tm tm;
struct timeval tv;
gettimeofday(&tv, 0);
localtime_r(&tv.tv_sec, &tm);
(*fLog) << setfill('0')
<< setw(2) << tm.tm_hour << ':'
<< setw(2) << tm.tm_min << ':'
<< setw(2) << tm.tm_sec
<< '.'
<< setw(4) << tv.tv_usec/100
<< " Name " << fPool->fName
<< " Active " << fPool->waitingFunctorsSize
<< " Most " << fPool->fThreadCount
<< " Max " << fPool->fMaxThreads
<< " Q " << fPool->fQueueSize
<< endl;
// struct timespec req = { 0, 1000 * 100 }; //100 usec
// nanosleep(&req, 0);
sleep(2);
}
}
} // namespace threadpool } // namespace threadpool

View File

@@ -31,7 +31,7 @@
#define THREADPOOL_H #define THREADPOOL_H
#include <string> #include <string>
#include <iostream> #include <fstream>
#include <cstdlib> #include <cstdlib>
#include <sstream> #include <sstream>
#include <stdexcept> #include <stdexcept>
@@ -55,7 +55,6 @@ namespace threadpool
* executing tasks. It is responsible for creating threads and tracking which threads are "busy" * executing tasks. It is responsible for creating threads and tracking which threads are "busy"
* and which are idle. Idle threads are utilized as "work" is added to the system. * and which are idle. Idle threads are utilized as "work" is added to the system.
*/ */
class ThreadPool class ThreadPool
{ {
public: public:
@@ -75,7 +74,10 @@ public:
* @param maxThreads the maximum number of threads in this pool. This is the maximum number * @param maxThreads the maximum number of threads in this pool. This is the maximum number
* of simultaneuous operations that can go on. * of simultaneuous operations that can go on.
* @param queueSize the maximum number of work tasks in the queue. This is the maximum * @param queueSize the maximum number of work tasks in the queue. This is the maximum
* number of jobs that can queue up in the work list before invoke() blocks. * number of jobs that can queue up in the work list before invoke() blocks.
* If 0, then threads never block and total threads may
* exceed maxThreads. Nothing waits. Thread count will
* idle down to maxThreads when less work is required.
*/ */
EXPORT explicit ThreadPool( size_t maxThreads, size_t queueSize ); EXPORT explicit ThreadPool( size_t maxThreads, size_t queueSize );
@@ -109,11 +111,6 @@ public:
*/ */
inline size_t getMaxThreads() const { return fMaxThreads; } inline size_t getMaxThreads() const { return fMaxThreads; }
/** @brief register a functor to be called when a new thread
* is created
*/
EXPORT void setThreadCreatedListener(const Functor_T &f) ;
/** @brief queue size accessor /** @brief queue size accessor
* *
*/ */
@@ -132,7 +129,7 @@ public:
* queueSize tasks already waiting, invoke() will block until a slot in the * queueSize tasks already waiting, invoke() will block until a slot in the
* queue comes free. * queue comes free.
*/ */
EXPORT void invoke(const Functor_T &threadfunc); EXPORT uint64_t invoke(const Functor_T &threadfunc);
/** @brief stop the threads /** @brief stop the threads
*/ */
@@ -142,20 +139,45 @@ public:
*/ */
EXPORT void wait(); EXPORT void wait();
/** @brief Wait for a specific thread
*/
EXPORT void join(uint64_t thrHandle);
/** @brief Wait for a specific thread
*/
EXPORT void join(std::vector<uint64_t>& thrHandle);
/** @brief for use in debugging /** @brief for use in debugging
*/ */
EXPORT void dump(); EXPORT void dump();
EXPORT std::string& name() {return fName;}
EXPORT void setName(std::string name) {fName = name;}
EXPORT void setName(const char* name) {fName = name;}
EXPORT bool debug() {return fDebug;}
EXPORT void setDebug(bool d) {fDebug = d;}
friend class ThreadPoolMonitor;
protected: protected:
private: private:
// Used internally to keep a handle associated with each functor for join()
struct PoolFunction_T
{
uint64_t hndl;
Functor_T functor;
};
/** @brief initialize data memebers /** @brief initialize data memebers
*/ */
void init(); void init();
/** @brief add a functor to the list /** @brief add a functor to the list
*/ */
void addFunctor(const Functor_T &func); uint64_t addFunctor(const Functor_T &func);
/** @brief thread entry point /** @brief thread entry point
*/ */
@@ -184,19 +206,18 @@ private:
struct NoOp struct NoOp
{ {
void operator () () const void operator () () const
{}} {}
; };
size_t fThreadCount; size_t fThreadCount;
size_t fMaxThreads; size_t fMaxThreads;
size_t fQueueSize; size_t fQueueSize;
typedef std::list<Functor_T> Container_T; typedef std::list<PoolFunction_T> Container_T;
Container_T fWaitingFunctors; Container_T fWaitingFunctors;
Container_T::iterator fNextFunctor; Container_T::iterator fNextFunctor;
// Functor_T * fThreadCreated;
uint32_t issued; uint32_t fIssued;
boost::mutex fMutex; boost::mutex fMutex;
boost::condition fThreadAvailable; // triggered when a thread is available boost::condition fThreadAvailable; // triggered when a thread is available
boost::condition fNeedThread; // triggered when a thread is needed boost::condition fNeedThread; // triggered when a thread is needed
@@ -206,7 +227,36 @@ private:
long fGeneralErrors; long fGeneralErrors;
long fFunctorErrors; long fFunctorErrors;
uint32_t waitingFunctorsSize; uint32_t waitingFunctorsSize;
uint64_t fNextHandle;
std::string fName; // Optional to add a name to the pool for debugging.
bool fDebug;
};
// This class, if instantiated, will continuously log details about the indicated threadpool
// The log will end up in /var/log/mariadb/columnstore/trace/threadpool_<name>.log
class ThreadPoolMonitor
{
public:
ThreadPoolMonitor(ThreadPool* pool) : fPool(pool), fLog(NULL)
{
}
~ThreadPoolMonitor()
{
if (fLog)
{
delete fLog;
}
}
void operator()();
private:
//defaults okay
//ThreadPoolMonitor(const ThreadPoolMonitor& rhs);
//ThreadPoolMonitor& operator=(const ThreadPoolMonitor& rhs);
ThreadPool* fPool;
std::ofstream* fLog;
}; };
} // namespace threadpool } // namespace threadpool

View File

@@ -1,222 +1,224 @@
<!DOCTYPE Project SYSTEM "http://www.slickedit.com/dtd/vse/10.0/vpj.dtd"> <!DOCTYPE Project SYSTEM "http://www.slickedit.com/dtd/vse/10.0/vpj.dtd">
<Project <Project
Version="10.0" Version="10.0"
VendorName="SlickEdit" VendorName="SlickEdit"
TemplateName="GNU C/C++" TemplateName="GNU C/C++"
WorkingDir="."> WorkingDir=".">
<Config <Config
Name="Debug" Name="Debug"
Type="gnuc" Type="gnuc"
DebugCallbackName="gdb" DebugCallbackName="gdb"
Version="1" Version="1"
OutputFile="%bdthreadpool.so" OutputFile="%bdthreadpool.so"
CompilerConfigName="Latest Version"> CompilerConfigName="Latest Version">
<Menu> <Menu>
<Target <Target
Name="Compile" Name="Compile"
MenuCaption="&amp;Compile" MenuCaption="&amp;Compile"
Dialog="_gnuc_options_form Compile" Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
OutputExts="*.o" OutputExts="*.o"
SaveOption="SaveCurrent" SaveOption="SaveCurrent"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine='g++ -c %xup %defd -g -o "%bd%n%oe" %i "%f"'/> <Exec CmdLine='g++ -c %xup %defd -g -o "%bd%n%oe" %i "%f"'/>
</Target> </Target>
<Target <Target
Name="Link" Name="Link"
MenuCaption="&amp;Link" MenuCaption="&amp;Link"
ShowOnMenu="Never" ShowOnMenu="Never"
Dialog="_gnuc_options_form Link" Dialog="_gnuc_options_form Link"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveCurrent" SaveOption="SaveCurrent"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine='g++ %xup -g -o "%o" %f %libs -shared -fPIC'/> <Exec CmdLine='g++ %xup -g -o "%o" %f %libs -shared -fPIC'/>
</Target> </Target>
<Target <Target
Name="Build" Name="Build"
MenuCaption="&amp;Build" MenuCaption="&amp;Build"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveWorkspaceFiles" SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine="make"/> <Exec CmdLine="make"/>
</Target> </Target>
<Target <Target
Name="Rebuild" Name="Rebuild"
MenuCaption="&amp;Rebuild" MenuCaption="&amp;Rebuild"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveWorkspaceFiles" SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine=""/> <Exec CmdLine=""/>
</Target> </Target>
<Target <Target
Name="Debug" Name="Debug"
MenuCaption="&amp;Debug" MenuCaption="&amp;Debug"
Dialog="_gnuc_options_form Run/Debug" Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1" BuildFirst="1"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveNone" SaveOption="SaveNone"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine=""/> <Exec CmdLine=""/>
</Target> </Target>
<Target <Target
Name="Execute" Name="Execute"
MenuCaption="E&amp;xecute" MenuCaption="E&amp;xecute"
Dialog="_gnuc_options_form Run/Debug" Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1" BuildFirst="1"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveWorkspaceFiles" SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine=""/> <Exec CmdLine=""/>
</Target> </Target>
<Target <Target
Name="dash" Name="dash"
MenuCaption="-" MenuCaption="-"
Deletable="0"> Deletable="0">
<Exec/> <Exec/>
</Target> </Target>
<Target <Target
Name="GNU C Options" Name="GNU C Options"
MenuCaption="GNU C &amp;Options..." MenuCaption="GNU C &amp;Options..."
ShowOnMenu="HideIfNoCmdLine" ShowOnMenu="HideIfNoCmdLine"
Deletable="0" Deletable="0"
SaveOption="SaveNone"> SaveOption="SaveNone">
<Exec <Exec
CmdLine="gnucoptions" CmdLine="gnucoptions"
Type="Slick-C"/> Type="Slick-C"/>
</Target> </Target>
</Menu> </Menu>
<List Name="GNUC Options"> <List Name="GNUC Options">
<Item <Item
Name="LinkerOutputType" Name="LinkerOutputType"
Value="SharedLibrary"/> Value="SharedLibrary"/>
</List> </List>
</Config> </Config>
<Config <Config
Name="Release" Name="Release"
Type="gnuc" Type="gnuc"
DebugCallbackName="gdb" DebugCallbackName="gdb"
Version="1" Version="1"
OutputFile="%bdthreadpool.so" OutputFile="%bdthreadpool.so"
CompilerConfigName="Latest Version"> CompilerConfigName="Latest Version">
<Menu> <Menu>
<Target <Target
Name="Compile" Name="Compile"
MenuCaption="&amp;Compile" MenuCaption="&amp;Compile"
Dialog="_gnuc_options_form Compile" Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
OutputExts="*.o" OutputExts="*.o"
SaveOption="SaveCurrent" SaveOption="SaveCurrent"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine='g++ -c %xup %defd -o "%bd%n%oe" %i "%f"'/> <Exec CmdLine='g++ -c %xup %defd -o "%bd%n%oe" %i "%f"'/>
</Target> </Target>
<Target <Target
Name="Link" Name="Link"
MenuCaption="&amp;Link" MenuCaption="&amp;Link"
ShowOnMenu="Never" ShowOnMenu="Never"
Dialog="_gnuc_options_form Link" Dialog="_gnuc_options_form Link"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveCurrent" SaveOption="SaveCurrent"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine='g++ %xup -o "%o" %f %libs -shared -fPIC'/> <Exec CmdLine='g++ %xup -o "%o" %f %libs -shared -fPIC'/>
</Target> </Target>
<Target <Target
Name="Build" Name="Build"
MenuCaption="&amp;Build" MenuCaption="&amp;Build"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveWorkspaceFiles" SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine="make"/> <Exec CmdLine="make"/>
</Target> </Target>
<Target <Target
Name="Rebuild" Name="Rebuild"
MenuCaption="&amp;Rebuild" MenuCaption="&amp;Rebuild"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveWorkspaceFiles" SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine=""/> <Exec CmdLine=""/>
</Target> </Target>
<Target <Target
Name="Debug" Name="Debug"
MenuCaption="&amp;Debug" MenuCaption="&amp;Debug"
Dialog="_gnuc_options_form Run/Debug" Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1" BuildFirst="1"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveNone" SaveOption="SaveNone"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine=""/> <Exec CmdLine=""/>
</Target> </Target>
<Target <Target
Name="Execute" Name="Execute"
MenuCaption="E&amp;xecute" MenuCaption="E&amp;xecute"
Dialog="_gnuc_options_form Run/Debug" Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1" BuildFirst="1"
CaptureOutputWith="ProcessBuffer" CaptureOutputWith="ProcessBuffer"
Deletable="0" Deletable="0"
SaveOption="SaveWorkspaceFiles" SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"> RunFromDir="%rw">
<Exec CmdLine=""/> <Exec CmdLine=""/>
</Target> </Target>
<Target <Target
Name="dash" Name="dash"
MenuCaption="-" MenuCaption="-"
Deletable="0"> Deletable="0">
<Exec/> <Exec/>
</Target> </Target>
<Target <Target
Name="GNU C Options" Name="GNU C Options"
MenuCaption="GNU C &amp;Options..." MenuCaption="GNU C &amp;Options..."
ShowOnMenu="HideIfNoCmdLine" ShowOnMenu="HideIfNoCmdLine"
Deletable="0" Deletable="0"
SaveOption="SaveNone"> SaveOption="SaveNone">
<Exec <Exec
CmdLine="gnucoptions" CmdLine="gnucoptions"
Type="Slick-C"/> Type="Slick-C"/>
</Target> </Target>
</Menu> </Menu>
<List Name="GNUC Options"> <List Name="GNUC Options">
<Item <Item
Name="LinkerOutputType" Name="LinkerOutputType"
Value="SharedLibrary"/> Value="SharedLibrary"/>
</List> </List>
</Config> </Config>
<Files> <Files>
<Folder <Folder
Name="Source Files" Name="Source Files"
Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d"> Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d">
<F N="tdriver.cpp"/> <F N="prioritythreadpool.cpp"/>
<F N="threadpool.cpp"/> <F N="tdriver.cpp"/>
<F N="weightedthreadpool.cpp"/> <F N="threadpool.cpp"/>
<F N="wtp.cpp"/> <F N="weightedthreadpool.cpp"/>
</Folder> <F N="wtp.cpp"/>
<Folder </Folder>
Name="Header Files" <Folder
Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if"> Name="Header Files"
<F N="threadpool.h"/> Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if">
<F N="weightedthreadpool.h"/> <F N="prioritythreadpool.h"/>
</Folder> <F N="threadpool.h"/>
<Folder <F N="weightedthreadpool.h"/>
Name="Resource Files" </Folder>
Filters="*.ico;*.cur;*.dlg"/> <Folder
<Folder Name="Resource Files"
Name="Bitmaps" Filters="*.ico;*.cur;*.dlg"/>
Filters="*.bmp"/> <Folder
<Folder Name="Bitmaps"
Name="Other Files" Filters="*.bmp"/>
Filters=""> <Folder
<F Name="Other Files"
N="Makefile" Filters="">
Type="Makefile"/> <F
</Folder> N="Makefile"
</Files> Type="Makefile"/>
</Folder>
</Files>
</Project> </Project>

130
utils/threadpool/tp.cpp Normal file
View File

@@ -0,0 +1,130 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <string>
#include <stdexcept>
#include <iostream>
#include <fstream>
using namespace std;
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/scoped_array.hpp>
#include <boost/shared_ptr.hpp>
#include "threadpool.h"
int64_t thecount = 0;
boost::mutex mutex;
const string timeNow()
{
time_t outputTime = time(0);
struct tm ltm;
char buf[32]; //ctime(3) says at least 26
size_t len = 0;
#ifdef _MSC_VER
asctime_s(buf, 32, localtime_r(&outputTime, &ltm));
#else
asctime_r(localtime_r(&outputTime, &ltm), buf);
#endif
len = strlen(buf);
if (len > 0) --len;
if (buf[len] == '\n') buf[len] = 0;
return buf;
}
// Functor class
struct foo
{
int64_t fData;
int64_t fThd;
string start;
bool running;
void operator ()()
{
start = timeNow();
std::cout << "foo thd = " << fThd << " start " << start << std::endl;
for (int64_t i = 0; i < 1024*1024*(fThd+0)*128; i++)
// simulate some work
fData++;
boost::mutex::scoped_lock lock(mutex);
std::cout << "foo thd = " << fThd << " start " << start << " fin " << timeNow() << std::endl;
}
foo(int64_t i) : fThd(i), fData(i), running(true) {start=timeNow();}
foo(const foo& copy) : fData(copy.fData), fThd(copy.fThd), start(copy.start), running(copy.running) {std::cout << "new foo " << fThd << endl;}
~foo() {running=false;}
};
int main( int argc, char **argv)
{
threadpool::ThreadPool pool( 20, 10 );
std::vector<uint64_t> hndl;
hndl.reserve(10);
int t1 = hndl.capacity();
uint64_t testHndl;
uint64_t thdhndl=999;
int64_t thd = 1;
boost::function0<void> foofunc;
boost::function0<void> foofunc2;
for (int64_t y = 0; y < 1; y++)
{
foo bar(y);
// foofunc = bar;
// foofunc2 = foofunc;
std::cout << "Done with assign" << std::endl;
for (int64_t i = 0; i < 1; ++i)
{
bar.fThd=thd++;
thdhndl = pool.invoke(bar);
if (y<10)
{
hndl.push_back(thdhndl);
}
if (y == 0)
{
testHndl = thdhndl;
}
}
boost::mutex::scoped_lock lock(mutex);
}
// Wait until all of the queued up and in-progress work has finished
std::cout << "Threads for join " << hndl.size() << std::endl;
pool.dump();
std::cout << "*** JOIN 1 ***" << std::endl;
pool.join(testHndl);
pool.dump();
std::cout << "*** JOIN 10 ***" << std::endl;
pool.join(hndl);
pool.dump();
std::cout << "*** WAIT ***" << std::endl;
pool.wait();
pool.dump();
sleep(2);
return 0;
}

240
utils/threadpool/tp.vpj Normal file
View File

@@ -0,0 +1,240 @@
<!DOCTYPE Project SYSTEM "http://www.slickedit.com/dtd/vse/10.0/vpj.dtd">
<Project
Version="10.0"
VendorName="SlickEdit"
TemplateName="GNU C/C++"
WorkingDir="."
BuildSystem="vsbuild">
<Config
Name="Debug"
Type="gnuc"
DebugCallbackName="gdb"
Version="1"
OutputFile="%bdtp"
CompilerConfigName="Latest Version"
Defines='"/DNOLOGGING"'>
<Menu>
<Target
Name="Compile"
MenuCaption="&amp;Compile"
Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
OutputExts="*.o"
SaveOption="SaveCurrent"
RunFromDir="%rw">
<Exec CmdLine='g++ -c %xup %defd -g -o "%bd%n%oe" %i "%f"'/>
</Target>
<Target
Name="Link"
MenuCaption="&amp;Link"
ShowOnMenu="Never"
Dialog="_gnuc_options_form Link"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveCurrent"
RunFromDir="%rw">
<Exec CmdLine='g++ %xup -g -o "%o" %f %libs'/>
</Target>
<Target
Name="Build"
MenuCaption="&amp;Build"
Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"
ClearProcessBuffer="1">
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t build'/>
</Target>
<Target
Name="Rebuild"
MenuCaption="&amp;Rebuild"
Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"
ClearProcessBuffer="1">
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t rebuild'/>
</Target>
<Target
Name="Debug"
MenuCaption="&amp;Debug"
Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveNone"
RunFromDir="%rw"
ClearProcessBuffer="1">
<Exec CmdLine='vsdebugio -prog "%o"'/>
</Target>
<Target
Name="Execute"
MenuCaption="E&amp;xecute"
Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw">
<Exec CmdLine='"%o"'/>
</Target>
<Target
Name="dash"
MenuCaption="-"
Deletable="0">
<Exec/>
</Target>
<Target
Name="GNU C Options"
MenuCaption="GNU C &amp;Options..."
ShowOnMenu="HideIfNoCmdLine"
Deletable="0"
SaveOption="SaveNone">
<Exec
CmdLine="gnucoptions"
Type="Slick-C"/>
</Target>
</Menu>
<List Name="GNUC Options">
<Item
Name="LinkerOutputType"
Value="Executable"/>
</List>
<Includes>
<Include Dir="/usr/local/include"/>
</Includes>
<Libs PreObjects="0">
<Lib File="/usr/lib/libboost_thread.so"/>
</Libs>
</Config>
<Config
Name="Release"
Type="gnuc"
DebugCallbackName="gdb"
Version="1"
OutputFile="%bdtp"
CompilerConfigName="Latest Version"
Defines='"/DNOLOGGING"'>
<Menu>
<Target
Name="Compile"
MenuCaption="&amp;Compile"
Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
OutputExts="*.o"
SaveOption="SaveCurrent"
RunFromDir="%rw">
<Exec CmdLine='g++ -c %xup %defd -o "%bd%n%oe" %i "%f"'/>
</Target>
<Target
Name="Link"
MenuCaption="&amp;Link"
ShowOnMenu="Never"
Dialog="_gnuc_options_form Link"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveCurrent"
RunFromDir="%rw">
<Exec CmdLine='g++ %xup -o "%o" %f %libs'/>
</Target>
<Target
Name="Build"
MenuCaption="&amp;Build"
Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"
ClearProcessBuffer="1">
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t build'/>
</Target>
<Target
Name="Rebuild"
MenuCaption="&amp;Rebuild"
Dialog="_gnuc_options_form Compile"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw"
ClearProcessBuffer="1">
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t rebuild'/>
</Target>
<Target
Name="Debug"
MenuCaption="&amp;Debug"
Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveNone"
RunFromDir="%rw"
ClearProcessBuffer="1">
<Exec CmdLine='vsdebugio -prog "%o"'/>
</Target>
<Target
Name="Execute"
MenuCaption="E&amp;xecute"
Dialog="_gnuc_options_form Run/Debug"
BuildFirst="1"
CaptureOutputWith="ProcessBuffer"
Deletable="0"
SaveOption="SaveWorkspaceFiles"
RunFromDir="%rw">
<Exec CmdLine='"%o"'/>
</Target>
<Target
Name="dash"
MenuCaption="-"
Deletable="0">
<Exec/>
</Target>
<Target
Name="GNU C Options"
MenuCaption="GNU C &amp;Options..."
ShowOnMenu="HideIfNoCmdLine"
Deletable="0"
SaveOption="SaveNone">
<Exec
CmdLine="gnucoptions"
Type="Slick-C"/>
</Target>
</Menu>
<List Name="GNUC Options">
<Item
Name="LinkerOutputType"
Value="Executable"/>
</List>
<Includes>
<Include Dir="/usr/local/include"/>
</Includes>
<Libs PreObjects="0">
<Lib File="/usr/lib/libboost_thread.so"/>
</Libs>
</Config>
<Files>
<Folder
Name="Source Files"
Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d">
<F N="threadpool.cpp"/>
<F N="tp.cpp"/>
</Folder>
<Folder
Name="Header Files"
Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if">
<F N="threadpool.h"/>
</Folder>
<Folder
Name="Resource Files"
Filters="*.ico;*.cur;*.dlg"/>
<Folder
Name="Bitmaps"
Filters="*.bmp"/>
<Folder
Name="Other Files"
Filters=""/>
</Files>
</Project>

6
utils/threadpool/tp.vpw Normal file
View File

@@ -0,0 +1,6 @@
<!DOCTYPE Workspace SYSTEM "http://www.slickedit.com/dtd/vse/10.0/vpw.dtd">
<Workspace Version="10.0" VendorName="SlickEdit">
<Projects>
<Project File="tp.vpj"/>
</Projects>
</Workspace>