From bd0f59910ae5ab5171527322110820c22f7becc3 Mon Sep 17 00:00:00 2001 From: Alexey Antipovsky Date: Wed, 21 May 2025 11:53:48 +0200 Subject: [PATCH] feat(PrimProc): MCOL-5950 Improve disk-based aggregation finalization (#3525) * feat(PrimProc): MCOL-5950 Improve disk-based aggregation finalization Iterate over the rows in the plain vector of RGData instead of iterating over the hashmap. This reduces the complexity and speeds up finalization (by up to the twice in the certain cases) * replace magic constant with muggle constant --- utils/rowgroup/rowstorage.cpp | 202 +++++++++++++++++----------------- utils/rowgroup/rowstorage.h | 2 +- 2 files changed, 104 insertions(+), 100 deletions(-) diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 7db0b8a59..8ab629873 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -19,12 +19,15 @@ #include #include #include +#include "branchpred.h" #include "rowgroup.h" #include #include #include "rowstorage.h" #include "robin_hood.h" +//#define DISK_AGG_DEBUG + namespace { int writeData(int fd, const char* buf, size_t sz) @@ -284,8 +287,10 @@ class MemManager release(fMemUsed); } - bool acquire(std::size_t amount) + bool acquire(ssize_t amount) { + if (UNLIKELY(-amount > fMemUsed)) + amount = -fMemUsed; return acquireImpl(amount); } void release(ssize_t amount = 0) @@ -332,16 +337,16 @@ class MemManager } protected: - virtual bool acquireImpl(std::size_t amount) + virtual bool acquireImpl(ssize_t amount) { fMemUsed += amount; return true; } - virtual void releaseImpl(std::size_t amount) + virtual void releaseImpl(ssize_t amount) { fMemUsed -= amount; } - ssize_t fMemUsed = 0; + ssize_t fMemUsed{0}; }; class RMMemManager : public MemManager @@ -389,7 +394,7 @@ class RMMemManager : public MemManager } protected: - bool acquireImpl(size_t amount) final + bool acquireImpl(ssize_t amount) final { if (amount) { @@ -402,7 +407,7 @@ class RMMemManager : public MemManager return true; } - void releaseImpl(size_t amount) override + void releaseImpl(ssize_t amount) override { if (amount) { @@ -858,8 +863,7 @@ class RowGroupStorage fRGDatas.pop_back(); fRowGroupOut->setData(rgdata.get()); - int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); - fMM->release(memSz); + fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows)); fLRU->remove(rgid); if (fRowGroupOut->getRowCount() == 0) continue; @@ -890,6 +894,50 @@ class RowGroupStorage fLRU->add(rgid); } + /** @brief Get the row at the specified position, skipping nonexistent rows + * + * @param idx(in, out) index (from 0) of the row, set to the actual index + * @param row(out) resulting row + */ + bool getRowForFinalization(uint64_t& idx, Row& row) + { + auto [rgid, rid] = rowIdxToGidRid(idx, fMaxRows); + while (rgid < fRGDatas.size()) + { + if (UNLIKELY(!fRGDatas[rgid])) + { + loadRG(rgid); + } + fRowGroupOut->setData(fRGDatas[rgid].get()); + if (UNLIKELY(rid >= fRowGroupOut->getRowCount())) + { + ++rgid; + rid = 0; + continue; + } + fRGDatas[rgid]->getRow(rid, &row); + idx = rowGidRidToIdx(rgid, rid, fMaxRows); + return true; + } + return false; + } + + size_t getRowsPerRG() const + { + return fMaxRows; + } + + void dropRGData(uint64_t rgid) + { + if (UNLIKELY(!fRGDatas[rgid])) + { + return; + } + fRowGroupOut->setData(fRGDatas[rgid].get()); + fMM->release(fRowGroupOut->getSizeWithStrings(fMaxRows)); + fRGDatas[rgid].reset(); + } + /** @brief Return a row and an index at the first free position. * * @param idx(out) index of the row @@ -1797,9 +1845,9 @@ void RowAggStorage::dump() if (!fEnabledDiskAggregation) return; - constexpr const int freeMemLimit = 50ULL * 1024ULL * 1024ULL; + constexpr int64_t freeMemLimit = 50LL * 1024LL * 1024LL; - const int64_t leaveFree = fNumOfInputRGPerThread * fRowGroupOut->getRowSize() * getBucketSize(); + const int64_t leaveFree = std::max(RGDataSizeType(freeMemLimit), fNumOfInputRGPerThread * fRowGroupOut->getSizeWithStrings(fMaxRows) * getBucketSize()); uint64_t freeAttempts{0}; int64_t freeMem = 0; while (true) @@ -2241,21 +2289,11 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) fKeysStorage = fStorage.get(); continue; } - std::unique_ptr prevHashes; - size_t prevSize; - size_t prevMask; - size_t prevMaxSize; - size_t prevHashMultiplier; - uint32_t prevInfoInc; - uint32_t prevInfoHashShift; - std::unique_ptr prevInfo; std::unique_ptr prevRowStorage; std::unique_ptr prevRealKeyRowStorage; RowGroupStorage* prevKeyRowStorage{nullptr}; - auto elems = calcSizeWithBuffer(fCurData->fMask + 1); - for (uint16_t prevGen = 0; prevGen < curGen; ++prevGen) { prevRowStorage.reset(fStorage->clone(prevGen)); @@ -2267,94 +2305,63 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) else prevKeyRowStorage = prevRowStorage.get(); - loadGeneration(prevGen, prevSize, prevMask, prevMaxSize, prevHashMultiplier, prevInfoInc, - prevInfoHashShift, prevInfo); - prevHashes = fCurData->fHashes->clone(prevMask + 1, prevGen, true); - - // iterate over current generation rows - uint64_t idx{}; - uint32_t info{}; - for (;; next(info, idx)) + if (fExtKeys) { - nextExisting(info, idx); + fKeysRowGroup->initRow(&tmpKeyRow); + } + auto& prevRow = fExtKeys ? tmpKeyRow : tmpRow; - if (idx >= elems) + // iterate over rows in prev generation + for (uint64_t rowidx = 0; prevKeyRowStorage->getRowForFinalization(rowidx, prevRow); ++rowidx) + { + auto [rgid, rid] = rowIdxToGidRid(rowidx, prevKeyRowStorage->getRowsPerRG()); + if (rgid != 0 && rid == 0) { - // done finalizing generation - break; + // start next RGData. At this point we don't need the previous one and can free the data + // as it has no changes to dump + prevKeyRowStorage->dropRGData(rgid - 1); + if (fExtKeys) { + prevKeyRowStorage->dropRGData(rgid - 1); + } } - const auto& pos = fCurData->fHashes->get(idx); - if (fKeysStorage->isFinalized(pos.idx)) + if (prevKeyRowStorage->isFinalized(rowidx)) { - // this row was already merged into newer generation, skip it continue; } - // now try to find row in the previous generation - auto [pinfo, pidx] = - rowHashToIdx(pos.hash, prevMask, prevHashMultiplier, prevInfoInc, prevInfoHashShift); - - while (pinfo < prevInfo[pidx]) + // TODO: store hashes in the RowGroupStorage? + uint64_t hash = hashRow(prevRow, fLastKeyCol); + auto [info, idx] = rowHashToIdx(hash); + nextWhileLess(info, idx); + constexpr uint64_t NOTFOUND_IDX = -1; + uint64_t curidx = NOTFOUND_IDX; + while (info == fCurData->fInfo[idx]) { - ++pidx; - pinfo += prevInfoInc; - } - if (prevInfo[pidx] != pinfo) - { - // there is no such row - continue; - } - - auto ppos = prevHashes->get(pidx); - while (prevInfo[pidx] == pinfo && pos.hash != ppos.hash) - { - // hashes are not equal => collision, try all matched rows - ++pidx; - pinfo += prevInfoInc; - ppos = prevHashes->get(pidx); - } - if (prevInfo[pidx] != pinfo || pos.hash != ppos.hash) - { - // no matches - continue; - } - - bool found = false; - auto& keyRow = fExtKeys ? fKeyRow : rowOut; - fKeysStorage->getRow(pos.idx, keyRow); - while (!found && prevInfo[pidx] == pinfo) - { - // try to find exactly match in case of hash collision - if (UNLIKELY(pos.hash != ppos.hash)) + auto& pos = fCurData->fHashes->get(idx); + if (pos.hash == hash) { - // hashes are not equal => no such row - ++pidx; - pinfo += prevInfoInc; - ppos = prevHashes->get(pidx); - continue; + auto& keyRow = fExtKeys ? fKeyRow : rowOut; + fKeysStorage->getRow(pos.idx, keyRow); + if (prevRow.equals(keyRow, fLastKeyCol)) + { + // Hallelujah! + curidx = pos.idx; + break; + } } - prevKeyRowStorage->getRow(ppos.idx, fExtKeys ? tmpKeyRow : tmpRow); - if (!keyRow.equals(fExtKeys ? tmpKeyRow : tmpRow, fLastKeyCol)) - { - ++pidx; - pinfo += prevInfoInc; - ppos = prevHashes->get(pidx); - continue; - } - found = true; + next(info, idx); } - if (!found) + if (curidx == NOTFOUND_IDX) { // nothing was found, go to the next row continue; } - if (UNLIKELY(prevKeyRowStorage->isFinalized(ppos.idx))) + if (UNLIKELY(fKeysStorage->isFinalized(curidx))) { - // just to be sure, it can NEVER happen continue; } @@ -2365,15 +2372,15 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) // 4 Mark the prev generation row as finalized if (fExtKeys) { - prevRowStorage->getRow(ppos.idx, tmpRow); - fStorage->getRow(pos.idx, rowOut); + prevRowStorage->getRow(rowidx, tmpRow); + fStorage->getRow(curidx, rowOut); } mergeFunc(tmpRow); genUpdated = true; - prevKeyRowStorage->markFinalized(ppos.idx); + prevKeyRowStorage->markFinalized(rowidx); if (fExtKeys) { - prevRowStorage->markFinalized(ppos.idx); + prevRowStorage->markFinalized(rowidx); } } @@ -2396,14 +2403,11 @@ void RowAggStorage::finalize(std::function mergeFunc, Row& rowOut) fKeysStorage->dumpAll(false); } - // swap current generation N with the prev generation N-1 - fCurData->fSize = prevSize; - fCurData->fMask = prevMask; - fCurData->fMaxSize = prevMaxSize; - fCurData->fInfoInc = prevInfoInc; - fCurData->fInfoHashShift = prevInfoHashShift; - fCurData->fInfo = std::move(prevInfo); - fCurData->fHashes = std::move(prevHashes); + // load previous generation (reusing RowGroupStorages) + loadGeneration(curGen - 1); + auto oh = std::move(fCurData->fHashes); + fCurData->fHashes = oh->clone(0, curGen - 1, true); + fStorage = std::move(prevRowStorage); if (fExtKeys) { diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h index 90a15b35c..72c20e932 100644 --- a/utils/rowgroup/rowstorage.h +++ b/utils/rowgroup/rowstorage.h @@ -109,7 +109,7 @@ class RowAggStorage */ bool getNextOutputRGData(std::unique_ptr& rgdata); - /** @brief TODO + /** @brief Merge generations together * * @param mergeFunc * @param rowOut