1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

[MCOL-4709] Disk-based aggregation

* Introduce multigeneration aggregation

* Do not save unused part of RGDatas to disk
* Add IO error explanation (strerror)

* Reduce memory usage while aggregating
* introduce in-memory generations to better memory utilization

* Try to limit the qty of buckets at a low limit

* Refactor disk aggregation a bit
* pass calculated hash into RowAggregation
* try to keep some RGData with free space in memory

* do not dump more than half of rowgroups to disk if generations are
  allowed, instead start a new generation
* for each thread shift the first processed bucket at each iteration,
  so the generations start more evenly

* Unify temp data location

* Explicitly create temp subdirectories
  whether disk aggregation/join are enabled or not
This commit is contained in:
Alexey Antipovsky
2021-01-15 18:52:13 +03:00
parent 3537c0d635
commit 475104e4d3
24 changed files with 5932 additions and 906 deletions

View File

@ -55,6 +55,7 @@ const string ResourceManager::fExtentMapStr("ExtentMap");
//const string ResourceManager::fDMLProcStr("DMLProc");
//const string ResourceManager::fBatchInsertStr("BatchInsert");
const string ResourceManager::fOrderByLimitStr("OrderByLimit");
const string ResourceManager::fRowAggregationStr("RowAggregation");
ResourceManager* ResourceManager::fInstance = NULL;
boost::mutex mx;
@ -254,6 +255,10 @@ ResourceManager::ResourceManager(bool runningInExeMgr) :
fUseHdfs = true;
else
fUseHdfs = false;
fAllowedDiskAggregation = getBoolVal(fRowAggregationStr,
"AllowDiskBasedAggregation",
defaultAllowDiskAggregation);
}
int ResourceManager::getEmPriority() const

View File

@ -126,6 +126,8 @@ const uint64_t defaultOrderByLimitMaxMemory = 1 * 1024 * 1024 * 1024ULL;
const uint64_t defaultDECThrottleThreshold = 200000000; // ~200 MB
const uint8_t defaultUseCpimport = 1;
const bool defaultAllowDiskAggregation = false;
/** @brief ResourceManager
* Returns requested values from Config
*
@ -149,7 +151,7 @@ public:
/** @brief dtor
*/
virtual ~ResourceManager() { }
virtual ~ResourceManager() {}
typedef std::map <uint32_t, uint64_t> MemMap;
@ -177,6 +179,11 @@ public:
return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize);
}
bool getAllowDiskAggregation() const
{
return fAllowedDiskAggregation;
}
int getHjMaxBuckets() const
{
return getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets);
@ -391,7 +398,7 @@ public:
atomicops::atomicAdd(&totalUmMemLimit, amount);
atomicops::atomicAdd(sessionLimit.get(), amount);
}
inline int64_t availableMemory()
inline int64_t availableMemory() const
{
return totalUmMemLimit;
}
@ -559,6 +566,8 @@ private:
template<typename IntType>
IntType getIntVal(const std::string& section, const std::string& name, IntType defval) const;
bool getBoolVal(const std::string& section, const std::string& name, bool defval) const;
void logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid, uint64_t value = 0, uint32_t sessionId = 0);
/*static const*/ std::string fExeMgrStr;
@ -573,6 +582,7 @@ private:
/*static const*/ std::string fDMLProcStr;
/*static const*/ std::string fBatchInsertStr;
static const std::string fOrderByLimitStr;
static const std::string fRowAggregationStr;
config::Config* fConfig;
static ResourceManager* fInstance;
uint32_t fTraceFlags;
@ -604,6 +614,7 @@ private:
bool isExeMgr;
bool fUseHdfs;
bool fAllowedDiskAggregation{false};
};
@ -644,7 +655,11 @@ inline IntType ResourceManager::getIntVal(const std::string& section, const std:
return ( 0 == retStr.length() ? defval : fConfig->fromText(retStr) );
}
inline bool ResourceManager::getBoolVal(const std::string& section, const std::string& name, bool defval) const
{
auto retStr = fConfig->getConfig(section, name);
return ( 0 == retStr.length() ? defval : (retStr == "y" || retStr == "Y") );
}
}

View File

