diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index cb231965f..c0160745c 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -360,45 +360,86 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID) if (threadID >= fNumOfBuckets) return; - scoped_array rowBucketVecs(new RowBucketVec[fNumOfBuckets]); - scoped_array bucketDone(new bool[fNumOfBuckets]); - uint32_t hashlen = fAggregator->aggMapKeyLength(); + bool finishedSecondPhase = false; + bool diskAggAllowed = fRm->getAllowDiskAggregation(); + const uint32_t maxRowsSize = rowgroup::rgCommonSize; - try + while (!finishedSecondPhase && !fEndOfResult) { - RowAggregationDistinct* aggDist = dynamic_cast(fAggregators[threadID].get()); - RowAggregationMultiDistinct* multiDist = - dynamic_cast(fAggregators[threadID].get()); - Row rowIn; - RowGroup* rowGroupIn = nullptr; - rowGroupIn = (aggDist->aggregator()->getOutputRowGroup()); - uint32_t bucketID; - std::vector> rgDataVec; + scoped_array rowBucketVecs(new RowBucketVec[fNumOfBuckets]); + scoped_array bucketDone(new bool[fNumOfBuckets]); + uint32_t hashlen = fAggregator->aggMapKeyLength(); + bool outOfMemory = false; + size_t totalMemSizeConsumed = 0; - if (multiDist) + try { - for (uint32_t i = 0; i < fNumOfBuckets; i++) - rowBucketVecs[i].resize(multiDist->subAggregators().size()); - } - else - { - for (uint32_t i = 0; i < fNumOfBuckets; i++) - rowBucketVecs[i].resize(1); - } + RowAggregationDistinct* aggDist = dynamic_cast(fAggregators[threadID].get()); + RowAggregationMultiDistinct* multiDist = + dynamic_cast(fAggregators[threadID].get()); + Row rowIn; + RowGroup* rowGroupIn = nullptr; + rowGroupIn = (aggDist->aggregator()->getOutputRowGroup()); + uint32_t bucketID; + std::vector> rgDataVec; - // dispatch rows to bucket - if (multiDist) - { - for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++) + if (multiDist) { - rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup()); - rowGroupIn->initRow(&rowIn); - auto* subDistAgg = dynamic_cast(multiDist->subAggregators()[j].get()); + for (uint32_t i = 0; i < fNumOfBuckets; i++) + rowBucketVecs[i].resize(multiDist->subAggregators().size()); + } + else + { + for (uint32_t i = 0; i < fNumOfBuckets; i++) + rowBucketVecs[i].resize(1); + } - while (subDistAgg->nextOutputRowGroup()) + // dispatch rows to bucket + if (multiDist) + { + for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++) { rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup()); - rgDataVec.emplace_back(subDistAgg->moveCurrentRGData()); + rowGroupIn->initRow(&rowIn); + auto* subDistAgg = dynamic_cast(multiDist->subAggregators()[j].get()); + + while (subDistAgg->nextOutputRowGroup()) + { + rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup()); + rgDataVec.emplace_back(subDistAgg->moveCurrentRGData()); + rowGroupIn->getRow(0, &rowIn); + + for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i) + { + // The key is the groupby columns, which are the leading columns. + // uint8_t* hashMapKey = rowIn.getData() + 2; + // bucketID = hash.operator()(hashMapKey) & fBucketMask; + uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1); + bucketID = hash % fNumOfBuckets; + rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash); + rowIn.nextRow(); + } + + const auto rgSize = diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize) + : rowGroupIn->getSizeWithStrings(); + totalMemSizeConsumed += rgSize; + if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed)) + { + outOfMemory = true; + break; + } + } + } + } + else + { + rowGroupIn->initRow(&rowIn); + auto* subAgg = dynamic_cast(aggDist->aggregator().get()); + + while (subAgg->nextOutputRowGroup()) + { + rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData()); + rgDataVec.emplace_back(subAgg->moveCurrentRGData()); rowGroupIn->getRow(0, &rowIn); for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i) @@ -408,87 +449,80 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID) // bucketID = hash.operator()(hashMapKey) & fBucketMask; uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1); bucketID = hash % fNumOfBuckets; - rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash); + rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash); rowIn.nextRow(); } - } - } - } - else - { - rowGroupIn->initRow(&rowIn); - auto* subAgg = dynamic_cast(aggDist->aggregator().get()); - while (subAgg->nextOutputRowGroup()) - { - rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData()); - rgDataVec.emplace_back(subAgg->moveCurrentRGData()); - rowGroupIn->getRow(0, &rowIn); - - for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i) - { - // The key is the groupby columns, which are the leading columns. - // uint8_t* hashMapKey = rowIn.getData() + 2; - // bucketID = hash.operator()(hashMapKey) & fBucketMask; - uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1); - bucketID = hash % fNumOfBuckets; - rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash); - rowIn.nextRow(); - } - } - } - - bool done = false; - - // reset bucketDone[] to be false - // memset(bucketDone, 0, sizeof(bucketDone)); - fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false); - - while (!done && !cancelled()) - { - done = true; - - for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++) - { - if (!bucketDone[c] && fAgg_mutex[c]->try_lock()) - { - try + const auto rgSize = + diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize) : rowGroupIn->getSizeWithStrings(); + totalMemSizeConsumed += rgSize; + if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed)) { - if (multiDist) - dynamic_cast(fAggregators[c].get()) - ->doDistinctAggregation_rowVec(rowBucketVecs[c]); - else - dynamic_cast(fAggregators[c].get()) - ->doDistinctAggregation_rowVec(rowBucketVecs[c][0]); + outOfMemory = true; + break; } - catch (...) + } + } + + if (!outOfMemory) + finishedSecondPhase = true; + + bool done = false; + // reset bucketDone[] to be false + // memset(bucketDone, 0, sizeof(bucketDone)); + fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false); + + while (!done && !cancelled()) + { + done = true; + + for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++) + { + if (!bucketDone[c] && fAgg_mutex[c]->try_lock()) { + try + { + if (multiDist) + dynamic_cast(fAggregators[c].get()) + ->doDistinctAggregation_rowVec(rowBucketVecs[c]); + else + dynamic_cast(fAggregators[c].get()) + ->doDistinctAggregation_rowVec(rowBucketVecs[c][0]); + } + catch (...) + { + fAgg_mutex[c]->unlock(); + throw; + } + fAgg_mutex[c]->unlock(); - throw; + bucketDone[c] = true; + rowBucketVecs[c][0].clear(); + } + else if (!bucketDone[c]) + { + done = false; } - - fAgg_mutex[c]->unlock(); - bucketDone[c] = true; - rowBucketVecs[c][0].clear(); - } - else if (!bucketDone[c]) - { - done = false; } } - } - if (cancelled()) + fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit); + if (cancelled()) + { + fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit); + finishedSecondPhase = true; + fEndOfResult = true; + } + } // try + catch (...) { + fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit); + handleException(std::current_exception(), logging::tupleAggregateStepErr, + logging::ERR_AGGREGATION_TOO_BIG, + "TupleAggregateStep::doThreadedSecondPhaseAggregate()"); fEndOfResult = true; + finishedSecondPhase = true; } - - } // try - catch (...) - { - handleException(std::current_exception(), logging::tupleAggregateStepErr, - logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedSecondPhaseAggregate()"); - fEndOfResult = true; } fDoneAggregate = true;