From c00daa93bd0ac4f12f8e2423f6ff4b512657a4a7 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Fri, 16 Oct 2020 12:13:00 +0000 Subject: [PATCH] MCOL-4172 MultiDistinctRowAggregation didn't honor multiple UDAF in projection ::doUDAF() doesn't crash anymore trying to access fRGContextColl[] elements that doesn't exist running RowAggregationMultiDistinct::doAggregate() --- utils/rowgroup/rowaggregation.cpp | 112 ++++++++++++++++++++---------- utils/rowgroup/rowaggregation.h | 45 ++++++++---- 2 files changed, 106 insertions(+), 51 deletions(-) diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index f51fcd022..05c30f6cb 100755 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -772,6 +772,9 @@ void RowAggregation::initialize() // save the original output rowgroup data as primary row data fPrimaryRowData = fRowGroupOut->getRGData(); + // Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx + fRGContextColl.resize(fFunctionCols.size()); + // Need map only if groupby list is not empty. if (!fGroupByCols.empty()) { @@ -784,8 +787,6 @@ void RowAggregation::initialize() { fRowGroupOut->setRowCount(1); attachGroupConcatAg(); - // Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx - fRGContextColl.resize(fFunctionCols.size()); // For UDAF, reset the data for (uint64_t i = 0; i < fFunctionCols.size(); i++) { @@ -867,7 +868,8 @@ void RowAggregationUM::aggReset() } -void RowAggregationUM::aggregateRowWithRemap(Row& row) +void RowAggregationUM::aggregateRowWithRemap(Row& row, + std::vector* rgContextColl) { pair inserted; RowPosition pos(RowPosition::MSB, 0); @@ -925,20 +927,22 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row) fResultDataVec[pos.group]->getRow(pos.row, &fRow); } - updateEntry(row); + updateEntry(row, rgContextColl); } -void RowAggregationUM::aggregateRow(Row& row) +void RowAggregationUM::aggregateRow(Row& row, + std::vector* rgContextColl) { if (UNLIKELY(fKeyOnHeap)) - aggregateRowWithRemap(row); + aggregateRowWithRemap(row, rgContextColl); else - RowAggregation::aggregateRow(row); + RowAggregation::aggregateRow(row, rgContextColl); } -void RowAggregation::aggregateRow(Row& row) +void RowAggregation::aggregateRow(Row& row, + std::vector* rgContextColl) { // groupby column list is not empty, find the entry. if (!fGroupByCols.empty()) @@ -1002,7 +1006,7 @@ void RowAggregation::aggregateRow(Row& row) } } - updateEntry(row); + updateEntry(row, rgContextColl); } @@ -1804,8 +1808,10 @@ void RowAggregation::deserialize(messageqcpp::ByteStream& bs) // NULL values are recognized and ignored for all agg functions except for // COUNT(*), which counts all rows regardless of value. // rowIn(in) - Row to be included in aggregation. +// rgContextColl(in) - ptr to a vector of UDAF contexts //------------------------------------------------------------------------------ -void RowAggregation::updateEntry(const Row& rowIn) +void RowAggregation::updateEntry(const Row& rowIn, + std::vector* rgContextColl) { for (uint64_t i = 0; i < fFunctionCols.size(); i++) { @@ -1862,7 +1868,7 @@ void RowAggregation::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - doUDAF(rowIn, colIn, colOut, colOut + 1, i); + doUDAF(rowIn, colIn, colOut, colOut + 1, i, rgContextColl); break; } @@ -2087,10 +2093,21 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + valIn * valIn, colAux + 1); } -void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, - int64_t colAux, uint64_t& funcColsIdx) +void RowAggregation::doUDAF(const Row& rowIn, + int64_t colIn, + int64_t colOut, + int64_t colAux, + uint64_t& funcColsIdx, + std::vector* rgContextColl) { - uint32_t paramCount = fRGContextColl[funcColsIdx].getParameterCount(); + std::vector* udafContextsCollPtr = &fRGContextColl; + if (UNLIKELY(rgContextColl != nullptr)) + { + udafContextsCollPtr = rgContextColl; + } + + std::vector& udafContextsColl = *udafContextsCollPtr; + uint32_t paramCount = udafContextsColl[funcColsIdx].getParameterCount(); // doUDAF changes funcColsIdx to skip UDAF arguments so the real UDAF // column idx is the initial value of the funcColsIdx uint64_t origFuncColsIdx = funcColsIdx; @@ -2119,7 +2136,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, if ((cc && cc->type() == execplan::ConstantColumn::NULLDATA) || (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true)) { - if (fRGContextColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) + if (udafContextsColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) { // When Ignore nulls, if there are multiple parameters and any // one of them is NULL, we ignore the entry. We need to increment @@ -2361,7 +2378,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, default: { std::ostringstream errmsg; - errmsg << "RowAggregation " << fRGContextColl[origFuncColsIdx].getName() << + errmsg << "RowAggregation " << udafContextsColl[origFuncColsIdx].getName() << ": No logic for data type: " << colDataType; throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr); break; @@ -2387,19 +2404,19 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, } // The intermediate values are stored in userData referenced by colAux. - fRGContextColl[origFuncColsIdx].setDataFlags(dataFlags); - fRGContextColl[origFuncColsIdx].setUserData(fRow.getUserData(colAux)); + udafContextsColl[origFuncColsIdx].setDataFlags(dataFlags); + udafContextsColl[origFuncColsIdx].setUserData(fRow.getUserData(colAux)); mcsv1sdk::mcsv1_UDAF::ReturnCode rc; - rc = fRGContextColl[origFuncColsIdx].getFunction()->nextValue(&fRGContextColl[origFuncColsIdx], + rc = udafContextsColl[origFuncColsIdx].getFunction()->nextValue(&udafContextsColl[origFuncColsIdx], valsIn); - fRGContextColl[origFuncColsIdx].setUserData(NULL); + udafContextsColl[origFuncColsIdx].setUserData(NULL); if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[origFuncColsIdx].get()); rowUDAF->bInterrupted = true; - throw logging::QueryDataExcept(fRGContextColl[origFuncColsIdx].getErrorMessage(), + throw logging::QueryDataExcept(udafContextsColl[origFuncColsIdx].getErrorMessage(), logging::aggregateFuncErr); } } @@ -2630,8 +2647,10 @@ void RowAggregationUM::attachGroupConcatAg() // Update the aggregation totals in the internal hashmap for the specified row. // NULL values are recognized and ignored for all agg functions except for count // rowIn(in) - Row to be included in aggregation. +// rgContextColl(in) - ptr to a vector of UDAF contexts //------------------------------------------------------------------------------ -void RowAggregationUM::updateEntry(const Row& rowIn) +void RowAggregationUM::updateEntry(const Row& rowIn, + std::vector* rgContextColl) { for (uint64_t i = 0; i < fFunctionCols.size(); i++) { @@ -2699,7 +2718,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - doUDAF(rowIn, colIn, colOut, colAux, i); + doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl); break; } @@ -3237,7 +3256,6 @@ void RowAggregationUM::calculateUDAFColumns() RowUDAFFunctionCol* rowUDAF = NULL; static_any::any valOut; - for (uint64_t i = 0; i < fFunctionCols.size(); i++) { if (fFunctionCols[i]->fAggFunction != ROWAGG_UDAF) @@ -4281,8 +4299,10 @@ RowAggregationUMP2::~RowAggregationUMP2() // Update the aggregation totals in the internal hashmap for the specified row. // NULL values are recognized and ignored for all agg functions except for count // rowIn(in) - Row to be included in aggregation. +// rgContextColl(in) - ptr to a vector of UDAF contexts //------------------------------------------------------------------------------ -void RowAggregationUMP2::updateEntry(const Row& rowIn) +void RowAggregationUMP2::updateEntry(const Row& rowIn, + std::vector* rgContextColl) { for (uint64_t i = 0; i < fFunctionCols.size(); i++) { @@ -4348,7 +4368,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - doUDAF(rowIn, colIn, colOut, colAux, i); + doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl); break; } @@ -4560,11 +4580,23 @@ void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut // colOut(in) - column in the output row group // colAux(in) - Where the UDAF userdata resides // rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance +// rgContextColl(in) - ptr to a vector that brings UDAF contextx in //------------------------------------------------------------------------------ -void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, - int64_t colAux, uint64_t& funcColsIdx) +void RowAggregationUMP2::doUDAF(const Row& rowIn, + int64_t colIn, + int64_t colOut, + int64_t colAux, + uint64_t& funcColsIdx, + std::vector* rgContextColl) { static_any::any valOut; + std::vector* udafContextsCollPtr = &fRGContextColl; + if (UNLIKELY(rgContextColl != nullptr)) + { + udafContextsCollPtr = rgContextColl; + } + + std::vector& udafContextsColl = *udafContextsCollPtr; // Get the user data boost::shared_ptr userDataIn = rowIn.getUserData(colIn + 1); @@ -4577,7 +4609,7 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, if (!userDataIn) { - if (fRGContextColl[funcColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) + if (udafContextsColl[funcColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) { return; } @@ -4586,14 +4618,14 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, flags[0] |= mcsv1sdk::PARAM_IS_NULL; } - fRGContextColl[funcColsIdx].setDataFlags(flags); + udafContextsColl[funcColsIdx].setDataFlags(flags); // The intermediate values are stored in colAux. - fRGContextColl[funcColsIdx].setUserData(fRow.getUserData(colAux)); + udafContextsColl[funcColsIdx].setUserData(fRow.getUserData(colAux)); // Call the UDAF subEvaluate method mcsv1sdk::mcsv1_UDAF::ReturnCode rc; - rc = fRGContextColl[funcColsIdx].getFunction()->subEvaluate(&fRGContextColl[funcColsIdx], userDataIn.get()); + rc = udafContextsColl[funcColsIdx].getFunction()->subEvaluate(&udafContextsColl[funcColsIdx], userDataIn.get()); fRGContext.setUserData(NULL); if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) @@ -4712,8 +4744,10 @@ void RowAggregationDistinct::doDistinctAggregation_rowVec(vector& // Update the aggregation totals in the internal hashmap for the specified row. // for non-DISTINCT columns works partially aggregated results // rowIn(in) - Row to be included in aggregation. +// rgContextColl(in) - ptr to a vector of UDAF contexts //------------------------------------------------------------------------------ -void RowAggregationDistinct::updateEntry(const Row& rowIn) +void RowAggregationDistinct::updateEntry(const Row& rowIn, + std::vector* rgContextColl) { for (uint64_t i = 0; i < fFunctionCols.size(); i++) { @@ -4795,7 +4829,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - doUDAF(rowIn, colIn, colOut, colAux, i); + doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl); break; } @@ -5083,6 +5117,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation() { fFunctionCols = fSubFunctions[i]; fRowGroupIn = fSubRowGroups[i]; + auto* rgContextColl = fSubAggregators[i]->rgContextColl(); Row rowIn; fRowGroupIn.initRow(&rowIn); @@ -5098,14 +5133,14 @@ void RowAggregationMultiDistinct::doDistinctAggregation() for (uint64_t j = 0; j < fRowGroupIn.getRowCount(); ++j, rowIn.nextRow()) { - aggregateRow(rowIn); + aggregateRow(rowIn, rgContextColl); } } } // restore the function column vector fFunctionCols = origFunctionCols; - fOrigFunctionCols = NULL; + fOrigFunctionCols = nullptr; } @@ -5120,20 +5155,21 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vectorrgContextColl(); Row rowIn; fRowGroupIn.initRow(&rowIn); for (uint64_t j = 0; j < inRows[i].size(); ++j) { rowIn.setData(inRows[i][j]); - aggregateRow(rowIn); + aggregateRow(rowIn, rgContextColl); } inRows[i].clear(); } // restore the function column vector fFunctionCols = origFunctionCols; - fOrigFunctionCols = NULL; + fOrigFunctionCols = nullptr; } diff --git a/utils/rowgroup/rowaggregation.h b/utils/rowgroup/rowaggregation.h index 34cd6fd03..d41cd87f8 100644 --- a/utils/rowgroup/rowaggregation.h +++ b/utils/rowgroup/rowaggregation.h @@ -605,7 +605,8 @@ public: return fResultDataVec; } - virtual void aggregateRow(Row& row); + virtual void aggregateRow(Row& row, + std::vector* rgContextColl = nullptr); inline uint32_t aggMapKeyLength() { return fAggMapKeyCount; @@ -619,19 +620,29 @@ public: { return fTimeZone; } + inline std::vector* rgContextColl() + { + return &fRGContextColl; + } protected: virtual void initialize(); virtual void initMapData(const Row& row); virtual void attachGroupConcatAg(); - virtual void updateEntry(const Row& row); + virtual void updateEntry(const Row& row, + std::vector* rgContextColl = nullptr); virtual void doMinMax(const Row&, int64_t, int64_t, int); virtual void doSum(const Row&, int64_t, int64_t, int); virtual void doAvg(const Row&, int64_t, int64_t, int64_t); virtual void doStatistics(const Row&, int64_t, int64_t, int64_t); virtual void doBitOp(const Row&, int64_t, int64_t, int); - virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx); + virtual void doUDAF(const Row&, + int64_t, + int64_t, + int64_t, + uint64_t& funcColsIdx, + std::vector* rgContextColl = nullptr); virtual bool countSpecial(const RowGroup* pRG) { fRow.setUintField<8>(fRow.getUintField<8>(0) + pRG->getRowCount(), 0); @@ -823,19 +834,21 @@ public: return fGroupConcat; } - void aggregateRow(Row&); - //void initialize(); + void aggregateRow(Row&, + std::vector* rgContextColl = nullptr) override; virtual void aggReset(); void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut); protected: - // virtual methods from base - void initialize(); - void aggregateRowWithRemap(Row&); + void initialize() override; + void updateEntry(const Row& row, + std::vector* rgContextColl = nullptr) override; + + void aggregateRowWithRemap(Row&, + std::vector* rgContextColl = nullptr); void attachGroupConcatAg(); - void updateEntry(const Row& row); bool countSpecial(const RowGroup* pRG) { fRow.setIntField<8>( @@ -948,12 +961,18 @@ public: protected: // virtual methods from base - void updateEntry(const Row& row); + void updateEntry(const Row& row, + std::vector* rgContextColl = nullptr) override; void doAvg(const Row&, int64_t, int64_t, int64_t); void doStatistics(const Row&, int64_t, int64_t, int64_t); void doGroupConcat(const Row&, int64_t, int64_t); void doBitOp(const Row&, int64_t, int64_t, int); - void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx); + void doUDAF(const Row&, + int64_t, + int64_t, + int64_t, + uint64_t& funcColsIdx, + std::vector* rgContextColl = nullptr) override; bool countSpecial(const RowGroup* pRG) { return false; @@ -1021,8 +1040,8 @@ public: } protected: - // virtual methods from base - void updateEntry(const Row& row); + void updateEntry(const Row& row, + std::vector* rgContextColl = nullptr) override; boost::shared_ptr fAggregator; RowGroup fRowGroupDist;