You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-4172 MultiDistinctRowAggregation didn't honor multiple UDAF in projection
::doUDAF() doesn't crash anymore trying to access fRGContextColl[] elements that doesn't exist running RowAggregationMultiDistinct::doAggregate()
This commit is contained in:
@ -772,6 +772,9 @@ void RowAggregation::initialize()
|
||||
// save the original output rowgroup data as primary row data
|
||||
fPrimaryRowData = fRowGroupOut->getRGData();
|
||||
|
||||
// Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx
|
||||
fRGContextColl.resize(fFunctionCols.size());
|
||||
|
||||
// Need map only if groupby list is not empty.
|
||||
if (!fGroupByCols.empty())
|
||||
{
|
||||
@ -784,8 +787,6 @@ void RowAggregation::initialize()
|
||||
{
|
||||
fRowGroupOut->setRowCount(1);
|
||||
attachGroupConcatAg();
|
||||
// Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx
|
||||
fRGContextColl.resize(fFunctionCols.size());
|
||||
// For UDAF, reset the data
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
@ -867,7 +868,8 @@ void RowAggregationUM::aggReset()
|
||||
}
|
||||
|
||||
|
||||
void RowAggregationUM::aggregateRowWithRemap(Row& row)
|
||||
void RowAggregationUM::aggregateRowWithRemap(Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
pair<ExtKeyMap_t::iterator, bool> inserted;
|
||||
RowPosition pos(RowPosition::MSB, 0);
|
||||
@ -925,20 +927,22 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
|
||||
fResultDataVec[pos.group]->getRow(pos.row, &fRow);
|
||||
}
|
||||
|
||||
updateEntry(row);
|
||||
updateEntry(row, rgContextColl);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void RowAggregationUM::aggregateRow(Row& row)
|
||||
void RowAggregationUM::aggregateRow(Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
if (UNLIKELY(fKeyOnHeap))
|
||||
aggregateRowWithRemap(row);
|
||||
aggregateRowWithRemap(row, rgContextColl);
|
||||
else
|
||||
RowAggregation::aggregateRow(row);
|
||||
RowAggregation::aggregateRow(row, rgContextColl);
|
||||
}
|
||||
|
||||
void RowAggregation::aggregateRow(Row& row)
|
||||
void RowAggregation::aggregateRow(Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
// groupby column list is not empty, find the entry.
|
||||
if (!fGroupByCols.empty())
|
||||
@ -1002,7 +1006,7 @@ void RowAggregation::aggregateRow(Row& row)
|
||||
}
|
||||
}
|
||||
|
||||
updateEntry(row);
|
||||
updateEntry(row, rgContextColl);
|
||||
}
|
||||
|
||||
|
||||
@ -1804,8 +1808,10 @@ void RowAggregation::deserialize(messageqcpp::ByteStream& bs)
|
||||
// NULL values are recognized and ignored for all agg functions except for
|
||||
// COUNT(*), which counts all rows regardless of value.
|
||||
// rowIn(in) - Row to be included in aggregation.
|
||||
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
||||
//------------------------------------------------------------------------------
|
||||
void RowAggregation::updateEntry(const Row& rowIn)
|
||||
void RowAggregation::updateEntry(const Row& rowIn,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
@ -1862,7 +1868,7 @@ void RowAggregation::updateEntry(const Row& rowIn)
|
||||
|
||||
case ROWAGG_UDAF:
|
||||
{
|
||||
doUDAF(rowIn, colIn, colOut, colOut + 1, i);
|
||||
doUDAF(rowIn, colIn, colOut, colOut + 1, i, rgContextColl);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -2087,10 +2093,21 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
|
||||
fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + valIn * valIn, colAux + 1);
|
||||
}
|
||||
|
||||
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
int64_t colAux, uint64_t& funcColsIdx)
|
||||
void RowAggregation::doUDAF(const Row& rowIn,
|
||||
int64_t colIn,
|
||||
int64_t colOut,
|
||||
int64_t colAux,
|
||||
uint64_t& funcColsIdx,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
uint32_t paramCount = fRGContextColl[funcColsIdx].getParameterCount();
|
||||
std::vector<mcsv1sdk::mcsv1Context>* udafContextsCollPtr = &fRGContextColl;
|
||||
if (UNLIKELY(rgContextColl != nullptr))
|
||||
{
|
||||
udafContextsCollPtr = rgContextColl;
|
||||
}
|
||||
|
||||
std::vector<mcsv1sdk::mcsv1Context>& udafContextsColl = *udafContextsCollPtr;
|
||||
uint32_t paramCount = udafContextsColl[funcColsIdx].getParameterCount();
|
||||
// doUDAF changes funcColsIdx to skip UDAF arguments so the real UDAF
|
||||
// column idx is the initial value of the funcColsIdx
|
||||
uint64_t origFuncColsIdx = funcColsIdx;
|
||||
@ -2119,7 +2136,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
if ((cc && cc->type() == execplan::ConstantColumn::NULLDATA)
|
||||
|| (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
|
||||
{
|
||||
if (fRGContextColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
if (udafContextsColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
{
|
||||
// When Ignore nulls, if there are multiple parameters and any
|
||||
// one of them is NULL, we ignore the entry. We need to increment
|
||||
@ -2361,7 +2378,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
default:
|
||||
{
|
||||
std::ostringstream errmsg;
|
||||
errmsg << "RowAggregation " << fRGContextColl[origFuncColsIdx].getName() <<
|
||||
errmsg << "RowAggregation " << udafContextsColl[origFuncColsIdx].getName() <<
|
||||
": No logic for data type: " << colDataType;
|
||||
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
||||
break;
|
||||
@ -2387,19 +2404,19 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
}
|
||||
|
||||
// The intermediate values are stored in userData referenced by colAux.
|
||||
fRGContextColl[origFuncColsIdx].setDataFlags(dataFlags);
|
||||
fRGContextColl[origFuncColsIdx].setUserData(fRow.getUserData(colAux));
|
||||
udafContextsColl[origFuncColsIdx].setDataFlags(dataFlags);
|
||||
udafContextsColl[origFuncColsIdx].setUserData(fRow.getUserData(colAux));
|
||||
|
||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||
rc = fRGContextColl[origFuncColsIdx].getFunction()->nextValue(&fRGContextColl[origFuncColsIdx],
|
||||
rc = udafContextsColl[origFuncColsIdx].getFunction()->nextValue(&udafContextsColl[origFuncColsIdx],
|
||||
valsIn);
|
||||
fRGContextColl[origFuncColsIdx].setUserData(NULL);
|
||||
udafContextsColl[origFuncColsIdx].setUserData(NULL);
|
||||
|
||||
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
||||
{
|
||||
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[origFuncColsIdx].get());
|
||||
rowUDAF->bInterrupted = true;
|
||||
throw logging::QueryDataExcept(fRGContextColl[origFuncColsIdx].getErrorMessage(),
|
||||
throw logging::QueryDataExcept(udafContextsColl[origFuncColsIdx].getErrorMessage(),
|
||||
logging::aggregateFuncErr);
|
||||
}
|
||||
}
|
||||
@ -2630,8 +2647,10 @@ void RowAggregationUM::attachGroupConcatAg()
|
||||
// Update the aggregation totals in the internal hashmap for the specified row.
|
||||
// NULL values are recognized and ignored for all agg functions except for count
|
||||
// rowIn(in) - Row to be included in aggregation.
|
||||
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
||||
//------------------------------------------------------------------------------
|
||||
void RowAggregationUM::updateEntry(const Row& rowIn)
|
||||
void RowAggregationUM::updateEntry(const Row& rowIn,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
@ -2699,7 +2718,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn)
|
||||
|
||||
case ROWAGG_UDAF:
|
||||
{
|
||||
doUDAF(rowIn, colIn, colOut, colAux, i);
|
||||
doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -3237,7 +3256,6 @@ void RowAggregationUM::calculateUDAFColumns()
|
||||
RowUDAFFunctionCol* rowUDAF = NULL;
|
||||
static_any::any valOut;
|
||||
|
||||
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction != ROWAGG_UDAF)
|
||||
@ -4281,8 +4299,10 @@ RowAggregationUMP2::~RowAggregationUMP2()
|
||||
// Update the aggregation totals in the internal hashmap for the specified row.
|
||||
// NULL values are recognized and ignored for all agg functions except for count
|
||||
// rowIn(in) - Row to be included in aggregation.
|
||||
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
||||
//------------------------------------------------------------------------------
|
||||
void RowAggregationUMP2::updateEntry(const Row& rowIn)
|
||||
void RowAggregationUMP2::updateEntry(const Row& rowIn,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
@ -4348,7 +4368,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn)
|
||||
|
||||
case ROWAGG_UDAF:
|
||||
{
|
||||
doUDAF(rowIn, colIn, colOut, colAux, i);
|
||||
doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -4560,11 +4580,23 @@ void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut
|
||||
// colOut(in) - column in the output row group
|
||||
// colAux(in) - Where the UDAF userdata resides
|
||||
// rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance
|
||||
// rgContextColl(in) - ptr to a vector that brings UDAF contextx in
|
||||
//------------------------------------------------------------------------------
|
||||
void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
int64_t colAux, uint64_t& funcColsIdx)
|
||||
void RowAggregationUMP2::doUDAF(const Row& rowIn,
|
||||
int64_t colIn,
|
||||
int64_t colOut,
|
||||
int64_t colAux,
|
||||
uint64_t& funcColsIdx,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
static_any::any valOut;
|
||||
std::vector<mcsv1sdk::mcsv1Context>* udafContextsCollPtr = &fRGContextColl;
|
||||
if (UNLIKELY(rgContextColl != nullptr))
|
||||
{
|
||||
udafContextsCollPtr = rgContextColl;
|
||||
}
|
||||
|
||||
std::vector<mcsv1sdk::mcsv1Context>& udafContextsColl = *udafContextsCollPtr;
|
||||
|
||||
// Get the user data
|
||||
boost::shared_ptr<mcsv1sdk::UserData> userDataIn = rowIn.getUserData(colIn + 1);
|
||||
@ -4577,7 +4609,7 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
|
||||
if (!userDataIn)
|
||||
{
|
||||
if (fRGContextColl[funcColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
if (udafContextsColl[funcColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
{
|
||||
return;
|
||||
}
|
||||
@ -4586,14 +4618,14 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
flags[0] |= mcsv1sdk::PARAM_IS_NULL;
|
||||
}
|
||||
|
||||
fRGContextColl[funcColsIdx].setDataFlags(flags);
|
||||
udafContextsColl[funcColsIdx].setDataFlags(flags);
|
||||
|
||||
// The intermediate values are stored in colAux.
|
||||
fRGContextColl[funcColsIdx].setUserData(fRow.getUserData(colAux));
|
||||
udafContextsColl[funcColsIdx].setUserData(fRow.getUserData(colAux));
|
||||
|
||||
// Call the UDAF subEvaluate method
|
||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||
rc = fRGContextColl[funcColsIdx].getFunction()->subEvaluate(&fRGContextColl[funcColsIdx], userDataIn.get());
|
||||
rc = udafContextsColl[funcColsIdx].getFunction()->subEvaluate(&udafContextsColl[funcColsIdx], userDataIn.get());
|
||||
fRGContext.setUserData(NULL);
|
||||
|
||||
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
||||
@ -4712,8 +4744,10 @@ void RowAggregationDistinct::doDistinctAggregation_rowVec(vector<Row::Pointer>&
|
||||
// Update the aggregation totals in the internal hashmap for the specified row.
|
||||
// for non-DISTINCT columns works partially aggregated results
|
||||
// rowIn(in) - Row to be included in aggregation.
|
||||
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
||||
//------------------------------------------------------------------------------
|
||||
void RowAggregationDistinct::updateEntry(const Row& rowIn)
|
||||
void RowAggregationDistinct::updateEntry(const Row& rowIn,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
||||
{
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
@ -4795,7 +4829,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn)
|
||||
|
||||
case ROWAGG_UDAF:
|
||||
{
|
||||
doUDAF(rowIn, colIn, colOut, colAux, i);
|
||||
doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -5083,6 +5117,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
|
||||
{
|
||||
fFunctionCols = fSubFunctions[i];
|
||||
fRowGroupIn = fSubRowGroups[i];
|
||||
auto* rgContextColl = fSubAggregators[i]->rgContextColl();
|
||||
Row rowIn;
|
||||
fRowGroupIn.initRow(&rowIn);
|
||||
|
||||
@ -5098,14 +5133,14 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
|
||||
|
||||
for (uint64_t j = 0; j < fRowGroupIn.getRowCount(); ++j, rowIn.nextRow())
|
||||
{
|
||||
aggregateRow(rowIn);
|
||||
aggregateRow(rowIn, rgContextColl);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// restore the function column vector
|
||||
fFunctionCols = origFunctionCols;
|
||||
fOrigFunctionCols = NULL;
|
||||
fOrigFunctionCols = nullptr;
|
||||
}
|
||||
|
||||
|
||||
@ -5120,20 +5155,21 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
|
||||
{
|
||||
fFunctionCols = fSubFunctions[i];
|
||||
fRowGroupIn = fSubRowGroups[i];
|
||||
auto* rgContextColl = fSubAggregators[i]->rgContextColl();
|
||||
Row rowIn;
|
||||
fRowGroupIn.initRow(&rowIn);
|
||||
|
||||
for (uint64_t j = 0; j < inRows[i].size(); ++j)
|
||||
{
|
||||
rowIn.setData(inRows[i][j]);
|
||||
aggregateRow(rowIn);
|
||||
aggregateRow(rowIn, rgContextColl);
|
||||
}
|
||||
|
||||
inRows[i].clear();
|
||||
}
|
||||
// restore the function column vector
|
||||
fFunctionCols = origFunctionCols;
|
||||
fOrigFunctionCols = NULL;
|
||||
fOrigFunctionCols = nullptr;
|
||||
}
|
||||
|
||||
|
||||
|
@ -605,7 +605,8 @@ public:
|
||||
return fResultDataVec;
|
||||
}
|
||||
|
||||
virtual void aggregateRow(Row& row);
|
||||
virtual void aggregateRow(Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
|
||||
inline uint32_t aggMapKeyLength()
|
||||
{
|
||||
return fAggMapKeyCount;
|
||||
@ -619,19 +620,29 @@ public:
|
||||
{
|
||||
return fTimeZone;
|
||||
}
|
||||
inline std::vector<mcsv1sdk::mcsv1Context>* rgContextColl()
|
||||
{
|
||||
return &fRGContextColl;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void initialize();
|
||||
virtual void initMapData(const Row& row);
|
||||
virtual void attachGroupConcatAg();
|
||||
|
||||
virtual void updateEntry(const Row& row);
|
||||
virtual void updateEntry(const Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
|
||||
virtual void doMinMax(const Row&, int64_t, int64_t, int);
|
||||
virtual void doSum(const Row&, int64_t, int64_t, int);
|
||||
virtual void doAvg(const Row&, int64_t, int64_t, int64_t);
|
||||
virtual void doStatistics(const Row&, int64_t, int64_t, int64_t);
|
||||
virtual void doBitOp(const Row&, int64_t, int64_t, int);
|
||||
virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx);
|
||||
virtual void doUDAF(const Row&,
|
||||
int64_t,
|
||||
int64_t,
|
||||
int64_t,
|
||||
uint64_t& funcColsIdx,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
|
||||
virtual bool countSpecial(const RowGroup* pRG)
|
||||
{
|
||||
fRow.setUintField<8>(fRow.getUintField<8>(0) + pRG->getRowCount(), 0);
|
||||
@ -823,19 +834,21 @@ public:
|
||||
return fGroupConcat;
|
||||
}
|
||||
|
||||
void aggregateRow(Row&);
|
||||
//void initialize();
|
||||
void aggregateRow(Row&,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
||||
virtual void aggReset();
|
||||
|
||||
void setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut);
|
||||
|
||||
protected:
|
||||
// virtual methods from base
|
||||
void initialize();
|
||||
void aggregateRowWithRemap(Row&);
|
||||
void initialize() override;
|
||||
void updateEntry(const Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
||||
|
||||
void aggregateRowWithRemap(Row&,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr);
|
||||
|
||||
void attachGroupConcatAg();
|
||||
void updateEntry(const Row& row);
|
||||
bool countSpecial(const RowGroup* pRG)
|
||||
{
|
||||
fRow.setIntField<8>(
|
||||
@ -948,12 +961,18 @@ public:
|
||||
|
||||
protected:
|
||||
// virtual methods from base
|
||||
void updateEntry(const Row& row);
|
||||
void updateEntry(const Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
||||
void doAvg(const Row&, int64_t, int64_t, int64_t);
|
||||
void doStatistics(const Row&, int64_t, int64_t, int64_t);
|
||||
void doGroupConcat(const Row&, int64_t, int64_t);
|
||||
void doBitOp(const Row&, int64_t, int64_t, int);
|
||||
void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx);
|
||||
void doUDAF(const Row&,
|
||||
int64_t,
|
||||
int64_t,
|
||||
int64_t,
|
||||
uint64_t& funcColsIdx,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
||||
bool countSpecial(const RowGroup* pRG)
|
||||
{
|
||||
return false;
|
||||
@ -1021,8 +1040,8 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
// virtual methods from base
|
||||
void updateEntry(const Row& row);
|
||||
void updateEntry(const Row& row,
|
||||
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl = nullptr) override;
|
||||
|
||||
boost::shared_ptr<RowAggregation> fAggregator;
|
||||
RowGroup fRowGroupDist;
|
||||
|
Reference in New Issue
Block a user