1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-2091 Special UDAF reset code for multi-distinct queries

This commit is contained in:
David Hall
2019-04-11 15:44:46 -05:00
parent 28e743bf38
commit f1b908abeb
2 changed files with 48 additions and 18 deletions

View File

@ -714,11 +714,8 @@ void RowAggregation::setJoinRowGroups(vector<RowGroup>* pSmallSideRG, RowGroup*
// threads on the PM and by multple threads on the UM. It must remain // threads on the PM and by multple threads on the UM. It must remain
// thread safe. // thread safe.
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void RowAggregation::resetUDAF(uint64_t funcColID) void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF)
{ {
// Get the UDAF class pointer and store in the row definition object.
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColID].get());
// RowAggregation and it's functions need to be re-entrant which means // RowAggregation and it's functions need to be re-entrant which means
// each instance (thread) needs its own copy of the context object. // each instance (thread) needs its own copy of the context object.
// Note: operator=() doesn't copy userData. // Note: operator=() doesn't copy userData.
@ -786,7 +783,7 @@ void RowAggregation::initialize()
{ {
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{ {
resetUDAF(i); resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
} }
} }
} }
@ -838,7 +835,7 @@ void RowAggregation::aggReset()
{ {
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{ {
resetUDAF(i); resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
} }
} }
} }
@ -885,14 +882,28 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
inserted.first->second = RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1); inserted.first->second = RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
// If there's UDAF involved, reset the user data. // If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++) if (fOrigFunctionCols)
{ {
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) // This is a multi-distinct query and fFunctionCols may not
// contain all the UDAF we need to reset
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
{ {
resetUDAF(i); if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
}
}
}
else
{
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
} }
} }
// replace the key value with an equivalent copy, yes this is OK // replace the key value with an equivalent copy, yes this is OK
const_cast<RowPosition&>((inserted.first->first)) = pos; const_cast<RowPosition&>((inserted.first->first)) = pos;
} }
@ -946,14 +957,28 @@ void RowAggregation::aggregateRow(Row& row)
RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1); RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
// If there's UDAF involved, reset the user data. // If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++) if (fOrigFunctionCols)
{ {
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) // This is a multi-distinct query and fFunctionCols may not
// contain all the UDAF we need to reset
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
{ {
resetUDAF(i); if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
}
}
}
else
{
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
} }
} }
} }
else else
{ {
@ -4699,7 +4724,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
{ {
// backup the function column vector for finalize(). // backup the function column vector for finalize().
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols; vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
fOrigFunctionCols = &origFunctionCols;
// aggregate data from each sub-aggregator to distinct aggregator // aggregate data from each sub-aggregator to distinct aggregator
for (uint64_t i = 0; i < fSubAggregators.size(); ++i) for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
{ {
@ -4727,6 +4752,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
// restore the function column vector // restore the function column vector
fFunctionCols = origFunctionCols; fFunctionCols = origFunctionCols;
fOrigFunctionCols = NULL;
} }
@ -4734,7 +4760,8 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
{ {
// backup the function column vector for finalize(). // backup the function column vector for finalize().
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols; vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
fOrigFunctionCols = &origFunctionCols;
// aggregate data from each sub-aggregator to distinct aggregator // aggregate data from each sub-aggregator to distinct aggregator
for (uint64_t i = 0; i < fSubAggregators.size(); ++i) for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
{ {
@ -4751,9 +4778,9 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
inRows[i].clear(); inRows[i].clear();
} }
// restore the function column vector // restore the function column vector
fFunctionCols = origFunctionCols; fFunctionCols = origFunctionCols;
fOrigFunctionCols = NULL;
} }

View File

@ -630,7 +630,7 @@ protected:
if (fAggMapPtr) fAggMapPtr->clear(); if (fAggMapPtr) fAggMapPtr->clear();
} }
void resetUDAF(uint64_t funcColID); void resetUDAF(RowUDAFFunctionCol* rowUDAF);
inline bool isNull(const RowGroup* pRowGroup, const Row& row, int64_t col); inline bool isNull(const RowGroup* pRowGroup, const Row& row, int64_t col);
inline void makeAggFieldsNull(Row& row); inline void makeAggFieldsNull(Row& row);
@ -710,6 +710,9 @@ protected:
static const static_any::any& doubleTypeId; static const static_any::any& doubleTypeId;
static const static_any::any& longdoubleTypeId; static const static_any::any& longdoubleTypeId;
static const static_any::any& strTypeId; static const static_any::any& strTypeId;
// For UDAF along with with multiple distinct columns
vector<SP_ROWAGG_FUNC_t>* fOrigFunctionCols = NULL;
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------