From 09926b31578bc14d61d6edd401b98a664a372eba Mon Sep 17 00:00:00 2001 From: drrtuy Date: Tue, 11 Mar 2025 19:21:44 +0000 Subject: [PATCH] feat(TNS, sorting, distinct): TNS now accounts data used by RGDatas and distinct maps. --- dbcon/joblist/limitedorderby.cpp | 2 +- dbcon/joblist/tupleannexstep.cpp | 90 +++++++++++++++++++++-------- dbcon/joblist/tupleannexstep.h | 1 + utils/loggingcpp/ErrorMessage.txt | 2 + utils/windowfunction/idborderby.cpp | 3 +- utils/windowfunction/idborderby.h | 15 +++++ 6 files changed, 87 insertions(+), 26 deletions(-) diff --git a/dbcon/joblist/limitedorderby.cpp b/dbcon/joblist/limitedorderby.cpp index a38968fef..5d9fe8c56 100644 --- a/dbcon/joblist/limitedorderby.cpp +++ b/dbcon/joblist/limitedorderby.cpp @@ -54,7 +54,7 @@ void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool { fRm = jobInfo.rm; fSessionMemLimit = jobInfo.umMemLimit; - fErrorCode = ERR_LIMIT_TOO_BIG; + fErrorCode = ERR_ORDERBY_TOO_BIG; // locate column position in the rowgroup map keyToIndexMap; diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index 8912a6c20..46fb1309e 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -22,7 +22,7 @@ #include #include #include -#include +#include using namespace std; #include @@ -83,8 +83,7 @@ struct TAEq bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const; }; // TODO: Generalize these and put them back in utils/common/hasher.h -typedef tr1::unordered_set > - DistinctMap_t; +using TNSDistinctMap_t = std::unordered_set >; }; // namespace inline uint64_t TAHasher::operator()(const Row::Pointer& p) const @@ -462,7 +461,6 @@ void TupleAnnexStep::executeNoOrderBy() void TupleAnnexStep::executeNoOrderByWithDistinct() { utils::setThreadName("TNSwoOrdDist"); - scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); vector dataVec; vector dataVecSkip; RGData rgDataIn; @@ -472,6 +470,9 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() Row rowSkip; bool more = false; + auto alloc = fRm->getAllocator(); + std::unique_ptr distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), alloc)); + rgDataOut.reinit(fRowGroupOut); fRowGroupOut.setData(&rgDataOut); fRowGroupOut.resetRowGroup(0); @@ -508,7 +509,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() for (uint64_t i = 0; i < fRowGroupIn.getRowCount() && !cancelled() && !fLimitHit; ++i) { - pair inserted; + pair inserted; Row* rowPtr; if (distinctMap->size() < fLimitStart) @@ -544,6 +545,8 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() // allocate new RGData for skipped rows below the fLimitStart // offset (do not take it into account in RM assuming there // are few skipped rows + checkAndAllocateMemory4RGData(rowGroupSkip); + dataVecSkip.push_back(rgDataSkip); rgDataSkip.reinit(rowGroupSkip); rowGroupSkip.setData(&rgDataSkip); @@ -560,6 +563,7 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() if (UNLIKELY(fRowGroupOut.getRowCount() >= rowgroup::rgCommonSize)) { + checkAndAllocateMemory4RGData(fRowGroupOut); dataVec.push_back(rgDataOut); rgDataOut.reinit(fRowGroupOut); fRowGroupOut.setData(&rgDataOut); @@ -572,6 +576,9 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() more = fInputDL->next(fInputIterator, &rgDataIn); } + // to reduce memory consumption + dataVecSkip.clear(); + if (fRowGroupOut.getRowCount() > 0) dataVec.push_back(rgDataOut); @@ -581,6 +588,14 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() fRowGroupOut.setData(&rgDataOut); fOutputDL->insert(rgDataOut); } + while (!dataVec.empty()) + { + auto& rgData = dataVec.back(); + fRowGroupOut.setData(&rgData); + fRm->returnMemory(fRowGroupOut.getSizeWithStrings() - fRowGroupOut.getHeaderSize()); + fOutputDL->insert(rgData); + dataVec.pop_back(); + } } catch (...) { @@ -595,6 +610,16 @@ void TupleAnnexStep::executeNoOrderByWithDistinct() fOutputDL->endOfInput(); } +void TupleAnnexStep::checkAndAllocateMemory4RGData(const rowgroup::RowGroup& rowGroup) +{ + uint64_t size = rowGroup.getSizeWithStrings() - rowGroup.getHeaderSize(); + if (!fRm->getMemory(size, false)) + { + cerr << IDBErrorInfo::instance()->errorMsg(ERR_TNS_DISTINCT_IS_TOO_BIG) << " @" << __FILE__ << ":" << __LINE__; + throw IDBExcept(ERR_TNS_DISTINCT_IS_TOO_BIG); + } +} + void TupleAnnexStep::executeWithOrderBy() { utils::setThreadName("TNSwOrd"); @@ -669,6 +694,10 @@ void TupleAnnexStep::executeWithOrderBy() { fRowsReturned += fRowGroupOut.getRowCount(); fOutputDL->insert(rgDataOut); + + // release RGData memory + size_t rgDataSize = fRowGroupOut.getSizeWithStrings() - fRowGroupOut.getHeaderSize(); + fOrderBy->returnRGDataMemory2RM(rgDataSize); } } } @@ -712,9 +741,10 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() // Calculate offset here fRowGroupOut.getRow(0, &fRowOut); - auto alloc = fRm->getAllocator(); - ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc); - scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); + auto allocSorting = fRm->getAllocator(); + ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, allocSorting); + auto allocDistinct = fRm->getAllocator(); + std::unique_ptr distinctMap(new TNSDistinctMap_t(10, TAHasher(this), TAEq(this), allocDistinct)); fRowGroupIn.initRow(&row1); fRowGroupIn.initRow(&row2); @@ -731,7 +761,7 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() fOrderByList[id]->getRule().revertRules(); ordering::SortingPQ& currentPQ = fOrderByList[id]->getQueue(); finalPQ.reserve(finalPQ.size() + currentPQ.size()); - pair inserted; + pair inserted; while (currentPQ.size()) { ordering::OrderByRow& topOBRow = const_cast(currentPQ.top()); @@ -868,14 +898,6 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() fOutputDL->endOfInput(); - StepTeleStats sts; - sts.query_uuid = fQueryUuid; - sts.step_uuid = fStepUuid; - sts.msg_type = StepTeleStats::ST_SUMMARY; - sts.total_units_of_work = sts.units_of_work_completed = 1; - sts.rows = fRowsReturned; - postStepSummaryTele(sts); - if (traceOn()) { if (dlTimes.FirstReadTime().tv_sec == 0) @@ -885,6 +907,20 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() dlTimes.setEndOfInputTime(); printCalTrace(); } + + // Release memory before ctor + for (uint64_t id = 1; id <= fMaxThreads; id++) + { + fOrderByList[id]->returnAllRGDataMemory2RM(); + } + + StepTeleStats sts; + sts.query_uuid = fQueryUuid; + sts.step_uuid = fStepUuid; + sts.msg_type = StepTeleStats::ST_SUMMARY; + sts.total_units_of_work = sts.units_of_work_completed = 1; + sts.rows = fRowsReturned; + postStepSummaryTele(sts); } /* @@ -1056,6 +1092,19 @@ void TupleAnnexStep::finalizeParallelOrderBy() fOutputDL->endOfInput(); + if (traceOn()) + { + dlTimes.setLastReadTime(); + dlTimes.setEndOfInputTime(); + printCalTrace(); + } + + // Release memory before ctor + for (uint64_t id = 1; id <= fMaxThreads; id++) + { + fOrderByList[id]->returnAllRGDataMemory2RM(); + } + StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; @@ -1063,13 +1112,6 @@ void TupleAnnexStep::finalizeParallelOrderBy() sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); - - if (traceOn()) - { - dlTimes.setLastReadTime(); - dlTimes.setEndOfInputTime(); - printCalTrace(); - } } void TupleAnnexStep::executeParallelOrderBy(uint64_t id) diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 133fdd466..f25947686 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -109,6 +109,7 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep void executeWithOrderBy(); void executeParallelOrderBy(uint64_t id); void executeNoOrderByWithDistinct(); + void checkAndAllocateMemory4RGData(const rowgroup::RowGroup& rowGroup); void formatMiniStats(); void printCalTrace(); void finalizeParallelOrderBy(); diff --git a/utils/loggingcpp/ErrorMessage.txt b/utils/loggingcpp/ErrorMessage.txt index c6f67b803..75092e1ba 100755 --- a/utils/loggingcpp/ErrorMessage.txt +++ b/utils/loggingcpp/ErrorMessage.txt @@ -110,6 +110,8 @@ 2061 ERR_NOT_SUPPORTED_GROUPBY_ORDERBY_EXPRESSION %1% is not in GROUP BY clause, not a column or an expression that contains function. +2063 ERR_TNS_DISTINCT_IS_TOO_BIG DISTINCT memory limit is exceeded whilst running TNS step. + # Sub-query errors 3001 ERR_NON_SUPPORT_SUB_QUERY_TYPE This subquery type is not supported yet. 3002 ERR_MORE_THAN_1_ROW Subquery returns more than 1 row. diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index 58a6bd605..88954a939 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -737,7 +737,8 @@ IdbOrderBy::IdbOrderBy() IdbOrderBy::~IdbOrderBy() { - if (fRm) + // returnRGDataMemory2RM() returns all memory before the dtor is called. + if (fRm && fMemSize > 0) fRm->returnMemory(fMemSize, fSessionMemLimit); // delete compare objects diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index 95d6044c3..fee09cf87 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -33,6 +33,7 @@ #include #include "countingallocator.h" +#include "resourcemanager.h" #include "rowgroup.h" #include "hasher.h" // #include "stlpoolallocator.h" @@ -433,6 +434,20 @@ class IdbOrderBy : public IdbCompare { return *fOrderByQueue; } + void returnAllRGDataMemory2RM() + { + while (!fOrderByQueue->empty()) + { + fOrderByQueue->pop(); + } + fRm->returnMemory(fMemSize, fSessionMemLimit); + fMemSize = 0; + } + void returnRGDataMemory2RM(const size_t rgDataSize) + { + fRm->returnMemory(rgDataSize, fSessionMemLimit); + fMemSize -= rgDataSize; + } CompareRule& getRule() { return fRule;