From 611cdb204dc5f7af6a8ab250f993b1aa0cccf71d Mon Sep 17 00:00:00 2001 From: David Hall Date: Thu, 16 Aug 2018 11:17:39 -0500 Subject: [PATCH] MCOL-521 Re-do changes for MariaDB 10.3 merge --- dbcon/joblist/tupleaggregatestep.cpp | 475 +++++++++++---------------- utils/udfsdk/CMakeLists.txt | 2 +- utils/udfsdk/mcsv1_udaf.cpp | 13 +- utils/udfsdk/mcsv1_udaf.h | 2 +- utils/udfsdk/udfmysql.cpp | 242 +++++++------- utils/udfsdk/udfsdk.vpj | 2 - 6 files changed, 312 insertions(+), 424 deletions(-) diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index da91919f0..429d5821d 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -849,10 +849,9 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo) idbassert(cc != NULL); // @bug5261 bool isNull = (ConstantColumn::NULLDATA == cc->type()); - if (ac->aggOp() == ROWAGG_UDAF) + if (ac->aggOp() == AggregateColumn::UDAF) { UDAFColumn* udafc = dynamic_cast(ac); - if (udafc) { constAggDataVec.push_back( @@ -1099,7 +1098,6 @@ void TupleAggregateStep::prep1PhaseAggregate( uint32_t bigUintWidth = sizeof(uint64_t); // For UDAF uint32_t projColsUDAFIdx = 0; - uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // for count column of average function @@ -1296,12 +1294,10 @@ void TupleAggregateStep::prep1PhaseAggregate( if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; - for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; - if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -1310,7 +1306,6 @@ void TupleAggregateStep::prep1PhaseAggregate( break; } } - if (it == jobInfo.projectionCols.end()) { throw logic_error("(1)prep1PhaseAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); @@ -1489,44 +1484,11 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision()); typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType()); widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth()); - // If the first param is const - udafcParamIdx = 0; - ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - - if (cc) - { - funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; - } - - ++udafcParamIdx; break; } case ROWAGG_MULTI_PARM: { - oidsAgg.push_back(oidsProj[colProj]); - keysAgg.push_back(key); - scaleAgg.push_back(scaleProj[colProj]); - precisionAgg.push_back(precisionProj[colProj]); - typeAgg.push_back(typeProj[colProj]); - widthAgg.push_back(width[colProj]); - - // If the param is const - if (udafc) - { - ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - - if (cc) - { - funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; - } - } - else - { - throw QueryDataExcept("prep1PhaseAggregate: UDAF multi function with no parms", aggregateFuncErr); - } - - ++udafcParamIdx; } break; @@ -1901,7 +1863,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( break; } } - if (it == jobInfo.projectionCols.end()) { throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); @@ -2121,12 +2082,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // If the first param is const udafcParamIdx = 0; ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } - ++udafcParamIdx; break; } @@ -2141,12 +2100,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( widthAgg.push_back(widthProj[colProj]); multiParmIndexes.push_back(colAgg); ++colAgg; - // If the param is const if (udafc) { ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; @@ -2156,7 +2113,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( { throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr); } - ++udafcParamIdx; } break; @@ -2208,10 +2164,9 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( groupByNoDist.push_back(groupby); aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc), i)); } - + // locate the return column position in aggregated rowgroup uint64_t outIdx = 0; - for (uint64_t i = 0; i < returnedColVec.size(); i++) { udafc = NULL; @@ -2256,19 +2211,16 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; - for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; - if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); break; } } - if (it == jobInfo.projectionCols.end()) { throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); @@ -2565,7 +2517,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( { // update the aggregate function vector SP_ROWAGG_FUNC_t funct; - if (aggOp == ROWAGG_UDAF) { funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, outIdx)); @@ -2609,7 +2560,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG) avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct)); } - ++outIdx; } // for (i @@ -2860,7 +2810,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( ++multiParms; continue; } - if (returnedColVec[k].first != distinctColKey) continue; @@ -2881,7 +2830,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( f->fStatsFunction, groupBySub.size() - 1, f->fOutputColumnIndex, - f->fAuxColumnIndex - multiParms)); + f->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } } @@ -2909,7 +2858,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( ++multiParms; continue; } - // search non-distinct functions in functionVec vector::iterator it = functionVec2.begin(); @@ -2925,7 +2873,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex, udafFuncCol->fOutputColumnIndex, - udafFuncCol->fAuxColumnIndex - multiParms)); + udafFuncCol->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } else if ((f->fOutputColumnIndex == k) && @@ -2947,7 +2895,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( f->fStatsFunction, f->fInputColumnIndex, f->fOutputColumnIndex, - f->fAuxColumnIndex - multiParms)); + f->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } } @@ -3183,12 +3131,10 @@ void TupleAggregateStep::prep2PhasesAggregate( if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; - for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; - if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -3197,7 +3143,6 @@ void TupleAggregateStep::prep2PhasesAggregate( break; } } - if (it == jobInfo.projectionCols.end()) { throw logic_error("(1)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); @@ -3420,12 +3365,10 @@ void TupleAggregateStep::prep2PhasesAggregate( // If the first param is const udafcParamIdx = 0; ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } - ++udafcParamIdx; break; } @@ -3439,12 +3382,10 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); colAggPm++; - // If the param is const if (udafc) { ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; @@ -3454,7 +3395,6 @@ void TupleAggregateStep::prep2PhasesAggregate( { throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with no parms", aggregateFuncErr); } - ++udafcParamIdx; } break; @@ -3482,7 +3422,6 @@ void TupleAggregateStep::prep2PhasesAggregate( AGG_MAP aggDupFuncMap; projColsUDAFIdx = 0; - // copy over the groupby vector // update the outputColumnIndex if returned for (uint64_t i = 0; i < groupByPm.size(); i++) @@ -3494,7 +3433,6 @@ void TupleAggregateStep::prep2PhasesAggregate( // locate the return column position in aggregated rowgroup from PM // outIdx is i without the multi-columns, uint64_t outIdx = 0; - for (uint64_t i = 0; i < returnedColVec.size(); i++) { uint32_t retKey = returnedColVec[i].first; @@ -3511,7 +3449,6 @@ void TupleAggregateStep::prep2PhasesAggregate( // Is this a UDAF? use the function as part of the key. pUDAFFunc = NULL; udafc = NULL; - if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; @@ -3520,14 +3457,12 @@ void TupleAggregateStep::prep2PhasesAggregate( { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; - if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); break; } } - if (it == jobInfo.projectionCols.end()) { throw logic_error("(3)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); @@ -3680,7 +3615,6 @@ void TupleAggregateStep::prep2PhasesAggregate( { // update the aggregate function vector SP_ROWAGG_FUNC_t funct; - if (aggOp == ROWAGG_UDAF) { funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, outIdx)); @@ -3722,7 +3656,6 @@ void TupleAggregateStep::prep2PhasesAggregate( if (returnedColVec[i].second == AggregateColumn::AVG) avgFuncMap.insert(make_pair(returnedColVec[i].first, funct)); } - ++outIdx; } @@ -4067,12 +4000,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; - for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; - if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -4081,7 +4012,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( break; } } - if (it == jobInfo.projectionCols.end()) { throw logic_error("(1)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); @@ -4300,12 +4230,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // If the first param is const udafcParamIdx = 0; ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } - ++udafcParamIdx; break; } @@ -4320,12 +4248,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( widthAggPm.push_back(width[colProj]); multiParmIndexes.push_back(colAggPm); colAggPm++; - // If the param is const if (udafc) { ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); - if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; @@ -4335,7 +4261,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( { throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr); } - ++udafcParamIdx; } break; @@ -4378,17 +4303,15 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (funcPm->fAggFunction == ROWAGG_UDAF) { RowUDAFFunctionCol* udafFuncCol = dynamic_cast(funcPm.get()); - if (!udafFuncCol) { - throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); + throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } - funct.reset(new RowUDAFFunctionCol( udafFuncCol->fUDAFContext, udafFuncCol->fOutputColumnIndex, - udafFuncCol->fOutputColumnIndex - multiParms, - udafFuncCol->fAuxColumnIndex - multiParms)); + udafFuncCol->fOutputColumnIndex-multiParms, + udafFuncCol->fAuxColumnIndex-multiParms)); functionNoDistVec.push_back(funct); pUDAFFunc = udafFuncCol->fUDAFContext.getFunction(); } @@ -4398,8 +4321,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( funcPm->fAggFunction, funcPm->fStatsFunction, funcPm->fOutputColumnIndex, - funcPm->fOutputColumnIndex - multiParms, - funcPm->fAuxColumnIndex - multiParms)); + funcPm->fOutputColumnIndex-multiParms, + funcPm->fAuxColumnIndex-multiParms)); functionNoDistVec.push_back(funct); pUDAFFunc = NULL; } @@ -4412,7 +4335,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( { continue; } - oidsAggUm.push_back(oidsAggPm[idx]); keysAggUm.push_back(keysAggPm[idx]); scaleAggUm.push_back(scaleAggPm[idx]); @@ -4449,7 +4371,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // locate the return column position in aggregated rowgroup from PM // outIdx is i without the multi-columns, uint64_t outIdx = 0; - for (uint64_t i = 0; i < returnedColVec.size(); i++) { pUDAFFunc = NULL; @@ -4470,19 +4391,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; - for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; - if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); break; } } - if (it == jobInfo.projectionCols.end()) { throw logic_error("(4)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); @@ -4496,241 +4414,222 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (it != aggFuncMap.end()) { - colUm = it->second; - } - else - { - ostringstream emsg; - emsg << "'" << jobInfo.keyInfo->tupleKeyToName[retKey] << "' isn't in tuple."; - cerr << "prep2PhasesDistinctAggregate: distinct " << emsg.str() - << " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId - << ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable; - - if (jobInfo.keyInfo->tupleKeyVec[retKey].fView.length() > 0) - cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView; - - cerr << endl; - throw QueryDataExcept(emsg.str(), aggregateFuncErr); + colUm = it->second - multiParms; } } - switch (aggOp) + if (colUm > -1) // Means we found a DISTINCT and have a column number { - case ROWAGG_DISTINCT_AVG: - - //avgFuncMap.insert(make_pair(key, funct)); - case ROWAGG_DISTINCT_SUM: + switch (aggOp) { - if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR || - typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR || - typeAggUm[colUm] == CalpontSystemCatalog::BLOB || - typeAggUm[colUm] == CalpontSystemCatalog::TEXT || - typeAggUm[colUm] == CalpontSystemCatalog::DATE || - typeAggUm[colUm] == CalpontSystemCatalog::DATETIME || - typeAggUm[colUm] == CalpontSystemCatalog::TIME) + case ROWAGG_DISTINCT_AVG: + + //avgFuncMap.insert(make_pair(key, funct)); + case ROWAGG_DISTINCT_SUM: { - Message::Args args; - args.add("sum/average"); - args.add(colTypeIdString(typeAggUm[colUm])); - string emsg = IDBErrorInfo::instance()-> - errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); - cerr << "prep2PhasesDistinctAggregate: " << emsg << endl; - throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); + if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR || + typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR || + typeAggUm[colUm] == CalpontSystemCatalog::BLOB || + typeAggUm[colUm] == CalpontSystemCatalog::TEXT || + typeAggUm[colUm] == CalpontSystemCatalog::DATE || + typeAggUm[colUm] == CalpontSystemCatalog::DATETIME || + typeAggUm[colUm] == CalpontSystemCatalog::TIME) + { + Message::Args args; + args.add("sum/average"); + args.add(colTypeIdString(typeAggUm[colUm])); + string emsg = IDBErrorInfo::instance()-> + errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); + cerr << "prep2PhasesDistinctAggregate: " << emsg << endl; + throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); + } + + oidsAggDist.push_back(oidsAggUm[colUm]); + keysAggDist.push_back(retKey); + + if (typeAggUm[colUm] != CalpontSystemCatalog::DOUBLE && + typeAggUm[colUm] != CalpontSystemCatalog::FLOAT) + { + if (isUnsigned(typeAggUm[colUm])) + { + typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); + precisionAggDist.push_back(20); + } + else + { + typeAggDist.push_back(CalpontSystemCatalog::BIGINT); + precisionAggDist.push_back(19); + } + + uint32_t scale = scaleAggUm[colUm]; + + // for int average, FE expects a decimal + if (aggOp == ROWAGG_DISTINCT_AVG) + scale = jobInfo.scaleOfAvg[retKey]; // scale += 4; + + scaleAggDist.push_back(scale); + widthAggDist.push_back(bigIntWidth); + } + else + { + typeAggDist.push_back(typeAggUm[colUm]); + scaleAggDist.push_back(scaleAggUm[colUm]); + precisionAggDist.push_back(precisionAggUm[colUm]); + widthAggDist.push_back(widthAggUm[colUm]); + } } + // PM: put the count column for avg next to the sum + // let fall through to add a count column for average function + //if (aggOp != ROWAGG_DISTINCT_AVG) + break; - oidsAggDist.push_back(oidsAggUm[colUm]); - keysAggDist.push_back(retKey); - - if (typeAggUm[colUm] != CalpontSystemCatalog::DOUBLE && - typeAggUm[colUm] != CalpontSystemCatalog::FLOAT) + case ROWAGG_COUNT_DISTINCT_COL_NAME: { + oidsAggDist.push_back(oidsAggUm[colUm]); + keysAggDist.push_back(retKey); + scaleAggDist.push_back(0); + // work around count() in select subquery + precisionAggDist.push_back(9999); + if (isUnsigned(typeAggUm[colUm])) { typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); - precisionAggDist.push_back(20); } else { typeAggDist.push_back(CalpontSystemCatalog::BIGINT); - precisionAggDist.push_back(19); } - uint32_t scale = scaleAggUm[colUm]; - - // for int average, FE expects a decimal - if (aggOp == ROWAGG_DISTINCT_AVG) - scale = jobInfo.scaleOfAvg[retKey]; // scale += 4; - - scaleAggDist.push_back(scale); widthAggDist.push_back(bigIntWidth); } - else - { - typeAggDist.push_back(typeAggUm[colUm]); - scaleAggDist.push_back(scaleAggUm[colUm]); - precisionAggDist.push_back(precisionAggUm[colUm]); - widthAggDist.push_back(widthAggUm[colUm]); - } - } - // PM: put the count column for avg next to the sum - // let fall through to add a count column for average function - //if (aggOp != ROWAGG_DISTINCT_AVG) - break; + break; - case ROWAGG_COUNT_DISTINCT_COL_NAME: + default: + // cound happen if agg and agg distinct use same column. + colUm = -1; + break; + } // switch + } + // For non distinct aggregates + if (colUm == -1) + { + AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + + if (it != aggFuncMap.end()) { + colUm = it->second - multiParms; oidsAggDist.push_back(oidsAggUm[colUm]); - keysAggDist.push_back(retKey); - scaleAggDist.push_back(0); - // work around count() in select subquery - precisionAggDist.push_back(9999); - - if (isUnsigned(typeAggUm[colUm])) - { - typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); - } - else - { - typeAggDist.push_back(CalpontSystemCatalog::BIGINT); - } - - widthAggDist.push_back(bigIntWidth); + keysAggDist.push_back(keysAggUm[colUm]); + scaleAggDist.push_back(scaleAggUm[colUm]); + precisionAggDist.push_back(precisionAggUm[colUm]); + typeAggDist.push_back(typeAggUm[colUm]); + widthAggDist.push_back(widthAggUm[colUm]); + colUm -= multiParms; } - break; - case ROWAGG_MIN: - case ROWAGG_MAX: - case ROWAGG_SUM: - case ROWAGG_AVG: - case ROWAGG_COUNT_ASTERISK: - case ROWAGG_COUNT_COL_NAME: - case ROWAGG_STATS: - case ROWAGG_BIT_AND: - case ROWAGG_BIT_OR: - case ROWAGG_BIT_XOR: - case ROWAGG_CONSTANT: - default: + // not a direct hit -- a returned column is not already in the RG from PMs + else { - AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + bool returnColMissing = true; - if (it != aggFuncMap.end()) + // check if a SUM or COUNT covered by AVG + if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { - colUm = it->second; - oidsAggDist.push_back(oidsAggUm[colUm]); - keysAggDist.push_back(keysAggUm[colUm]); - scaleAggDist.push_back(scaleAggUm[colUm]); - precisionAggDist.push_back(precisionAggUm[colUm]); - typeAggDist.push_back(typeAggUm[colUm]); - widthAggDist.push_back(widthAggUm[colUm]); - colUm -= multiParms; - } + it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); - // not a direct hit -- a returned column is not already in the RG from PMs - else - { - bool returnColMissing = true; - - // check if a SUM or COUNT covered by AVG - if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) + if (it != aggFuncMap.end()) { - it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); + // false alarm + returnColMissing = false; - if (it != aggFuncMap.end()) + colUm = it->second - multiParms; + + if (aggOp == ROWAGG_SUM) { - // false alarm - returnColMissing = false; + oidsAggDist.push_back(oidsAggUm[colUm]); + keysAggDist.push_back(retKey); + scaleAggDist.push_back(scaleAggUm[colUm] >> 8); + precisionAggDist.push_back(precisionAggUm[colUm]); + typeAggDist.push_back(typeAggUm[colUm]); + widthAggDist.push_back(widthAggUm[colUm]); + } + else + { + // leave the count() to avg + aggOp = ROWAGG_COUNT_NO_OP; - colUm = it->second; - - if (aggOp == ROWAGG_SUM) + oidsAggDist.push_back(oidsAggUm[colUm]); + keysAggDist.push_back(retKey); + scaleAggDist.push_back(0); + if (isUnsigned(typeAggUm[colUm])) { - oidsAggDist.push_back(oidsAggUm[colUm]); - keysAggDist.push_back(retKey); - scaleAggDist.push_back(scaleAggUm[colUm] >> 8); - precisionAggDist.push_back(precisionAggUm[colUm]); - typeAggDist.push_back(typeAggUm[colUm]); - widthAggDist.push_back(widthAggUm[colUm]); + precisionAggDist.push_back(20); + typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); } else { - // leave the count() to avg - aggOp = ROWAGG_COUNT_NO_OP; - - oidsAggDist.push_back(oidsAggUm[colUm]); - keysAggDist.push_back(retKey); - scaleAggDist.push_back(0); - - if (isUnsigned(typeAggUm[colUm])) - { - precisionAggDist.push_back(20); - typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); - } - else - { - precisionAggDist.push_back(19); - typeAggDist.push_back(CalpontSystemCatalog::BIGINT); - } - - widthAggDist.push_back(bigIntWidth); + precisionAggDist.push_back(19); + typeAggDist.push_back(CalpontSystemCatalog::BIGINT); } + widthAggDist.push_back(bigIntWidth); } } - else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), - retKey) != jobInfo.expressionVec.end()) - { - // a function on aggregation - TupleInfo ti = getTupleInfo(retKey, jobInfo); - oidsAggDist.push_back(ti.oid); - keysAggDist.push_back(retKey); - scaleAggDist.push_back(ti.scale); - precisionAggDist.push_back(ti.precision); - typeAggDist.push_back(ti.dtype); - widthAggDist.push_back(ti.width); + } + else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), + retKey) != jobInfo.expressionVec.end()) + { + // a function on aggregation + TupleInfo ti = getTupleInfo(retKey, jobInfo); + oidsAggDist.push_back(ti.oid); + keysAggDist.push_back(retKey); + scaleAggDist.push_back(ti.scale); + precisionAggDist.push_back(ti.precision); + typeAggDist.push_back(ti.dtype); + widthAggDist.push_back(ti.width); - returnColMissing = false; - } - else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end()) - { - // a window function - TupleInfo ti = getTupleInfo(retKey, jobInfo); - oidsAggDist.push_back(ti.oid); - keysAggDist.push_back(retKey); - scaleAggDist.push_back(ti.scale); - precisionAggDist.push_back(ti.precision); - typeAggDist.push_back(ti.dtype); - widthAggDist.push_back(ti.width); + returnColMissing = false; + } + else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end()) + { + // a window function + TupleInfo ti = getTupleInfo(retKey, jobInfo); + oidsAggDist.push_back(ti.oid); + keysAggDist.push_back(retKey); + scaleAggDist.push_back(ti.scale); + precisionAggDist.push_back(ti.precision); + typeAggDist.push_back(ti.dtype); + widthAggDist.push_back(ti.width); - returnColMissing = false; - } - else if (aggOp == ROWAGG_CONSTANT) - { - TupleInfo ti = getTupleInfo(retKey, jobInfo); - oidsAggDist.push_back(ti.oid); - keysAggDist.push_back(retKey); - scaleAggDist.push_back(ti.scale); - precisionAggDist.push_back(ti.precision); - typeAggDist.push_back(ti.dtype); - widthAggDist.push_back(ti.width); + returnColMissing = false; + } + else if (aggOp == ROWAGG_CONSTANT) + { + TupleInfo ti = getTupleInfo(retKey, jobInfo); + oidsAggDist.push_back(ti.oid); + keysAggDist.push_back(retKey); + scaleAggDist.push_back(ti.scale); + precisionAggDist.push_back(ti.precision); + typeAggDist.push_back(ti.dtype); + widthAggDist.push_back(ti.width); - returnColMissing = false; - } + returnColMissing = false; + } - if (returnColMissing) - { - Message::Args args; - args.add(keyName(outIdx, retKey, jobInfo)); - string emsg = IDBErrorInfo::instance()-> - errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); - cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid=" - << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias=" - << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view=" - << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" - << (int) aggOp << endl; - throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION); - } - } //else - } // switch - } + if (returnColMissing) + { + Message::Args args; + args.add(keyName(outIdx, retKey, jobInfo)); + string emsg = IDBErrorInfo::instance()-> + errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); + cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid=" + << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias=" + << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view=" + << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" + << (int) aggOp << endl; + throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION); + } + } //else not a direct hit + } // else not a DISTINCT // update groupby vector if the groupby column is a returned column if (returnedColVec[i].second == 0) @@ -4757,7 +4656,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( { // update the aggregate function vector SP_ROWAGG_FUNC_t funct; - if (aggOp == ROWAGG_UDAF) { funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, outIdx)); @@ -4801,7 +4699,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG) avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct)); } - ++outIdx; } // for (i @@ -5044,7 +4941,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( ++multiParms; continue; } - if (returnedColVec[k].first != distinctColKey) continue; @@ -5066,7 +4962,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( f->fStatsFunction, groupBySub.size() - 1, f->fOutputColumnIndex, - f->fAuxColumnIndex - multiParms)); + f->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } } @@ -5092,7 +4988,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( ++multiParms; continue; } - // search non-distinct functions in functionVec vector::iterator it = functionVecUm.begin(); @@ -5110,7 +5005,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex, udafFuncCol->fOutputColumnIndex, - udafFuncCol->fAuxColumnIndex - multiParms)); + udafFuncCol->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK || @@ -5131,7 +5026,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( f->fStatsFunction, f->fInputColumnIndex, f->fOutputColumnIndex, - f->fAuxColumnIndex - multiParms)); + f->fAuxColumnIndex-multiParms)); functionSub2.push_back(funct); } } diff --git a/utils/udfsdk/CMakeLists.txt b/utils/udfsdk/CMakeLists.txt index ad4460977..c4d7fa574 100755 --- a/utils/udfsdk/CMakeLists.txt +++ b/utils/udfsdk/CMakeLists.txt @@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ########### next target ############### -set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp avg_mode.cpp regr_avgx.cpp avgx.cpp) +set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp median.cpp avg_mode.cpp avgx.cpp) add_definitions(-DMYSQL_DYNAMIC_PLUGIN) diff --git a/utils/udfsdk/mcsv1_udaf.cpp b/utils/udfsdk/mcsv1_udaf.cpp index 9e4596440..2a93cfad3 100644 --- a/utils/udfsdk/mcsv1_udaf.cpp +++ b/utils/udfsdk/mcsv1_udaf.cpp @@ -31,14 +31,21 @@ using namespace mcsv1sdk; * This is a temporary kludge until we get the library loader * task complete */ -UDAF_MAP UDAFMap::fm; #include "allnull.h" #include "ssq.h" +#include "median.h" #include "avg_mode.h" -#include "regr_avgx.h" #include "avgx.h" + +UDAF_MAP& UDAFMap::fm() +{ + static UDAF_MAP* m = new UDAF_MAP; + return *m; +} + UDAF_MAP& UDAFMap::getMap() { + UDAF_MAP& fm = UDAFMap::fm(); if (fm.size() > 0) { return fm; @@ -51,8 +58,8 @@ UDAF_MAP& UDAFMap::getMap() // the function names passed to the interface is always in lower case. fm["allnull"] = new allnull(); fm["ssq"] = new ssq(); + fm["median"] = new median(); fm["avg_mode"] = new avg_mode(); - fm["regr_avgx"] = new regr_avgx(); fm["avgx"] = new avgx(); return fm; diff --git a/utils/udfsdk/mcsv1_udaf.h b/utils/udfsdk/mcsv1_udaf.h index e09228d77..28db6808b 100644 --- a/utils/udfsdk/mcsv1_udaf.h +++ b/utils/udfsdk/mcsv1_udaf.h @@ -108,7 +108,7 @@ public: static EXPORT UDAF_MAP& getMap(); private: - static UDAF_MAP fm; + static UDAF_MAP& fm(); }; /** diff --git a/utils/udfsdk/udfmysql.cpp b/utils/udfsdk/udfmysql.cpp index 1c0fee1db..60da18a43 100644 --- a/utils/udfsdk/udfmysql.cpp +++ b/utils/udfsdk/udfmysql.cpp @@ -349,6 +349,78 @@ extern "C" return data->sumsq; } +//======================================================================= + + /** + * MEDIAN connector stub + */ +#ifdef _MSC_VER + __declspec(dllexport) +#endif + my_bool median_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 1) + { + strcpy(message, "median() requires one argument"); + return 1; + } + + /* + if (!(data = (struct ssq_data*) malloc(sizeof(struct ssq_data)))) + { + strmov(message,"Couldn't allocate memory"); + return 1; + } + data->sumsq = 0; + + initid->ptr = (char*)data; + */ + return 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void median_deinit(UDF_INIT* initid) + { +// free(initid->ptr); + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void + median_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), + char* message __attribute__((unused))) + { +// struct ssq_data* data = (struct ssq_data*)initid->ptr; +// data->sumsq = 0; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + void + median_add(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, + char* message __attribute__((unused))) + { +// struct ssq_data* data = (struct ssq_data*)initid->ptr; +// double val = cvtArgToDouble(args->arg_type[0], args->args[0]); +// data->sumsq = val*val; + } + +#ifdef _MSC_VER + __declspec(dllexport) +#endif + long long median(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + char* is_null, char* error __attribute__((unused))) + { +// struct ssq_data* data = (struct ssq_data*)initid->ptr; +// return data->sumsq; + return 0; + } + /** * avg_mode connector stub */ @@ -422,167 +494,83 @@ extern "C" //======================================================================= /** - * regr_avgx connector stub - */ - struct regr_avgx_data - { - double sumx; - int64_t cnt; - }; - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - my_bool regr_avgx_init(UDF_INIT* initid, UDF_ARGS* args, char* message) - { - struct regr_avgx_data* data; - - if (args->arg_count != 2) - { - strcpy(message, "regr_avgx() requires two arguments"); - return 1; - } - - if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data)))) - { - strmov(message, "Couldn't allocate memory"); - return 1; - } - - data->sumx = 0; - data->cnt = 0; - - initid->ptr = (char*)data; - return 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void regr_avgx_deinit(UDF_INIT* initid) - { - free(initid->ptr); - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void - regr_avgx_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), - char* message __attribute__((unused))) - { - struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr; - data->sumx = 0; - data->cnt = 0; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - void - regr_avgx_add(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, - char* message __attribute__((unused))) - { - // TODO test for NULL in x and y - struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr; - double xval = cvtArgToDouble(args->arg_type[1], args->args[0]); - ++data->cnt; - data->sumx += xval; - } - -#ifdef _MSC_VER - __declspec(dllexport) -#endif - long long regr_avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), - char* is_null, char* error __attribute__((unused))) - { - struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr; - return data->sumx / data->cnt; - } - -//======================================================================= - - /** - * avgx connector stub. Exactly the same functionality as the - * built in avg() function. Use to test the performance of the - * API + * avgx connector stub. Exactly the same functionality as the + * built in avg() function. Use to test the performance of the + * API */ struct avgx_data { - double sumx; - int64_t cnt; + double sumx; + int64_t cnt; }; - -#ifdef _MSC_VER + + #ifdef _MSC_VER __declspec(dllexport) -#endif + #endif my_bool avgx_init(UDF_INIT* initid, UDF_ARGS* args, char* message) { - struct avgx_data* data; + struct avgx_data* data; + if (args->arg_count != 1) + { + strcpy(message,"avgx() requires one argument"); + return 1; + } - if (args->arg_count != 1) - { - strcpy(message, "avgx() requires one argument"); - return 1; - } - - if (!(data = (struct avgx_data*) malloc(sizeof(struct avgx_data)))) - { - strmov(message, "Couldn't allocate memory"); - return 1; - } - - data->sumx = 0; + if (!(data = (struct avgx_data*) malloc(sizeof(struct avgx_data)))) + { + strmov(message,"Couldn't allocate memory"); + return 1; + } + data->sumx = 0; data->cnt = 0; - initid->ptr = (char*)data; - return 0; + initid->ptr = (char*)data; + return 0; } -#ifdef _MSC_VER + #ifdef _MSC_VER __declspec(dllexport) -#endif + #endif void avgx_deinit(UDF_INIT* initid) { - free(initid->ptr); - } + free(initid->ptr); + } -#ifdef _MSC_VER + #ifdef _MSC_VER __declspec(dllexport) -#endif + #endif void avgx_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), - char* message __attribute__((unused))) + char* message __attribute__((unused))) { - struct avgx_data* data = (struct avgx_data*)initid->ptr; - data->sumx = 0; + struct avgx_data* data = (struct avgx_data*)initid->ptr; + data->sumx = 0; data->cnt = 0; } -#ifdef _MSC_VER + #ifdef _MSC_VER __declspec(dllexport) -#endif + #endif void avgx_add(UDF_INIT* initid, UDF_ARGS* args, - char* is_null, - char* message __attribute__((unused))) + char* is_null, + char* message __attribute__((unused))) { // TODO test for NULL in x and y - struct avgx_data* data = (struct avgx_data*)initid->ptr; - double xval = cvtArgToDouble(args->arg_type[1], args->args[0]); + struct avgx_data* data = (struct avgx_data*)initid->ptr; + double xval = cvtArgToDouble(args->arg_type[1], args->args[0]); ++data->cnt; - data->sumx += xval; + data->sumx += xval; } -#ifdef _MSC_VER + #ifdef _MSC_VER __declspec(dllexport) -#endif + #endif long long avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), - char* is_null, char* error __attribute__((unused))) + char* is_null, char* error __attribute__((unused))) { - struct avgx_data* data = (struct avgx_data*)initid->ptr; - return data->sumx / data->cnt; + struct avgx_data* data = (struct avgx_data*)initid->ptr; + return data->sumx / data->cnt; } } // vim:ts=4 sw=4: diff --git a/utils/udfsdk/udfsdk.vpj b/utils/udfsdk/udfsdk.vpj index fe1f3fd0e..1096f8431 100755 --- a/utils/udfsdk/udfsdk.vpj +++ b/utils/udfsdk/udfsdk.vpj @@ -207,7 +207,6 @@ - @@ -220,7 +219,6 @@ -