1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

fix(aggregation, RAM): MCOL-5715 Changes the second phase aggregation. (#3171)

This patch changes the second phase aggregation pipeline - takes into
account current memory consumption.

Co-authored-by: Leonid Fedorov <79837786+mariadb-LeonidFedorov@users.noreply.github.com>
Co-authored-by: drrtuy <roman.nozdrin@mariadb.com>
This commit is contained in:
Denis Khalikov
2024-08-29 14:24:47 +03:00
committed by drrtuy
parent e0a01c6cf4
commit 928678499a

View File

@ -360,45 +360,86 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
if (threadID >= fNumOfBuckets)
return;
scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
uint32_t hashlen = fAggregator->aggMapKeyLength();
bool finishedSecondPhase = false;
bool diskAggAllowed = fRm->getAllowDiskAggregation();
const uint32_t maxRowsSize = rowgroup::rgCommonSize;
try
while (!finishedSecondPhase && !fEndOfResult)
{
RowAggregationDistinct* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[threadID].get());
RowAggregationMultiDistinct* multiDist =
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[threadID].get());
Row rowIn;
RowGroup* rowGroupIn = nullptr;
rowGroupIn = (aggDist->aggregator()->getOutputRowGroup());
uint32_t bucketID;
std::vector<std::unique_ptr<RGData>> rgDataVec;
scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
uint32_t hashlen = fAggregator->aggMapKeyLength();
bool outOfMemory = false;
size_t totalMemSizeConsumed = 0;
if (multiDist)
try
{
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(multiDist->subAggregators().size());
}
else
{
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(1);
}
RowAggregationDistinct* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[threadID].get());
RowAggregationMultiDistinct* multiDist =
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[threadID].get());
Row rowIn;
RowGroup* rowGroupIn = nullptr;
rowGroupIn = (aggDist->aggregator()->getOutputRowGroup());
uint32_t bucketID;
std::vector<std::unique_ptr<RGData>> rgDataVec;
// dispatch rows to bucket
if (multiDist)
{
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
if (multiDist)
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rowGroupIn->initRow(&rowIn);
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(multiDist->subAggregators().size());
}
else
{
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(1);
}
while (subDistAgg->nextOutputRowGroup())
// dispatch rows to bucket
if (multiDist)
{
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
rowGroupIn->initRow(&rowIn);
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
while (subDistAgg->nextOutputRowGroup())
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
rowGroupIn->getRow(0, &rowIn);
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
{
// The key is the groupby columns, which are the leading columns.
// uint8_t* hashMapKey = rowIn.getData() + 2;
// bucketID = hash.operator()(hashMapKey) & fBucketMask;
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
const auto rgSize = diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize)
: rowGroupIn->getSizeWithStrings();
totalMemSizeConsumed += rgSize;
if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed))
{
outOfMemory = true;
break;
}
}
}
}
else
{
rowGroupIn->initRow(&rowIn);
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
while (subAgg->nextOutputRowGroup())
{
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
rowGroupIn->getRow(0, &rowIn);
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
@ -408,87 +449,80 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
// bucketID = hash.operator()(hashMapKey) & fBucketMask;
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
}
}
else
{
rowGroupIn->initRow(&rowIn);
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
while (subAgg->nextOutputRowGroup())
{
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
rowGroupIn->getRow(0, &rowIn);
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
{
// The key is the groupby columns, which are the leading columns.
// uint8_t* hashMapKey = rowIn.getData() + 2;
// bucketID = hash.operator()(hashMapKey) & fBucketMask;
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
}
bool done = false;
// reset bucketDone[] to be false
// memset(bucketDone, 0, sizeof(bucketDone));
fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
while (!done && !cancelled())
{
done = true;
for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++)
{
if (!bucketDone[c] && fAgg_mutex[c]->try_lock())
{
try
const auto rgSize =
diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize) : rowGroupIn->getSizeWithStrings();
totalMemSizeConsumed += rgSize;
if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed))
{
if (multiDist)
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())
->doDistinctAggregation_rowVec(rowBucketVecs[c]);
else
dynamic_cast<RowAggregationDistinct*>(fAggregators[c].get())
->doDistinctAggregation_rowVec(rowBucketVecs[c][0]);
outOfMemory = true;
break;
}
catch (...)
}
}
if (!outOfMemory)
finishedSecondPhase = true;
bool done = false;
// reset bucketDone[] to be false
// memset(bucketDone, 0, sizeof(bucketDone));
fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
while (!done && !cancelled())
{
done = true;
for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++)
{
if (!bucketDone[c] && fAgg_mutex[c]->try_lock())
{
try
{
if (multiDist)
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())
->doDistinctAggregation_rowVec(rowBucketVecs[c]);
else
dynamic_cast<RowAggregationDistinct*>(fAggregators[c].get())
->doDistinctAggregation_rowVec(rowBucketVecs[c][0]);
}
catch (...)
{
fAgg_mutex[c]->unlock();
throw;
}
fAgg_mutex[c]->unlock();
throw;
bucketDone[c] = true;
rowBucketVecs[c][0].clear();
}
else if (!bucketDone[c])
{
done = false;
}
fAgg_mutex[c]->unlock();
bucketDone[c] = true;
rowBucketVecs[c][0].clear();
}
else if (!bucketDone[c])
{
done = false;
}
}
}
if (cancelled())
fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit);
if (cancelled())
{
fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit);
finishedSecondPhase = true;
fEndOfResult = true;
}
} // try
catch (...)
{
fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit);
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG,
"TupleAggregateStep::doThreadedSecondPhaseAggregate()");
fEndOfResult = true;
finishedSecondPhase = true;
}
} // try
catch (...)
{
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedSecondPhaseAggregate()");
fEndOfResult = true;
}
fDoneAggregate = true;