You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-926 Handle duplicate function detection for UDAF
This commit is contained in:
@ -55,7 +55,7 @@ UDAFColumn::UDAFColumn(const uint32_t sessionID):
|
||||
}
|
||||
|
||||
UDAFColumn::UDAFColumn(const UDAFColumn& rhs, const uint32_t sessionID):
|
||||
AggregateColumn(dynamic_cast<const AggregateColumn&>(rhs))
|
||||
AggregateColumn(dynamic_cast<const AggregateColumn&>(rhs), sessionID), context(rhs.context)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -74,10 +74,30 @@ using namespace querytele;
|
||||
namespace
|
||||
{
|
||||
|
||||
struct cmpTuple {
|
||||
bool operator()(tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*> a,
|
||||
tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*> b)
|
||||
{
|
||||
if (get<0>(a) < get<0>(b))
|
||||
return true;
|
||||
if (get<0>(a) == get<0>(b))
|
||||
{
|
||||
if (get<1>(a) < get<1>(b))
|
||||
return true;
|
||||
if (get<1>(a) == get<1>(b))
|
||||
return get<2>(a) < get<2>(b);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
typedef vector<Row::Pointer> RowBucket;
|
||||
typedef vector<RowBucket> RowBucketVec;
|
||||
|
||||
// The AGG_MAP type is used to maintain a list of aggregate functions in order to
|
||||
// detect duplicates. Since all UDAF have the same op type (ROWAGG_UDAF), we add in
|
||||
// the function pointer in order to ensure uniqueness.
|
||||
typedef map<tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*>, uint64_t, cmpTuple> AGG_MAP;
|
||||
|
||||
inline RowAggFunctionType functionIdMap(int planFuncId)
|
||||
{
|
||||
@ -697,12 +717,15 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(ac->constCol().get());
|
||||
idbassert(cc != NULL); // @bug5261
|
||||
bool isNull = (ConstantColumn::NULLDATA == cc->type());
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
|
||||
if (udafc)
|
||||
if (ac->aggOp() == ROWAGG_UDAF)
|
||||
{
|
||||
constAggDataVec.push_back(
|
||||
ConstantAggData(cc->constval(), udafc->getContext().getName(),
|
||||
functionIdMap(ac->aggOp()), isNull));
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
|
||||
if (udafc)
|
||||
{
|
||||
constAggDataVec.push_back(
|
||||
ConstantAggData(cc->constval(), udafc->getContext().getName(),
|
||||
functionIdMap(ac->aggOp()), isNull));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -936,6 +959,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
uint32_t bigIntWidth = sizeof(int64_t);
|
||||
uint32_t bigUintWidth = sizeof(uint64_t);
|
||||
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
// for count column of average function
|
||||
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
|
||||
|
||||
@ -972,7 +996,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
}
|
||||
|
||||
// populate the aggregate rowgroup
|
||||
map<pair<uint32_t, int>, uint64_t> aggFuncMap;
|
||||
AGG_MAP aggFuncMap;
|
||||
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
||||
{
|
||||
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
||||
@ -1112,7 +1136,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
SP_ROWAGG_FUNC_t funct;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
|
||||
if (udafc)
|
||||
{
|
||||
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
||||
@ -1274,6 +1298,8 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
{
|
||||
throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
||||
}
|
||||
pUDAFFunc = udafFuncCol->fUDAFContext.getFunction();
|
||||
|
||||
// Return column
|
||||
oidsAgg.push_back(oidsProj[colProj]);
|
||||
keysAgg.push_back(key);
|
||||
@ -1294,14 +1320,15 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
}
|
||||
|
||||
// find if this func is a duplicate
|
||||
map<pair<uint32_t, int>, uint64_t>::iterator iter =
|
||||
aggFuncMap.find(make_pair(key, aggOp));
|
||||
AGG_MAP::iterator iter = aggFuncMap.find(make_tuple(key, aggOp, pUDAFFunc));
|
||||
if (iter != aggFuncMap.end())
|
||||
{
|
||||
if (funct->fAggFunction == ROWAGG_AVG)
|
||||
funct->fAggFunction = ROWAGG_DUP_AVG;
|
||||
else if (funct->fAggFunction == ROWAGG_STATS)
|
||||
funct->fAggFunction = ROWAGG_DUP_STATS;
|
||||
else if (funct->fAggFunction == ROWAGG_UDAF)
|
||||
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
||||
else
|
||||
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
||||
|
||||
@ -1309,7 +1336,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
}
|
||||
else
|
||||
{
|
||||
aggFuncMap.insert(make_pair(make_pair(key, aggOp), funct->fOutputColumnIndex));
|
||||
aggFuncMap.insert(make_pair(make_tuple(key, aggOp, pUDAFFunc), funct->fOutputColumnIndex));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1453,7 +1480,9 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
vector<SP_ROWAGG_GRPBY_t> groupBy, groupByNoDist;
|
||||
vector<SP_ROWAGG_FUNC_t> functionVec1, functionVec2, functionNoDistVec;
|
||||
uint32_t bigIntWidth = sizeof(int64_t);
|
||||
map<pair<uint32_t, int>, uint64_t> aggFuncMap;
|
||||
// map key = column key, operation (enum), and UDAF pointer if UDAF.
|
||||
AGG_MAP aggFuncMap;
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
set<uint32_t> avgSet;
|
||||
|
||||
// for count column of average function
|
||||
@ -1506,7 +1535,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
typeAgg.push_back(typeProj[colProj]);
|
||||
widthAgg.push_back(widthProj[colProj]);
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(keysAgg[colAgg], 0), colAgg));
|
||||
aggFuncMap.insert(make_pair(make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg));
|
||||
colAgg++;
|
||||
}
|
||||
|
||||
@ -1544,13 +1573,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
typeAgg.push_back(typeProj[colProj]);
|
||||
widthAgg.push_back(widthProj[colProj]);
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(keysAgg[colAgg], 0), colAgg));
|
||||
aggFuncMap.insert(make_pair(make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg));
|
||||
colAgg++;
|
||||
}
|
||||
|
||||
// vectors for aggregate functions
|
||||
for (uint64_t i = 0; i < aggColVec.size(); i++)
|
||||
{
|
||||
pUDAFFunc = NULL;
|
||||
uint32_t aggKey = aggColVec[i].first;
|
||||
RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second);
|
||||
RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
|
||||
@ -1573,7 +1603,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(
|
||||
aggOp, stats, colAgg, colAgg, -1));
|
||||
functionVec1.push_back(funct);
|
||||
aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAgg));
|
||||
aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAgg));
|
||||
colAgg++;
|
||||
|
||||
continue;
|
||||
@ -1607,9 +1637,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
SP_ROWAGG_FUNC_t funct;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
||||
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
|
||||
}
|
||||
@ -1620,15 +1651,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
}
|
||||
else
|
||||
{
|
||||
// skip if this is a duplicate
|
||||
if (aggFuncMap.find(make_pair(aggKey, aggOp)) != aggFuncMap.end())
|
||||
continue;
|
||||
|
||||
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
|
||||
}
|
||||
functionVec1.push_back(funct);
|
||||
// skip if this is a duplicate
|
||||
if (aggFuncMap.find(make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end())
|
||||
continue;
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAgg));
|
||||
functionVec1.push_back(funct);
|
||||
aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAgg));
|
||||
|
||||
switch (aggOp)
|
||||
{
|
||||
@ -1853,7 +1883,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
{
|
||||
// check if the count column for AVG is also a returned column,
|
||||
// if so, replace the "-1" to actual position in returned vec.
|
||||
map<pair<uint32_t, RowAggFunctionType>, uint64_t> aggDupFuncMap;
|
||||
AGG_MAP aggDupFuncMap;
|
||||
pUDAFFunc = NULL;
|
||||
|
||||
// copy over the groupby vector
|
||||
// update the outputColumnIndex if returned
|
||||
@ -1861,12 +1892,13 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
{
|
||||
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
|
||||
groupByNoDist.push_back(groupby);
|
||||
aggFuncMap.insert(make_pair(make_pair(keysAgg[i], 0), i));
|
||||
aggFuncMap.insert(make_pair(make_tuple(keysAgg[i], 0, pUDAFFunc), i));
|
||||
}
|
||||
|
||||
// locate the return column position in aggregated rowgroup
|
||||
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
||||
{
|
||||
pUDAFFunc = NULL;
|
||||
uint32_t retKey = returnedColVec[i].first;
|
||||
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
||||
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
|
||||
@ -1874,7 +1906,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
|
||||
jobInfo.distinctColVec.end() )
|
||||
{
|
||||
map<pair<uint32_t, int>, uint64_t>::iterator it = aggFuncMap.find(make_pair(retKey, 0));
|
||||
AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, 0, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
colAgg = it->second;
|
||||
@ -1977,8 +2009,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
case ROWAGG_BIT_XOR:
|
||||
default:
|
||||
{
|
||||
map<pair<uint32_t, int>, uint64_t>::iterator it =
|
||||
aggFuncMap.find(make_pair(retKey, aggOp));
|
||||
AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
colAgg = it->second;
|
||||
@ -2005,7 +2036,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
// check if a SUM or COUNT covered by AVG
|
||||
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
||||
{
|
||||
it = aggFuncMap.find(make_pair(returnedColVec[i].first, ROWAGG_AVG));
|
||||
it = aggFuncMap.find(make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
// false alarm
|
||||
@ -2175,21 +2206,22 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
functionVec2.push_back(funct);
|
||||
|
||||
// find if this func is a duplicate
|
||||
map<pair<uint32_t, RowAggFunctionType>, uint64_t>::iterator iter =
|
||||
aggDupFuncMap.find(make_pair(retKey, aggOp));
|
||||
AGG_MAP::iterator iter = aggDupFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc));
|
||||
if (iter != aggDupFuncMap.end())
|
||||
{
|
||||
if (funct->fAggFunction == ROWAGG_AVG)
|
||||
funct->fAggFunction = ROWAGG_DUP_AVG;
|
||||
else if (funct->fAggFunction == ROWAGG_STATS)
|
||||
funct->fAggFunction = ROWAGG_DUP_STATS;
|
||||
else if (funct->fAggFunction != ROWAGG_UDAF)
|
||||
else if (funct->fAggFunction == ROWAGG_UDAF)
|
||||
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
||||
else
|
||||
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
||||
funct->fAuxColumnIndex = iter->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
aggDupFuncMap.insert(make_pair(make_pair(retKey, aggOp),
|
||||
aggDupFuncMap.insert(make_pair(make_tuple(retKey, aggOp, pUDAFFunc),
|
||||
funct->fOutputColumnIndex));
|
||||
}
|
||||
|
||||
@ -2587,7 +2619,8 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
vector<SP_ROWAGG_FUNC_t> functionVecPm, functionVecUm;
|
||||
uint32_t bigIntWidth = sizeof(int64_t);
|
||||
uint32_t bigUintWidth = sizeof(uint64_t);
|
||||
map<pair<uint32_t, int>, uint64_t> aggFuncMap;
|
||||
AGG_MAP aggFuncMap;
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
|
||||
// associate the columns between projected RG and aggregate RG on PM
|
||||
// populated the aggregate columns
|
||||
@ -2637,7 +2670,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
typeAggPm.push_back(typeProj[colProj]);
|
||||
widthAggPm.push_back(width[colProj]);
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm));
|
||||
aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
|
||||
colAggPm++;
|
||||
}
|
||||
|
||||
@ -2675,7 +2708,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
typeAggPm.push_back(typeProj[colProj]);
|
||||
widthAggPm.push_back(width[colProj]);
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm));
|
||||
aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
|
||||
colAggPm++;
|
||||
}
|
||||
|
||||
@ -2686,6 +2719,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
if (returnedColVec[i].second == 0)
|
||||
continue;
|
||||
|
||||
pUDAFFunc = NULL;
|
||||
uint32_t aggKey = returnedColVec[i].first;
|
||||
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
||||
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
|
||||
@ -2717,9 +2751,10 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
SP_ROWAGG_FUNC_t funct;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
||||
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
|
||||
}
|
||||
@ -2730,15 +2765,14 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
}
|
||||
else
|
||||
{
|
||||
// skip if this is a duplicate
|
||||
if (aggFuncMap.find(make_pair(aggKey, aggOp)) != aggFuncMap.end())
|
||||
continue;
|
||||
|
||||
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
|
||||
}
|
||||
functionVecPm.push_back(funct);
|
||||
// skip if this is a duplicate
|
||||
if (aggFuncMap.find(make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end())
|
||||
continue;
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAggPm));
|
||||
functionVecPm.push_back(funct);
|
||||
aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm));
|
||||
|
||||
switch (aggOp)
|
||||
{
|
||||
@ -2948,7 +2982,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
// check if the count column for AVG is also a returned column,
|
||||
// if so, replace the "-1" to actual position in returned vec.
|
||||
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
|
||||
map<pair<uint32_t, RowAggFunctionType>, uint64_t> aggDupFuncMap;
|
||||
AGG_MAP aggDupFuncMap;
|
||||
|
||||
// copy over the groupby vector
|
||||
// update the outputColumnIndex if returned
|
||||
@ -2966,8 +3000,16 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
|
||||
int colPm = -1;
|
||||
|
||||
map<pair<uint32_t, int>, uint64_t>::iterator it =
|
||||
aggFuncMap.find(make_pair(retKey, aggOp));
|
||||
// Is this a UDAF? use the function as part of the key.
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
|
||||
if (udafc)
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
}
|
||||
|
||||
AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
colPm = it->second;
|
||||
@ -2987,7 +3029,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
// check if a SUM or COUNT covered by AVG
|
||||
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
||||
{
|
||||
it = aggFuncMap.find(make_pair(returnedColVec[i].first, ROWAGG_AVG));
|
||||
it = aggFuncMap.find(make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
// false alarm
|
||||
@ -3112,7 +3154,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
SP_ROWAGG_FUNC_t funct;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
|
||||
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, i));
|
||||
}
|
||||
else
|
||||
@ -3126,21 +3168,22 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
functionVecUm.push_back(funct);
|
||||
|
||||
// find if this func is a duplicate
|
||||
map<pair<uint32_t, RowAggFunctionType>, uint64_t>::iterator iter =
|
||||
aggDupFuncMap.find(make_pair(retKey, aggOp));
|
||||
AGG_MAP::iterator iter = aggDupFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc));
|
||||
if (iter != aggDupFuncMap.end())
|
||||
{
|
||||
if (funct->fAggFunction == ROWAGG_AVG)
|
||||
funct->fAggFunction = ROWAGG_DUP_AVG;
|
||||
else if (funct->fAggFunction == ROWAGG_STATS)
|
||||
funct->fAggFunction = ROWAGG_DUP_STATS;
|
||||
else if (funct->fAggFunction != ROWAGG_UDAF)
|
||||
else if (funct->fAggFunction == ROWAGG_UDAF)
|
||||
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
||||
else
|
||||
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
||||
funct->fAuxColumnIndex = iter->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
aggDupFuncMap.insert(make_pair(make_pair(retKey, aggOp),
|
||||
aggDupFuncMap.insert(make_pair(make_tuple(retKey, aggOp, pUDAFFunc),
|
||||
funct->fOutputColumnIndex));
|
||||
}
|
||||
|
||||
@ -3328,7 +3371,9 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
vector<SP_ROWAGG_FUNC_t> functionVecPm, functionNoDistVec, functionVecUm;
|
||||
|
||||
uint32_t bigIntWidth = sizeof(int64_t);
|
||||
map<pair<uint32_t, int>, uint64_t> aggFuncMap, avgFuncDistMap;
|
||||
map<pair<uint32_t, int>, uint64_t> avgFuncDistMap;
|
||||
AGG_MAP aggFuncMap;
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
|
||||
// associate the columns between projected RG and aggregate RG on PM
|
||||
// populated the aggregate columns
|
||||
@ -3378,7 +3423,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
typeAggPm.push_back(typeProj[colProj]);
|
||||
widthAggPm.push_back(width[colProj]);
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm));
|
||||
aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
|
||||
colAggPm++;
|
||||
}
|
||||
|
||||
@ -3416,7 +3461,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
typeAggPm.push_back(typeProj[colProj]);
|
||||
widthAggPm.push_back(width[colProj]);
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm));
|
||||
aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
|
||||
colAggPm++;
|
||||
}
|
||||
|
||||
@ -3428,6 +3473,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
if (aggOp == ROWAGG_CONSTANT)
|
||||
continue;
|
||||
|
||||
pUDAFFunc = NULL;
|
||||
uint32_t aggKey = aggColVec[i].first;
|
||||
if (projColPosMap.find(aggKey) == projColPosMap.end())
|
||||
{
|
||||
@ -3457,9 +3503,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
SP_ROWAGG_FUNC_t funct;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
||||
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
|
||||
}
|
||||
@ -3470,15 +3517,14 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
}
|
||||
else
|
||||
{
|
||||
// skip if this is a duplicate
|
||||
if (aggFuncMap.find(make_pair(aggKey, aggOp)) != aggFuncMap.end())
|
||||
continue;
|
||||
|
||||
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
|
||||
}
|
||||
functionVecPm.push_back(funct);
|
||||
// skip if this is a duplicate
|
||||
if (aggFuncMap.find(make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end())
|
||||
continue;
|
||||
|
||||
aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAggPm));
|
||||
functionVecPm.push_back(funct);
|
||||
aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm));
|
||||
|
||||
switch (aggOp)
|
||||
{
|
||||
@ -3734,7 +3780,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
// check if the count column for AVG is also a returned column,
|
||||
// if so, replace the "-1" to actual position in returned vec.
|
||||
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
|
||||
map<pair<uint32_t, RowAggFunctionType>, uint64_t> aggDupFuncMap;
|
||||
AGG_MAP aggDupFuncMap;
|
||||
|
||||
// copy over the groupby vector
|
||||
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
||||
@ -3746,6 +3792,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
// locate the return column position in aggregated rowgroup from PM
|
||||
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
||||
{
|
||||
pUDAFFunc = NULL;
|
||||
uint32_t retKey = returnedColVec[i].first;
|
||||
|
||||
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
||||
@ -3754,7 +3801,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
|
||||
jobInfo.distinctColVec.end() )
|
||||
{
|
||||
map<pair<uint32_t, int>, uint64_t>::iterator it = aggFuncMap.find(make_pair(retKey, 0));
|
||||
AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, 0, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
colUm = it->second;
|
||||
@ -3862,8 +3909,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
case ROWAGG_CONSTANT:
|
||||
default:
|
||||
{
|
||||
map<pair<uint32_t, int>, uint64_t>::iterator it =
|
||||
aggFuncMap.find(make_pair(retKey, aggOp));
|
||||
AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
colUm = it->second;
|
||||
@ -3883,7 +3929,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
// check if a SUM or COUNT covered by AVG
|
||||
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
||||
{
|
||||
it = aggFuncMap.find(make_pair(returnedColVec[i].first, ROWAGG_AVG));
|
||||
it = aggFuncMap.find(make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
|
||||
if (it != aggFuncMap.end())
|
||||
{
|
||||
// false alarm
|
||||
@ -3997,7 +4043,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
SP_ROWAGG_FUNC_t funct;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.nonConstCols[i].get());
|
||||
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, i));
|
||||
}
|
||||
else
|
||||
@ -4012,22 +4059,23 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
functionVecUm.push_back(funct);
|
||||
|
||||
// find if this func is a duplicate
|
||||
map<pair<uint32_t, RowAggFunctionType>, uint64_t>::iterator iter =
|
||||
aggDupFuncMap.find(make_pair(retKey, aggOp));
|
||||
AGG_MAP::iterator iter = aggDupFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc));
|
||||
if (iter != aggDupFuncMap.end())
|
||||
{
|
||||
if (funct->fAggFunction == ROWAGG_AVG)
|
||||
funct->fAggFunction = ROWAGG_DUP_AVG;
|
||||
else if (funct->fAggFunction == ROWAGG_STATS)
|
||||
funct->fAggFunction = ROWAGG_DUP_STATS;
|
||||
else if (funct->fAggFunction != ROWAGG_UDAF)
|
||||
else if (funct->fAggFunction == ROWAGG_UDAF)
|
||||
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
||||
else
|
||||
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
||||
|
||||
funct->fAuxColumnIndex = iter->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
aggDupFuncMap.insert(make_pair(make_pair(retKey, aggOp),
|
||||
aggDupFuncMap.insert(make_pair(make_tuple(retKey, aggOp, pUDAFFunc),
|
||||
funct->fOutputColumnIndex));
|
||||
}
|
||||
|
||||
|
@ -1553,6 +1553,7 @@ void RowAggregation::updateEntry(const Row& rowIn)
|
||||
case ROWAGG_DUP_FUNCT:
|
||||
case ROWAGG_DUP_AVG:
|
||||
case ROWAGG_DUP_STATS:
|
||||
case ROWAGG_DUP_UDAF:
|
||||
case ROWAGG_CONSTANT:
|
||||
case ROWAGG_GROUP_CONCAT:
|
||||
break;
|
||||
@ -2110,6 +2111,8 @@ void RowAggregationUM::finalize()
|
||||
if (fHasUDAF)
|
||||
{
|
||||
calculateUDAFColumns();
|
||||
// copy the duplicate UDAF, if any
|
||||
fixDuplicates(ROWAGG_DUP_UDAF);
|
||||
}
|
||||
|
||||
if (fGroupConcat.size() > 0)
|
||||
@ -2208,6 +2211,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn)
|
||||
case ROWAGG_DUP_FUNCT:
|
||||
case ROWAGG_DUP_AVG:
|
||||
case ROWAGG_DUP_STATS:
|
||||
case ROWAGG_DUP_UDAF:
|
||||
case ROWAGG_CONSTANT:
|
||||
break;
|
||||
|
||||
@ -3511,6 +3515,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn)
|
||||
case ROWAGG_DUP_FUNCT:
|
||||
case ROWAGG_DUP_AVG:
|
||||
case ROWAGG_DUP_STATS:
|
||||
case ROWAGG_DUP_UDAF:
|
||||
case ROWAGG_CONSTANT:
|
||||
break;
|
||||
|
||||
@ -3728,12 +3733,18 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
|
||||
// Call the UDAF subEvaluate method
|
||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||
rc = rgContext.getFunction()->subEvaluate(&rgContext, rowIn.getUserData(colIn+1).get());
|
||||
boost::shared_ptr<mcsv1sdk::UserData> userData = rowIn.getUserData(colIn+1);
|
||||
if (!userData)
|
||||
{
|
||||
rowUDAF->bInterrupted = true;
|
||||
throw logic_error("UDAF subevaluate : No userData");
|
||||
}
|
||||
rc = rgContext.getFunction()->subEvaluate(&rgContext, userData.get());
|
||||
rgContext.setUserData(NULL);
|
||||
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
||||
{
|
||||
rowUDAF->bInterrupted = true;
|
||||
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
|
||||
throw logging::IDBExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3917,6 +3928,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn)
|
||||
case ROWAGG_DUP_FUNCT:
|
||||
case ROWAGG_DUP_AVG:
|
||||
case ROWAGG_DUP_STATS:
|
||||
case ROWAGG_DUP_UDAF:
|
||||
case ROWAGG_CONSTANT:
|
||||
break;
|
||||
|
||||
|
@ -116,10 +116,10 @@ enum RowAggFunctionType
|
||||
// ROWAGG_DUP_FUNCT : copy data before AVG calculation, because SUM may share by AVG
|
||||
// ROWAGG_DUP_AVG : copy data after AVG calculation
|
||||
ROWAGG_COUNT_NO_OP, // COUNT(column_name), but leave count() to AVG
|
||||
ROWAGG_DUP_FUNCT, // duplicate aggregate Function(), except AVG, in select
|
||||
ROWAGG_DUP_FUNCT, // duplicate aggregate Function(), except AVG and UDAF, in select
|
||||
ROWAGG_DUP_AVG, // duplicate AVG(column_name) in select
|
||||
ROWAGG_DUP_STATS // duplicate statistics functions in select
|
||||
|
||||
ROWAGG_DUP_STATS, // duplicate statistics functions in select
|
||||
ROWAGG_DUP_UDAF // duplicate UDAF function in select
|
||||
};
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user