@ -129,7 +129,7 @@ struct cmpTuple
}
};
typedef vector<Row::Pointer> RowBucket;
typedef vector<std::pair<Row::Pointer, uint64_t>> RowBucket;
typedef vector<RowBucket> RowBucketVec;
// The AGG_MAP type is used to maintain a list of aggregate functions in order to
@ -402,6 +402,17 @@ TupleAggregateStep::TupleAggregateStep(
fNumOfThreads = fRm->aggNumThreads();
fNumOfBuckets = fRm->aggNumBuckets();
fNumOfRowGroups = fRm->aggNumRowGroups();
auto memLimit = std::min(fRm->availableMemory(), *fSessionMemLimit);
fNumOfBuckets = calcNumberOfBuckets(memLimit,
fNumOfThreads,
fNumOfBuckets,
fNumOfRowGroups,
fRowGroupIn.getRowSize(),
fRowGroupOut.getRowSize(),
fRm->getAllowDiskAggregation());
fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);
fMemUsage.reset(new uint64_t[fNumOfThreads]);
memset(fMemUsage.get(), 0, fNumOfThreads * sizeof(uint64_t));
@ -440,7 +451,7 @@ void TupleAggregateStep::initializeMultiThread()
for (i = 0; i < fNumOfBuckets; i++)
{
boost::mutex* lock = new boost::mutex();
boost::mutex* lock = new boost::mutex();
fAgg_mutex.push_back(lock);
fRowGroupOuts[i] = fRowGroupOut;
rgData.reinit(fRowGroupOut);
@ -481,9 +492,10 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
RowAggregationDistinct* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[threadID].get());
RowAggregationMultiDistinct* multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[threadID].get());
Row rowIn;
RowGroup* rowGroupIn = 0;
RowGroup* rowGroupIn = nullptr;
rowGroupIn = (aggDist->aggregator()->getOutputRowGroup());
uint32_t bucketID;
std::vector<std::unique_ptr<RGData>> rgDataVec;
if (multiDist)
{
@ -503,10 +515,12 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rowGroupIn->initRow(&rowIn);
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
while (dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get())->nextRowGroup())
while (subDistAgg->nextRowGroup())
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
rowGroupIn->getRow(0, &rowIn);
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
@ -514,8 +528,9 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
// The key is the groupby columns, which are the leading columns.
//uint8_t* hashMapKey = rowIn.getData() + 2;
//bucketID = hash.operator()(hashMapKey) & fBucketMask;
bucketID = rowIn.hash(hashlen - 1) % fNumOfBuckets;
rowBucketVecs[bucketID][j].push_back(rowIn.getPointer());
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
@ -524,10 +539,12 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
else
{
rowGroupIn->initRow(&rowIn);
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
while (dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get())->nextRowGroup())
while (subAgg->nextRowGroup())
{
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
rowGroupIn->getRow(0, &rowIn);
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
@ -535,8 +552,9 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
// The key is the groupby columns, which are the leading columns.
//uint8_t* hashMapKey = rowIn.getData() + 2;
//bucketID = hash.operator()(hashMapKey) & fBucketMask;
bucketID = rowIn.hash(hashlen - 1) % fNumOfBuckets;
rowBucketVecs[bucketID][0].push_back(rowIn.getPointer());
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
@ -971,7 +989,7 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
if (doUMOnly)
rgs.push_back(rgs[0]);
}
if (!doUMOnly)
{
if (distinctAgg == true)
@ -1013,7 +1031,7 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
// Setup the input JobstepAssoctiation -- the mechanism
// whereby the previous step feeds data to this step.
// Otherwise, we need to create one and hook to the
// Otherwise, we need to create one and hook to the
// previous step as well as this aggregate step.
spjs->stepId(step->stepId() + 1);
@ -1299,7 +1317,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
@ -1333,7 +1351,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(width[colProj]);
widthAgg.push_back(width[colProj]);
}
break;
@ -1836,7 +1854,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
@ -2140,7 +2158,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
groupByNoDist.push_back(groupby);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), i));
}
// locate the return column position in aggregated rowgroup
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
@ -2198,7 +2216,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
@ -3120,7 +3138,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
@ -3430,7 +3448,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
@ -3732,7 +3750,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
for (uint64_t i = 0; i < oidsAggUm.size(); i++)
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm,
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm,
csNumAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
rowAggUm->timeZone(jobInfo.timeZone);
@ -3744,7 +3762,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
for (uint64_t i = 0; i < oidsAggPm.size(); i++)
posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm,
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm,
csNumAggPm, scaleAggPm, precisionAggPm, jobInfo.stringTableThreshold);
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm));
rowAggPm->timeZone(jobInfo.timeZone);
@ -4005,7 +4023,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
@ -4401,7 +4419,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
@ -4808,7 +4826,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
for (uint64_t i = 0; i < oidsAggUm.size(); i++)
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm,
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm,
csNumAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit));
rowAggUm->timeZone(jobInfo.timeZone);
@ -4818,8 +4836,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
for (uint64_t i = 0; i < oidsAggDist.size(); i++)
posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist,
typeAggDist, csNumAggDist, scaleAggDist,
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist,
typeAggDist, csNumAggDist, scaleAggDist,
precisionAggDist, jobInfo.stringTableThreshold);
SP_ROWAGG_DIST rowAggDist(new RowAggregationDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
rowAggDist->timeZone(jobInfo.timeZone);
@ -5058,7 +5076,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
for (uint64_t i = 0; i < oidsAggPm.size(); i++)
posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm,
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm,
csNumAggPm, scaleAggPm, precisionAggPm, jobInfo.stringTableThreshold);
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm));
rowAggPm->timeZone(jobInfo.timeZone);
@ -5100,7 +5118,7 @@ void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInf
uint64_t eid = -1;
if (((ac = dynamic_cast<ArithmeticColumn*>(it->get())) != NULL) &&
(ac->aggColumnList().size() > 0) &&
(ac->aggColumnList().size() > 0) &&
(ac->windowfunctionColumnList().size() == 0))
{
const vector<SimpleColumn*>& scols = ac->simpleColumnList();
@ -5256,6 +5274,26 @@ void TupleAggregateStep::aggregateRowGroups()
}
}
void TupleAggregateStep::threadedAggregateFinalize(uint32_t threadID)
{
for (uint32_t i = 0; i < fNumOfBuckets; ++i)
{
if (fAgg_mutex[i]->try_lock())
{
try
{
if (fAggregators[i])
fAggregators[i]->finalAggregation();
}
catch (...)
{
fAgg_mutex[i]->unlock();
throw;
}
fAgg_mutex[i]->unlock();
}
}
}
void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
{
@ -5268,9 +5306,10 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
vector<uint32_t> hashLens;
bool locked = false;
bool more = true;
RowGroupDL* dlIn = NULL;
RowGroupDL* dlIn = nullptr;
uint32_t rgVecShift = float(fNumOfBuckets) / fNumOfThreads * threadID;
RowAggregationMultiDistinct* multiDist = NULL;
RowAggregationMultiDistinct* multiDist = nullptr;
if (!fDoneAggregate)
{
@ -5279,7 +5318,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (dlIn == NULL)
if (dlIn == nullptr)
throw logic_error("Input is not RowGroup data list in delivery step.");
vector<RGData> rgDatas;
@ -5358,29 +5397,35 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
if (more)
{
fRowGroupIns[threadID].setData(&rgData);
fMemUsage[threadID] += fRowGroupIns[threadID].getSizeWithStrings();
fRowGroupIns[threadID].setData(&rgData);
fMemUsage[threadID] +=
fRowGroupIns[threadID].getSizeWithStrings();
if (!fRm->getMemory(fRowGroupIns[threadID].getSizeWithStrings(), fSessionMemLimit))
{
rgDatas.clear(); // to short-cut the rest of processing
abort();
more = false;
fEndOfResult = true;
bool diskAggAllowed = fRm->getAllowDiskAggregation();
if (!fRm->getMemory(
fRowGroupIns[threadID].getSizeWithStrings(),
fSessionMemLimit, !diskAggAllowed))
{
if (!diskAggAllowed)
{
rgDatas.clear(); // to short-cut the rest of processing
more = false;
fEndOfResult = true;
if (status() == 0)
{
errorMessage(IDBErrorInfo::instance()->errorMsg(
ERR_AGGREGATION_TOO_BIG));
status(ERR_AGGREGATION_TOO_BIG);
}
break;
}
else
{
rgDatas.push_back(rgData);
}
if (status() == 0)
{
errorMessage(IDBErrorInfo::instance()->errorMsg(
ERR_AGGREGATION_TOO_BIG));
status(ERR_AGGREGATION_TOO_BIG);
}
}
else
{
rgDatas.push_back(rgData);
}
break;
}
rgDatas.push_back(rgData);
}
else
{
@ -5429,8 +5474,9 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
// TBD This approach could potentiall
// put all values in on bucket.
bucketID = distRow[j].hash(hashLens[j] - 1) % fNumOfBuckets;
rowBucketVecs[bucketID][j].push_back(rowIn.getPointer());
uint64_t hash = rowgroup::hashRow(distRow[j], hashLens[j] - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
@ -5447,10 +5493,11 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
{
// The key is the groupby columns, which are the leading columns.
// TBD This approach could potentiall
// TBD This approach could potential
// put all values in on bucket.
int bucketID = rowIn.hash(hashLens[0] - 1) % fNumOfBuckets;
rowBucketVecs[bucketID][0].push_back(rowIn.getPointer());
uint64_t hash = rowgroup::hashRow(rowIn, hashLens[0] - 1);
int bucketID = hash% fNumOfBuckets;
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
@ -5465,8 +5512,11 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
bool didWork = false;
done = true;
for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++)
// each thread starts from its own bucket for better distribution
uint32_t shift = (rgVecShift++) % fNumOfBuckets;
for (uint32_t ci = 0; ci < fNumOfBuckets && !cancelled(); ci++)
{
uint32_t c = (ci + shift) % fNumOfBuckets;
if (!fEndOfResult && !bucketDone[c] && fAgg_mutex[c]->try_lock())
{
try
@ -5484,9 +5534,9 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
throw;
}
fAgg_mutex[c]->unlock();
rowBucketVecs[c][0].clear();
bucketDone[c] = true;
fAgg_mutex[c]->unlock();
}
else if (!bucketDone[c])
{
@ -5519,7 +5569,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
handleException(std::current_exception(),
logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG,
"TupleAggregateStep::threadedAggregateRowGroups()");
"TupleAggregateStep::threadedAggregateRowGroups()[" + std::to_string(threadID) + "]");
fEndOfResult = true;
fDoneAggregate = true;
}
@ -5527,7 +5577,8 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
if (!locked) fMutex.lock();
while (more) more = dlIn->next(fInputIter, &rgData);
while (more)
more = dlIn->next(fInputIter, &rgData);
fMutex.unlock();
locked = false;
@ -5639,6 +5690,20 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
jobstepThreadPool.join(runners);
}
if (!cancelled())
{
vector<uint64_t> runners;
// use half of the threads because finalizing requires twice as
// much memory on average
uint32_t threads = std::max(1U, fNumOfThreads / 2);
runners.reserve(threads);
for (i = 0; i < threads; ++i)
{
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, i)));
}
jobstepThreadPool.join(runners);
}
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)
{
// 2nd phase multi-threaded aggregate
@ -5700,7 +5765,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
}
else
{
RowAggregationDistinct* agg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
auto* agg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
if (!fEndOfResult)
{
@ -5713,27 +5778,26 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
// do the final aggregtion and deliver the results
// at least one RowGroup for aggregate results
// for "distinct without group by" case
if (agg != NULL)
if (agg != nullptr)
{
RowAggregationMultiDistinct* aggMultiDist =
auto* aggMultiDist =
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[i].get());
RowAggregationDistinct* aggDist =
auto* aggDist =
dynamic_cast<RowAggregationDistinct*>(fAggregators[i].get());
agg->aggregator(aggDist->aggregator());
if (aggMultiDist)
{
(dynamic_cast<RowAggregationMultiDistinct*>(agg))
->subAggregators(aggMultiDist->subAggregators());
->subAggregators(aggMultiDist->subAggregators());
}
agg->doDistinctAggregation();
}
// for "group by without distinct" case
else
{
fAggregator->resultDataVec().insert(
fAggregator->resultDataVec().end(),
fAggregators[i]->resultDataVec().begin(),
fAggregators[i]->resultDataVec().end());
fAggregator->append(fAggregators[i].get());
}
}
}

