From d930a1e32222e0c56f19edc2b2c4ed006664ebde Mon Sep 17 00:00:00 2001 From: David Hall Date: Tue, 25 Sep 2018 16:31:10 -0500 Subject: [PATCH] MCOL-521 Some more fixes for multi-parm aggregates. Add regr slope --- dbcon/joblist/tupleaggregatestep.cpp | 197 +++++++++++++++++++----- dbcon/mysql/ha_window_function.cpp | 3 + dbcon/mysql/install_calpont_mysql.sh | 3 + genii.vpw | 1 + utils/regr/CMakeLists.txt | 26 ++++ utils/regr/regr.vpj | 5 + utils/regr/regr_avgx.cpp | 133 +--------------- utils/regr/regr_avgy.cpp | 132 +--------------- utils/regr/regr_slope.cpp | 197 ++++++++++++++++++++++++ utils/regr/regr_slope.h | 88 +++++++++++ utils/regr/regrmysql.cpp | 222 ++++++++++++++++++++++++++- utils/rowgroup/rowaggregation.h | 2 +- utils/udfsdk/mcsv1_udaf.h | 72 +++++++++ 13 files changed, 776 insertions(+), 305 deletions(-) create mode 100755 utils/regr/CMakeLists.txt create mode 100644 utils/regr/regr_slope.cpp create mode 100644 utils/regr/regr_slope.h diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 97a577f4f..da4f73823 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -76,21 +76,55 @@ namespace struct cmpTuple { - bool operator()(boost::tuple a, - boost::tuple b) + bool operator()(boost::tuple* > a, + boost::tuple* > b) { - if (boost::get<0>(a) < boost::get<0>(b)) + uint32_t keya = boost::get<0>(a); + uint32_t keyb = boost::get<0>(b); + int opa; + int opb; + mcsv1sdk::mcsv1_UDAF* pUDAFa; + mcsv1sdk::mcsv1_UDAF* pUDAFb; + + // If key is less than + if (keya < keyb) return true; - - if (boost::get<0>(a) == boost::get<0>(b)) + if (keya == keyb) { - if (boost::get<1>(a) < boost::get<1>(b)) + // test Op + opa = boost::get<1>(a); + opb = boost::get<1>(b); + if (opa < opb) return true; + if (opa == opb) + { + // look at the UDAF object + pUDAFa = boost::get<2>(a); + pUDAFb = boost::get<2>(b); + if (pUDAFa < pUDAFb) + return true; + if (pUDAFa == pUDAFb) + { + if (pUDAFa == NULL) + return false; + std::vector* paramKeysa = boost::get<3>(a); + std::vector* paramKeysb = boost::get<3>(b); - if (boost::get<1>(a) == boost::get<1>(b)) - return boost::get<2>(a) < boost::get<2>(b); + if (paramKeysa->size() < paramKeysb->size()) + return true; + if (paramKeysa->size() == paramKeysb->size()) + { + if (paramKeysa == NULL) + return false; + for (uint64_t i = 0; i < paramKeysa->size(); ++i) + { + if ((*paramKeysa)[i] < (*paramKeysb)[i]) + return true; + } + } + } + } } - return false; } }; @@ -101,7 +135,7 @@ typedef vector RowBucketVec; // The AGG_MAP type is used to maintain a list of aggregate functions in order to // detect duplicates. Since all UDAF have the same op type (ROWAGG_UDAF), we add in // the function pointer in order to ensure uniqueness. -typedef map, uint64_t, cmpTuple> AGG_MAP; +typedef map* >, uint64_t, cmpTuple> AGG_MAP; inline RowAggFunctionType functionIdMap(int planFuncId) { @@ -796,7 +830,6 @@ const string TupleAggregateStep::toString() const return oss.str(); } - SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo) { SJSTEP spjs; @@ -1301,6 +1334,16 @@ void TupleAggregateStep::prep1PhaseAggregate( if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); + // Save the multi-parm keys for dup-detection. + if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) + { + for (uint64_t k = i+1; + k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; + ++k) + { + udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); + } + } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, outIdx)); break; @@ -1502,7 +1545,7 @@ void TupleAggregateStep::prep1PhaseAggregate( } // find if this func is a duplicate - AGG_MAP::iterator iter = aggFuncMap.find(boost::make_tuple(key, aggOp, pUDAFFunc)); + AGG_MAP::iterator iter = aggFuncMap.find(boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (iter != aggFuncMap.end()) { @@ -1519,7 +1562,7 @@ void TupleAggregateStep::prep1PhaseAggregate( } else { - aggFuncMap.insert(make_pair(boost::make_tuple(key, aggOp, pUDAFFunc), funct->fOutputColumnIndex)); + aggFuncMap.insert(make_pair(boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } if (aggOp != ROWAGG_MULTI_PARM) @@ -1740,7 +1783,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(widthProj[colProj]); - aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg)); + aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); colAgg++; } @@ -1781,7 +1824,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(widthProj[colProj]); - aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg)); + aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); colAgg++; } @@ -1811,7 +1854,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol( aggOp, stats, colAgg, colAgg, -1)); functionVec1.push_back(funct); - aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAgg)); + aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); colAgg++; continue; @@ -1858,6 +1901,16 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); + // Save the multi-parm keys for dup-detection. + if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) + { + for (uint64_t k = i+1; + k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; + ++k) + { + udafc->getContext().getParamKeys()->push_back(aggColVec[k].first); + } + } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg)); break; @@ -1874,11 +1927,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( } // skip if this is a duplicate - if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end()) + if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end()) continue; functionVec1.push_back(funct); - aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAgg)); + aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); switch (aggOp) { @@ -2103,6 +2156,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // If the param is const if (udafc) { + if (udafcParamIdx > udafc->aggParms().size() - 1) + { + throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr); + } ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { @@ -2162,7 +2219,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1)); groupByNoDist.push_back(groupby); - aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc), i)); + aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), i)); } // locate the return column position in aggregated rowgroup @@ -2186,7 +2243,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != jobInfo.distinctColVec.end() ) { - AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc)); + AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -2218,6 +2275,16 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); + // Save the multi-parm keys for dup-detection. + if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) + { + for (uint64_t k = i+1; + k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; + ++k) + { + udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); + } + } break; } } @@ -2318,7 +2385,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( case ROWAGG_BIT_XOR: default: { - AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -2349,7 +2416,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { - it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); + it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -2534,7 +2601,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( functionVec2.push_back(funct); // find if this func is a duplicate - AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (iter != aggDupFuncMap.end()) { @@ -2551,7 +2618,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( } else { - aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc), + aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } @@ -3048,7 +3115,7 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); + aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } @@ -3089,7 +3156,7 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); + aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } @@ -3138,6 +3205,16 @@ void TupleAggregateStep::prep2PhasesAggregate( if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); + // Save the multi-parm keys for dup-detection. + if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) + { + for (uint64_t k = i+1; + k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; + ++k) + { + udafc->getContext().getParamKeys()->push_back(aggColVec[k].first); + } + } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); break; @@ -3154,11 +3231,11 @@ void TupleAggregateStep::prep2PhasesAggregate( } // skip if this is a duplicate - if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end()) + if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end()) continue; functionVecPm.push_back(funct); - aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm)); + aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); switch (aggOp) { @@ -3385,6 +3462,10 @@ void TupleAggregateStep::prep2PhasesAggregate( // If the param is const if (udafc) { + if (udafcParamIdx > udafc->aggParms().size() - 1) + { + throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with too many parms", aggregateFuncErr); + } ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { @@ -3460,6 +3541,16 @@ void TupleAggregateStep::prep2PhasesAggregate( if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); + // Save the multi-parm keys for dup-detection. + if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) + { + for (uint64_t k = i+1; + k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; + ++k) + { + udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); + } + } break; } } @@ -3469,7 +3560,7 @@ void TupleAggregateStep::prep2PhasesAggregate( } } - AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -3490,7 +3581,7 @@ void TupleAggregateStep::prep2PhasesAggregate( // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { - it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); + it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -3632,7 +3723,7 @@ void TupleAggregateStep::prep2PhasesAggregate( functionVecUm.push_back(funct); // find if this func is a duplicate - AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (iter != aggDupFuncMap.end()) { @@ -3649,7 +3740,7 @@ void TupleAggregateStep::prep2PhasesAggregate( } else { - aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc), + aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } @@ -3911,7 +4002,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); + aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } @@ -3952,7 +4043,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); - aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm)); + aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } @@ -4008,6 +4099,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); + // Save the multi-parm keys for dup-detection. + if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) + { + for (uint64_t k = i+1; + k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; + ++k) + { + udafc->getContext().getParamKeys()->push_back(aggColVec[k].first); + } + } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); break; @@ -4024,11 +4125,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( } // skip if this is a duplicate - if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end()) + if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end()) continue; functionVecPm.push_back(funct); - aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm-multiParm)); + aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm-multiParm)); switch (aggOp) { @@ -4253,6 +4354,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // If the param is const if (udafc) { + if (udafcParamIdx > udafc->aggParms().size() - 1) + { + throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr); + } ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { @@ -4400,6 +4505,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); + // Save the multi-parm keys for dup-detection. + if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) + { + for (uint64_t k = i+1; + k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; + ++k) + { + udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); + } + } break; } } @@ -4412,7 +4527,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != jobInfo.distinctColVec.end() ) { - AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc)); + AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -4515,7 +4630,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // For non distinct aggregates if (colUm == -1) { - AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -4536,7 +4651,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { - it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc)); + it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { @@ -4674,7 +4789,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( functionVecUm.push_back(funct); // find if this func is a duplicate - AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); + AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (iter != aggDupFuncMap.end()) { @@ -4691,7 +4806,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( } else { - aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc), + aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } diff --git a/dbcon/mysql/ha_window_function.cpp b/dbcon/mysql/ha_window_function.cpp index 0c57ce8bc..f4a95bbc3 100644 --- a/dbcon/mysql/ha_window_function.cpp +++ b/dbcon/mysql/ha_window_function.cpp @@ -323,6 +323,9 @@ string ConvertFuncName(Item_sum* item) case Item_sum::LAG_FUNC: return "LAG"; break; + default: + // We just don't handle it. + break; }; return ""; diff --git a/dbcon/mysql/install_calpont_mysql.sh b/dbcon/mysql/install_calpont_mysql.sh index d9c1290fb..311e03784 100755 --- a/dbcon/mysql/install_calpont_mysql.sh +++ b/dbcon/mysql/install_calpont_mysql.sh @@ -87,6 +87,9 @@ CREATE FUNCTION mcssystemreadonly RETURNS INTEGER soname 'libcalmysql.so'; CREATE AGGREGATE FUNCTION regr_avgx RETURNS REAL soname 'libregr_mysql.so'; CREATE AGGREGATE FUNCTION regr_avgy RETURNS REAL soname 'libregr_mysql.so'; CREATE AGGREGATE FUNCTION regr_count RETURNS INTEGER soname 'libregr_mysql.so'; +CREATE AGGREGATE FUNCTION regr_slope RETURNS REAL soname 'libregr_mysql.so'; +CREATE AGGREGATE FUNCTION regr_intercept RETURNS REAL soname 'libregr_mysql.so'; + CREATE AGGREGATE FUNCTION distinct_count RETURNS INTEGER soname 'libudf_mysql.so'; CREATE DATABASE IF NOT EXISTS infinidb_vtable; diff --git a/genii.vpw b/genii.vpw index 69e258339..686b01ed7 100644 --- a/genii.vpw +++ b/genii.vpw @@ -44,6 +44,7 @@ + diff --git a/utils/regr/CMakeLists.txt b/utils/regr/CMakeLists.txt new file mode 100755 index 000000000..47db83c63 --- /dev/null +++ b/utils/regr/CMakeLists.txt @@ -0,0 +1,26 @@ + +include_directories( ${ENGINE_COMMON_INCLUDES} + ../../dbcon/mysql ) + +########### next target ############### + +set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept) + +add_definitions(-DMYSQL_DYNAMIC_PLUGIN) + +add_library(regr SHARED ${regr_LIB_SRCS} ) + +set_target_properties(regr PROPERTIES VERSION 1.1.0 SOVERSION 1) + +install(TARGETS regr DESTINATION ${ENGINE_LIBDIR} COMPONENT libs) + + + +set(regr_mysql_LIB_SRCS regrmysql.cpp) + +add_library(regr_mysql SHARED ${regr_mysql_LIB_SRCS}) + +set_target_properties(regr_mysql PROPERTIES VERSION 1.0.0 SOVERSION 1) + +install(TARGETS regr_mysql DESTINATION ${ENGINE_LIBDIR} COMPONENT storage-engine) + diff --git a/utils/regr/regr.vpj b/utils/regr/regr.vpj index d99a1d436..a22c54ced 100644 --- a/utils/regr/regr.vpj +++ b/utils/regr/regr.vpj @@ -197,6 +197,9 @@ + + + + + getUserData()->data; - DATATYPE val = 0.0; - - if (context->isParamNull(0) || context->isParamNull(1)) - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - - if (valIn_x.empty() || valIn_y.empty()) // Usually empty if NULL. Probably redundant - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - - if (valIn_x.compatible(longTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(charTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(scharTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(shortTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(intTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(llTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ucharTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ushortTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(uintTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ulongTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ullTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(floatTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(doubleTypeId)) - { - val = valIn_x.cast(); - } + DATATYPE val = convertAnyTo(valIn_x); // For decimal types, we need to move the decimal point. uint32_t scale = valsIn[1].scale; - if (val != 0 && scale > 0) { val /= pow(10.0, (double)scale); @@ -202,76 +137,12 @@ mcsv1_UDAF::ReturnCode regr_avgx::evaluate(mcsv1Context* context, static_any::an mcsv1_UDAF::ReturnCode regr_avgx::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) { - static_any::any& valIn_y = valsDropped[0].columnData; static_any::any& valIn_x = valsDropped[1].columnData; struct regr_avgx_data* data = (struct regr_avgx_data*)context->getUserData()->data; - DATATYPE val = 0.0; - - if (context->isParamNull(0) || context->isParamNull(1)) - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - if (valIn_x.empty() || valIn_y.empty()) - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - - if (valIn_x.compatible(charTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(scharTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(shortTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(intTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(longTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(llTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ucharTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ushortTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(uintTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ulongTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(ullTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(floatTypeId)) - { - val = valIn_x.cast(); - } - else if (valIn_x.compatible(doubleTypeId)) - { - val = valIn_x.cast(); - } + double val = convertAnyTo(valIn_x); // For decimal types, we need to move the decimal point. uint32_t scale = valsDropped[1].scale; - if (val != 0 && scale > 0) { val /= pow(10.0, (double)scale); diff --git a/utils/regr/regr_avgy.cpp b/utils/regr/regr_avgy.cpp index 667019a33..69c654acf 100644 --- a/utils/regr/regr_avgy.cpp +++ b/utils/regr/regr_avgy.cpp @@ -85,75 +85,11 @@ mcsv1_UDAF::ReturnCode regr_avgy::reset(mcsv1Context* context) mcsv1_UDAF::ReturnCode regr_avgy::nextValue(mcsv1Context* context, ColumnDatum* valsIn) { static_any::any& valIn_y = valsIn[0].columnData; - static_any::any& valIn_x = valsIn[1].columnData; struct regr_avgy_data* data = (struct regr_avgy_data*)context->getUserData()->data; - DATATYPE val = 0.0; - - if (context->isParamNull(0) || context->isParamNull(1)) - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - if (valIn_x.empty() || valIn_y.empty()) // Usually empty if NULL. Probably redundant - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - - if (valIn_y.compatible(longTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(charTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(scharTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(shortTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(intTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(llTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ucharTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ushortTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(uintTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ulongTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ullTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(floatTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(doubleTypeId)) - { - val = valIn_y.cast(); - } + double val = convertAnyTo(valIn_y); // For decimal types, we need to move the decimal point. uint32_t scale = valsIn[0].scale; - if (val != 0 && scale > 0) { val /= pow(10.0, (double)scale); @@ -199,75 +135,11 @@ mcsv1_UDAF::ReturnCode regr_avgy::evaluate(mcsv1Context* context, static_any::an mcsv1_UDAF::ReturnCode regr_avgy::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) { static_any::any& valIn_y = valsDropped[0].columnData; - static_any::any& valIn_x = valsDropped[1].columnData; struct regr_avgy_data* data = (struct regr_avgy_data*)context->getUserData()->data; - DATATYPE val = 0.0; - - if (context->isParamNull(0) || context->isParamNull(1)) - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - if (valIn_x.empty() || valIn_y.empty()) - { - return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. - } - - if (valIn_y.compatible(charTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(scharTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(shortTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(intTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(longTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(llTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ucharTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ushortTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(uintTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ulongTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(ullTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(floatTypeId)) - { - val = valIn_y.cast(); - } - else if (valIn_y.compatible(doubleTypeId)) - { - val = valIn_y.cast(); - } + double val = convertAnyTo(valIn_y); // For decimal types, we need to move the decimal point. uint32_t scale = valsDropped[0].scale; - if (val != 0 && scale > 0) { val /= pow(10.0, (double)scale); diff --git a/utils/regr/regr_slope.cpp b/utils/regr/regr_slope.cpp new file mode 100644 index 000000000..c4178c56b --- /dev/null +++ b/utils/regr/regr_slope.cpp @@ -0,0 +1,197 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include "regr_slope.h" +#include "bytestream.h" +#include "objectreader.h" + +using namespace mcsv1sdk; + +class Add_regr_slope_ToUDAFMap +{ +public: + Add_regr_slope_ToUDAFMap() + { + UDAFMap::getMap()["regr_slope"] = new regr_slope(); + } +}; + +static Add_regr_slope_ToUDAFMap addToMap; + +// Use the simple data model +struct regr_slope_data +{ + uint64_t cnt; + double sumx; + double sumx2; // sum of (x squared) + double sumy; + double sumxy; // sum of (x*y) +}; + + +mcsv1_UDAF::ReturnCode regr_slope::init(mcsv1Context* context, + ColumnDatum* colTypes) +{ + if (context->getParameterCount() != 2) + { + // The error message will be prepended with + // "The storage engine for the table doesn't support " + context->setErrorMessage("regr_slope() with other than 2 arguments"); + return mcsv1_UDAF::ERROR; + } + + context->setUserDataSize(sizeof(regr_slope_data)); + context->setResultType(CalpontSystemCatalog::DOUBLE); + context->setColWidth(8); + if (colTypes[0].scale) + { + context->setScale(colTypes[0].scale + 8); + context->setPrecision(19); + } + context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS); + return mcsv1_UDAF::SUCCESS; + +} + +mcsv1_UDAF::ReturnCode regr_slope::reset(mcsv1Context* context) +{ + struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data; + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_slope::nextValue(mcsv1Context* context, ColumnDatum* valsIn) +{ + static_any::any& valIn_y = valsIn[0].columnData; + static_any::any& valIn_x = valsIn[1].columnData; + struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data; + double valx = 0.0; + double valy = 0.0; + + valx = convertAnyTo(valIn_x); + valy = convertAnyTo(valIn_y); + + // For decimal types, we need to move the decimal point. + uint32_t scaley = valsIn[0].scale; + + if (valy != 0 && scaley > 0) + { + valy /= pow(10.0, (double)scaley); + } + + data->sumy += valy; + + // For decimal types, we need to move the decimal point. + uint32_t scalex = valsIn[1].scale; + + if (valx != 0 && scalex > 0) + { + valx /= pow(10.0, (double)scaley); + } + + data->sumx += valx; + data->sumx2 += valx*valx; + + data->sumxy += valx*valy; + ++data->cnt; + + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_slope::subEvaluate(mcsv1Context* context, const UserData* userDataIn) +{ + if (!userDataIn) + { + return mcsv1_UDAF::SUCCESS; + } + + struct regr_slope_data* outData = (struct regr_slope_data*)context->getUserData()->data; + struct regr_slope_data* inData = (struct regr_slope_data*)userDataIn->data; + + outData->sumx += inData->sumx; + outData->sumx2 += inData->sumx2; + outData->sumy += inData->sumy; + outData->sumxy += inData->sumxy; + outData->cnt += inData->cnt; + + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_slope::evaluate(mcsv1Context* context, static_any::any& valOut) +{ + struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data; + double N = data->cnt; + if (N > 0) + { + double sumx = data->sumx; + double sumy = data->sumy; + double sumx2 = data->sumx2; + double sumxy = data->sumxy; + double variance = (N * sumx2) - (sumx * sumx); + if (variance != 0) + { + valOut = ((N * sumxy) - (sumx * sumy)) / variance; + } + } + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_slope::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) +{ + static_any::any& valIn_y = valsDropped[0].columnData; + static_any::any& valIn_x = valsDropped[1].columnData; + struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data; + + double valx = 0.0; + double valy = 0.0; + + valx = convertAnyTo(valIn_x); + valy = convertAnyTo(valIn_y); + + // For decimal types, we need to move the decimal point. + uint32_t scaley = valsDropped[0].scale; + + if (valy != 0 && scaley > 0) + { + valy /= pow(10.0, (double)scaley); + } + + data->sumy -= valy; + + // For decimal types, we need to move the decimal point. + uint32_t scalex = valsDropped[1].scale; + + if (valx != 0 && scalex > 0) + { + valx /= pow(10.0, (double)scaley); + } + + data->sumx -= valx; + data->sumx2 -= valx*valx; + + data->sumxy -= valx*valy; + --data->cnt; + + return mcsv1_UDAF::SUCCESS; +} + diff --git a/utils/regr/regr_slope.h b/utils/regr/regr_slope.h new file mode 100644 index 000000000..9c148d895 --- /dev/null +++ b/utils/regr/regr_slope.h @@ -0,0 +1,88 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/*********************************************************************** +* $Id$ +* +* regr_slope.h +***********************************************************************/ + +/** + * Columnstore interface for for the regr_slope function + * + * + * CREATE AGGREGATE FUNCTION regr_slope returns REAL + * soname 'libregr_mysql.so'; + * + */ +#ifndef HEADER_regr_slope +#define HEADER_regr_slope + +#include +#include +#include +#ifdef _MSC_VER +#include +#else +#include +#endif + +#include "mcsv1_udaf.h" +#include "calpontsystemcatalog.h" +#include "windowfunctioncolumn.h" +using namespace execplan; + +#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT) +#define EXPORT __declspec(dllexport) +#else +#define EXPORT +#endif + +namespace mcsv1sdk +{ + +// Return the regr_slope value of the dataset + +class regr_slope : public mcsv1_UDAF +{ +public: + // Defaults OK + regr_slope() : mcsv1_UDAF() {}; + virtual ~regr_slope() {}; + + virtual ReturnCode init(mcsv1Context* context, + ColumnDatum* colTypes); + + virtual ReturnCode reset(mcsv1Context* context); + + virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn); + + virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn); + + virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut); + + virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped); + +protected: +}; + +}; // namespace + +#undef EXPORT + +#endif // HEADER_regr_slope.h + diff --git a/utils/regr/regrmysql.cpp b/utils/regr/regrmysql.cpp index 6870f3050..3b048f14d 100644 --- a/utils/regr/regrmysql.cpp +++ b/utils/regr/regrmysql.cpp @@ -199,7 +199,7 @@ extern "C" #ifdef _MSC_VER __declspec(dllexport) #endif - long long regr_avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + double regr_avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), char* is_null, char* error __attribute__((unused))) { struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr; @@ -283,7 +283,7 @@ extern "C" #ifdef _MSC_VER __declspec(dllexport) #endif - long long regr_avgy(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + double regr_avgy(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), char* is_null, char* error __attribute__((unused))) { struct regr_avgy_data* data = (struct regr_avgy_data*)initid->ptr; @@ -368,7 +368,225 @@ extern "C" struct regr_count_data* data = (struct regr_count_data*)initid->ptr; return data->cnt; } + //======================================================================= + + /** + * regr_slope connector stub + */ + struct regr_slope_data + { + int64_t cnt; + double sumx; + double sumx2; // sum of (x squared) + double sumy; + double sumxy; // sum of (x*y) + }; + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + my_bool regr_slope_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + struct regr_slope_data* data; + if (args->arg_count != 2) + { + strcpy(message,"regr_slope() requires two arguments"); + return 1; + } + + if (!(data = (struct regr_slope_data*) malloc(sizeof(struct regr_slope_data)))) + { + strmov(message,"Couldn't allocate memory"); + return 1; + } + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + + initid->ptr = (char*)data; + return 0; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void regr_slope_deinit(UDF_INIT* initid) + { + free(initid->ptr); + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void + regr_slope_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), + char* message __attribute__((unused))) + { + struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr; + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void + regr_slope_add(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, + char* message __attribute__((unused))) + { + // Test for NULL in x and y + if (args->args[0] == 0 || args->args[1] == 0) + { + return; + } + struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr; + double yval = cvtArgToDouble(args->arg_type[0], args->args[0]); + double xval = cvtArgToDouble(args->arg_type[1], args->args[1]); + data->sumy += yval; + data->sumx += xval; + data->sumx2 += xval*xval; + data->sumxy += xval*yval; + ++data->cnt; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + double regr_slope(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + char* is_null, char* error __attribute__((unused))) + { + struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr; + double N = data->cnt; + if (N > 0) + { + double sumx = data->sumx; + double sumy = data->sumy; + double sumx2 = data->sumx2; + double sumxy = data->sumxy; + double variance = (N * sumx2) - (sumx * sumx); + if (variance) + { + return ((N * sumxy) - (sumx * sumy)) / variance; + } + } + *is_null = 1; + return 0; + } + +//======================================================================= + + /** + * regr_intercept connector stub + */ + struct regr_intercept_data + { + int64_t cnt; + double sumx; + double sumx2; // sum of (x squared) + double sumy; + double sumxy; // sum of (x*y) + }; + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + my_bool regr_intercept_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + struct regr_intercept_data* data; + if (args->arg_count != 2) + { + strcpy(message,"regr_intercept() requires two arguments"); + return 1; + } + + if (!(data = (struct regr_intercept_data*) malloc(sizeof(struct regr_intercept_data)))) + { + strmov(message,"Couldn't allocate memory"); + return 1; + } + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + + initid->ptr = (char*)data; + return 0; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void regr_intercept_deinit(UDF_INIT* initid) + { + free(initid->ptr); + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void + regr_intercept_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), + char* message __attribute__((unused))) + { + struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr; + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void + regr_intercept_add(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, + char* message __attribute__((unused))) + { + // Test for NULL in x and y + if (args->args[0] == 0 || args->args[1] == 0) + { + return; + } + struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr; + double yval = cvtArgToDouble(args->arg_type[0], args->args[0]); + double xval = cvtArgToDouble(args->arg_type[1], args->args[1]); + data->sumy += yval; + data->sumx += xval; + data->sumx2 += xval*xval; + data->sumxy += xval*yval; + ++data->cnt; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + double regr_intercept(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + char* is_null, char* error __attribute__((unused))) + { + struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr; + double N = data->cnt; + if (N > 0) + { + double sumx = data->sumx; + double sumy = data->sumy; + double sumx2 = data->sumx2; + double sumxy = data->sumxy; + double variance = (N * sumx2) - (sumx * sumx); + if (variance) + { + double slope = ((N * sumxy) - (sumx * sumy)) / variance; + return (sumy - (slope * sumx)) / N; + } + } + *is_null = 1; + return 0; + } } // vim:ts=4 sw=4: diff --git a/utils/rowgroup/rowaggregation.h b/utils/rowgroup/rowaggregation.h index b593239cd..e039d5c2a 100644 --- a/utils/rowgroup/rowaggregation.h +++ b/utils/rowgroup/rowaggregation.h @@ -242,7 +242,7 @@ struct RowUDAFFunctionCol : public RowAggFunctionCol mcsv1sdk::mcsv1Context fUDAFContext; // The UDAF context bool bInterrupted; // Shared by all the threads -}; + }; inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const { diff --git a/utils/udfsdk/mcsv1_udaf.h b/utils/udfsdk/mcsv1_udaf.h index 073b5164a..ec0d0cb79 100644 --- a/utils/udfsdk/mcsv1_udaf.h +++ b/utils/udfsdk/mcsv1_udaf.h @@ -381,6 +381,7 @@ private: std::string functionName; mcsv1sdk::mcsv1_UDAF* func; int32_t fParamCount; + std::vector paramKeys; public: // For use by the framework @@ -403,6 +404,7 @@ public: EXPORT mcsv1sdk::mcsv1_UDAF* getFunction() const; EXPORT boost::shared_ptr getUserDataSP(); EXPORT void setParamCount(int32_t paramCount); + std::vector* getParamKeys(); }; // Since aggregate functions can operate on any data type, we use the following structure @@ -606,6 +608,9 @@ public: virtual ReturnCode createUserData(UserData*& userdata, int32_t& length); protected: + // some handy conversion routines + template + T convertAnyTo(static_any::any&); // These are handy for testing the actual type of static_any static const static_any::any& charTypeId; static const static_any::any& scharTypeId; @@ -948,6 +953,11 @@ inline void mcsv1Context::setParamCount(int32_t paramCount) fParamCount = paramCount; } +inline std::vector* mcsv1Context::getParamKeys() +{ + return ¶mKeys; +} + inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) { return NOT_IMPLEMENTED; @@ -960,6 +970,68 @@ inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::createUserData(UserData*& userData, in return SUCCESS; } + + +// Handy helper functions +template +inline T mcsv1_UDAF::convertAnyTo(static_any::any& valIn) +{ + T val; + if (valIn.compatible(longTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(charTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(scharTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(shortTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(intTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(llTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(ucharTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(ushortTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(uintTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(ulongTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(ullTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(floatTypeId)) + { + val = valIn.cast(); + } + else if (valIn.compatible(doubleTypeId)) + { + val = valIn.cast(); + } + return val; +} + }; // namespace mcssdk #undef EXPORT