You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
fix(aggregation, disk-based) MCOL-5691 distinct aggregate disk based (#3145)
* fix(aggregation, disk-based): MCOL-5689 this fixes disk-based distinct aggregation functions Previously disk-based distinct aggregation functions produced incorrect results b/c there was no finalization applied for previous generations stored on disk. * fix(aggregation, disk-based): Fix disk-based COUNT(DISTINCT ...) queries. (Case 2). (Distinct & Multi-Distinct, Single- & Multi-Threaded). * fix(aggregation, disk-based): Fix disk-based DISTINCT & GROUP BY queries. (Case 1). (Distinct & Multi-Distinct, Single- & Multi-Threaded). --------- Co-authored-by: Theresa Hradilak <theresa.hradilak@gmail.com> Co-authored-by: Roman Nozdrin <rnozdrin@mariadb.com>
This commit is contained in:
@ -289,6 +289,7 @@ TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup
|
||||
fNumOfBuckets =
|
||||
calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(),
|
||||
fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation());
|
||||
|
||||
fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);
|
||||
|
||||
fMemUsage.reset(new uint64_t[fNumOfThreads]);
|
||||
@ -392,7 +393,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
|
||||
rowGroupIn->initRow(&rowIn);
|
||||
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
|
||||
|
||||
while (subDistAgg->nextRowGroup())
|
||||
while (subDistAgg->nextOutputRowGroup())
|
||||
{
|
||||
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
|
||||
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
|
||||
@ -416,7 +417,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
|
||||
rowGroupIn->initRow(&rowIn);
|
||||
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
|
||||
|
||||
while (subAgg->nextRowGroup())
|
||||
while (subAgg->nextOutputRowGroup())
|
||||
{
|
||||
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
|
||||
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
|
||||
@ -571,7 +572,7 @@ bool TupleAggregateStep::nextDeliveredRowGroup()
|
||||
{
|
||||
for (; fBucketNum < fNumOfBuckets; fBucketNum++)
|
||||
{
|
||||
while (fAggregators[fBucketNum]->nextRowGroup())
|
||||
while (fAggregators[fBucketNum]->nextOutputRowGroup())
|
||||
{
|
||||
fAggregators[fBucketNum]->finalize();
|
||||
fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
|
||||
@ -5708,14 +5709,27 @@ void TupleAggregateStep::doAggregate()
|
||||
return;
|
||||
}
|
||||
|
||||
/** @brief Aggregate input row groups in two-phase multi-threaded aggregation.
|
||||
* In second phase handle three different aggregation cases differently:
|
||||
* 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one
|
||||
* GROUP BY column
|
||||
* 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
|
||||
* 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
|
||||
* DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp.
|
||||
*/
|
||||
uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp)
|
||||
{
|
||||
uint32_t i;
|
||||
RGData rgData;
|
||||
// initialize return value variable
|
||||
uint64_t rowCount = 0;
|
||||
|
||||
try
|
||||
{
|
||||
/*
|
||||
* Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns
|
||||
* per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators ==
|
||||
* fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator.
|
||||
*/
|
||||
|
||||
if (!fDoneAggregate)
|
||||
{
|
||||
initializeMultiThread();
|
||||
@ -5724,9 +5738,9 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
||||
runners.reserve(fNumOfThreads); // to prevent a resize during use
|
||||
|
||||
// Start the aggregator threads
|
||||
for (i = 0; i < fNumOfThreads; i++)
|
||||
for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++)
|
||||
{
|
||||
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i)));
|
||||
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum)));
|
||||
}
|
||||
|
||||
// Now wait for all those threads
|
||||
@ -5740,18 +5754,28 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
||||
// much memory on average
|
||||
uint32_t threads = std::max(1U, fNumOfThreads / 2);
|
||||
runners.reserve(threads);
|
||||
for (i = 0; i < threads; ++i)
|
||||
for (uint32_t threadNum = 0; threadNum < threads; ++threadNum)
|
||||
{
|
||||
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, i)));
|
||||
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum)));
|
||||
}
|
||||
jobstepThreadPool.join(runners);
|
||||
}
|
||||
|
||||
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0)
|
||||
/*
|
||||
* Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows
|
||||
* that need to aggregated and output results.
|
||||
*/
|
||||
|
||||
auto* distinctAggregator = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
|
||||
const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0;
|
||||
|
||||
// Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column
|
||||
// e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2;
|
||||
if (distinctAggregator && hasGroupByColumns)
|
||||
{
|
||||
// 2nd phase multi-threaded aggregate
|
||||
if (!fEndOfResult)
|
||||
{
|
||||
// Do multi-threaded second phase aggregation (per row group created for GROUP BY statement)
|
||||
if (!fDoneAggregate)
|
||||
{
|
||||
vector<uint64_t> runners; // thread pool handles
|
||||
@ -5759,97 +5783,114 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
||||
|
||||
uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads;
|
||||
uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1);
|
||||
// uint32_t bucketsPerThread = 1;
|
||||
// uint32_t numThreads = fNumOfBuckets;
|
||||
|
||||
runners.reserve(numThreads);
|
||||
|
||||
for (i = 0; i < numThreads; i++)
|
||||
for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++)
|
||||
{
|
||||
runners.push_back(jobstepThreadPool.invoke(
|
||||
ThreadedSecondPhaseAggregator(this, i * bucketsPerThread, bucketsPerThread)));
|
||||
ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread)));
|
||||
}
|
||||
|
||||
jobstepThreadPool.join(runners);
|
||||
}
|
||||
|
||||
// Deliver results
|
||||
fDoneAggregate = true;
|
||||
bool done = true;
|
||||
|
||||
while (nextDeliveredRowGroup())
|
||||
while (nextDeliveredRowGroup() && !cancelled())
|
||||
{
|
||||
done = false;
|
||||
rowCount = fRowGroupOut.getRowCount();
|
||||
|
||||
if (rowCount != 0)
|
||||
{
|
||||
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
||||
pruneAuxColumns();
|
||||
|
||||
if (dlp)
|
||||
{
|
||||
rgData = fRowGroupDelivered.duplicate();
|
||||
dlp->insert(rgData);
|
||||
}
|
||||
else
|
||||
{
|
||||
bs.restart();
|
||||
fRowGroupDelivered.serializeRGData(bs);
|
||||
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
done = true;
|
||||
}
|
||||
|
||||
if (done)
|
||||
{
|
||||
fEndOfResult = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
// Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
|
||||
// e.g. SELECT SUM(DISTINCT col1) FROM test;
|
||||
else if (distinctAggregator)
|
||||
{
|
||||
auto* agg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
|
||||
|
||||
if (!fEndOfResult)
|
||||
{
|
||||
if (!fDoneAggregate)
|
||||
{
|
||||
for (i = 0; i < fNumOfBuckets; i++)
|
||||
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
|
||||
// easy way of multi-threading this and it's done in a single thread for now.
|
||||
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++)
|
||||
{
|
||||
if (fEndOfResult == false)
|
||||
{
|
||||
// do the final aggregtion and deliver the results
|
||||
// at least one RowGroup for aggregate results
|
||||
// for "distinct without group by" case
|
||||
if (agg != nullptr)
|
||||
{
|
||||
auto* aggMultiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[i].get());
|
||||
auto* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[i].get());
|
||||
agg->aggregator(aggDist->aggregator());
|
||||
// The distinctAggregator accumulates the aggregation results of all row groups by being added
|
||||
// all row groups of each bucket aggregator and doing an aggregation step after each addition.
|
||||
auto* bucketMultiDistinctAggregator =
|
||||
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[bucketNum].get());
|
||||
auto* bucketDistinctAggregator =
|
||||
dynamic_cast<RowAggregationDistinct*>(fAggregators[bucketNum].get());
|
||||
distinctAggregator->aggregator(bucketDistinctAggregator->aggregator());
|
||||
|
||||
if (aggMultiDist)
|
||||
{
|
||||
(dynamic_cast<RowAggregationMultiDistinct*>(agg))
|
||||
->subAggregators(aggMultiDist->subAggregators());
|
||||
}
|
||||
|
||||
agg->doDistinctAggregation();
|
||||
}
|
||||
// for "group by without distinct" case
|
||||
else
|
||||
if (bucketMultiDistinctAggregator)
|
||||
{
|
||||
fAggregator->append(fAggregators[i].get());
|
||||
(dynamic_cast<RowAggregationMultiDistinct*>(distinctAggregator))
|
||||
->subAggregators(bucketMultiDistinctAggregator->subAggregators());
|
||||
}
|
||||
|
||||
distinctAggregator->aggregator()->finalAggregation();
|
||||
distinctAggregator->doDistinctAggregation();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver results
|
||||
fDoneAggregate = true;
|
||||
bool done = true;
|
||||
while (fAggregator->nextRowGroup() && !cancelled())
|
||||
{
|
||||
done = false;
|
||||
fAggregator->finalize();
|
||||
rowCount = fRowGroupOut.getRowCount();
|
||||
fRowsReturned += rowCount;
|
||||
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
|
||||
|
||||
if (rowCount != 0)
|
||||
{
|
||||
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
||||
break;
|
||||
}
|
||||
done = true;
|
||||
}
|
||||
if (done)
|
||||
fEndOfResult = true;
|
||||
}
|
||||
}
|
||||
// CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
|
||||
// e.g. SELECT SUM(col1) FROM test GROUP BY col2;
|
||||
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
|
||||
// easy way of multi-threading this and it's done in a single thread for now.
|
||||
else if (hasGroupByColumns)
|
||||
{
|
||||
if (!fEndOfResult && !fDoneAggregate)
|
||||
{
|
||||
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; ++bucketNum)
|
||||
{
|
||||
fAggregator->append(fAggregators[bucketNum].get());
|
||||
}
|
||||
}
|
||||
|
||||
fDoneAggregate = true;
|
||||
bool done = true;
|
||||
|
||||
//@bug4459
|
||||
while (fAggregator->nextRowGroup() && !cancelled())
|
||||
{
|
||||
done = false;
|
||||
@ -5860,22 +5901,9 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
||||
|
||||
if (rowCount != 0)
|
||||
{
|
||||
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
||||
pruneAuxColumns();
|
||||
|
||||
if (dlp)
|
||||
{
|
||||
rgData = fRowGroupDelivered.duplicate();
|
||||
dlp->insert(rgData);
|
||||
}
|
||||
else
|
||||
{
|
||||
bs.restart();
|
||||
fRowGroupDelivered.serializeRGData(bs);
|
||||
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
done = true;
|
||||
}
|
||||
|
||||
@ -5884,7 +5912,14 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
||||
fEndOfResult = true;
|
||||
}
|
||||
}
|
||||
} // try
|
||||
else
|
||||
{
|
||||
throw logic_error(
|
||||
"TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function "
|
||||
"or "
|
||||
"GROUP BY columns found. Should not reach here.");
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
||||
@ -5924,6 +5959,23 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
|
||||
return rowCount;
|
||||
}
|
||||
|
||||
bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp)
|
||||
{
|
||||
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
||||
pruneAuxColumns();
|
||||
|
||||
if (dlp)
|
||||
{
|
||||
RGData rgData = fRowGroupDelivered.duplicate();
|
||||
dlp->insert(rgData);
|
||||
return true;
|
||||
}
|
||||
|
||||
bs.restart();
|
||||
fRowGroupDelivered.serializeRGData(bs);
|
||||
return false;
|
||||
}
|
||||
|
||||
void TupleAggregateStep::pruneAuxColumns()
|
||||
{
|
||||
uint64_t rowCount = fRowGroupOut.getRowCount();
|
||||
|
@ -161,6 +161,7 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep
|
||||
void doThreadedSecondPhaseAggregate(uint32_t threadID);
|
||||
bool nextDeliveredRowGroup();
|
||||
void pruneAuxColumns();
|
||||
bool cleanUpAndOutputRowGroup(messageqcpp::ByteStream& bs, RowGroupDL* dlp);
|
||||
void formatMiniStats();
|
||||
void printCalTrace();
|
||||
template <class GroupByMap>
|
||||
|
Reference in New Issue
Block a user