From c6dabe7eb5d0ad4f0625c6af604f3fbac6990f2b Mon Sep 17 00:00:00 2001 From: drrtuy Date: Mon, 10 Mar 2025 19:10:45 +0000 Subject: [PATCH] feat(TNS): distribute SortingPQ that supports CountingAllocator --- dbcon/joblist/groupconcat.cpp | 38 +++++++++++++++++------------ dbcon/joblist/jsonarrayagg.cpp | 38 +++++++++++++++++------------ dbcon/joblist/limitedorderby.cpp | 27 +++++++++++--------- dbcon/joblist/tupleannexstep.cpp | 9 +++++-- dbcon/joblist/tupleannexstep.h | 20 +-------------- utils/windowfunction/idborderby.cpp | 6 +++++ utils/windowfunction/idborderby.h | 28 +++++++++++++++------ 7 files changed, 93 insertions(+), 73 deletions(-) diff --git a/dbcon/joblist/groupconcat.cpp b/dbcon/joblist/groupconcat.cpp index a90598cf2..12ddeebda 100644 --- a/dbcon/joblist/groupconcat.cpp +++ b/dbcon/joblist/groupconcat.cpp @@ -768,6 +768,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row) if (concatColIsNull(row)) return; + auto& orderByQueue = getQueue(); // if the row count is less than the limit if (fCurrentLength < fGroupConcatLen) { @@ -776,7 +777,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row) int16_t estLen = lengthEstimate(fRow0); fRow0.setRid(estLen); OrderByRow newRow(fRow0, fRule); - fOrderByQueue.push(newRow); + orderByQueue.push(newRow); fCurrentLength += estLen; // add to the distinct map @@ -806,11 +807,11 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row) } } - else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData)) + else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData)) { - OrderByRow swapRow = fOrderByQueue.top(); + OrderByRow swapRow = orderByQueue.top(); fRow1.setData(swapRow.fData); - fOrderByQueue.pop(); + orderByQueue.pop(); fCurrentLength -= fRow1.getRelRid(); fRow2.setData(swapRow.fData); @@ -830,7 +831,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row) fRow2.setRid(estLen); fCurrentLength += estLen; - fOrderByQueue.push(swapRow); + orderByQueue.push(swapRow); } } @@ -838,9 +839,12 @@ void GroupConcatOrderBy::merge(GroupConcator* gc) { GroupConcatOrderBy* go = dynamic_cast(gc); - while (go->fOrderByQueue.empty() == false) + auto& orderByQueue = getQueue(); + auto mergeQueue = go->getQueue(); + + while (mergeQueue.empty() == false) { - const OrderByRow& row = go->fOrderByQueue.top(); + const OrderByRow& row = mergeQueue.top(); // check if the distinct row already exists if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end()) @@ -851,7 +855,7 @@ void GroupConcatOrderBy::merge(GroupConcator* gc) // if the row count is less than the limit else if (fCurrentLength < fGroupConcatLen) { - fOrderByQueue.push(row); + orderByQueue.push(row); row1.setData(row.fData); fCurrentLength += row1.getRelRid(); @@ -860,11 +864,11 @@ void GroupConcatOrderBy::merge(GroupConcator* gc) fDistinctMap->insert(row.fData); } - else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData)) + else if (fOrderByCond.size() > 0 && fRule.less(row.fData, orderByQueue.top().fData)) { - OrderByRow swapRow = fOrderByQueue.top(); + OrderByRow swapRow = orderByQueue.top(); row1.setData(swapRow.fData); - fOrderByQueue.pop(); + orderByQueue.pop(); fCurrentLength -= row1.getRelRid(); if (fDistinct) @@ -876,10 +880,10 @@ void GroupConcatOrderBy::merge(GroupConcator* gc) row1.setData(row.fData); fCurrentLength += row1.getRelRid(); - fOrderByQueue.push(row); + orderByQueue.push(row); } - go->fOrderByQueue.pop(); + mergeQueue.pop(); } } @@ -890,10 +894,12 @@ uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep) // need to reverse the order stack rowStack; - while (fOrderByQueue.size() > 0) + auto& orderByQueue = getQueue(); + + while (orderByQueue.size() > 0) { - rowStack.push(fOrderByQueue.top()); - fOrderByQueue.pop(); + rowStack.push(orderByQueue.top()); + orderByQueue.pop(); } size_t prevResultSize = 0; diff --git a/dbcon/joblist/jsonarrayagg.cpp b/dbcon/joblist/jsonarrayagg.cpp index c979e5ef6..84b3e036e 100644 --- a/dbcon/joblist/jsonarrayagg.cpp +++ b/dbcon/joblist/jsonarrayagg.cpp @@ -762,6 +762,8 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row) if (concatColIsNull(row)) return; + auto& orderByQueue = getQueue(); + // if the row count is less than the limit if (fCurrentLength < fGroupConcatLen) { @@ -770,7 +772,7 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row) int16_t estLen = lengthEstimate(fRow0); fRow0.setRid(estLen); OrderByRow newRow(fRow0, fRule); - fOrderByQueue.push(newRow); + orderByQueue.push(newRow); fCurrentLength += estLen; // add to the distinct map @@ -800,11 +802,11 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row) } } - else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData)) + else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData)) { - OrderByRow swapRow = fOrderByQueue.top(); + OrderByRow swapRow = orderByQueue.top(); fRow1.setData(swapRow.fData); - fOrderByQueue.pop(); + orderByQueue.pop(); fCurrentLength -= fRow1.getRelRid(); fRow2.setData(swapRow.fData); @@ -824,7 +826,7 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row) fRow2.setRid(estLen); fCurrentLength += estLen; - fOrderByQueue.push(swapRow); + orderByQueue.push(swapRow); } } @@ -832,9 +834,12 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc) { JsonArrayAggOrderBy* go = dynamic_cast(gc); - while (go->fOrderByQueue.empty() == false) + auto& orderByQueue = getQueue(); + auto mergeQueue = go->getQueue(); + + while (mergeQueue.empty() == false) { - const OrderByRow& row = go->fOrderByQueue.top(); + const OrderByRow& row = mergeQueue.top(); // check if the distinct row already exists if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end()) @@ -845,7 +850,7 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc) // if the row count is less than the limit else if (fCurrentLength < fGroupConcatLen) { - fOrderByQueue.push(row); + orderByQueue.push(row); row1.setData(row.fData); fCurrentLength += row1.getRelRid(); @@ -854,11 +859,11 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc) fDistinctMap->insert(row.fData); } - else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData)) + else if (fOrderByCond.size() > 0 && fRule.less(row.fData, orderByQueue.top().fData)) { - OrderByRow swapRow = fOrderByQueue.top(); + OrderByRow swapRow = orderByQueue.top(); row1.setData(swapRow.fData); - fOrderByQueue.pop(); + orderByQueue.pop(); fCurrentLength -= row1.getRelRid(); if (fDistinct) @@ -870,10 +875,10 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc) row1.setData(row.fData); fCurrentLength += row1.getRelRid(); - fOrderByQueue.push(row); + orderByQueue.push(row); } - go->fOrderByQueue.pop(); + mergeQueue.pop(); } } @@ -884,11 +889,12 @@ uint8_t* JsonArrayAggOrderBy::getResultImpl(const string&) // need to reverse the order stack rowStack; + auto& orderByQueue = getQueue(); - while (fOrderByQueue.size() > 0) + while (orderByQueue.size() > 0) { - rowStack.push(fOrderByQueue.top()); - fOrderByQueue.pop(); + rowStack.push(orderByQueue.top()); + orderByQueue.pop(); } if (rowStack.size() > 0) { diff --git a/dbcon/joblist/limitedorderby.cpp b/dbcon/joblist/limitedorderby.cpp index 8927e3866..a38968fef 100644 --- a/dbcon/joblist/limitedorderby.cpp +++ b/dbcon/joblist/limitedorderby.cpp @@ -109,12 +109,13 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row) if (fCount == 0) return; + auto& orderByQueue = getQueue(); // if the row count is less than the limit - if (fOrderByQueue.size() < fStart + fCount) + if (orderByQueue.size() < fStart + fCount) { copyRow(row, &fRow0); OrderByRow newRow(fRow0, fRule); - fOrderByQueue.push(newRow); + orderByQueue.push(newRow); uint64_t memSizeInc = sizeof(newRow); fUncommitedMemory += memSizeInc; @@ -155,20 +156,20 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row) } } - else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData)) + else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData)) { - OrderByRow swapRow = fOrderByQueue.top(); + OrderByRow swapRow = orderByQueue.top(); row1.setData(swapRow.fData); copyRow(row, &row1); if (fDistinct) { - fDistinctMap->erase(fOrderByQueue.top().fData); + fDistinctMap->erase(orderByQueue.top().fData); fDistinctMap->insert(row1.getPointer()); } - fOrderByQueue.pop(); - fOrderByQueue.push(swapRow); + orderByQueue.pop(); + orderByQueue.push(swapRow); } } @@ -194,7 +195,9 @@ void LimitedOrderBy::finalize() if (fRowGroup.getRowCount() > 0) fDataQueue.push(fData); - if (fOrderByQueue.size() > 0) + auto& orderByQueue = getQueue(); + + if (orderByQueue.size() > 0) { // *DRRTUY Very memory intensive. CS needs to account active // memory only and release memory if needed. @@ -210,7 +213,7 @@ void LimitedOrderBy::finalize() uint64_t offset = 0; uint64_t i = 0; // Reduce queue size by an offset value if it applicable. - uint64_t queueSizeWoOffset = fOrderByQueue.size() > fStart ? fOrderByQueue.size() - fStart : 0; + uint64_t queueSizeWoOffset = orderByQueue.size() > fStart ? orderByQueue.size() - fStart : 0; list tempRGDataList; if (fCount <= queueSizeWoOffset) @@ -239,15 +242,15 @@ void LimitedOrderBy::finalize() offset = offset != 0 ? offset - 1 : offset; fRowGroup.getRow(offset, &fRow0); - while ((fOrderByQueue.size() > fStart) && (i++ < fCount)) + while ((orderByQueue.size() > fStart) && (i++ < fCount)) { - const OrderByRow& topRow = fOrderByQueue.top(); + const OrderByRow& topRow = orderByQueue.top(); row1.setData(topRow.fData); copyRow(row1, &fRow0); fRowGroup.incRowCount(); offset--; fRow0.prevRow(rSize); - fOrderByQueue.pop(); + orderByQueue.pop(); // if RG has fRowsPerRG rows if (offset == (uint64_t)-1) diff --git a/dbcon/joblist/tupleannexstep.cpp b/dbcon/joblist/tupleannexstep.cpp index 9238c005c..8912a6c20 100644 --- a/dbcon/joblist/tupleannexstep.cpp +++ b/dbcon/joblist/tupleannexstep.cpp @@ -165,6 +165,8 @@ void TupleAnnexStep::setOutputRowGroup(const rowgroup::RowGroup& rg) void TupleAnnexStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo) { + // Initialize ResourceManager to acount memory usage. + fRm = jobInfo.rm; // Initialize structures used by separate workers uint64_t id = 1; fRowGroupIn = rgIn; @@ -709,7 +711,9 @@ void TupleAnnexStep::finalizeParallelOrderByDistinct() fRowGroupOut.resetRowGroup(0); // Calculate offset here fRowGroupOut.getRow(0, &fRowOut); - ordering::SortingPQ finalPQ; + + auto alloc = fRm->getAllocator(); + ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc); scoped_ptr distinctMap(new DistinctMap_t(10, TAHasher(this), TAEq(this))); fRowGroupIn.initRow(&row1); fRowGroupIn.initRow(&row2); @@ -903,7 +907,8 @@ void TupleAnnexStep::finalizeParallelOrderBy() uint32_t rowSize = 0; rowgroup::RGData rgDataOut; - ordering::SortingPQ finalPQ; + auto alloc = fRm->getAllocator(); + ordering::SortingPQ finalPQ(rowgroup::rgCommonSize, alloc); rgDataOut.reinit(fRowGroupOut, rowgroup::rgCommonSize); fRowGroupOut.setData(&rgDataOut); fRowGroupOut.resetRowGroup(0); diff --git a/dbcon/joblist/tupleannexstep.h b/dbcon/joblist/tupleannexstep.h index 255306092..133fdd466 100644 --- a/dbcon/joblist/tupleannexstep.h +++ b/dbcon/joblist/tupleannexstep.h @@ -171,25 +171,7 @@ class TupleAnnexStep : public JobStep, public TupleDeliveryStep std::vector fRunnersList; uint16_t fFinishedThreads; boost::mutex fParallelFinalizeMutex; -}; - -template -class reservablePQ : private std::priority_queue -{ - public: - typedef typename std::priority_queue::size_type size_type; - explicit reservablePQ(size_type capacity = 0) - { - reserve(capacity); - }; - void reserve(size_type capacity) - { - this->c.reserve(capacity); - } - size_type capacity() const - { - return this->c.capacity(); - } + joblist::ResourceManager* fRm; }; } // namespace joblist diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index 53a4da9ee..58a6bd605 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -772,6 +772,12 @@ void IdbOrderBy::initialize(const RowGroup& rg) fRowGroup.initRow(&row1); fRowGroup.initRow(&row2); + // These two blocks contain structs with memory accounting. + { + auto alloc = fRm->getAllocator(); + fOrderByQueue.reset(new SortingPQ(rowgroup::rgCommonSize, alloc)); + } + if (fDistinct) { auto alloc = fRm->getAllocator(); diff --git a/utils/windowfunction/idborderby.h b/utils/windowfunction/idborderby.h index 9380cd491..95d6044c3 100644 --- a/utils/windowfunction/idborderby.h +++ b/utils/windowfunction/idborderby.h @@ -22,6 +22,7 @@ #pragma once +#include #include #include #include @@ -34,7 +35,7 @@ #include "countingallocator.h" #include "rowgroup.h" #include "hasher.h" -#include "stlpoolallocator.h" +// #include "stlpoolallocator.h" // forward reference namespace joblist @@ -44,16 +45,26 @@ class ResourceManager; namespace ordering { -template , +template >, typename _Compare = std::less > -class reservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare> +class ReservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare> { public: typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type; - explicit reservablePQ(size_type capacity = 0) + explicit explicit ReservablePQ(size_type capacity, std::atomic* memoryLimit, + const int64_t checkPointStepSize = allocators::CheckPointStepSize, + const int64_t lowerBound = allocators::MemoryLimitLowerBound) + : std::priority_queue<_Tp, _Sequence, _Compare>(_Compare(), + _Sequence(allocators::CountingAllocator<_Tp>(memoryLimit, checkPointStepSize, lowerBound))) { reserve(capacity); - }; + } + explicit ReservablePQ(size_type capacity, allocators::CountingAllocator<_Tp> alloc) + : std::priority_queue<_Tp, _Sequence, _Compare>(_Compare(), + _Sequence(alloc)) + { + reserve(capacity); + } void reserve(size_type capacity) { this->c.reserve(capacity); @@ -73,7 +84,7 @@ class reservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare> class IdbCompare; class OrderByRow; -typedef reservablePQ SortingPQ; +using SortingPQ = ReservablePQ; // order by specification struct IdbSortSpec @@ -417,16 +428,17 @@ class IdbOrderBy : public IdbCompare { return fDistinct; } + // INV fOrderByQueue is always a valid pointer that is instantiated in constructor SortingPQ& getQueue() { - return fOrderByQueue; + return *fOrderByQueue; } CompareRule& getRule() { return fRule; } - SortingPQ fOrderByQueue; + std::unique_ptr fOrderByQueue = nullptr; protected: std::vector fOrderByCond;