diff --git a/dbcon/execplan/udafcolumn.cpp b/dbcon/execplan/udafcolumn.cpp index d58c4fb97..5862b469f 100755 --- a/dbcon/execplan/udafcolumn.cpp +++ b/dbcon/execplan/udafcolumn.cpp @@ -55,7 +55,7 @@ UDAFColumn::UDAFColumn(const uint32_t sessionID): } UDAFColumn::UDAFColumn(const UDAFColumn& rhs, const uint32_t sessionID): - AggregateColumn(dynamic_cast(rhs)) + AggregateColumn(dynamic_cast(rhs), sessionID), context(rhs.context) { } diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 1d7df956d..ccd21a917 100755 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -74,10 +74,30 @@ using namespace querytele; namespace { +struct cmpTuple { + bool operator()(tuple a, + tuple b) + { + if (get<0>(a) < get<0>(b)) + return true; + if (get<0>(a) == get<0>(b)) + { + if (get<1>(a) < get<1>(b)) + return true; + if (get<1>(a) == get<1>(b)) + return get<2>(a) < get<2>(b); + } + return false; + } +}; typedef vector RowBucket; typedef vector RowBucketVec; +// The AGG_MAP type is used to maintain a list of aggregate functions in order to +// detect duplicates. Since all UDAF have the same op type (ROWAGG_UDAF), we add in +// the function pointer in order to ensure uniqueness. +typedef map, uint64_t, cmpTuple> AGG_MAP; inline RowAggFunctionType functionIdMap(int planFuncId) { @@ -697,12 +717,15 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo) ConstantColumn* cc = dynamic_cast(ac->constCol().get()); idbassert(cc != NULL); // @bug5261 bool isNull = (ConstantColumn::NULLDATA == cc->type()); - UDAFColumn* udafc = dynamic_cast(ac); - if (udafc) + if (ac->aggOp() == ROWAGG_UDAF) { - constAggDataVec.push_back( - ConstantAggData(cc->constval(), udafc->getContext().getName(), - functionIdMap(ac->aggOp()), isNull)); + UDAFColumn* udafc = dynamic_cast(ac); + if (udafc) + { + constAggDataVec.push_back( + ConstantAggData(cc->constval(), udafc->getContext().getName(), + functionIdMap(ac->aggOp()), isNull)); + } } else { @@ -936,6 +959,7 @@ void TupleAggregateStep::prep1PhaseAggregate( uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigUintWidth = sizeof(uint64_t); + mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // for count column of average function map avgFuncMap; @@ -972,7 +996,7 @@ void TupleAggregateStep::prep1PhaseAggregate( } // populate the aggregate rowgroup - map, uint64_t> aggFuncMap; + AGG_MAP aggFuncMap; for (uint64_t i = 0; i < returnedColVec.size(); i++) { RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); @@ -1112,7 +1136,7 @@ void TupleAggregateStep::prep1PhaseAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.nonConstCols[i].get()); + UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); if (udafc) { // Create a RowAggFunctionCol (UDAF subtype) with the context. @@ -1274,6 +1298,8 @@ void TupleAggregateStep::prep1PhaseAggregate( { throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } + pUDAFFunc = udafFuncCol->fUDAFContext.getFunction(); + // Return column oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); @@ -1294,14 +1320,15 @@ void TupleAggregateStep::prep1PhaseAggregate( } // find if this func is a duplicate - map, uint64_t>::iterator iter = - aggFuncMap.find(make_pair(key, aggOp)); + AGG_MAP::iterator iter = aggFuncMap.find(make_tuple(key, aggOp, pUDAFFunc)); if (iter != aggFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; + else if (funct->fAggFunction == ROWAGG_UDAF) + funct->fAggFunction = ROWAGG_DUP_UDAF; else funct->fAggFunction = ROWAGG_DUP_FUNCT; @@ -1309,7 +1336,7 @@ void TupleAggregateStep::prep1PhaseAggregate( } else { - aggFuncMap.insert(make_pair(make_pair(key, aggOp), funct->fOutputColumnIndex)); + aggFuncMap.insert(make_pair(make_tuple(key, aggOp, pUDAFFunc), funct->fOutputColumnIndex)); } } @@ -1453,7 +1480,9 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( vector groupBy, groupByNoDist; vector functionVec1, functionVec2, functionNoDistVec; uint32_t bigIntWidth = sizeof(int64_t); - map, uint64_t> aggFuncMap; + // map key = column key, operation (enum), and UDAF pointer if UDAF. + AGG_MAP aggFuncMap; + mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; set avgSet; // for count column of average function @@ -1506,7 +1535,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(widthProj[colProj]); - aggFuncMap.insert(make_pair(make_pair(keysAgg[colAgg], 0), colAgg)); + aggFuncMap.insert(make_pair(make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg)); colAgg++; } @@ -1544,13 +1573,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(widthProj[colProj]); - aggFuncMap.insert(make_pair(make_pair(keysAgg[colAgg], 0), colAgg)); + aggFuncMap.insert(make_pair(make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg)); colAgg++; } // vectors for aggregate functions for (uint64_t i = 0; i < aggColVec.size(); i++) { + pUDAFFunc = NULL; uint32_t aggKey = aggColVec[i].first; RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second); @@ -1573,7 +1603,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol( aggOp, stats, colAgg, colAgg, -1)); functionVec1.push_back(funct); - aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAgg)); + aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAgg)); colAgg++; continue; @@ -1607,9 +1637,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.nonConstCols[i].get()); + UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); if (udafc) { + pUDAFFunc = udafc->getContext().getFunction(); // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg)); } @@ -1620,15 +1651,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( } else { - // skip if this is a duplicate - if (aggFuncMap.find(make_pair(aggKey, aggOp)) != aggFuncMap.end()) - continue; - funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg)); } - functionVec1.push_back(funct); + // skip if this is a duplicate + if (aggFuncMap.find(make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end()) + continue; - aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAgg)); + functionVec1.push_back(funct); + aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAgg)); switch (aggOp) { @@ -1853,7 +1883,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( { // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. - map, uint64_t> aggDupFuncMap; + AGG_MAP aggDupFuncMap; + pUDAFFunc = NULL; // copy over the groupby vector // update the outputColumnIndex if returned @@ -1861,12 +1892,13 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1)); groupByNoDist.push_back(groupby); - aggFuncMap.insert(make_pair(make_pair(keysAgg[i], 0), i)); + aggFuncMap.insert(make_pair(make_tuple(keysAgg[i], 0, pUDAFFunc), i)); } // locate the return column position in aggregated rowgroup for (uint64_t i = 0; i < returnedColVec.size(); i++) { + pUDAFFunc = NULL; uint32_t retKey = returnedColVec[i].first; RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); @@ -1874,7 +1906,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != jobInfo.distinctColVec.end() ) { - map, uint64_t>::iterator it = aggFuncMap.find(make_pair(retKey, 0)); + AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, 0, pUDAFFunc)); if (it != aggFuncMap.end()) { colAgg = it->second; @@ -1977,8 +2009,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( case ROWAGG_BIT_XOR: default: { - map, uint64_t>::iterator it = - aggFuncMap.find(make_pair(retKey, aggOp)); + AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc)); if (it != aggFuncMap.end()) { colAgg = it->second; @@ -2005,7 +2036,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { - it = aggFuncMap.find(make_pair(returnedColVec[i].first, ROWAGG_AVG)); + it = aggFuncMap.find(make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); if (it != aggFuncMap.end()) { // false alarm @@ -2175,21 +2206,22 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( functionVec2.push_back(funct); // find if this func is a duplicate - map, uint64_t>::iterator iter = - aggDupFuncMap.find(make_pair(retKey, aggOp)); + AGG_MAP::iterator iter = aggDupFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc)); if (iter != aggDupFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; - else if (funct->fAggFunction != ROWAGG_UDAF) + else if (funct->fAggFunction == ROWAGG_UDAF) + funct->fAggFunction = ROWAGG_DUP_UDAF; + else funct->fAggFunction = ROWAGG_DUP_FUNCT; funct->fAuxColumnIndex = iter->second; } else { - aggDupFuncMap.insert(make_pair(make_pair(retKey, aggOp), + aggDupFuncMap.insert(make_pair(make_tuple(retKey, aggOp, pUDAFFunc), funct->fOutputColumnIndex)); } @@ -2587,7 +2619,8 @@ void TupleAggregateStep::prep2PhasesAggregate( vector functionVecPm, functionVecUm; uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigUintWidth = sizeof(uint64_t); - map, uint64_t> aggFuncMap; + AGG_MAP aggFuncMap; + mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // associate the columns between projected RG and aggregate RG on PM // populated the aggregate columns @@ -2637,7 +2670,7 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm)); + aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); colAggPm++; } @@ -2675,7 +2708,7 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm)); + aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); colAggPm++; } @@ -2686,6 +2719,7 @@ void TupleAggregateStep::prep2PhasesAggregate( if (returnedColVec[i].second == 0) continue; + pUDAFFunc = NULL; uint32_t aggKey = returnedColVec[i].first; RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); @@ -2717,9 +2751,10 @@ void TupleAggregateStep::prep2PhasesAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.nonConstCols[i].get()); + UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); if (udafc) { + pUDAFFunc = udafc->getContext().getFunction(); // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); } @@ -2730,15 +2765,14 @@ void TupleAggregateStep::prep2PhasesAggregate( } else { - // skip if this is a duplicate - if (aggFuncMap.find(make_pair(aggKey, aggOp)) != aggFuncMap.end()) - continue; - funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); } - functionVecPm.push_back(funct); + // skip if this is a duplicate + if (aggFuncMap.find(make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end()) + continue; - aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAggPm)); + functionVecPm.push_back(funct); + aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm)); switch (aggOp) { @@ -2948,7 +2982,7 @@ void TupleAggregateStep::prep2PhasesAggregate( // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. map avgFuncMap; - map, uint64_t> aggDupFuncMap; + AGG_MAP aggDupFuncMap; // copy over the groupby vector // update the outputColumnIndex if returned @@ -2966,8 +3000,16 @@ void TupleAggregateStep::prep2PhasesAggregate( RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); int colPm = -1; - map, uint64_t>::iterator it = - aggFuncMap.find(make_pair(retKey, aggOp)); + // Is this a UDAF? use the function as part of the key. + mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; + if (aggOp == ROWAGG_UDAF) + { + UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); + if (udafc) + pUDAFFunc = udafc->getContext().getFunction(); + } + + AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc)); if (it != aggFuncMap.end()) { colPm = it->second; @@ -2987,7 +3029,7 @@ void TupleAggregateStep::prep2PhasesAggregate( // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { - it = aggFuncMap.find(make_pair(returnedColVec[i].first, ROWAGG_AVG)); + it = aggFuncMap.find(make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); if (it != aggFuncMap.end()) { // false alarm @@ -3112,7 +3154,7 @@ void TupleAggregateStep::prep2PhasesAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.nonConstCols[i].get()); + UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, i)); } else @@ -3126,21 +3168,22 @@ void TupleAggregateStep::prep2PhasesAggregate( functionVecUm.push_back(funct); // find if this func is a duplicate - map, uint64_t>::iterator iter = - aggDupFuncMap.find(make_pair(retKey, aggOp)); + AGG_MAP::iterator iter = aggDupFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc)); if (iter != aggDupFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; - else if (funct->fAggFunction != ROWAGG_UDAF) + else if (funct->fAggFunction == ROWAGG_UDAF) + funct->fAggFunction = ROWAGG_DUP_UDAF; + else funct->fAggFunction = ROWAGG_DUP_FUNCT; funct->fAuxColumnIndex = iter->second; } else { - aggDupFuncMap.insert(make_pair(make_pair(retKey, aggOp), + aggDupFuncMap.insert(make_pair(make_tuple(retKey, aggOp, pUDAFFunc), funct->fOutputColumnIndex)); } @@ -3328,7 +3371,9 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( vector functionVecPm, functionNoDistVec, functionVecUm; uint32_t bigIntWidth = sizeof(int64_t); - map, uint64_t> aggFuncMap, avgFuncDistMap; + map, uint64_t> avgFuncDistMap; + AGG_MAP aggFuncMap; + mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // associate the columns between projected RG and aggregate RG on PM // populated the aggregate columns @@ -3378,7 +3423,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm)); + aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); colAggPm++; } @@ -3416,7 +3461,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(make_pair(keysAggPm[colAggPm], 0), colAggPm)); + aggFuncMap.insert(make_pair(make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); colAggPm++; } @@ -3428,6 +3473,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (aggOp == ROWAGG_CONSTANT) continue; + pUDAFFunc = NULL; uint32_t aggKey = aggColVec[i].first; if (projColPosMap.find(aggKey) == projColPosMap.end()) { @@ -3457,9 +3503,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.nonConstCols[i].get()); + UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); if (udafc) { + pUDAFFunc = udafc->getContext().getFunction(); // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); } @@ -3470,15 +3517,14 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( } else { - // skip if this is a duplicate - if (aggFuncMap.find(make_pair(aggKey, aggOp)) != aggFuncMap.end()) - continue; - funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); } - functionVecPm.push_back(funct); + // skip if this is a duplicate + if (aggFuncMap.find(make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end()) + continue; - aggFuncMap.insert(make_pair(make_pair(aggKey, aggOp), colAggPm)); + functionVecPm.push_back(funct); + aggFuncMap.insert(make_pair(make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm)); switch (aggOp) { @@ -3734,7 +3780,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. map avgFuncMap, avgDistFuncMap; - map, uint64_t> aggDupFuncMap; + AGG_MAP aggDupFuncMap; // copy over the groupby vector for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) @@ -3746,6 +3792,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // locate the return column position in aggregated rowgroup from PM for (uint64_t i = 0; i < returnedColVec.size(); i++) { + pUDAFFunc = NULL; uint32_t retKey = returnedColVec[i].first; RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); @@ -3754,7 +3801,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != jobInfo.distinctColVec.end() ) { - map, uint64_t>::iterator it = aggFuncMap.find(make_pair(retKey, 0)); + AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, 0, pUDAFFunc)); if (it != aggFuncMap.end()) { colUm = it->second; @@ -3862,8 +3909,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( case ROWAGG_CONSTANT: default: { - map, uint64_t>::iterator it = - aggFuncMap.find(make_pair(retKey, aggOp)); + AGG_MAP::iterator it = aggFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc)); if (it != aggFuncMap.end()) { colUm = it->second; @@ -3883,7 +3929,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { - it = aggFuncMap.find(make_pair(returnedColVec[i].first, ROWAGG_AVG)); + it = aggFuncMap.find(make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); if (it != aggFuncMap.end()) { // false alarm @@ -3997,7 +4043,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.nonConstCols[i].get()); + UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); + pUDAFFunc = udafc->getContext().getFunction(); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, i)); } else @@ -4012,22 +4059,23 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( functionVecUm.push_back(funct); // find if this func is a duplicate - map, uint64_t>::iterator iter = - aggDupFuncMap.find(make_pair(retKey, aggOp)); + AGG_MAP::iterator iter = aggDupFuncMap.find(make_tuple(retKey, aggOp, pUDAFFunc)); if (iter != aggDupFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; - else if (funct->fAggFunction != ROWAGG_UDAF) + else if (funct->fAggFunction == ROWAGG_UDAF) + funct->fAggFunction = ROWAGG_DUP_UDAF; + else funct->fAggFunction = ROWAGG_DUP_FUNCT; funct->fAuxColumnIndex = iter->second; } else { - aggDupFuncMap.insert(make_pair(make_pair(retKey, aggOp), + aggDupFuncMap.insert(make_pair(make_tuple(retKey, aggOp, pUDAFFunc), funct->fOutputColumnIndex)); } diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index c7dd036af..4649adc81 100755 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -1553,6 +1553,7 @@ void RowAggregation::updateEntry(const Row& rowIn) case ROWAGG_DUP_FUNCT: case ROWAGG_DUP_AVG: case ROWAGG_DUP_STATS: + case ROWAGG_DUP_UDAF: case ROWAGG_CONSTANT: case ROWAGG_GROUP_CONCAT: break; @@ -2110,6 +2111,8 @@ void RowAggregationUM::finalize() if (fHasUDAF) { calculateUDAFColumns(); + // copy the duplicate UDAF, if any + fixDuplicates(ROWAGG_DUP_UDAF); } if (fGroupConcat.size() > 0) @@ -2208,6 +2211,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn) case ROWAGG_DUP_FUNCT: case ROWAGG_DUP_AVG: case ROWAGG_DUP_STATS: + case ROWAGG_DUP_UDAF: case ROWAGG_CONSTANT: break; @@ -3511,6 +3515,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn) case ROWAGG_DUP_FUNCT: case ROWAGG_DUP_AVG: case ROWAGG_DUP_STATS: + case ROWAGG_DUP_UDAF: case ROWAGG_CONSTANT: break; @@ -3728,12 +3733,18 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, // Call the UDAF subEvaluate method mcsv1sdk::mcsv1_UDAF::ReturnCode rc; - rc = rgContext.getFunction()->subEvaluate(&rgContext, rowIn.getUserData(colIn+1).get()); + boost::shared_ptr userData = rowIn.getUserData(colIn+1); + if (!userData) + { + rowUDAF->bInterrupted = true; + throw logic_error("UDAF subevaluate : No userData"); + } + rc = rgContext.getFunction()->subEvaluate(&rgContext, userData.get()); rgContext.setUserData(NULL); if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { rowUDAF->bInterrupted = true; - throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr); + throw logging::IDBExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr); } } @@ -3917,6 +3928,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn) case ROWAGG_DUP_FUNCT: case ROWAGG_DUP_AVG: case ROWAGG_DUP_STATS: + case ROWAGG_DUP_UDAF: case ROWAGG_CONSTANT: break; diff --git a/utils/rowgroup/rowaggregation.h b/utils/rowgroup/rowaggregation.h index e5b230c93..c77d4ce6f 100755 --- a/utils/rowgroup/rowaggregation.h +++ b/utils/rowgroup/rowaggregation.h @@ -116,10 +116,10 @@ enum RowAggFunctionType // ROWAGG_DUP_FUNCT : copy data before AVG calculation, because SUM may share by AVG // ROWAGG_DUP_AVG : copy data after AVG calculation ROWAGG_COUNT_NO_OP, // COUNT(column_name), but leave count() to AVG - ROWAGG_DUP_FUNCT, // duplicate aggregate Function(), except AVG, in select + ROWAGG_DUP_FUNCT, // duplicate aggregate Function(), except AVG and UDAF, in select ROWAGG_DUP_AVG, // duplicate AVG(column_name) in select - ROWAGG_DUP_STATS // duplicate statistics functions in select - + ROWAGG_DUP_STATS, // duplicate statistics functions in select + ROWAGG_DUP_UDAF // duplicate UDAF function in select };