diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index cb231965f..6978fda77 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -291,7 +291,6 @@ TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup fNumOfBuckets = calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(), fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation()); - fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets); fMemUsage.reset(new uint64_t[fNumOfThreads]); @@ -395,7 +394,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID) rowGroupIn->initRow(&rowIn); auto* subDistAgg = dynamic_cast(multiDist->subAggregators()[j].get()); - while (subDistAgg->nextOutputRowGroup()) + while (subDistAgg->nextRowGroup()) { rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup()); rgDataVec.emplace_back(subDistAgg->moveCurrentRGData()); @@ -419,7 +418,7 @@ void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID) rowGroupIn->initRow(&rowIn); auto* subAgg = dynamic_cast(aggDist->aggregator().get()); - while (subAgg->nextOutputRowGroup()) + while (subAgg->nextRowGroup()) { rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData()); rgDataVec.emplace_back(subAgg->moveCurrentRGData()); @@ -574,7 +573,7 @@ bool TupleAggregateStep::nextDeliveredRowGroup() { for (; fBucketNum < fNumOfBuckets; fBucketNum++) { - while (fAggregators[fBucketNum]->nextOutputRowGroup()) + while (fAggregators[fBucketNum]->nextRowGroup()) { fAggregators[fBucketNum]->finalize(); fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData()); @@ -5704,27 +5703,14 @@ void TupleAggregateStep::doAggregate() return; } -/** @brief Aggregate input row groups in two-phase multi-threaded aggregation. - * In second phase handle three different aggregation cases differently: - * 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one - * GROUP BY column - * 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column - * 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column - * DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp. - */ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp) { - // initialize return value variable + uint32_t i; + RGData rgData; uint64_t rowCount = 0; try { - /* - * Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns - * per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators == - * fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator. - */ - if (!fDoneAggregate) { initializeMultiThread(); @@ -5733,9 +5719,9 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp runners.reserve(fNumOfThreads); // to prevent a resize during use // Start the aggregator threads - for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++) + for (i = 0; i < fNumOfThreads; i++) { - runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum))); + runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, i))); } // Now wait for all those threads @@ -5749,28 +5735,18 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp // much memory on average uint32_t threads = std::max(1U, fNumOfThreads / 2); runners.reserve(threads); - for (uint32_t threadNum = 0; threadNum < threads; ++threadNum) + for (i = 0; i < threads; ++i) { - runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum))); + runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, i))); } jobstepThreadPool.join(runners); } - /* - * Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows - * that need to aggregated and output results. - */ - - auto* distinctAggregator = dynamic_cast(fAggregator.get()); - const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0; - - // Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column - // e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2; - if (distinctAggregator && hasGroupByColumns) + if (dynamic_cast(fAggregator.get()) && fAggregator->aggMapKeyLength() > 0) { + // 2nd phase multi-threaded aggregate if (!fEndOfResult) { - // Do multi-threaded second phase aggregation (per row group created for GROUP BY statement) if (!fDoneAggregate) { vector runners; // thread pool handles @@ -5778,114 +5754,97 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads; uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1); + // uint32_t bucketsPerThread = 1; + // uint32_t numThreads = fNumOfBuckets; runners.reserve(numThreads); - for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++) + for (i = 0; i < numThreads; i++) { runners.push_back(jobstepThreadPool.invoke( - ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread))); + ThreadedSecondPhaseAggregator(this, i * bucketsPerThread, bucketsPerThread))); } jobstepThreadPool.join(runners); } - // Deliver results fDoneAggregate = true; bool done = true; - while (nextDeliveredRowGroup() && !cancelled()) + + while (nextDeliveredRowGroup()) { done = false; rowCount = fRowGroupOut.getRowCount(); if (rowCount != 0) { - if (!cleanUpAndOutputRowGroup(bs, dlp)) + if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) + pruneAuxColumns(); + + if (dlp) + { + rgData = fRowGroupDelivered.duplicate(); + dlp->insert(rgData); + } + else + { + bs.restart(); + fRowGroupDelivered.serializeRGData(bs); break; + } } done = true; } if (done) - { fEndOfResult = true; - } } } - // Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column - // e.g. SELECT SUM(DISTINCT col1) FROM test; - else if (distinctAggregator) + else { + auto* agg = dynamic_cast(fAggregator.get()); + if (!fEndOfResult) { if (!fDoneAggregate) { - // Do aggregation over all row groups. As all row groups need to be aggregated together there is no - // easy way of multi-threading this and it's done in a single thread for now. - for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++) + for (i = 0; i < fNumOfBuckets; i++) { if (fEndOfResult == false) { - // The distinctAggregator accumulates the aggregation results of all row groups by being added - // all row groups of each bucket aggregator and doing an aggregation step after each addition. - auto* bucketMultiDistinctAggregator = - dynamic_cast(fAggregators[bucketNum].get()); - auto* bucketDistinctAggregator = - dynamic_cast(fAggregators[bucketNum].get()); - distinctAggregator->aggregator(bucketDistinctAggregator->aggregator()); - - if (bucketMultiDistinctAggregator) + // do the final aggregtion and deliver the results + // at least one RowGroup for aggregate results + // for "distinct without group by" case + if (agg != nullptr) { - (dynamic_cast(distinctAggregator)) - ->subAggregators(bucketMultiDistinctAggregator->subAggregators()); - } + auto* aggMultiDist = dynamic_cast(fAggregators[i].get()); + auto* aggDist = dynamic_cast(fAggregators[i].get()); + agg->aggregator(aggDist->aggregator()); - distinctAggregator->aggregator()->finalAggregation(); - distinctAggregator->doDistinctAggregation(); + if (aggMultiDist) + { + (dynamic_cast(agg)) + ->subAggregators(aggMultiDist->subAggregators()); + } + + agg->doDistinctAggregation(); + } + // for "group by without distinct" case + else + { + fAggregator->append(fAggregators[i].get()); + } } } } - // Deliver results fDoneAggregate = true; - bool done = true; - while (fAggregator->nextRowGroup() && !cancelled()) - { - done = false; - fAggregator->finalize(); - rowCount = fRowGroupOut.getRowCount(); - fRowsReturned += rowCount; - fRowGroupDelivered.setData(fRowGroupOut.getRGData()); - - if (rowCount != 0) - { - if (!cleanUpAndOutputRowGroup(bs, dlp)) - break; - } - done = true; - } - if (done) - fEndOfResult = true; - } - } - // CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column - // e.g. SELECT SUM(col1) FROM test GROUP BY col2; - // Do aggregation over all row groups. As all row groups need to be aggregated together there is no - // easy way of multi-threading this and it's done in a single thread for now. - else if (hasGroupByColumns) - { - if (!fEndOfResult && !fDoneAggregate) - { - for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; ++bucketNum) - { - fAggregator->append(fAggregators[bucketNum].get()); - } } - fDoneAggregate = true; bool done = true; + //@bug4459 while (fAggregator->nextRowGroup() && !cancelled()) { done = false; @@ -5896,9 +5855,22 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp if (rowCount != 0) { - if (!cleanUpAndOutputRowGroup(bs, dlp)) + if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) + pruneAuxColumns(); + + if (dlp) + { + rgData = fRowGroupDelivered.duplicate(); + dlp->insert(rgData); + } + else + { + bs.restart(); + fRowGroupDelivered.serializeRGData(bs); break; + } } + done = true; } @@ -5907,14 +5879,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp fEndOfResult = true; } } - else - { - throw logic_error( - "TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function " - "or " - "GROUP BY columns found. Should not reach here."); - } - } + } // try catch (...) { handleException(std::current_exception(), logging::tupleAggregateStepErr, @@ -5954,23 +5919,6 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp return rowCount; } -bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp) -{ - if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) - pruneAuxColumns(); - - if (dlp) - { - RGData rgData = fRowGroupDelivered.duplicate(); - dlp->insert(rgData); - return true; - } - - bs.restart(); - fRowGroupDelivered.serializeRGData(bs); - return false; -} - void TupleAggregateStep::pruneAuxColumns() { uint64_t rowCount = fRowGroupOut.getRowCount(); diff --git a/dbcon/joblist/tupleaggregatestep.h b/dbcon/joblist/tupleaggregatestep.h index 18dc22a91..a0ddcfdfb 100644 --- a/dbcon/joblist/tupleaggregatestep.h +++ b/dbcon/joblist/tupleaggregatestep.h @@ -161,7 +161,6 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep void doThreadedSecondPhaseAggregate(uint32_t threadID); bool nextDeliveredRowGroup(); void pruneAuxColumns(); - bool cleanUpAndOutputRowGroup(messageqcpp::ByteStream& bs, RowGroupDL* dlp); void formatMiniStats(); void printCalTrace(); template diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index 82051474a..2bf858f69 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -57,7 +57,7 @@ #include "rowstorage.h" //..comment out NDEBUG to enable assertions, uncomment NDEBUG to disable -// #define NDEBUG +//#define NDEBUG #include "mcs_decimal.h" using namespace std; @@ -315,7 +315,7 @@ void RowAggregation::updateStringMinMax(utils::NullString val1, utils::NullStrin if (val1.isNull()) { // as any comparison with NULL is false, it should not affect min/max ranges. - return; // do nothing. + return ; // do nothing. } CHARSET_INFO* cs = fRow.getCharset(col); int tmp = cs->strnncoll(val1.str(), val1.length(), val2.str(), val2.length()); @@ -810,9 +810,8 @@ void RowAggregation::aggregateRow(Row& row, const uint64_t* hash, std::vector* rgContextColl) { uint32_t cnt = fRollupFlag ? fGroupByCols.size() : 1; - for (uint32_t z = 0; z < cnt; z++) - { - // groupby column list is not empty, find the entry. + for (uint32_t z = 0; z < cnt; z++) { + // groupby column list is not empty, find the entry. if (!fGroupByCols.empty()) { bool is_new_row; @@ -857,8 +856,7 @@ void RowAggregation::aggregateRow(Row& row, const uint64_t* hash, updateEntry(row, rgContextColl); // these quantities are unsigned and comparing z and cnt - 1 can be incorrect // because cnt can be zero. - if ((z + 1 < cnt)) - { + if ((z + 1 < cnt)) { // if we are rolling up, we mark appropriate field as NULL and also increment // value in the "mark" column, so that we can differentiate between data and // various rollups. @@ -1171,8 +1169,8 @@ void RowAggregation::doMinMax(const Row& rowIn, int64_t colIn, int64_t colOut, i { if (LIKELY(rowIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH)) { - updateIntMinMax(rowIn.getTSInt128Field(colIn).getValue(), fRow.getTSInt128Field(colOut).getValue(), - colOut, funcType); + updateIntMinMax(rowIn.getTSInt128Field(colIn).getValue(), fRow.getTSInt128Field(colOut).getValue(), colOut, + funcType); } else if (rowIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH) { @@ -2122,9 +2120,10 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu long double mean = fRow.getLongDoubleField(colAux); long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1); volatile long double delta = valIn - mean; - mean += delta / count; + mean += delta/count; scaledMomentum2 += delta * (valIn - mean); + fRow.setDoubleField(count, colOut); fRow.setLongDoubleField(mean, colAux); fRow.setLongDoubleField(scaledMomentum2, colAux + 1); @@ -2174,7 +2173,8 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int cc = dynamic_cast(fFunctionCols[funcColsIdx]->fpConstCol.get()); } - if ((cc && cc->isNull()) || (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true)) + if ((cc && cc->isNull()) || + (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true)) { if (udafContextsColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) { @@ -2500,8 +2500,7 @@ void RowAggregation::loadEmptySet(messageqcpp::ByteStream& bs) //------------------------------------------------------------------------------ RowAggregationUM::RowAggregationUM(const vector& rowAggGroupByCols, const vector& rowAggFunctionCols, - joblist::ResourceManager* r, boost::shared_ptr sessionLimit, - bool withRollup) + joblist::ResourceManager* r, boost::shared_ptr sessionLimit, bool withRollup) : RowAggregation(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup) , fHasAvg(false) , fHasStatsFunc(false) @@ -3229,7 +3228,7 @@ void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut) case execplan::CalpontSystemCatalog::CHAR: case execplan::CalpontSystemCatalog::VARCHAR: - case execplan::CalpontSystemCatalog::TEXT: fRow.setStringField(strOut, colOut); break; + case execplan::CalpontSystemCatalog::TEXT: fRow.setStringField(strOut, colOut); break; case execplan::CalpontSystemCatalog::VARBINARY: case execplan::CalpontSystemCatalog::CLOB: @@ -4221,26 +4220,13 @@ bool RowAggregationUM::nextRowGroup() return more; } -bool RowAggregationUM::nextOutputRowGroup() -{ - bool more = fRowAggStorage->getNextOutputRGData(fCurRGData); - - if (more) - { - fRowGroupOut->setData(fCurRGData.get()); - } - - return more; -} - //------------------------------------------------------------------------------ // Row Aggregation constructor used on UM // For 2nd phase of two-phase case, from partial RG to final aggregated RG //------------------------------------------------------------------------------ RowAggregationUMP2::RowAggregationUMP2(const vector& rowAggGroupByCols, const vector& rowAggFunctionCols, - joblist::ResourceManager* r, boost::shared_ptr sessionLimit, - bool withRollup) + joblist::ResourceManager* r, boost::shared_ptr sessionLimit, bool withRollup) : RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup) { } @@ -4464,8 +4450,7 @@ void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, { if (LIKELY(cnt > 0)) { - int128_t valOut = fRow.getTSInt128Field(colOut).getValue(); - ; + int128_t valOut = fRow.getTSInt128Field(colOut).getValue();; int128_t sum = valOut + wideValue; fRow.setInt128Field(sum, colOut); fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux); @@ -4524,8 +4509,7 @@ void RowAggregationUMP2::doStatistics(const Row& rowIn, int64_t colIn, int64_t c { volatile long double delta = mean - blockMean; nextMean = (mean * count + blockMean * blockCount) / nextCount; - nextScaledMomentum2 = - scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount); + nextScaledMomentum2 = scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount); } fRow.setDoubleField(nextCount, colOut); fRow.setLongDoubleField(nextMean, colAux); @@ -4698,29 +4682,19 @@ void RowAggregationDistinct::addRowGroup(const RowGroup* pRows, //------------------------------------------------------------------------------ void RowAggregationDistinct::doDistinctAggregation() { - auto* umAggregator = dynamic_cast(fAggregator.get()); - if (umAggregator) + while (dynamic_cast(fAggregator.get())->nextRowGroup()) { - while (umAggregator->nextOutputRowGroup()) + fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData()); + + Row rowIn; + fRowGroupIn.initRow(&rowIn); + fRowGroupIn.getRow(0, &rowIn); + + for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i, rowIn.nextRow()) { - fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData()); - - Row rowIn; - fRowGroupIn.initRow(&rowIn); - fRowGroupIn.getRow(0, &rowIn); - - for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i, rowIn.nextRow()) - { - aggregateRow(rowIn); - } + aggregateRow(rowIn); } } - else - { - std::ostringstream errmsg; - errmsg << "RowAggregationDistinct: incorrect fAggregator class."; - cerr << errmsg.str() << endl; - } } void RowAggregationDistinct::doDistinctAggregation_rowVec(vector>& inRows) diff --git a/utils/rowgroup/rowaggregation.h b/utils/rowgroup/rowaggregation.h index 2441c23f2..a2246a14f 100644 --- a/utils/rowgroup/rowaggregation.h +++ b/utils/rowgroup/rowaggregation.h @@ -681,17 +681,6 @@ class RowAggregationUM : public RowAggregation */ bool nextRowGroup(); - /** @brief Returns aggregated rows in a RowGroup as long as there are still not returned result RowGroups. - * - * This function should be called repeatedly until false is returned (meaning end of data). - * Returns data from in-memory storage, as well as spilled data from disk. If disk-based aggregation is - * happening, finalAggregation() should be called before returning result RowGroups to finalize the used - * RowAggStorages, merge different spilled generations and obtain correct aggregation results. - * - * @returns True if there are more result RowGroups, else false if all results have been returned. - */ - bool nextOutputRowGroup(); - /** @brief Add an aggregator for DISTINCT aggregation */ void distinctAggregator(const boost::shared_ptr& da) diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index 9b9e47d1c..585dc8ed0 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include "rowgroup.h" #include #include @@ -80,11 +79,6 @@ std::string errorString(int errNo) auto* buf = strerror_r(errNo, tmp, sizeof(tmp)); return {buf}; } - -size_t findFirstSetBit(const uint64_t mask) -{ - return __builtin_ffsll(mask); -} } // anonymous namespace namespace rowgroup @@ -558,7 +552,7 @@ class Dumper class RowGroupStorage { public: - using RGDataStorage = std::vector; + using RGDataStorage = std::vector>; public: /** @brief Default constructor @@ -619,54 +613,6 @@ class RowGroupStorage return fRowGroupOut->getSizeWithStrings(fMaxRows); } - // This shifts data within RGData such that it compacts the non finalized rows - PosOpos shiftRowsInRowGroup(RGDataUnPtr& rgdata, uint64_t fgid, uint64_t tgid) - { - uint64_t pos = 0; - uint64_t opos = 0; - - fRowGroupOut->setData(rgdata.get()); - for (auto i = fgid; i < tgid; ++i) - { - if ((i - fgid) * HashMaskElements >= fRowGroupOut->getRowCount()) - break; - uint64_t mask = ~fFinalizedRows[i]; - if ((i - fgid + 1) * HashMaskElements > fRowGroupOut->getRowCount()) - { - mask &= (~0ULL) >> ((i - fgid + 1) * HashMaskElements - fRowGroupOut->getRowCount()); - } - opos = (i - fgid) * HashMaskElements; - - if (mask == ~0ULL) - { - if (LIKELY(pos != opos)) - moveRows(rgdata.get(), pos, opos, HashMaskElements); - pos += HashMaskElements; - continue; - } - - if (mask == 0) - continue; - - while (mask != 0) - { - // find position until block full of not finalized rows. - size_t b = findFirstSetBit(mask); - size_t e = findFirstSetBit(~(mask >> b)) + b; - if (UNLIKELY(e >= HashMaskElements)) - mask = 0; - else - mask >>= e; - if (LIKELY(pos != opos + b - 1)) - moveRows(rgdata.get(), pos, opos + b - 1, e - b); - pos += e - b; - opos += e; - } - --opos; - } - return {pos, opos}; - } - /** @brief Take away RGDatas from another RowGroupStorage * * If some of the RGDatas is not in the memory do not load them, @@ -680,7 +626,7 @@ class RowGroupStorage } void append(RowGroupStorage* o) { - RGDataUnPtr rgd; + std::unique_ptr rgd; std::string ofname; while (o->getNextRGData(rgd, ofname)) { @@ -720,130 +666,11 @@ class RowGroupStorage } } - /** @brief Get the last RGData from fRGDatas, remove it from the vector and return its id. - * - * @param rgdata The RGData to be retrieved - */ - uint64_t getLastRGData(RGDataUnPtr& rgdata) - { - assert(!fRGDatas.empty()); - uint64_t rgid = fRGDatas.size() - 1; - rgdata = std::move(fRGDatas[rgid]); - fRGDatas.pop_back(); - return rgid; - } - - static FgidTgid calculateGids(const uint64_t rgid, const uint64_t fMaxRows) - { - // Calculate from first and last uint64_t entry in fFinalizedRows BitMap - // which contains information about rows in the RGData. - uint64_t fgid = rgid * fMaxRows / HashMaskElements; - uint64_t tgid = fgid + fMaxRows / HashMaskElements; - return {fgid, tgid}; - } - - /** @brief Used to output aggregation results from memory and disk in the current generation in the form of - * RGData. Returns next RGData, loads from disk if necessary. Skips finalized rows as they would contain - * duplicate results, compacts actual rows into start of RGData and adapts number of rows transmitted in - * RGData. - * @returns A pointer to the next RGData or an empty pointer if there are no more RGDatas in this - * generation. - */ - bool getNextOutputRGData(RGDataUnPtr& rgdata) - { - if (UNLIKELY(fRGDatas.empty())) - { - fMM->release(); - return false; - } - - while (!fRGDatas.empty()) - { - auto rgid = getLastRGData(rgdata); - auto [fgid, tgid] = calculateGids(rgid, fMaxRows); - - if (fFinalizedRows.size() <= fgid) - { - // There are no finalized rows in this RGData. We can just return it. - // Load from disk if necessary and unlink DumpFile. - if (!rgdata) - { - loadRG(rgid, rgdata, true); - } - return true; - } - - if (tgid >= fFinalizedRows.size()) - fFinalizedRows.resize(tgid + 1, 0ULL); - - // Check if there are rows to process - bool hasReturnRows = false; - for (auto i = fgid; i < tgid; ++i) - { - if (fFinalizedRows[i] != ~0ULL) - { - // Not all rows are finalized, we have to return at least parts of this RGData - hasReturnRows = true; - break; - } - } - - if (rgdata) - { - // RGData is currently in memory - if (!hasReturnRows) - { - // All rows are finalized, don't return this RGData - continue; - } - } - else - { - if (hasReturnRows) - { - // Load RGData from disk, unlink dump file and continue processing - loadRG(rgid, rgdata, true); - } - else - { - // All rows are finalized. Unlink dump file and continue search for return RGData - unlink(makeRGFilename(rgid).c_str()); - continue; - } - } - - auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid); - - // Nothing got shifted at all -> all rows must be finalized. If all rows finalized remove - // RGData and file and don't give it out. - if (pos == 0) - { - fLRU->remove(rgid); - unlink(makeRGFilename(rgid).c_str()); - continue; - } - - // set RGData with number of not finalized rows which have been compacted at front of RGData - fRowGroupOut->setData(rgdata.get()); - fRowGroupOut->setRowCount(pos); - int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); - - // Release the memory used by the current rgdata from this MemoryManager. - fMM->release(memSz); - unlink(makeRGFilename(rgid).c_str()); - - // to periodically clean up freed memory so it can be used by other threads. - fLRU->remove(rgid); - return true; - } - return false; - } - /** @brief Returns next RGData, load it from disk if necessary. * * @returns pointer to the next RGData or empty pointer if there is nothing */ - RGDataUnPtr getNextRGData() + std::unique_ptr getNextRGData() { while (!fRGDatas.empty()) { @@ -1203,7 +1030,7 @@ class RowGroupStorage * @param fname(out) Filename of the dump if it's not in the memory * @returns true if there is available RGData */ - bool getNextRGData(RGDataUnPtr& rgdata, std::string& fname) + bool getNextRGData(std::unique_ptr& rgdata, std::string& fname) { if (UNLIKELY(fRGDatas.empty())) { @@ -1212,9 +1039,12 @@ class RowGroupStorage } while (!fRGDatas.empty()) { - auto rgid = getLastRGData(rgdata); - auto [fgid, tgid] = calculateGids(rgid, fMaxRows); + uint64_t rgid = fRGDatas.size() - 1; + rgdata = std::move(fRGDatas[rgid]); + fRGDatas.pop_back(); + uint64_t fgid = rgid * fMaxRows / 64; + uint64_t tgid = fgid + fMaxRows / 64; if (fFinalizedRows.size() > fgid) { if (tgid >= fFinalizedRows.size()) @@ -1238,7 +1068,45 @@ class RowGroupStorage continue; } - auto [pos, opos] = shiftRowsInRowGroup(rgdata, fgid, tgid); + uint64_t pos = 0; + uint64_t opos = 0; + fRowGroupOut->setData(rgdata.get()); + for (auto i = fgid; i < tgid; ++i) + { + if ((i - fgid) * 64 >= fRowGroupOut->getRowCount()) + break; + uint64_t mask = ~fFinalizedRows[i]; + if ((i - fgid + 1) * 64 > fRowGroupOut->getRowCount()) + { + mask &= (~0ULL) >> ((i - fgid + 1) * 64 - fRowGroupOut->getRowCount()); + } + opos = (i - fgid) * 64; + if (mask == ~0ULL) + { + if (LIKELY(pos != opos)) + moveRows(rgdata.get(), pos, opos, 64); + pos += 64; + continue; + } + + if (mask == 0) + continue; + + while (mask != 0) + { + size_t b = __builtin_ffsll(mask); + size_t e = __builtin_ffsll(~(mask >> b)) + b; + if (UNLIKELY(e >= 64)) + mask = 0; + else + mask >>= e; + if (LIKELY(pos != opos + b - 1)) + moveRows(rgdata.get(), pos, opos + b - 1, e - b); + pos += e - b; + opos += e; + } + --opos; + } if (pos == 0) { @@ -1251,7 +1119,6 @@ class RowGroupStorage fRowGroupOut->setRowCount(pos); } - // Release the memory used by the current rgdata. if (rgdata) { fRowGroupOut->setData(rgdata.get()); @@ -1263,7 +1130,6 @@ class RowGroupStorage { fname = makeRGFilename(rgid); } - // to periodically clean up freed memory so it can be used by other threads. fLRU->remove(rgid); return true; } @@ -1303,7 +1169,7 @@ class RowGroupStorage loadRG(rgid, fRGDatas[rgid]); } - void loadRG(uint64_t rgid, RGDataUnPtr& rgdata, bool unlinkDump = false) + void loadRG(uint64_t rgid, std::unique_ptr& rgdata, bool unlinkDump = false) { auto fname = makeRGFilename(rgid); @@ -1871,7 +1737,7 @@ void RowAggStorage::append(RowAggStorage& other) } } -RGDataUnPtr RowAggStorage::getNextRGData() +std::unique_ptr RowAggStorage::getNextRGData() { if (!fStorage) { @@ -1882,43 +1748,6 @@ RGDataUnPtr RowAggStorage::getNextRGData() return fStorage->getNextRGData(); } -bool RowAggStorage::getNextOutputRGData(RGDataUnPtr& rgdata) -{ - if (!fStorage) - { - return {}; - } - - cleanup(); - freeData(); - - // fGeneration is an unsigned int, we need a signed int for a comparison >= 0 - int32_t gen = fGeneration; - while (gen >= 0) - { - bool moreInGeneration = fStorage->getNextOutputRGData(rgdata); - - if (moreInGeneration) - { - fRowGroupOut->setData(rgdata.get()); - return true; - } - - // all generations have been emptied - if (fGeneration == 0) - { - break; - } - - // current generation has no more RGDatas to return - // load earlier generation and continue with returning its RGDatas - gen--; - fGeneration--; - fStorage.reset(fStorage->clone(fGeneration)); - } - return false; -} - void RowAggStorage::freeData() { for (auto& data : fGens) diff --git a/utils/rowgroup/rowstorage.h b/utils/rowgroup/rowstorage.h index 90a15b35c..50151dc61 100644 --- a/utils/rowgroup/rowstorage.h +++ b/utils/rowgroup/rowstorage.h @@ -20,7 +20,6 @@ #include "resourcemanager.h" #include "rowgroup.h" #include "idbcompress.h" -#include #include #include #include @@ -36,15 +35,10 @@ class RowPosHashStorage; using RowPosHashStoragePtr = std::unique_ptr; class RowGroupStorage; -using RGDataUnPtr = std::unique_ptr; -using PosOpos = std::pair; -using FgidTgid = std::pair; - uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol); constexpr const size_t MaxConstStrSize = 2048ULL; constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1; -constexpr const uint64_t HashMaskElements = 64ULL; class RowAggStorage { @@ -103,12 +97,6 @@ class RowAggStorage */ std::unique_ptr getNextRGData(); - /** @brief Remove last RGData from in-memory storage or disk. - * Iterates over all generations on disk if available. - * @returns True if RGData is returned in parameter or false if no more RGDatas can be returned. - */ - bool getNextOutputRGData(std::unique_ptr& rgdata); - /** @brief TODO * * @param mergeFunc diff --git a/versioning/BRM/sessionmanagerserver.cpp b/versioning/BRM/sessionmanagerserver.cpp index bf0c7ccf9..427bafd69 100644 --- a/versioning/BRM/sessionmanagerserver.cpp +++ b/versioning/BRM/sessionmanagerserver.cpp @@ -28,7 +28,6 @@ #include #include #include -#include #include #include