From 444cf4c65e4f210581e39c07ea41574a00fbc47a Mon Sep 17 00:00:00 2001 From: drrtuy Date: Sun, 24 Mar 2024 17:04:37 +0200 Subject: [PATCH] fix(aggregation, disk-based) MCOL-5691 distinct aggregate disk based (#3145) * fix(aggregation, disk-based): MCOL-5689 this fixes disk-based distinct aggregation functions Previously disk-based distinct aggregation functions produced incorrect results b/c there was no finalization applied for previous generations stored on disk. * fix(aggregation, disk-based): Fix disk-based COUNT(DISTINCT ...) queries. (Case 2). (Distinct & Multi-Distinct, Single- & Multi-Threaded). * fix(aggregation, disk-based): Fix disk-based DISTINCT & GROUP BY queries. (Case 1). (Distinct & Multi-Distinct, Single- & Multi-Threaded). --------- Co-authored-by: Theresa Hradilak Co-authored-by: Roman Nozdrin --- dbcon/joblist/tupleaggregatestep.cpp | 190 +++++++++++------ dbcon/joblist/tupleaggregatestep.h | 1 + utils/rowgroup/rowaggregation.cpp | 40 +++- utils/rowgroup/rowaggregation.h | 11 + utils/rowgroup/rowstorage.cpp | 271 +++++++++++++++++++----- utils/rowgroup/rowstorage.h | 12 ++ versioning/BRM/sessionmanagerserver.cpp | 1 + 7 files changed, 398 insertions(+), 128 deletions(-) diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index ab7f9027e..5754b85d7 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -289,6 +289,7 @@ TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup fNumOfBuckets = calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(), fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation()); + fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets); fMemUsage.reset(new uint64_t[fNumOfThreads]); @@ -392,7 +393,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID) rowGroupIn->initRow(&rowIn); auto* subDistAgg = dynamic_cast(multiDist->subAggregators()[j].get()); - while (subDistAgg->nextRowGroup()) + while (subDistAgg->nextOutputRowGroup()) { rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup()); rgDataVec.emplace_back(subDistAgg->moveCurrentRGData()); @@ -416,7 +417,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID) rowGroupIn->initRow(&rowIn); auto* subAgg = dynamic_cast(aggDist->aggregator().get()); - while (subAgg->nextRowGroup()) + while (subAgg->nextOutputRowGroup()) { rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData()); rgDataVec.emplace_back(subAgg->moveCurrentRGData()); @@ -571,7 +572,7 @@ bool TupleAggregateStep::nextDeliveredRowGroup() { for (; fBucketNum < fNumOfBuckets; fBucketNum++) { - while (fAggregators[fBucketNum]->nextRowGroup()) + while (fAggregators[fBucketNum]->nextOutputRowGroup()) { fAggregators[fBucketNum]->finalize(); fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData()); @@ -5708,14 +5709,27 @@ void TupleAggregateStep::doAggregate() return; } +/** @brief Aggregate input row groups in two-phase multi-threaded aggregation. + * In second phase handle three different aggregation cases differently: + * 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one + * GROUP BY column + * 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column + * 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column + * DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp. + */ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp) { - uint32_t i; - RGData rgData; + // initialize return value variable uint64_t rowCount = 0; try { + /* + * Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns + * per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators == + * fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator. + */ + if (!fDoneAggregate) { initializeMultiThread(); @@ -5724,9 +5738,9 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp runners.reserve(fNumOfThreads); // to prevent a resize during use // Start the aggregator threads - for (i = 0; i < fNumOfThreads; i++) + for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++) { - runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))); + runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum))); } // Now wait for all those threads @@ -5740,18 +5754,28 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp // much memory on average uint32_t threads = std::max(1U, fNumOfThreads / 2); runners.reserve(threads); - for (i = 0; i < threads; ++i) + for (uint32_t threadNum = 0; threadNum < threads; ++threadNum) { - runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, i))); + runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum))); } jobstepThreadPool.join(runners); } - if (dynamic_cast(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0) + /* + * Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows + * that need to aggregated and output results. + */ + + auto* distinctAggregator = dynamic_cast(fAggregator.get()); + const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0; + + // Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column + // e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2; + if (distinctAggregator && hasGroupByColumns) { - // 2nd phase multi-threaded aggregate if (!fEndOfResult) { + // Do multi-threaded second phase aggregation (per row group created for GROUP BY statement) if (!fDoneAggregate) { vector runners; // thread pool handles @@ -5759,97 +5783,114 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads; uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1); - // uint32_t bucketsPerThread = 1; - // uint32_t numThreads = fNumOfBuckets; runners.reserve(numThreads); - for (i = 0; i < numThreads; i++) + for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++) { runners.push_back(jobstepThreadPool.invoke( - ThreadedSecondPhaseAggregator(this, i * bucketsPerThread, bucketsPerThread))); + ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread))); } jobstepThreadPool.join(runners); } + // Deliver results fDoneAggregate = true; bool done = true; - - while (nextDeliveredRowGroup()) + while (nextDeliveredRowGroup() && !cancelled()) { done = false; rowCount = fRowGroupOut.getRowCount(); if (rowCount != 0) { - if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) - pruneAuxColumns(); - - if (dlp) - { - rgData = fRowGroupDelivered.duplicate(); - dlp->insert(rgData); - } - else - { - bs.restart(); - fRowGroupDelivered.serializeRGData(bs); + if (!cleanUpAndOutputRowGroup(bs, dlp)) break; - } } done = true; } if (done) + { fEndOfResult = true; + } } } - else + // Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column + // e.g. SELECT SUM(DISTINCT col1) FROM test; + else if (distinctAggregator) { - auto* agg = dynamic_cast(fAggregator.get()); - if (!fEndOfResult) { if (!fDoneAggregate) { - for (i = 0; i < fNumOfBuckets; i++) + // Do aggregation over all row groups. As all row groups need to be aggregated together there is no + // easy way of multi-threading this and it's done in a single thread for now. + for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++) { if (fEndOfResult == false) { - // do the final aggregtion and deliver the results - // at least one RowGroup for aggregate results - // for "distinct without group by" case - if (agg != nullptr) - { - auto* aggMultiDist = dynamic_cast(fAggregators[i].get()); - auto* aggDist = dynamic_cast(fAggregators[i].get()); - agg->aggregator(aggDist->aggregator()); + // The distinctAggregator accumulates the aggregation results of all row groups by being added + // all row groups of each bucket aggregator and doing an aggregation step after each addition. + auto* bucketMultiDistinctAggregator = + dynamic_cast(fAggregators[bucketNum].get()); + auto* bucketDistinctAggregator = + dynamic_cast(fAggregators[bucketNum].get()); + distinctAggregator->aggregator(bucketDistinctAggregator->aggregator()); - if (aggMultiDist) - { - (dynamic_cast(agg)) - ->subAggregators(aggMultiDist->subAggregators()); - } - - agg->doDistinctAggregation(); - } - // for "group by without distinct" case - else + if (bucketMultiDistinctAggregator) { - fAggregator->append(fAggregators[i].get()); + (dynamic_cast(distinctAggregator)) + ->subAggregators(bucketMultiDistinctAggregator->subAggregators()); } + + distinctAggregator->aggregator()->finalAggregation(); + distinctAggregator->doDistinctAggregation(); } } } + // Deliver results fDoneAggregate = true; + bool done = true; + while (fAggregator->nextRowGroup() && !cancelled()) + { + done = false; + fAggregator->finalize(); + rowCount = fRowGroupOut.getRowCount(); + fRowsReturned += rowCount; + fRowGroupDelivered.setData(fRowGroupOut.getRGData()); + + if (rowCount != 0) + { + if (!cleanUpAndOutputRowGroup(bs, dlp)) + break; + } + done = true; + } + if (done) + fEndOfResult = true; + } + } + // CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column + // e.g. SELECT SUM(col1) FROM test GROUP BY col2; + // Do aggregation over all row groups. As all row groups need to be aggregated together there is no + // easy way of multi-threading this and it's done in a single thread for now. + else if (hasGroupByColumns) + { + if (!fEndOfResult && !fDoneAggregate) + { + for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; ++bucketNum) + { + fAggregator->append(fAggregators[bucketNum].get()); + } } + fDoneAggregate = true; bool done = true; - //@bug4459 while (fAggregator->nextRowGroup() && !cancelled()) { done = false; @@ -5860,22 +5901,9 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp if (rowCount != 0) { - if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) - pruneAuxColumns(); - - if (dlp) - { - rgData = fRowGroupDelivered.duplicate(); - dlp->insert(rgData); - } - else - { - bs.restart(); - fRowGroupDelivered.serializeRGData(bs); + if (!cleanUpAndOutputRowGroup(bs, dlp)) break; - } } - done = true; } @@ -5884,7 +5912,14 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp fEndOfResult = true; } } - } // try + else + { + throw logic_error( + "TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function " + "or " + "GROUP BY columns found. Should not reach here."); + } + } catch (...) { handleException(std::current_exception(), logging::tupleAggregateStepErr, @@ -5924,6 +5959,23 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp return rowCount; } +bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp) +{ + if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) + pruneAuxColumns(); + + if (dlp) + { + RGData rgData = fRowGroupDelivered.duplicate(); + dlp->insert(rgData); + return true; + } + + bs.restart(); + fRowGroupDelivered.serializeRGData(bs); + return false; +} + void TupleAggregateStep::pruneAuxColumns() { uint64_t rowCount = fRowGroupOut.getRowCount(); diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index a0ddcfdfb..18dc22a91 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -161,6 +161,7 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep void doThreadedSecondPhaseAggregate(uint32_t threadID); bool nextDeliveredRowGroup(); void pruneAuxColumns(); + bool cleanUpAndOutputRowGroup(messageqcpp::ByteStream& bs, RowGroupDL* dlp); void formatMiniStats(); void printCalTrace(); template diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index a0a707f83..89c709c85 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -4099,6 +4099,18 @@ bool RowAggregationUM::nextRowGroup() return more; } +bool RowAggregationUM::nextOutputRowGroup() +{ + bool more = fRowAggStorage->getNextOutputRGData(fCurRGData); + + if (more) + { + fRowGroupOut->setData(fCurRGData.get()); + } + + return more; +} + //------------------------------------------------------------------------------ // Row Aggregation constructor used on UM // For 2nd phase of two-phase case, from partial RG to final aggregated RG @@ -4558,19 +4570,29 @@ void RowAggregationDistinct::addRowGroup(const RowGroup* pRows, //------------------------------------------------------------------------------ void RowAggregationDistinct::doDistinctAggregation() { - while (dynamic_cast(fAggregator.get())->nextRowGroup()) + auto* umAggregator = dynamic_cast(fAggregator.get()); + if (umAggregator) { - fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData()); - - Row rowIn; - fRowGroupIn.initRow(&rowIn); - fRowGroupIn.getRow(0, &rowIn); - - for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i, rowIn.nextRow()) + while (umAggregator->nextOutputRowGroup()) { - aggregateRow(rowIn); + fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData()); + + Row rowIn; + fRowGroupIn.initRow(&rowIn); + fRowGroupIn.getRow(0, &rowIn); + + for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i, rowIn.nextRow()) + { + aggregateRow(rowIn); + } } } + else + { + std::ostringstream errmsg; + errmsg << "RowAggregationDistinct: incorrect fAggregator class."; + cerr << errmsg.str() << endl; + } } void RowAggregationDistinct::doDistinctAggregation_rowVec(vector>& inRows) diff --git a/utils/rowgroup/rowaggregation.h b/utils/rowgroup/rowaggregation.h index 72789e498..799e09b24 100644 --- a/utils/rowgroup/rowaggregation.h +++ b/utils/rowgroup/rowaggregation.h @@ -679,6 +679,17 @@ class RowAggregationUM : public RowAggregation */ bool nextRowGroup(); + /** @brief Returns aggregated rows in a RowGroup as long as there are still not returned result RowGroups. + * + * This function should be called repeatedly until false is returned (meaning end of data). + * Returns data from in-memory storage, as well as spilled data from disk. If disk-based aggregation is + * happening, finalAggregation() should be called before returning result RowGroups to finalize the used + * RowAggStorages, merge different spilled generations and obtain correct aggregation results. + * + * @returns True if there are more result RowGroups, else false if all results have been returned. + */ + bool nextOutputRowGroup(); + /** @brief Add an aggregator for DISTINCT aggregation */ void distinctAggregator(const boost::shared_ptr& da) diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 550788973..fe07fe8b7 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "rowgroup.h" #include #include @@ -79,6 +80,11 @@ std::string errorString(int errNo) auto* buf = strerror_r(errNo, tmp, sizeof(tmp)); return {buf}; } + +size_t findFirstSetBit(const uint64_t mask) +{ + return __builtin_ffsll(mask); +} } // anonymous namespace namespace rowgroup @@ -552,7 +558,7 @@ class Dumper class RowGroupStorage { public: - using RGDataStorage = std::vector>; + using RGDataStorage = std::vector; public: /** @brief Default constructor @@ -613,6 +619,54 @@ class RowGroupStorage return fRowGroupOut->getSizeWithStrings(fMaxRows); } + // This shifts data within RGData such that it compacts the non finalized rows + PosOpos shiftRowsInRowGroup(RGDataUnPtr& rgdata, uint64_t fgid, uint64_t tgid) + { + uint64_t pos = 0; + uint64_t opos = 0; + + fRowGroupOut->setData(rgdata.get()); + for (auto i = fgid; i < tgid; ++i) + { + if ((i - fgid) * HashMaskElements >= fRowGroupOut->getRowCount()) + break; + uint64_t mask = ~fFinalizedRows[i]; + if ((i - fgid + 1) * HashMaskElements > fRowGroupOut->getRowCount()) + { + mask &= (~0ULL) >> ((i - fgid + 1) * HashMaskElements - fRowGroupOut->getRowCount()); + } + opos = (i - fgid) * HashMaskElements; + + if (mask == ~0ULL) + { + if (LIKELY(pos != opos)) + moveRows(rgdata.get(), pos, opos, HashMaskElements); + pos += HashMaskElements; + continue; + } + + if (mask == 0) + continue; + + while (mask != 0) + { + // find position until block full of not finalized rows. + size_t b = findFirstSetBit(mask); + size_t e = findFirstSetBit(~(mask >> b)) + b; + if (UNLIKELY(e >= HashMaskElements)) + mask = 0; + else + mask >>= e; + if (LIKELY(pos != opos + b - 1)) + moveRows(rgdata.get(), pos, opos + b - 1, e - b); + pos += e - b; + opos += e; + } + --opos; + } + return {pos, opos}; + } + /** @brief Take away RGDatas from another RowGroupStorage * * If some of the RGDatas is not in the memory do not load them, @@ -626,7 +680,7 @@ class RowGroupStorage } void append(RowGroupStorage* o) { - std::unique_ptr rgd; + RGDataUnPtr rgd; std::string ofname; while (o->getNextRGData(rgd, ofname)) { @@ -666,11 +720,130 @@ class RowGroupStorage } } + /** @brief Get the last RGData from fRGDatas, remove it from the vector and return its id. + * + * @param rgdata The RGData to be retrieved + */ + uint64_t getLastRGData(RGDataUnPtr& rgdata) + { + assert(!fRGDatas.empty()); + uint64_t rgid = fRGDatas.size() - 1; + rgdata = std::move(fRGDatas[rgid]); + fRGDatas.pop_back(); + return rgid; + } + + static FgidTgid calculateGids(const uint64_t rgid, const uint64_t fMaxRows) + { + // Calculate from first and last uint64_t entry in fFinalizedRows BitMap + // which contains information about rows in the RGData. + uint64_t fgid = rgid * fMaxRows / HashMaskElements; + uint64_t tgid = fgid + fMaxRows / HashMaskElements; + return {fgid, tgid}; + } + + /** @brief Used to output aggregation results from memory and disk in the current generation in the form of + * RGData. Returns next RGData, loads from disk if necessary. Skips finalized rows as they would contain + * duplicate results, compacts actual rows into start of RGData and adapts number of rows transmitted in + * RGData. + * @returns A pointer to the next RGData or an empty pointer if there are no more RGDatas in this + * generation. + */ + bool getNextOutputRGData(RGDataUnPtr& rgdata) + { + if (UNLIKELY(fRGDatas.empty())) + { + fMM->release(); + return false; + } + + while (!fRGDatas.empty()) + { + auto rgid = getLastRGData(rgdata); + auto [fgid, tgid] = calculateGids(rgid, fMaxRows); + + if (fFinalizedRows.size() <= fgid) + { + // There are no finalized rows in this RGData. We can just return it. + // Load from disk if necessary and unlink DumpFile. + if (!rgdata) + { + loadRG(rgid, rgdata, true); + } + return true; + } + + if (tgid >= fFinalizedRows.size()) + fFinalizedRows.resize(tgid + 1, 0ULL); + + // Check if there are rows to process + bool hasReturnRows = false; + for (auto i = fgid; i < tgid; ++i) + { + if (fFinalizedRows[i] != ~0ULL) + { + // Not all rows are finalized, we have to return at least parts of this RGData + hasReturnRows = true; + break; + } + } + + if (rgdata) + { + // RGData is currently in memory + if (!hasReturnRows) + { + // All rows are finalized, don't return this RGData + continue; + } + } + else + { + if (hasReturnRows) + { + // Load RGData from disk, unlink dump file and continue processing + loadRG(rgid, rgdata, true); + } + else + { + // All rows are finalized. Unlink dump file and continue search for return RGData + unlink(makeRGFilename(rgid).c_str()); + continue; + } + } + + auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid); + + // Nothing got shifted at all -> all rows must be finalized. If all rows finalized remove + // RGData and file and don't give it out. + if (pos == 0) + { + fLRU->remove(rgid); + unlink(makeRGFilename(rgid).c_str()); + continue; + } + + // set RGData with number of not finalized rows which have been compacted at front of RGData + fRowGroupOut->setData(rgdata.get()); + fRowGroupOut->setRowCount(pos); + int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); + + // Release the memory used by the current rgdata from this MemoryManager. + fMM->release(memSz); + unlink(makeRGFilename(rgid).c_str()); + + // to periodically clean up freed memory so it can be used by other threads. + fLRU->remove(rgid); + return true; + } + return false; + } + /** @brief Returns next RGData, load it from disk if necessary. * * @returns pointer to the next RGData or empty pointer if there is nothing */ - std::unique_ptr getNextRGData() + RGDataUnPtr getNextRGData() { while (!fRGDatas.empty()) { @@ -1030,7 +1203,7 @@ class RowGroupStorage * @param fname(out) Filename of the dump if it's not in the memory * @returns true if there is available RGData */ - bool getNextRGData(std::unique_ptr& rgdata, std::string& fname) + bool getNextRGData(RGDataUnPtr& rgdata, std::string& fname) { if (UNLIKELY(fRGDatas.empty())) { @@ -1039,12 +1212,9 @@ class RowGroupStorage } while (!fRGDatas.empty()) { - uint64_t rgid = fRGDatas.size() - 1; - rgdata = std::move(fRGDatas[rgid]); - fRGDatas.pop_back(); + auto rgid = getLastRGData(rgdata); + auto [fgid, tgid] = calculateGids(rgid, fMaxRows); - uint64_t fgid = rgid * fMaxRows / 64; - uint64_t tgid = fgid + fMaxRows / 64; if (fFinalizedRows.size() > fgid) { if (tgid >= fFinalizedRows.size()) @@ -1068,45 +1238,7 @@ class RowGroupStorage continue; } - uint64_t pos = 0; - uint64_t opos = 0; - fRowGroupOut->setData(rgdata.get()); - for (auto i = fgid; i < tgid; ++i) - { - if ((i - fgid) * 64 >= fRowGroupOut->getRowCount()) - break; - uint64_t mask = ~fFinalizedRows[i]; - if ((i - fgid + 1) * 64 > fRowGroupOut->getRowCount()) - { - mask &= (~0ULL) >> ((i - fgid + 1) * 64 - fRowGroupOut->getRowCount()); - } - opos = (i - fgid) * 64; - if (mask == ~0ULL) - { - if (LIKELY(pos != opos)) - moveRows(rgdata.get(), pos, opos, 64); - pos += 64; - continue; - } - - if (mask == 0) - continue; - - while (mask != 0) - { - size_t b = __builtin_ffsll(mask); - size_t e = __builtin_ffsll(~(mask >> b)) + b; - if (UNLIKELY(e >= 64)) - mask = 0; - else - mask >>= e; - if (LIKELY(pos != opos + b - 1)) - moveRows(rgdata.get(), pos, opos + b - 1, e - b); - pos += e - b; - opos += e; - } - --opos; - } + auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid); if (pos == 0) { @@ -1119,6 +1251,7 @@ class RowGroupStorage fRowGroupOut->setRowCount(pos); } + // Release the memory used by the current rgdata. if (rgdata) { fRowGroupOut->setData(rgdata.get()); @@ -1130,6 +1263,7 @@ class RowGroupStorage { fname = makeRGFilename(rgid); } + // to periodically clean up freed memory so it can be used by other threads. fLRU->remove(rgid); return true; } @@ -1169,7 +1303,7 @@ class RowGroupStorage loadRG(rgid, fRGDatas[rgid]); } - void loadRG(uint64_t rgid, std::unique_ptr& rgdata, bool unlinkDump = false) + void loadRG(uint64_t rgid, RGDataUnPtr& rgdata, bool unlinkDump = false) { auto fname = makeRGFilename(rgid); @@ -1736,7 +1870,7 @@ void RowAggStorage::append(RowAggStorage& other) } } -std::unique_ptr RowAggStorage::getNextRGData() +RGDataUnPtr RowAggStorage::getNextRGData() { if (!fStorage) { @@ -1747,6 +1881,43 @@ std::unique_ptr RowAggStorage::getNextRGData() return fStorage->getNextRGData(); } +bool RowAggStorage::getNextOutputRGData(RGDataUnPtr& rgdata) +{ + if (!fStorage) + { + return {}; + } + + cleanup(); + freeData(); + + // fGeneration is an unsigned int, we need a signed int for a comparison >= 0 + int32_t gen = fGeneration; + while (gen >= 0) + { + bool moreInGeneration = fStorage->getNextOutputRGData(rgdata); + + if (moreInGeneration) + { + fRowGroupOut->setData(rgdata.get()); + return true; + } + + // all generations have been emptied + if (fGeneration == 0) + { + break; + } + + // current generation has no more RGDatas to return + // load earlier generation and continue with returning its RGDatas + gen--; + fGeneration--; + fStorage.reset(fStorage->clone(fGeneration)); + } + return false; +} + void RowAggStorage::freeData() { for (auto& data : fGens) diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h index 50151dc61..90a15b35c 100644 --- a/utils/rowgroup/rowstorage.h +++ b/utils/rowgroup/rowstorage.h @@ -20,6 +20,7 @@ #include "resourcemanager.h" #include "rowgroup.h" #include "idbcompress.h" +#include #include #include #include @@ -35,10 +36,15 @@ class RowPosHashStorage; using RowPosHashStoragePtr = std::unique_ptr; class RowGroupStorage; +using RGDataUnPtr = std::unique_ptr; +using PosOpos = std::pair; +using FgidTgid = std::pair; + uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol); constexpr const size_t MaxConstStrSize = 2048ULL; constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1; +constexpr const uint64_t HashMaskElements = 64ULL; class RowAggStorage { @@ -97,6 +103,12 @@ class RowAggStorage */ std::unique_ptr getNextRGData(); + /** @brief Remove last RGData from in-memory storage or disk. + * Iterates over all generations on disk if available. + * @returns True if RGData is returned in parameter or false if no more RGDatas can be returned. + */ + bool getNextOutputRGData(std::unique_ptr& rgdata); + /** @brief TODO * * @param mergeFunc diff --git a/versioning/BRM/sessionmanagerserver.cpp b/versioning/BRM/sessionmanagerserver.cpp index 427bafd69..bf0c7ccf9 100644 --- a/versioning/BRM/sessionmanagerserver.cpp +++ b/versioning/BRM/sessionmanagerserver.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include