diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 8f7755ad9..be0e2009d 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -1097,7 +1097,8 @@ void TupleAggregateStep::prep1PhaseAggregate( uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigUintWidth = sizeof(uint64_t); // For UDAF - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // for count column of average function @@ -1139,6 +1140,7 @@ void TupleAggregateStep::prep1PhaseAggregate( // populate the aggregate rowgroup AGG_MAP aggFuncMap; + uint64_t outIdx = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { @@ -1156,8 +1158,9 @@ void TupleAggregateStep::prep1PhaseAggregate( typeAgg.push_back(ti.dtype); widthAgg.push_back(ti.width); SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol( - aggOp, stats, 0, i, jobInfo.cntStarPos)); + aggOp, stats, 0, outIdx, jobInfo.cntStarPos)); functionVec.push_back(funct); + ++outIdx; continue; } @@ -1173,9 +1176,10 @@ void TupleAggregateStep::prep1PhaseAggregate( typeAgg.push_back(ti.dtype); widthAgg.push_back(width); SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol( - aggOp, stats, 0, i, -1)); + aggOp, stats, 0, outIdx, -1)); functionVec.push_back(funct); + ++outIdx; continue; } @@ -1221,16 +1225,17 @@ void TupleAggregateStep::prep1PhaseAggregate( widthAgg.push_back(width[colProj]); if (groupBy[it->second]->fOutputColumnIndex == (uint32_t) - 1) - groupBy[it->second]->fOutputColumnIndex = i; + groupBy[it->second]->fOutputColumnIndex = outIdx; else functionVec.push_back(SP_ROWAGG_FUNC_t( new RowAggFunctionCol( ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, - i, + outIdx, groupBy[it->second]->fOutputColumnIndex))); + ++outIdx; continue; } else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) != @@ -1243,6 +1248,7 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(ti.precision); typeAgg.push_back(ti.dtype); widthAgg.push_back(ti.width); + ++outIdx; continue; } else if (jobInfo.groupConcatInfo.columns().find(key) != @@ -1255,6 +1261,7 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(width[colProj]); + ++outIdx; continue; } else if (jobInfo.windowSet.find(key) != jobInfo.windowSet.end()) @@ -1266,6 +1273,7 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(width[colProj]); + ++outIdx; continue; } else @@ -1286,16 +1294,16 @@ void TupleAggregateStep::prep1PhaseAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Create a RowAggFunctionCol (UDAF subtype) with the context. - funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i)); + funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, outIdx)); break; } } @@ -1306,7 +1314,7 @@ void TupleAggregateStep::prep1PhaseAggregate( } else { - funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, i)); + funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, outIdx)); } functionVec.push_back(funct); @@ -1477,6 +1485,14 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision()); typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType()); widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth()); + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -1488,6 +1504,13 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(width[colProj]); + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -1520,6 +1543,11 @@ void TupleAggregateStep::prep1PhaseAggregate( { aggFuncMap.insert(make_pair(boost::make_tuple(key, aggOp, pUDAFFunc), funct->fOutputColumnIndex)); } + + if (aggOp != ROWAGG_MULTI_PARM) + { + ++outIdx; + } } // now fix the AVG function, locate the count(column) position @@ -1671,12 +1699,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( uint32_t bigIntWidth = sizeof(int64_t); // map key = column key, operation (enum), and UDAF pointer if UDAF. AGG_MAP aggFuncMap; - set avgSet; +// set avgSet; + list multiParmIndexes; // fOR udaf UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; // for count column of average function map avgFuncMap, avgDistFuncMap; @@ -1825,9 +1855,9 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( } // skip sum / count(column) if avg is also selected - if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && - (avgSet.find(aggKey) != avgSet.end())) - continue; +// if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && +// (avgSet.find(aggKey) != avgSet.end())) +// continue; if (aggOp == ROWAGG_DISTINCT_SUM || aggOp == ROWAGG_DISTINCT_AVG || @@ -1840,12 +1870,12 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { @@ -2063,7 +2093,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType()); widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth()); ++colAgg; - // UDAF Dummy holder for UserData struct + // Column for index of UDAF UserData struct oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); @@ -2071,6 +2101,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(CalpontSystemCatalog::UBIGINT); widthAgg.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAgg++; + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -2082,7 +2120,15 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(widthProj[colProj]); + multiParmIndexes.push_back(colAgg); ++colAgg; + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -2122,7 +2168,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. AGG_MAP aggDupFuncMap; - pUDAFFunc = NULL; + projColsUDAFIdx = 0; + int64_t multiParms = 0; // copy over the groupby vector // update the outputColumnIndex if returned @@ -2133,8 +2180,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc), i)); } - projColsUDAFIndex = 0; // locate the return column position in aggregated rowgroup + uint64_t outIdx = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { udafc = NULL; @@ -2144,23 +2191,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); int colAgg = -1; - if (aggOp == ROWAGG_UDAF) + if (aggOp == ROWAGG_MULTI_PARM) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; - for (; it != jobInfo.projectionCols.end(); it++) - { - udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; - if (udafc) - { - pUDAFFunc = udafc->getContext().getFunction(); - break; - } - } - if (it == jobInfo.projectionCols.end()) - { - throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); - } + // Skip on final agg.: Extra parms for an aggregate have no work there. + ++multiParms; + continue; } if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != @@ -2188,6 +2223,25 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( } } + if (aggOp == ROWAGG_UDAF) + { + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; + for (; it != jobInfo.projectionCols.end(); it++) + { + udafc = dynamic_cast((*it).get()); + projColsUDAFIdx++; + if (udafc) + { + pUDAFFunc = udafc->getContext().getFunction(); + break; + } + } + if (it == jobInfo.projectionCols.end()) + { + throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); + } + } + switch (aggOp) { case ROWAGG_DISTINCT_AVG: @@ -2438,7 +2492,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (returnColMissing) { Message::Args args; - args.add(keyName(i, retKey, jobInfo)); + args.add(keyName(outIdx, retKey, jobInfo)); string emsg = IDBErrorInfo::instance()-> errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); cerr << "prep1PhaseDistinctAggregate: " << emsg << " oid=" @@ -2462,7 +2516,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (jobInfo.groupByColVec[j] == retKey) { if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t) - 1) - groupByNoDist[j]->fOutputColumnIndex = i; + groupByNoDist[j]->fOutputColumnIndex = outIdx; else dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex; } @@ -2472,7 +2526,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (dupGroupbyIndex != -1) functionVec2.push_back(SP_ROWAGG_FUNC_t( new RowAggFunctionCol( - ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, i, dupGroupbyIndex))); + ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex))); } else { @@ -2480,11 +2534,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, i)); + funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, outIdx)); } else { - funct.reset(new RowAggFunctionCol(aggOp, stats, colAgg, i)); + funct.reset(new RowAggFunctionCol(aggOp, stats, colAgg, outIdx)); } if (aggOp == ROWAGG_COUNT_NO_OP) @@ -2521,6 +2575,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG) avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct)); } + ++outIdx; } // for (i // now fix the AVG function, locate the count(column) position @@ -2538,7 +2593,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( } // there is avg(k), but no count(k) in the select list - uint64_t lastCol = returnedColVec.size(); + uint64_t lastCol = outIdx; for (map::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++) { @@ -2753,6 +2808,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k)); groupBySub.push_back(groupby); + // Keep a count of the parms after the first for any aggregate. + // These will be skipped and the count needs to be subtracted + // from where the aux column will be. + int64_t multiParms = 0; + // tricky part : 2 function vectors // -- dummy function vector for sub-aggregator, which does distinct only // -- aggregate function on this distinct column for rowAggDist @@ -2760,6 +2820,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( for (uint64_t k = 0; k < returnedColVec.size(); k++) { + if (functionIdMap(returnedColVec[i].second) == ROWAGG_MULTI_PARM) + { + ++multiParms; + continue; + } if (returnedColVec[k].first != distinctColKey) continue; @@ -2780,7 +2845,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( f->fStatsFunction, groupBySub.size() - 1, f->fOutputColumnIndex, - f->fAuxColumnIndex)); + f->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } } @@ -2799,9 +2864,15 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( { vector functionSub1 = functionNoDistVec; vector functionSub2; + int64_t multiParms = 0; for (uint64_t k = 0; k < returnedColVec.size(); k++) { + if (functionIdMap(returnedColVec[k].second) == ROWAGG_MULTI_PARM) + { + ++multiParms; + continue; + } // search non-distinct functions in functionVec vector::iterator it = functionVec2.begin(); @@ -2817,7 +2888,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex, udafFuncCol->fOutputColumnIndex, - udafFuncCol->fAuxColumnIndex)); + udafFuncCol->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } else if ((f->fOutputColumnIndex == k) && @@ -2839,7 +2910,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( f->fStatsFunction, f->fInputColumnIndex, f->fOutputColumnIndex, - f->fAuxColumnIndex)); + f->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } } @@ -2893,7 +2964,8 @@ void TupleAggregateStep::prep2PhasesAggregate( set avgSet; vector >& returnedColVec = jobInfo.returnedColVec; // For UDAF - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; @@ -3073,11 +3145,11 @@ void TupleAggregateStep::prep2PhasesAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -3305,6 +3377,14 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); widthAggPm.push_back(bigUintWidth); funct->fAuxColumnIndex = colAggPm++; + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -3317,6 +3397,13 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); colAggPm++; + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -3342,7 +3429,7 @@ void TupleAggregateStep::prep2PhasesAggregate( map avgFuncMap; AGG_MAP aggDupFuncMap; - projColsUDAFIndex = 0; + projColsUDAFIdx = 0; // copy over the groupby vector // update the outputColumnIndex if returned for (uint64_t i = 0; i < groupByPm.size(); i++) @@ -3372,12 +3459,12 @@ void TupleAggregateStep::prep2PhasesAggregate( udafc = NULL; if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -3703,7 +3790,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( set avgSet, avgDistSet; vector >& returnedColVec = jobInfo.returnedColVec; // For UDAF - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; @@ -3919,11 +4007,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -4147,6 +4235,14 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); widthAggPm.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAggPm++; + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -4160,6 +4256,13 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( widthAggPm.push_back(width[colProj]); multiParmIndexes.push_back(colAggPm); colAggPm++; + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -4208,9 +4311,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( funct.reset(new RowUDAFFunctionCol( udafFuncCol->fUDAFContext, udafFuncCol->fOutputColumnIndex, - udafFuncCol->fOutputColumnIndex, + udafFuncCol->fOutputColumnIndex-multiParms, udafFuncCol->fAuxColumnIndex-multiParms)); functionNoDistVec.push_back(funct); + pUDAFFunc = udafFuncCol->fUDAFContext.getFunction(); } else { @@ -4218,9 +4322,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( funcPm->fAggFunction, funcPm->fStatsFunction, funcPm->fOutputColumnIndex, - funcPm->fOutputColumnIndex, + funcPm->fOutputColumnIndex-multiParms, funcPm->fAuxColumnIndex-multiParms)); functionNoDistVec.push_back(funct); + pUDAFFunc = NULL; } } @@ -4251,7 +4356,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // These will be skipped and the count needs to be subtracted // from where the aux column will be. int64_t multiParms = 0; - projColsUDAFIndex = 0; + projColsUDAFIdx = 0; // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. map avgFuncMap, avgDistFuncMap; @@ -4286,11 +4391,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -4436,6 +4541,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( precisionAggDist.push_back(precisionAggUm[colUm]); typeAggDist.push_back(typeAggUm[colUm]); widthAggDist.push_back(widthAggUm[colUm]); + colUm -= multiParms; } // not a direct hit -- a returned column is not already in the RG from PMs @@ -4472,8 +4578,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(retKey); scaleAggDist.push_back(0); - precisionAggDist.push_back(19); - typeAggDist.push_back(CalpontSystemCatalog::BIGINT); + if (isUnsigned(typeAggUm[colUm])) + { + precisionAggDist.push_back(20); + typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); + } + else + { + precisionAggDist.push_back(19); + typeAggDist.push_back(CalpontSystemCatalog::BIGINT); + } widthAggDist.push_back(bigIntWidth); } } diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 4a86dc218..d030d1855 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -4097,426 +4097,429 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) try { - // special parsing for group_concat - if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) - { - Item_func_group_concat* gc = (Item_func_group_concat*)isp; + // special parsing for group_concat + if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) + { + Item_func_group_concat* gc = (Item_func_group_concat*)isp; vector orderCols; - RowColumn* rowCol = new RowColumn(); + RowColumn* rowCol = new RowColumn(); vector selCols; - uint32_t select_ctn = gc->count_field(); - ReturnedColumn* rc = NULL; + uint32_t select_ctn = gc->count_field(); + ReturnedColumn* rc = NULL; - for (uint32_t i = 0; i < select_ctn; i++) + for (uint32_t i = 0; i < select_ctn; i++) + { + rc = buildReturnedColumn(sfitempp[i], gwi, gwi.fatalParseError); + + if (!rc || gwi.fatalParseError) + { + if (ac) + delete ac; + return NULL; + } + + selCols.push_back(SRCP(rc)); + } + + ORDER** order_item, **end; + + for (order_item = gc->get_order(), + end = order_item + gc->order_field(); order_item < end; + order_item++) + { + Item* ord_col = *(*order_item)->item; + + if (ord_col->type() == Item::INT_ITEM) { - rc = buildReturnedColumn(sfitempp[i], gwi, gwi.fatalParseError); + Item_int* id = (Item_int*)ord_col; + + if (id->val_int() > (int)selCols.size()) + { + gwi.fatalParseError = true; + if (ac) + delete ac; + return NULL; + } + + rc = selCols[id->val_int() - 1]->clone(); + rc->orderPos(id->val_int() - 1); + } + else + { + rc = buildReturnedColumn(ord_col, gwi, gwi.fatalParseError); if (!rc || gwi.fatalParseError) { - if (ac) - delete ac; + if (ac) + delete ac; return NULL; } - - selCols.push_back(SRCP(rc)); } - ORDER** order_item, **end; + // 10.2 TODO: direction is now a tri-state flag + rc->asc((*order_item)->direction == ORDER::ORDER_ASC ? true : false); + orderCols.push_back(SRCP(rc)); + } - for (order_item = gc->get_order(), - end = order_item + gc->order_field(); order_item < end; - order_item++) - { - Item* ord_col = *(*order_item)->item; - - if (ord_col->type() == Item::INT_ITEM) - { - Item_int* id = (Item_int*)ord_col; - - if (id->val_int() > (int)selCols.size()) - { - gwi.fatalParseError = true; - if (ac) - delete ac; - return NULL; - } - - rc = selCols[id->val_int() - 1]->clone(); - rc->orderPos(id->val_int() - 1); - } - else - { - rc = buildReturnedColumn(ord_col, gwi, gwi.fatalParseError); - - if (!rc || gwi.fatalParseError) - { - if (ac) - delete ac; - return NULL; - } - } - - // 10.2 TODO: direction is now a tri-state flag - rc->asc((*order_item)->direction == ORDER::ORDER_ASC ? true : false); - orderCols.push_back(SRCP(rc)); - } - - rowCol->columnVec(selCols); - (dynamic_cast(ac))->orderCols(orderCols); - parm.reset(rowCol); + rowCol->columnVec(selCols); + (dynamic_cast(ac))->orderCols(orderCols); + parm.reset(rowCol); ac->aggParms().push_back(parm); - if (gc->str_separator()) - { - string separator; - separator.assign(gc->str_separator()->ptr(), gc->str_separator()->length()); - (dynamic_cast(ac))->separator(separator); - } - } - else + if (gc->str_separator()) { - for (uint32_t i = 0; i < isp->argument_count(); i++) + string separator; + separator.assign(gc->str_separator()->ptr(), gc->str_separator()->length()); + (dynamic_cast(ac))->separator(separator); + } + } + else + { + for (uint32_t i = 0; i < isp->argument_count(); i++) + { + Item* sfitemp = sfitempp[i]; + Item::Type sfitype = sfitemp->type(); + + switch (sfitype) { - Item* sfitemp = sfitempp[i]; - Item::Type sfitype = sfitemp->type(); - - switch (sfitype) + case Item::FIELD_ITEM: { - case Item::FIELD_ITEM: - { - Item_field* ifp = reinterpret_cast(sfitemp); - SimpleColumn* sc = buildSimpleColumn(ifp, gwi); + Item_field* ifp = reinterpret_cast(sfitemp); + SimpleColumn* sc = buildSimpleColumn(ifp, gwi); - if (!sc) - { - gwi.fatalParseError = true; - break; - } - - parm.reset(sc); - gwi.columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(ifp->field_name), parm)); - TABLE_LIST* tmp = (ifp->cached_table ? ifp->cached_table : 0); - gwi.tableMap[make_aliastable(sc->schemaName(), sc->tableName(), sc->tableAlias(), sc->isInfiniDB())] = make_pair(1, tmp); - break; - } - - case Item::INT_ITEM: - case Item::STRING_ITEM: - case Item::REAL_ITEM: - case Item::DECIMAL_ITEM: - { - // treat as count(*) - if (ac->aggOp() == AggregateColumn::COUNT) - ac->aggOp(AggregateColumn::COUNT_ASTERISK); - - ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError))); - break; - } - - case Item::NULL_ITEM: - { - parm.reset(new ConstantColumn("", ConstantColumn::NULLDATA)); - ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError))); - break; - } - - case Item::FUNC_ITEM: - { - Item_func* ifp = (Item_func*)sfitemp; - ReturnedColumn* rc = 0; - - // check count(1+1) case - vector tmpVec; - uint16_t parseInfo = 0; - parse_item(ifp, tmpVec, gwi.fatalParseError, parseInfo); - - if (parseInfo & SUB_BIT) - { - gwi.fatalParseError = true; - break; - } - else if (!gwi.fatalParseError && - !(parseInfo & AGG_BIT) && - !(parseInfo & AF_BIT) && - tmpVec.size() == 0) - { - rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError); - FunctionColumn* fc = dynamic_cast(rc); - - if ((fc && fc->functionParms().empty()) || !fc) - { - //ac->aggOp(AggregateColumn::COUNT_ASTERISK); - ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError); - - if (dynamic_cast(rc)) - { - //@bug5229. handle constant function on aggregate argument - ac->constCol(SRCP(rc)); - break; - } - } - } - - // MySQL carelessly allows correlated aggregate function on the WHERE clause. - // Here is the work around to deal with that inconsistence. - // e.g., SELECT (SELECT t.c FROM t1 AS t WHERE t.b=MAX(t1.b + 0)) FROM t1; - ClauseType clauseType = gwi.clauseType; - - if (gwi.clauseType == WHERE) - gwi.clauseType = HAVING; - - // @bug 3603. for cases like max(rand()). try to build function first. - if (!rc) - rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError); - - parm.reset(rc); - gwi.clauseType = clauseType; - - if (gwi.fatalParseError) - break; - - break; - } - - case Item::REF_ITEM: - { - ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError); - - if (rc) - { - parm.reset(rc); - break; - } - } - - default: + if (!sc) { gwi.fatalParseError = true; - //gwi.parseErrorText = "Non-supported Item in Aggregate function"; + break; + } + + parm.reset(sc); + gwi.columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(ifp->field_name), parm)); + TABLE_LIST* tmp = (ifp->cached_table ? ifp->cached_table : 0); + gwi.tableMap[make_aliastable(sc->schemaName(), sc->tableName(), sc->tableAlias(), sc->isInfiniDB())] = make_pair(1, tmp); + break; + } + + case Item::INT_ITEM: + case Item::STRING_ITEM: + case Item::REAL_ITEM: + case Item::DECIMAL_ITEM: + { + // treat as count(*) + if (ac->aggOp() == AggregateColumn::COUNT) + ac->aggOp(AggregateColumn::COUNT_ASTERISK); + parm.reset(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)); + ac->constCol(parm); + break; + } + + case Item::NULL_ITEM: + { + parm.reset(new ConstantColumn("", ConstantColumn::NULLDATA)); + ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError))); + break; + } + + case Item::FUNC_ITEM: + { + Item_func* ifp = (Item_func*)sfitemp; + ReturnedColumn* rc = 0; + + // check count(1+1) case + vector tmpVec; + uint16_t parseInfo = 0; + parse_item(ifp, tmpVec, gwi.fatalParseError, parseInfo); + + if (parseInfo & SUB_BIT) + { + gwi.fatalParseError = true; + break; + } + else if (!gwi.fatalParseError && + !(parseInfo & AGG_BIT) && + !(parseInfo & AF_BIT) && + tmpVec.size() == 0) + { + rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError); + FunctionColumn* fc = dynamic_cast(rc); + + if ((fc && fc->functionParms().empty()) || !fc) + { + //ac->aggOp(AggregateColumn::COUNT_ASTERISK); + ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError); + + if (dynamic_cast(rc)) + { + //@bug5229. handle constant function on aggregate argument + ac->constCol(SRCP(rc)); + break; + } + } + } + + // MySQL carelessly allows correlated aggregate function on the WHERE clause. + // Here is the work around to deal with that inconsistence. + // e.g., SELECT (SELECT t.c FROM t1 AS t WHERE t.b=MAX(t1.b + 0)) FROM t1; + ClauseType clauseType = gwi.clauseType; + + if (gwi.clauseType == WHERE) + gwi.clauseType = HAVING; + + // @bug 3603. for cases like max(rand()). try to build function first. + if (!rc) + rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError); + + parm.reset(rc); + gwi.clauseType = clauseType; + + if (gwi.fatalParseError) + break; + + break; + } + + case Item::REF_ITEM: + { + ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError); + + if (rc) + { + parm.reset(rc); + break; } } - if (gwi.fatalParseError) + default: { - if (gwi.parseErrorText.empty()) - { - Message::Args args; + gwi.fatalParseError = true; + //gwi.parseErrorText = "Non-supported Item in Aggregate function"; + } + } - if (item->name) - args.add(item->name); - else - args.add(""); + if (gwi.fatalParseError) + { + if (gwi.parseErrorText.empty()) + { + Message::Args args; - gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_AGG_ARGS, args); - } + if (item->name) + args.add(item->name); + else + args.add(""); + + gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_AGG_ARGS, args); + } if (ac) delete ac; - return NULL; - } + return NULL; + } if (parm) { // MCOL-1201 multi-argument aggregate ac->aggParms().push_back(parm); } - } } + } // Get result type // Modified for MCOL-1201 multi-argument aggregate if (ac->aggParms().size() > 0) - { + { // These are all one parm functions, so we can safely // use the first parm for result type. parm = ac->aggParms()[0]; - if (isp->sum_func() == Item_sum::AVG_FUNC || - isp->sum_func() == Item_sum::AVG_DISTINCT_FUNC) + if (isp->sum_func() == Item_sum::AVG_FUNC || + isp->sum_func() == Item_sum::AVG_DISTINCT_FUNC) + { + CalpontSystemCatalog::ColType ct = parm->resultType(); + + switch (ct.colDataType) { - CalpontSystemCatalog::ColType ct = parm->resultType(); + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + ct.colDataType = CalpontSystemCatalog::DECIMAL; + ct.colWidth = 8; + ct.scale += 4; + break; - switch (ct.colDataType) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - ct.colDataType = CalpontSystemCatalog::DECIMAL; - ct.colWidth = 8; - ct.scale += 4; - break; +#if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM - #if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + ct.colDataType = CalpontSystemCatalog::DOUBLE; + ct.colWidth = 8; + break; +#endif - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - ct.colDataType = CalpontSystemCatalog::DOUBLE; - ct.colWidth = 8; - break; - #endif - - default: - break; - } - - ac->resultType(ct); + default: + break; } - else if (isp->sum_func() == Item_sum::COUNT_FUNC || - isp->sum_func() == Item_sum::COUNT_DISTINCT_FUNC) + + ac->resultType(ct); + } + else if (isp->sum_func() == Item_sum::COUNT_FUNC || + isp->sum_func() == Item_sum::COUNT_DISTINCT_FUNC) + { + CalpontSystemCatalog::ColType ct; + ct.colDataType = CalpontSystemCatalog::BIGINT; + ct.colWidth = 8; + ct.scale = parm->resultType().scale; + ac->resultType(ct); + } + else if (isp->sum_func() == Item_sum::SUM_FUNC || + isp->sum_func() == Item_sum::SUM_DISTINCT_FUNC) + { + CalpontSystemCatalog::ColType ct = parm->resultType(); + + switch (ct.colDataType) { - CalpontSystemCatalog::ColType ct; - ct.colDataType = CalpontSystemCatalog::BIGINT; - ct.colWidth = 8; - ct.scale = parm->resultType().scale; - ac->resultType(ct); + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + ct.colDataType = CalpontSystemCatalog::BIGINT; + + // no break, let fall through + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + ct.colWidth = 8; + break; + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + ct.colDataType = CalpontSystemCatalog::UBIGINT; + ct.colWidth = 8; + break; + +#if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + ct.colDataType = CalpontSystemCatalog::DOUBLE; + ct.colWidth = 8; + break; +#endif + + default: + break; } - else if (isp->sum_func() == Item_sum::SUM_FUNC || - isp->sum_func() == Item_sum::SUM_DISTINCT_FUNC) - { - CalpontSystemCatalog::ColType ct = parm->resultType(); - switch (ct.colDataType) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - ct.colDataType = CalpontSystemCatalog::BIGINT; - - // no break, let fall through - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - ct.colWidth = 8; - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - ct.colDataType = CalpontSystemCatalog::UBIGINT; - ct.colWidth = 8; - break; - - #if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - ct.colDataType = CalpontSystemCatalog::DOUBLE; - ct.colWidth = 8; - break; - #endif - - default: - break; - } - - ac->resultType(ct); - } - else if (isp->sum_func() == Item_sum::STD_FUNC || - isp->sum_func() == Item_sum::VARIANCE_FUNC) - { - CalpontSystemCatalog::ColType ct; - ct.colDataType = CalpontSystemCatalog::DOUBLE; - ct.colWidth = 8; - ct.scale = 0; - ac->resultType(ct); - } - else if (isp->sum_func() == Item_sum::SUM_BIT_FUNC) - { - CalpontSystemCatalog::ColType ct; - ct.colDataType = CalpontSystemCatalog::BIGINT; - ct.colWidth = 8; - ct.scale = 0; - ct.precision = -16; // borrowed to indicate skip null value check on connector - ac->resultType(ct); - } - else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) - { - //Item_func_group_concat* gc = (Item_func_group_concat*)isp; - CalpontSystemCatalog::ColType ct; - ct.colDataType = CalpontSystemCatalog::VARCHAR; - ct.colWidth = isp->max_length; - ct.precision = 0; - ac->resultType(ct); - } - else - { - // UDAF result type will be set below. - ac->resultType(parm->resultType()); - } + ac->resultType(ct); + } + else if (isp->sum_func() == Item_sum::STD_FUNC || + isp->sum_func() == Item_sum::VARIANCE_FUNC) + { + CalpontSystemCatalog::ColType ct; + ct.colDataType = CalpontSystemCatalog::DOUBLE; + ct.colWidth = 8; + ct.scale = 0; + ac->resultType(ct); + } + else if (isp->sum_func() == Item_sum::SUM_BIT_FUNC) + { + CalpontSystemCatalog::ColType ct; + ct.colDataType = CalpontSystemCatalog::BIGINT; + ct.colWidth = 8; + ct.scale = 0; + ct.precision = -16; // borrowed to indicate skip null value check on connector + ac->resultType(ct); + } + else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC) + { + //Item_func_group_concat* gc = (Item_func_group_concat*)isp; + CalpontSystemCatalog::ColType ct; + ct.colDataType = CalpontSystemCatalog::VARCHAR; + ct.colWidth = isp->max_length; + ct.precision = 0; + ac->resultType(ct); } else { - ac->resultType(colType_MysqlToIDB(isp)); + // UDAF result type will be set below. + ac->resultType(parm->resultType()); } + } + else + { + ac->resultType(colType_MysqlToIDB(isp)); + } - // adjust decimal result type according to internalDecimalScale - if (gwi.internalDecimalScale >= 0 && ac->resultType().colDataType == CalpontSystemCatalog::DECIMAL) + // adjust decimal result type according to internalDecimalScale + if (gwi.internalDecimalScale >= 0 && ac->resultType().colDataType == CalpontSystemCatalog::DECIMAL) + { + CalpontSystemCatalog::ColType ct = ac->resultType(); + ct.scale = gwi.internalDecimalScale; + ac->resultType(ct); + } + + // check for same aggregate on the select list + ac->expressionId(ci->expressionId++); + + if (gwi.clauseType != SELECT) + { + for (uint32_t i = 0; i < gwi.returnedCols.size(); i++) { - CalpontSystemCatalog::ColType ct = ac->resultType(); - ct.scale = gwi.internalDecimalScale; - ac->resultType(ct); + if (*ac == gwi.returnedCols[i].get()) + ac->expressionId(gwi.returnedCols[i]->expressionId()); } + } - // check for same aggregate on the select list - ac->expressionId(ci->expressionId++); - - if (gwi.clauseType != SELECT) + // @bug5977 @note Temporary fix to avoid mysqld crash. The permanent fix will + // be applied in ExeMgr. When the ExeMgr fix is available, this checking + // will be taken out. + if (isp->sum_func() != Item_sum::UDF_SUM_FUNC) { - for (uint32_t i = 0; i < gwi.returnedCols.size(); i++) - { - if (*ac == gwi.returnedCols[i].get()) - ac->expressionId(gwi.returnedCols[i]->expressionId()); - } + if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty()) + { + gwi.fatalParseError = true; + gwi.parseErrorText = "No project column found for aggregate function"; + if (ac) + delete ac; + return NULL; + } + else if (ac->constCol()) + { + gwi.count_asterisk_list.push_back(ac); + } } - // @bug5977 @note Temporary fix to avoid mysqld crash. The permanent fix will - // be applied in ExeMgr. When the ExeMgr fix is available, this checking - // will be taken out. - if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty()) - { - gwi.fatalParseError = true; - gwi.parseErrorText = "No project column found for aggregate function"; - if (ac) - delete ac; - return NULL; - } - else if (ac->constCol()) - { - gwi.count_asterisk_list.push_back(ac); - } - - // For UDAF, populate the context and call the UDAF init() function. + // For UDAF, populate the context and call the UDAF init() function. // The return type is (should be) set in context by init(). - if (isp->sum_func() == Item_sum::UDF_SUM_FUNC) + if (isp->sum_func() == Item_sum::UDF_SUM_FUNC) + { + UDAFColumn* udafc = dynamic_cast(ac); + + if (udafc) { - UDAFColumn* udafc = dynamic_cast(ac); + mcsv1Context& context = udafc->getContext(); + context.setName(isp->func_name()); - if (udafc) - { - mcsv1Context& context = udafc->getContext(); - context.setName(isp->func_name()); - - // Set up the return type defaults for the call to init() - context.setResultType(udafc->resultType().colDataType); - context.setColWidth(udafc->resultType().colWidth); - context.setScale(udafc->resultType().scale); - context.setPrecision(udafc->resultType().precision); + // Set up the return type defaults for the call to init() + context.setResultType(udafc->resultType().colDataType); + context.setColWidth(udafc->resultType().colWidth); + context.setScale(udafc->resultType().scale); + context.setPrecision(udafc->resultType().precision); context.setParamCount(udafc->aggParms().size()); ColumnDatum colType; @@ -4533,7 +4536,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) colTypes[i] = colType; } - // Call the user supplied init() + // Call the user supplied init() mcsv1sdk::mcsv1_UDAF* udaf = context.getFunction(); if (!udaf) { @@ -4544,37 +4547,37 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) return NULL; } if (udaf->init(&context, colTypes) == mcsv1_UDAF::ERROR) - { - gwi.fatalParseError = true; - gwi.parseErrorText = udafc->getContext().getErrorMessage(); + { + gwi.fatalParseError = true; + gwi.parseErrorText = udafc->getContext().getErrorMessage(); if (ac) delete ac; - return NULL; - } + return NULL; + } // UDAF_OVER_REQUIRED means that this function is for Window // Function only. Reject it here in aggregate land. - if (udafc->getContext().getRunFlag(UDAF_OVER_REQUIRED)) - { - gwi.fatalParseError = true; - gwi.parseErrorText = - logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_WINDOW_FUNC_ONLY, - context.getName()); + if (udafc->getContext().getRunFlag(UDAF_OVER_REQUIRED)) + { + gwi.fatalParseError = true; + gwi.parseErrorText = + logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_WINDOW_FUNC_ONLY, + context.getName()); if (ac) delete ac; - return NULL; - } - - // Set the return type as set in init() - CalpontSystemCatalog::ColType ct; - ct.colDataType = context.getResultType(); - ct.colWidth = context.getColWidth(); - ct.scale = context.getScale(); - ct.precision = context.getPrecision(); - udafc->resultType(ct); + return NULL; } + + // Set the return type as set in init() + CalpontSystemCatalog::ColType ct; + ct.colDataType = context.getResultType(); + ct.colWidth = context.getColWidth(); + ct.scale = context.getScale(); + ct.precision = context.getPrecision(); + udafc->resultType(ct); } } + } catch (std::logic_error e) { gwi.fatalParseError = true; @@ -4744,6 +4747,7 @@ void gp_walk(const Item* item, void* arg) if (isp) { + // @bug 3669. trim trailing spaces for the compare value if (isp->result_type() == STRING_RESULT) { String val, *str = isp->val_str(&val); @@ -4754,7 +4758,10 @@ void gp_walk(const Item* item, void* arg) cval.assign(str->ptr(), str->length()); } + size_t spos = cval.find_last_not_of(" "); + if (spos != string::npos) + cval = cval.substr(0, spos + 1); gwip->rcWorkStack.push(new ConstantColumn(cval)); break; @@ -7908,8 +7915,15 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); return ER_CHECK_NOT_IMPLEMENTED; } - - (*coliter)->aggParms().push_back(minSc); + // Replace the last (presumably constant) object with minSc + if ((*coliter)->aggParms().empty()) + { + (*coliter)->aggParms().push_back(minSc); + } + else + { + (*coliter)->aggParms()[0] = minSc; + } } std::vector::iterator funciter; @@ -8075,9 +8089,9 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi) gwi.thd = thd; int status = getGroupPlan(gwi, select_lex, csep, gi); - cerr << "---------------- cp_get_group_plan EXECUTION PLAN ----------------" << endl; - cerr << *csep << endl ; - cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; +// cerr << "---------------- cp_get_group_plan EXECUTION PLAN ----------------" << endl; +// cerr << *csep << endl ; +// cerr << "-------------- EXECUTION PLAN END --------------\n" << endl; if (status > 0) return ER_INTERNAL_ERROR; diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index 043dcaac2..bead74aff 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -1723,17 +1723,7 @@ void RowAggregation::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(pFunctionCol.get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colOut + 1, rowUDAF, i); - } - else - { - throw logic_error("(3)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colOut + 1, i); break; } @@ -2012,31 +2002,60 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + valIn * valIn, colAux + 1); } -void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, - RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx) +void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, + int64_t colAux, uint64_t& funcColsIdx) { uint32_t paramCount = fRGContext.getParameterCount(); // The vector of parameters to be sent to the UDAF mcsv1sdk::ColumnDatum valsIn[paramCount]; uint32_t dataFlags[paramCount]; - + ConstantColumn* cc; + bool bIsNull = false; execplan::CalpontSystemCatalog::ColDataType colDataType; for (uint32_t i = 0; i < paramCount; ++i) { + // If UDAF_IGNORE_NULLS is on, bIsNull gets set the first time + // we find a null. We still need to eat the rest of the parameters + // to sync updateEntry + if (bIsNull) + { + ++funcColsIdx; + continue; + } + SP_ROWAGG_FUNC_t pFunctionCol = fFunctionCols[funcColsIdx]; mcsv1sdk::ColumnDatum& datum = valsIn[i]; // Turn on NULL flags dataFlags[i] = 0; - if (isNull(&fRowGroupIn, rowIn, colIn) == true) + + // If this particular parameter is a constant, then we need + // to acces the constant value rather than a row value. + cc = NULL; + if (pFunctionCol->fpConstCol) + { + cc = dynamic_cast(pFunctionCol->fpConstCol.get()); + } + + if ((cc && cc->type() == ConstantColumn::NULLDATA) + || (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true)) { if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) { - return; + bIsNull = true; + ++funcColsIdx; + continue; } dataFlags[i] |= mcsv1sdk::PARAM_IS_NULL; } - - colDataType = fRowGroupIn.getColTypes()[colIn]; - if (!fRGContext.isParamNull(i)) + + if (cc) + { + colDataType = cc->resultType().colDataType; + } + else + { + colDataType = fRowGroupIn.getColTypes()[colIn]; + } + if (!(dataFlags[i] & mcsv1sdk::PARAM_IS_NULL)) { switch (colDataType) { @@ -2045,13 +2064,38 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::BIGINT: + { + datum.dataType = execplan::CalpontSystemCatalog::BIGINT; + if (cc) + { + datum.columnData = cc->getIntVal(const_cast(rowIn), bIsNull); + datum.scale = cc->resultType().scale; + datum.precision = cc->resultType().precision; + } + else + { + datum.columnData = rowIn.getIntField(colIn); + datum.scale = fRowGroupIn.getScale()[colIn]; + datum.precision = fRowGroupIn.getPrecision()[colIn]; + } + break; + } case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL: { - datum.dataType = execplan::CalpontSystemCatalog::BIGINT; - datum.columnData = rowIn.getIntField(colIn); - datum.scale = fRowGroupIn.getScale()[colIn]; - datum.precision = fRowGroupIn.getPrecision()[colIn]; + datum.dataType = colDataType; + if (cc) + { + datum.columnData = cc->getDecimalVal(const_cast(rowIn), bIsNull).value; + datum.scale = cc->resultType().scale; + datum.precision = cc->resultType().precision; + } + else + { + datum.columnData = rowIn.getIntField(colIn); + datum.scale = fRowGroupIn.getScale()[colIn]; + datum.precision = fRowGroupIn.getPrecision()[colIn]; + } break; } @@ -2062,7 +2106,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::UBIGINT: { datum.dataType = execplan::CalpontSystemCatalog::UBIGINT; - datum.columnData = rowIn.getUintField(colIn); + if (cc) + { + datum.columnData = cc->getUintVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getUintField(colIn); + } break; } @@ -2070,7 +2121,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::UDOUBLE: { datum.dataType = execplan::CalpontSystemCatalog::DOUBLE; - datum.columnData = rowIn.getDoubleField(colIn); + if (cc) + { + datum.columnData = cc->getDoubleVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getDoubleField(colIn); + } break; } @@ -2078,22 +2136,55 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::UFLOAT: { datum.dataType = execplan::CalpontSystemCatalog::FLOAT; - datum.columnData = rowIn.getFloatField(colIn); + if (cc) + { + datum.columnData = cc->getFloatVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getFloatField(colIn); + } break; } case execplan::CalpontSystemCatalog::DATE: + { + datum.dataType = execplan::CalpontSystemCatalog::UBIGINT; + if (cc) + { + datum.columnData = cc->getDateIntVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getUintField(colIn); + } + break; + } case execplan::CalpontSystemCatalog::DATETIME: { datum.dataType = execplan::CalpontSystemCatalog::UBIGINT; - datum.columnData = rowIn.getUintField(colIn); + if (cc) + { + datum.columnData = cc->getDatetimeIntVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getUintField(colIn); + } break; } case execplan::CalpontSystemCatalog::TIME: { datum.dataType = execplan::CalpontSystemCatalog::BIGINT; - datum.columnData = rowIn.getIntField(colIn); + if (cc) + { + datum.columnData = cc->getTimeIntVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getIntField(colIn); + } break; } @@ -2105,7 +2196,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::BLOB: { datum.dataType = colDataType; - datum.columnData = rowIn.getStringField(colIn); + if (cc) + { + datum.columnData = cc->getStrVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getStringField(colIn); + } break; } @@ -2147,6 +2245,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { + RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[funcColsIdx].get()); rowUDAF->bInterrupted = true; throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr); } @@ -2443,17 +2542,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[i].get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i); - } - else - { - throw logic_error("(5)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colAux, i); break; } @@ -3991,17 +4080,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[i].get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i); - } - else - { - throw logic_error("(6)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colAux, i); break; } @@ -4199,20 +4278,20 @@ void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut // colAux(in) - Where the UDAF userdata resides // rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance //------------------------------------------------------------------------------ -void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, - RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx) +void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, + int64_t colAux, uint64_t& funcColsIdx) { static_any::any valOut; // Get the user data - boost::shared_ptr userData = rowIn.getUserData(colIn + 1); + boost::shared_ptr userDataIn = rowIn.getUserData(colIn+1); // Unlike other aggregates, the data isn't in colIn, so testing it for NULL // there won't help. In case of NULL, userData will be NULL. uint32_t flags[1]; flags[0] = 0; - if (!userData) + if (!userDataIn) { if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) { @@ -4230,11 +4309,12 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, // Call the UDAF subEvaluate method mcsv1sdk::mcsv1_UDAF::ReturnCode rc; - rc = fRGContext.getFunction()->subEvaluate(&fRGContext, userData.get()); + rc = fRGContext.getFunction()->subEvaluate(&fRGContext, userDataIn.get()); fRGContext.setUserData(NULL); if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { + RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[funcColsIdx].get()); rowUDAF->bInterrupted = true; throw logging::IDBExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr); } @@ -4429,17 +4509,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[i].get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i); - } - else - { - throw logic_error("(7)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colAux, i); break; }