View File

@ -105,6 +105,7 @@ private:
uint64_t doThreadedAggregate(messageqcpp::ByteStream& bs, RowGroupDL* dlp);
void aggregateRowGroups();
void threadedAggregateRowGroups(uint32_t threadID);
void threadedAggregateFinalize(uint32_t threadID);
void doThreadedSecondPhaseAggregate(uint32_t threadID);
bool nextDeliveredRowGroup();
void pruneAuxColumns();
@ -156,7 +157,9 @@ private:
{}
void operator()()
{
utils::setThreadName("TASThrAggr");
std::string t{"TASThrAggr"};
t.append(std::to_string(fThreadID));
utils::setThreadName(t.c_str());
fStep->threadedAggregateRowGroups(fThreadID);
}
@ -164,6 +167,26 @@ private:
uint32_t fThreadID;
};
class ThreadedAggregateFinalizer
{
public:
ThreadedAggregateFinalizer(TupleAggregateStep* step, uint32_t threadID) :
fStep(step),
fThreadID(threadID)
{}
void operator()()
{
std::string t{"TASThrFin"};
t.append(std::to_string(fThreadID));
utils::setThreadName(t.c_str());
fStep->threadedAggregateFinalize(fThreadID);
}
TupleAggregateStep* fStep;
uint32_t fThreadID;
};
class ThreadedSecondPhaseAggregator
{
public: