1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-20 01:42:27 +03:00

MCOL-523 Add UDAF and UDAnF SDK

This commit is contained in:
David Hall
2017-07-26 11:53:08 -05:00
parent bbad7882d2
commit a38b098f2a
72 changed files with 10264 additions and 4521 deletions

430
dbcon/joblist/tupleaggregatestep.cpp Normal file → Executable file
View File

@@ -47,6 +47,7 @@ using namespace config;
#include "calpontsystemcatalog.h"
#include "aggregatecolumn.h"
#include "udafcolumn.h"
#include "arithmeticcolumn.h"
#include "functioncolumn.h"
#include "constantcolumn.h"
@@ -100,6 +101,7 @@ inline RowAggFunctionType functionIdMap(int planFuncId)
case AggregateColumn::BIT_XOR: return ROWAGG_BIT_XOR;
case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT;
case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT;
case AggregateColumn::UDAF: return ROWAGG_UDAF;
default: return ROWAGG_FUNCT_UNDEFINE;
}
}
@@ -695,8 +697,18 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(ac->constCol().get());
idbassert(cc != NULL); // @bug5261
bool isNull = (ConstantColumn::NULLDATA == cc->type());
constAggDataVec.push_back(
ConstantAggData(cc->constval(), functionIdMap(ac->aggOp()), isNull));
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.deliveredCols[idx].get());
if (udafc)
{
constAggDataVec.push_back(
ConstantAggData(cc->constval(), udafc->getContext().getName(),
functionIdMap(ac->aggOp()), isNull));
}
else
{
constAggDataVec.push_back(
ConstantAggData(cc->constval(), functionIdMap(ac->aggOp()), isNull));
}
}
}
}
@@ -1097,7 +1109,24 @@ void TupleAggregateStep::prep1PhaseAggregate(
}
}
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colProj, i));
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
if (udafc)
{
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i));
}
else
{
throw logic_error("prep1PhasesAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, i));
}
functionVec.push_back(funct);
switch (aggOp)
@@ -1224,6 +1253,23 @@ void TupleAggregateStep::prep1PhaseAggregate(
}
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
// Return column
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
break;
}
default:
{
ostringstream emsg;
@@ -1285,14 +1331,34 @@ void TupleAggregateStep::prep1PhaseAggregate(
}
}
// add auxiliary fields for statistics functions
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVec.size(); i++)
{
uint64_t j = functionVec[i]->fInputColumnIndex;
if (functionVec[i]->fAggFunction == ROWAGG_UDAF)
{
// UDAF user data
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec[i].get());
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVec[i]->fAuxColumnIndex = lastCol++;
oidsAgg.push_back(oidsAgg[j]);
keysAgg.push_back(keysAgg[j]);
scaleAgg.push_back(0);
precisionAgg.push_back(0);
precisionAgg.push_back(0);
typeAgg.push_back(CalpontSystemCatalog::VARBINARY);
widthAgg.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
continue;
}
if (functionVec[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVec[i]->fAuxColumnIndex = lastCol;
uint64_t j = functionVec[i]->fInputColumnIndex;
// sum(x)
oidsAgg.push_back(oidsAgg[j]);
@@ -1527,7 +1593,33 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
continue;
uint64_t colProj = projColPosMap[aggKey];
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
if (udafc)
{
mcsv1sdk::UDAF_MAP::iterator funcIter = mcsv1sdk::UDAFMap::getMap().find(udafc->getContext().getName());
if (UNLIKELY(funcIter == mcsv1sdk::UDAFMap::getMap().end()))
{
std::ostringstream errmsg;
errmsg << "prep1PhaseDistinctAggregate: A UDAF function, " << udafc->getContext().getName() <<
", is called but there's no entry in UDAF_MAP";
throw std::logic_error(errmsg.str());
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
}
else
{
throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
}
functionVec1.push_back(funct);
aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAgg));
@@ -1671,6 +1763,32 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
}
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error("prep1PhaseDistinctAggregate A UDAF function is called but there's no RowUDAFFunctionCol");
}
// Return column
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
colAgg++;
// UDAF Dummy holder for UserData struct
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(0);
precisionAgg.push_back(0);
typeAgg.push_back(CalpontSystemCatalog::VARBINARY);
widthAgg.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes
funct->fAuxColumnIndex = colAgg++;
break;
}
default:
{
ostringstream emsg;
@@ -2090,14 +2208,33 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
}
}
// add auxiliary fields for statistics functions
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVec2.size(); i++)
{
uint64_t j = functionVec2[i]->fInputColumnIndex;
if (functionVec2[i]->fAggFunction == ROWAGG_UDAF)
{
// Dummy Column for UDAF user data
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec2[i].get());
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVec2[i]->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(oidsAggDist[j]); // Dummy?
keysAggDist.push_back(keysAggDist[j]); // Dummy?
scaleAggDist.push_back(0);
precisionAggDist.push_back(0);
typeAggDist.push_back(CalpontSystemCatalog::VARBINARY);
widthAggDist.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
continue;
}
if (functionVec2[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVec2[i]->fAuxColumnIndex = lastCol;
uint64_t j = functionVec2[i]->fInputColumnIndex;
// sum(x)
oidsAggDist.push_back(oidsAggDist[j]);
@@ -2344,22 +2481,22 @@ void TupleAggregateStep::prep2PhasesAggregate(
{
// check if there are any aggregate columns
// a vector that has the aggregate function to be done by PM
vector<pair<uint32_t, int> > aggColVec;
// vector<pair<uint32_t, int> > aggColVec;
set<uint32_t> avgSet;
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
// for (uint64_t i = 0; i < returnedColVec.size(); i++)
// {
// skip if not an aggregation column
if (returnedColVec[i].second == 0)
continue;
// if (returnedColVec[i].second == 0)
// continue;
aggColVec.push_back(returnedColVec[i]);
// aggColVec.push_back(returnedColVec[i]);
// remember if a column has an average function,
// with avg function, no need for separate sum or count_column_name
if (returnedColVec[i].second == AggregateColumn::AVG)
avgSet.insert(returnedColVec[i].first);
}
// if (returnedColVec[i].second == AggregateColumn::AVG)
// avgSet.insert(returnedColVec[i].first);
// }
// populate the aggregate rowgroup on PM and UM
// PM: projectedRG -> aggregateRGPM
@@ -2480,11 +2617,15 @@ void TupleAggregateStep::prep2PhasesAggregate(
}
// vectors for aggregate functions
for (uint64_t i = 0; i < aggColVec.size(); i++)
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
uint32_t aggKey = aggColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
// skip if not an aggregation column
if (returnedColVec[i].second == 0)
continue;
uint32_t aggKey = returnedColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
// skip on PM if this is a constant
if (aggOp == ROWAGG_CONSTANT)
@@ -2504,7 +2645,8 @@ void TupleAggregateStep::prep2PhasesAggregate(
}
if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) &&
(avgSet.find(aggKey) != avgSet.end()))
(returnedColVec[i].second == AggregateColumn::AVG))
// (avgSet.find(aggKey) != avgSet.end()))
// skip sum / count(column) if avg is also selected
continue;
@@ -2513,7 +2655,24 @@ void TupleAggregateStep::prep2PhasesAggregate(
continue;
uint64_t colProj = projColPosMap[aggKey];
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
if (udafc)
{
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
}
else
{
throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
}
functionVecPm.push_back(funct);
aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAggPm));
@@ -2667,7 +2826,31 @@ void TupleAggregateStep::prep2PhasesAggregate(
colAggPm++;
}
break;
case ROWAGG_UDAF:
{
// Return column
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
}
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
colAggPm++;
// Dummy Column for UDAF UserData struct
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(0);
typeAggPm.push_back(CalpontSystemCatalog::VARBINARY);
widthAggPm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes
funct->fAuxColumnIndex = colAggPm++;
break;
}
default:
{
ostringstream emsg;
@@ -2849,7 +3032,16 @@ void TupleAggregateStep::prep2PhasesAggregate(
// update the aggregate function vector
else
{
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colPm, i));
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, i));
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, i));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
funct->fAuxColumnIndex = colPm;
else if (aggOp == ROWAGG_CONSTANT)
@@ -2909,14 +3101,33 @@ void TupleAggregateStep::prep2PhasesAggregate(
}
}
// add auxiliary fields for statistics functions
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVecUm.size(); i++)
{
uint64_t j = functionVecUm[i]->fInputColumnIndex;
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
{
// Dummy column for UDAF user data
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVecUm[i]->fAuxColumnIndex = lastCol++;
oidsAggUm.push_back(oidsAggUm[j]); // Dummy?
keysAggUm.push_back(keysAggUm[j]); // Dummy?
scaleAggUm.push_back(0);
precisionAggUm.push_back(0);
typeAggUm.push_back(CalpontSystemCatalog::VARBINARY);
widthAggUm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
continue;
}
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVecUm[i]->fAuxColumnIndex = lastCol;
uint64_t j = functionVecUm[i]->fInputColumnIndex;
// sum(x)
oidsAggUm.push_back(oidsAggUm[j]);
@@ -3170,7 +3381,32 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
continue;
uint64_t colProj = projColPosMap[aggKey];
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
if (udafc)
{
mcsv1sdk::UDAF_MAP::iterator funcIter = mcsv1sdk::UDAFMap::getMap().find(udafc->getContext().getName());
if (UNLIKELY(funcIter == mcsv1sdk::UDAFMap::getMap().end()))
{
std::ostringstream errmsg;
errmsg << "prep2PhasesAggregate: A UDAF function, " << udafc->getContext().getName() <<
", is called but there's no entry in UDAF_MAP";
throw std::logic_error(errmsg.str());
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
}
else
{
throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no UDAFColumn");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
}
functionVecPm.push_back(funct);
aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAggPm));
@@ -3314,6 +3550,32 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
}
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
}
// Return column
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
colAggPm++;
// Dummy column for UDAF UserData struct
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(0);
typeAggPm.push_back(CalpontSystemCatalog::VARBINARY);
widthAggPm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes
funct->fAuxColumnIndex = colAggPm++;
break;
}
default:
{
ostringstream emsg;
@@ -3337,13 +3599,27 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
for (uint32_t idx = 0; idx < functionVecPm.size(); idx++)
{
SP_ROWAGG_FUNC_t funct;
SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx];
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(
funcPm->fAggFunction,
funcPm->fStatsFunction,
funcPm->fOutputColumnIndex,
funcPm->fOutputColumnIndex,
funcPm->fAuxColumnIndex));
// UDAF support
if (funcPm->fAggFunction == ROWAGG_UDAF)
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get());
funct.reset(new RowUDAFFunctionCol(
udafFuncCol->fUDAFContext,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex));
}
else
{
funct.reset(new RowAggFunctionCol(
funcPm->fAggFunction,
funcPm->fStatsFunction,
funcPm->fOutputColumnIndex,
funcPm->fOutputColumnIndex,
funcPm->fAuxColumnIndex));
}
functionNoDistVec.push_back(funct);
}
@@ -3611,7 +3887,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// update the aggregate function vector
else
{
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colUm, i));
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, i));
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, i));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
funct->fAuxColumnIndex = colUm;
else if (aggOp == ROWAGG_CONSTANT)
@@ -3704,14 +3989,33 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
}
}
// add auxiliary fields for statistics functions
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVecUm.size(); i++)
{
uint64_t j = functionVecUm[i]->fInputColumnIndex;
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
{
// Dummy column for UDAF user data
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVecUm[i]->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(oidsAggUm[j]); // Dummy?
keysAggDist.push_back(keysAggUm[j]); // Dummy?
scaleAggDist.push_back(0);
precisionAggDist.push_back(0);
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
widthAggDist.push_back(bigIntWidth);
widthAggUm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
continue;
}
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVecUm[i]->fAuxColumnIndex = lastCol;
uint64_t j = functionVecUm[i]->fInputColumnIndex;
// sum(x)
oidsAggDist.push_back(oidsAggDist[j]);
@@ -3880,27 +4184,39 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
while (it != functionVecUm.end())
{
SP_ROWAGG_FUNC_t funct;
SP_ROWAGG_FUNC_t f = *it++;
if ((f->fOutputColumnIndex == k) &&
(f->fAggFunction == ROWAGG_COUNT_ASTERISK ||
f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
f->fAggFunction == ROWAGG_SUM ||
f->fAggFunction == ROWAGG_AVG ||
f->fAggFunction == ROWAGG_MIN ||
f->fAggFunction == ROWAGG_MAX ||
f->fAggFunction == ROWAGG_STATS ||
f->fAggFunction == ROWAGG_BIT_AND ||
f->fAggFunction == ROWAGG_BIT_OR ||
f->fAggFunction == ROWAGG_BIT_XOR ||
f->fAggFunction == ROWAGG_CONSTANT))
if (f->fOutputColumnIndex == k)
{
SP_ROWAGG_FUNC_t funct(
new RowAggFunctionCol(
f->fAggFunction,
f->fStatsFunction,
f->fInputColumnIndex,
f->fOutputColumnIndex,
f->fAuxColumnIndex));
if (f->fAggFunction == ROWAGG_UDAF)
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(f.get());
funct.reset(new RowUDAFFunctionCol(
udafFuncCol->fUDAFContext,
udafFuncCol->fInputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex));
}
else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK ||
f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
f->fAggFunction == ROWAGG_SUM ||
f->fAggFunction == ROWAGG_AVG ||
f->fAggFunction == ROWAGG_MIN ||
f->fAggFunction == ROWAGG_MAX ||
f->fAggFunction == ROWAGG_STATS ||
f->fAggFunction == ROWAGG_BIT_AND ||
f->fAggFunction == ROWAGG_BIT_OR ||
f->fAggFunction == ROWAGG_BIT_XOR ||
f->fAggFunction == ROWAGG_CONSTANT)
{
funct.reset(
new RowAggFunctionCol(
f->fAggFunction,
f->fStatsFunction,
f->fInputColumnIndex,
f->fOutputColumnIndex,
f->fAuxColumnIndex));
}
functionSub2.push_back(funct);
}
}
@@ -4292,6 +4608,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
{
fRowGroupIns[threadID].getRow(0, &rowIn);
rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
{
for (uint64_t k = 0;
@@ -4313,6 +4630,7 @@ void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
{
fRowGroupIns[threadID].setData(&rgDatas[c]);
fRowGroupIns[threadID].getRow(0, &rowIn);
rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
{
// The key is the groupby columns, which are the leading columns.