1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-521 Some more fixes for multi-parm aggregates. Add regr slope

This commit is contained in:
David Hall
2018-09-25 16:31:10 -05:00
parent 3fac7b1e19
commit d930a1e322
13 changed files with 776 additions and 305 deletions

View File

@ -76,21 +76,55 @@ namespace
struct cmpTuple
{
bool operator()(boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*> a,
boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*> b)
bool operator()(boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>* > a,
boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>* > b)
{
if (boost::get<0>(a) < boost::get<0>(b))
return true;
uint32_t keya = boost::get<0>(a);
uint32_t keyb = boost::get<0>(b);
int opa;
int opb;
mcsv1sdk::mcsv1_UDAF* pUDAFa;
mcsv1sdk::mcsv1_UDAF* pUDAFb;
if (boost::get<0>(a) == boost::get<0>(b))
// If key is less than
if (keya < keyb)
return true;
if (keya == keyb)
{
if (boost::get<1>(a) < boost::get<1>(b))
// test Op
opa = boost::get<1>(a);
opb = boost::get<1>(b);
if (opa < opb)
return true;
if (opa == opb)
{
// look at the UDAF object
pUDAFa = boost::get<2>(a);
pUDAFb = boost::get<2>(b);
if (pUDAFa < pUDAFb)
return true;
if (pUDAFa == pUDAFb)
{
if (pUDAFa == NULL)
return false;
std::vector<uint32_t>* paramKeysa = boost::get<3>(a);
std::vector<uint32_t>* paramKeysb = boost::get<3>(b);
if (boost::get<1>(a) == boost::get<1>(b))
return boost::get<2>(a) < boost::get<2>(b);
if (paramKeysa->size() < paramKeysb->size())
return true;
if (paramKeysa->size() == paramKeysb->size())
{
if (paramKeysa == NULL)
return false;
for (uint64_t i = 0; i < paramKeysa->size(); ++i)
{
if ((*paramKeysa)[i] < (*paramKeysb)[i])
return true;
}
}
}
}
}
return false;
}
};
@ -101,7 +135,7 @@ typedef vector<RowBucket> RowBucketVec;
// The AGG_MAP type is used to maintain a list of aggregate functions in order to
// detect duplicates. Since all UDAF have the same op type (ROWAGG_UDAF), we add in
// the function pointer in order to ensure uniqueness.
typedef map<boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*>, uint64_t, cmpTuple> AGG_MAP;
typedef map<boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>* >, uint64_t, cmpTuple> AGG_MAP;
inline RowAggFunctionType functionIdMap(int planFuncId)
{
@ -796,7 +830,6 @@ const string TupleAggregateStep::toString() const
return oss.str();
}
SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
{
SJSTEP spjs;
@ -1301,6 +1334,16 @@ void TupleAggregateStep::prep1PhaseAggregate(
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, outIdx));
break;
@ -1502,7 +1545,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
}
// find if this func is a duplicate
AGG_MAP::iterator iter = aggFuncMap.find(boost::make_tuple(key, aggOp, pUDAFFunc));
AGG_MAP::iterator iter = aggFuncMap.find(boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (iter != aggFuncMap.end())
{
@ -1519,7 +1562,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
}
else
{
aggFuncMap.insert(make_pair(boost::make_tuple(key, aggOp, pUDAFFunc), funct->fOutputColumnIndex));
aggFuncMap.insert(make_pair(boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex));
}
if (aggOp != ROWAGG_MULTI_PARM)
@ -1740,7 +1783,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
typeAgg.push_back(typeProj[colProj]);
widthAgg.push_back(widthProj[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg));
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
colAgg++;
}
@ -1781,7 +1824,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
typeAgg.push_back(typeProj[colProj]);
widthAgg.push_back(widthProj[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc), colAgg));
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
colAgg++;
}
@ -1811,7 +1854,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(
aggOp, stats, colAgg, colAgg, -1));
functionVec1.push_back(funct);
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAgg));
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
colAgg++;
continue;
@ -1858,6 +1901,16 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
break;
@ -1874,11 +1927,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
}
// skip if this is a duplicate
if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end())
if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end())
continue;
functionVec1.push_back(funct);
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAgg));
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg));
switch (aggOp)
{
@ -2103,6 +2156,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// If the param is const
if (udafc)
{
if (udafcParamIdx > udafc->aggParms().size() - 1)
{
throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr);
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
@ -2162,7 +2219,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
{
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
groupByNoDist.push_back(groupby);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc), i));
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), i));
}
// locate the return column position in aggregated rowgroup
@ -2186,7 +2243,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
jobInfo.distinctColVec.end() )
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc));
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -2218,6 +2275,16 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
break;
}
}
@ -2318,7 +2385,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
case ROWAGG_BIT_XOR:
default:
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -2349,7 +2416,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
{
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -2534,7 +2601,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
functionVec2.push_back(funct);
// find if this func is a duplicate
AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (iter != aggDupFuncMap.end())
{
@ -2551,7 +2618,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
}
else
{
aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc),
aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
funct->fOutputColumnIndex));
}
@ -3048,7 +3115,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
typeAggPm.push_back(typeProj[colProj]);
widthAggPm.push_back(width[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
colAggPm++;
}
@ -3089,7 +3156,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
typeAggPm.push_back(typeProj[colProj]);
widthAggPm.push_back(width[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
colAggPm++;
}
@ -3138,6 +3205,16 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
break;
@ -3154,11 +3231,11 @@ void TupleAggregateStep::prep2PhasesAggregate(
}
// skip if this is a duplicate
if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end())
if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end())
continue;
functionVecPm.push_back(funct);
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm));
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
switch (aggOp)
{
@ -3385,6 +3462,10 @@ void TupleAggregateStep::prep2PhasesAggregate(
// If the param is const
if (udafc)
{
if (udafcParamIdx > udafc->aggParms().size() - 1)
{
throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with too many parms", aggregateFuncErr);
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
@ -3460,6 +3541,16 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
break;
}
}
@ -3469,7 +3560,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
}
}
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -3490,7 +3581,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
{
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -3632,7 +3723,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
functionVecUm.push_back(funct);
// find if this func is a duplicate
AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (iter != aggDupFuncMap.end())
{
@ -3649,7 +3740,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
}
else
{
aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc),
aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
funct->fOutputColumnIndex));
}
@ -3911,7 +4002,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
typeAggPm.push_back(typeProj[colProj]);
widthAggPm.push_back(width[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
colAggPm++;
}
@ -3952,7 +4043,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
typeAggPm.push_back(typeProj[colProj]);
widthAggPm.push_back(width[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc), colAggPm));
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm));
colAggPm++;
}
@ -4008,6 +4099,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
break;
@ -4024,11 +4125,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
}
// skip if this is a duplicate
if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc)) != aggFuncMap.end())
if (aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end())
continue;
functionVecPm.push_back(funct);
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc), colAggPm-multiParm));
aggFuncMap.insert(make_pair(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm-multiParm));
switch (aggOp)
{
@ -4253,6 +4354,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// If the param is const
if (udafc)
{
if (udafcParamIdx > udafc->aggParms().size() - 1)
{
throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr);
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
@ -4400,6 +4505,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i+1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM;
++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
break;
}
}
@ -4412,7 +4527,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
jobInfo.distinctColVec.end() )
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc));
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -4515,7 +4630,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// For non distinct aggregates
if (colUm == -1)
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -4536,7 +4651,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
{
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
@ -4674,7 +4789,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
functionVecUm.push_back(funct);
// find if this func is a duplicate
AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
AGG_MAP::iterator iter = aggDupFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (iter != aggDupFuncMap.end())
{
@ -4691,7 +4806,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
}
else
{
aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc),
aggDupFuncMap.insert(make_pair(boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
funct->fOutputColumnIndex));
}

View File

@ -323,6 +323,9 @@ string ConvertFuncName(Item_sum* item)
case Item_sum::LAG_FUNC:
return "LAG";
break;
default:
// We just don't handle it.
break;
};
return "";

View File

@ -87,6 +87,9 @@ CREATE FUNCTION mcssystemreadonly RETURNS INTEGER soname 'libcalmysql.so';
CREATE AGGREGATE FUNCTION regr_avgx RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_avgy RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_count RETURNS INTEGER soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_slope RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_intercept RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION distinct_count RETURNS INTEGER soname 'libudf_mysql.so';
CREATE DATABASE IF NOT EXISTS infinidb_vtable;

View File

@ -44,6 +44,7 @@
<Project File="primitives/primproc/primproc.vpj"/>
<Project File="procmgr/procmgr.vpj"/>
<Project File="procmon/procmon.vpj"/>
<Project File="utils/regr/regr.vpj"/>
<Project File="oamapps/replayTransactionLog/ReplayTransactionLog.vpj"/>
<Project File="oamapps/resourceMonitor/resourceMonitor.vpj"/>
<Project File="utils/rowgroup/rowgroup.vpj"/>

26
utils/regr/CMakeLists.txt Executable file
View File

@ -0,0 +1,26 @@
include_directories( ${ENGINE_COMMON_INCLUDES}
../../dbcon/mysql )
########### next target ###############
set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept)
add_definitions(-DMYSQL_DYNAMIC_PLUGIN)
add_library(regr SHARED ${regr_LIB_SRCS} )
set_target_properties(regr PROPERTIES VERSION 1.1.0 SOVERSION 1)
install(TARGETS regr DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
set(regr_mysql_LIB_SRCS regrmysql.cpp)
add_library(regr_mysql SHARED ${regr_mysql_LIB_SRCS})
set_target_properties(regr_mysql PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS regr_mysql DESTINATION ${ENGINE_LIBDIR} COMPONENT storage-engine)

View File

@ -197,6 +197,9 @@
<F N="regr_avgx.cpp"/>
<F N="regr_avgy.cpp"/>
<F N="regr_count.cpp"/>
<F N="regr_intercept.cpp"/>
<F N="regr_slope.cpp"/>
<F N="regrmysql.cpp"/>
</Folder>
<Folder
Name="Header Files"
@ -204,6 +207,8 @@
<F N="regr_avgx.h"/>
<F N="regr_avgy.h"/>
<F N="regr_count.h"/>
<F N="regr_intercept.h"/>
<F N="regr_slope.h"/>
</Folder>
<Folder
Name="Resource Files"

View File

@ -84,77 +84,12 @@ mcsv1_UDAF::ReturnCode regr_avgx::reset(mcsv1Context* context)
mcsv1_UDAF::ReturnCode regr_avgx::nextValue(mcsv1Context* context, ColumnDatum* valsIn)
{
static_any::any& valIn_y = valsIn[0].columnData;
static_any::any& valIn_x = valsIn[1].columnData;
struct regr_avgx_data* data = (struct regr_avgx_data*)context->getUserData()->data;
DATATYPE val = 0.0;
if (context->isParamNull(0) || context->isParamNull(1))
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_x.empty() || valIn_y.empty()) // Usually empty if NULL. Probably redundant
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_x.compatible(longTypeId))
{
val = valIn_x.cast<long>();
}
else if (valIn_x.compatible(charTypeId))
{
val = valIn_x.cast<char>();
}
else if (valIn_x.compatible(scharTypeId))
{
val = valIn_x.cast<signed char>();
}
else if (valIn_x.compatible(shortTypeId))
{
val = valIn_x.cast<short>();
}
else if (valIn_x.compatible(intTypeId))
{
val = valIn_x.cast<int>();
}
else if (valIn_x.compatible(llTypeId))
{
val = valIn_x.cast<long long>();
}
else if (valIn_x.compatible(ucharTypeId))
{
val = valIn_x.cast<unsigned char>();
}
else if (valIn_x.compatible(ushortTypeId))
{
val = valIn_x.cast<unsigned short>();
}
else if (valIn_x.compatible(uintTypeId))
{
val = valIn_x.cast<unsigned int>();
}
else if (valIn_x.compatible(ulongTypeId))
{
val = valIn_x.cast<unsigned long>();
}
else if (valIn_x.compatible(ullTypeId))
{
val = valIn_x.cast<unsigned long long>();
}
else if (valIn_x.compatible(floatTypeId))
{
val = valIn_x.cast<float>();
}
else if (valIn_x.compatible(doubleTypeId))
{
val = valIn_x.cast<double>();
}
DATATYPE val = convertAnyTo<double>(valIn_x);
// For decimal types, we need to move the decimal point.
uint32_t scale = valsIn[1].scale;
if (val != 0 && scale > 0)
{
val /= pow(10.0, (double)scale);
@ -202,76 +137,12 @@ mcsv1_UDAF::ReturnCode regr_avgx::evaluate(mcsv1Context* context, static_any::an
mcsv1_UDAF::ReturnCode regr_avgx::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
static_any::any& valIn_y = valsDropped[0].columnData;
static_any::any& valIn_x = valsDropped[1].columnData;
struct regr_avgx_data* data = (struct regr_avgx_data*)context->getUserData()->data;
DATATYPE val = 0.0;
if (context->isParamNull(0) || context->isParamNull(1))
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_x.empty() || valIn_y.empty())
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_x.compatible(charTypeId))
{
val = valIn_x.cast<char>();
}
else if (valIn_x.compatible(scharTypeId))
{
val = valIn_x.cast<signed char>();
}
else if (valIn_x.compatible(shortTypeId))
{
val = valIn_x.cast<short>();
}
else if (valIn_x.compatible(intTypeId))
{
val = valIn_x.cast<int>();
}
else if (valIn_x.compatible(longTypeId))
{
val = valIn_x.cast<long>();
}
else if (valIn_x.compatible(llTypeId))
{
val = valIn_x.cast<long long>();
}
else if (valIn_x.compatible(ucharTypeId))
{
val = valIn_x.cast<unsigned char>();
}
else if (valIn_x.compatible(ushortTypeId))
{
val = valIn_x.cast<unsigned short>();
}
else if (valIn_x.compatible(uintTypeId))
{
val = valIn_x.cast<unsigned int>();
}
else if (valIn_x.compatible(ulongTypeId))
{
val = valIn_x.cast<unsigned long>();
}
else if (valIn_x.compatible(ullTypeId))
{
val = valIn_x.cast<unsigned long long>();
}
else if (valIn_x.compatible(floatTypeId))
{
val = valIn_x.cast<float>();
}
else if (valIn_x.compatible(doubleTypeId))
{
val = valIn_x.cast<double>();
}
double val = convertAnyTo<double>(valIn_x);
// For decimal types, we need to move the decimal point.
uint32_t scale = valsDropped[1].scale;
if (val != 0 && scale > 0)
{
val /= pow(10.0, (double)scale);

View File

@ -85,75 +85,11 @@ mcsv1_UDAF::ReturnCode regr_avgy::reset(mcsv1Context* context)
mcsv1_UDAF::ReturnCode regr_avgy::nextValue(mcsv1Context* context, ColumnDatum* valsIn)
{
static_any::any& valIn_y = valsIn[0].columnData;
static_any::any& valIn_x = valsIn[1].columnData;
struct regr_avgy_data* data = (struct regr_avgy_data*)context->getUserData()->data;
DATATYPE val = 0.0;
if (context->isParamNull(0) || context->isParamNull(1))
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_x.empty() || valIn_y.empty()) // Usually empty if NULL. Probably redundant
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_y.compatible(longTypeId))
{
val = valIn_y.cast<long>();
}
else if (valIn_y.compatible(charTypeId))
{
val = valIn_y.cast<char>();
}
else if (valIn_y.compatible(scharTypeId))
{
val = valIn_y.cast<signed char>();
}
else if (valIn_y.compatible(shortTypeId))
{
val = valIn_y.cast<short>();
}
else if (valIn_y.compatible(intTypeId))
{
val = valIn_y.cast<int>();
}
else if (valIn_y.compatible(llTypeId))
{
val = valIn_y.cast<long long>();
}
else if (valIn_y.compatible(ucharTypeId))
{
val = valIn_y.cast<unsigned char>();
}
else if (valIn_y.compatible(ushortTypeId))
{
val = valIn_y.cast<unsigned short>();
}
else if (valIn_y.compatible(uintTypeId))
{
val = valIn_y.cast<unsigned int>();
}
else if (valIn_y.compatible(ulongTypeId))
{
val = valIn_y.cast<unsigned long>();
}
else if (valIn_y.compatible(ullTypeId))
{
val = valIn_y.cast<unsigned long long>();
}
else if (valIn_y.compatible(floatTypeId))
{
val = valIn_y.cast<float>();
}
else if (valIn_y.compatible(doubleTypeId))
{
val = valIn_y.cast<double>();
}
double val = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scale = valsIn[0].scale;
if (val != 0 && scale > 0)
{
val /= pow(10.0, (double)scale);
@ -199,75 +135,11 @@ mcsv1_UDAF::ReturnCode regr_avgy::evaluate(mcsv1Context* context, static_any::an
mcsv1_UDAF::ReturnCode regr_avgy::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
static_any::any& valIn_y = valsDropped[0].columnData;
static_any::any& valIn_x = valsDropped[1].columnData;
struct regr_avgy_data* data = (struct regr_avgy_data*)context->getUserData()->data;
DATATYPE val = 0.0;
if (context->isParamNull(0) || context->isParamNull(1))
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_x.empty() || valIn_y.empty())
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_y.compatible(charTypeId))
{
val = valIn_y.cast<char>();
}
else if (valIn_y.compatible(scharTypeId))
{
val = valIn_y.cast<signed char>();
}
else if (valIn_y.compatible(shortTypeId))
{
val = valIn_y.cast<short>();
}
else if (valIn_y.compatible(intTypeId))
{
val = valIn_y.cast<int>();
}
else if (valIn_y.compatible(longTypeId))
{
val = valIn_y.cast<long>();
}
else if (valIn_y.compatible(llTypeId))
{
val = valIn_y.cast<long long>();
}
else if (valIn_y.compatible(ucharTypeId))
{
val = valIn_y.cast<unsigned char>();
}
else if (valIn_y.compatible(ushortTypeId))
{
val = valIn_y.cast<unsigned short>();
}
else if (valIn_y.compatible(uintTypeId))
{
val = valIn_y.cast<unsigned int>();
}
else if (valIn_y.compatible(ulongTypeId))
{
val = valIn_y.cast<unsigned long>();
}
else if (valIn_y.compatible(ullTypeId))
{
val = valIn_y.cast<unsigned long long>();
}
else if (valIn_y.compatible(floatTypeId))
{
val = valIn_y.cast<float>();
}
else if (valIn_y.compatible(doubleTypeId))
{
val = valIn_y.cast<double>();
}
double val = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scale = valsDropped[0].scale;
if (val != 0 && scale > 0)
{
val /= pow(10.0, (double)scale);

197
utils/regr/regr_slope.cpp Normal file
View File

@ -0,0 +1,197 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sstream>
#include <cstring>
#include <typeinfo>
#include "regr_slope.h"
#include "bytestream.h"
#include "objectreader.h"
using namespace mcsv1sdk;
class Add_regr_slope_ToUDAFMap
{
public:
Add_regr_slope_ToUDAFMap()
{
UDAFMap::getMap()["regr_slope"] = new regr_slope();
}
};
static Add_regr_slope_ToUDAFMap addToMap;
// Use the simple data model
struct regr_slope_data
{
uint64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumxy; // sum of (x*y)
};
mcsv1_UDAF::ReturnCode regr_slope::init(mcsv1Context* context,
ColumnDatum* colTypes)
{
if (context->getParameterCount() != 2)
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("regr_slope() with other than 2 arguments");
return mcsv1_UDAF::ERROR;
}
context->setUserDataSize(sizeof(regr_slope_data));
context->setResultType(CalpontSystemCatalog::DOUBLE);
context->setColWidth(8);
if (colTypes[0].scale)
{
context->setScale(colTypes[0].scale + 8);
context->setPrecision(19);
}
context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS);
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_slope::reset(mcsv1Context* context)
{
struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data;
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_slope::nextValue(mcsv1Context* context, ColumnDatum* valsIn)
{
static_any::any& valIn_y = valsIn[0].columnData;
static_any::any& valIn_x = valsIn[1].columnData;
struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data;
double valx = 0.0;
double valy = 0.0;
valx = convertAnyTo<double>(valIn_x);
valy = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scaley = valsIn[0].scale;
if (valy != 0 && scaley > 0)
{
valy /= pow(10.0, (double)scaley);
}
data->sumy += valy;
// For decimal types, we need to move the decimal point.
uint32_t scalex = valsIn[1].scale;
if (valx != 0 && scalex > 0)
{
valx /= pow(10.0, (double)scaley);
}
data->sumx += valx;
data->sumx2 += valx*valx;
data->sumxy += valx*valy;
++data->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_slope::subEvaluate(mcsv1Context* context, const UserData* userDataIn)
{
if (!userDataIn)
{
return mcsv1_UDAF::SUCCESS;
}
struct regr_slope_data* outData = (struct regr_slope_data*)context->getUserData()->data;
struct regr_slope_data* inData = (struct regr_slope_data*)userDataIn->data;
outData->sumx += inData->sumx;
outData->sumx2 += inData->sumx2;
outData->sumy += inData->sumy;
outData->sumxy += inData->sumxy;
outData->cnt += inData->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_slope::evaluate(mcsv1Context* context, static_any::any& valOut)
{
struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data;
double N = data->cnt;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumxy = data->sumxy;
double variance = (N * sumx2) - (sumx * sumx);
if (variance != 0)
{
valOut = ((N * sumxy) - (sumx * sumy)) / variance;
}
}
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_slope::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
static_any::any& valIn_y = valsDropped[0].columnData;
static_any::any& valIn_x = valsDropped[1].columnData;
struct regr_slope_data* data = (struct regr_slope_data*)context->getUserData()->data;
double valx = 0.0;
double valy = 0.0;
valx = convertAnyTo<double>(valIn_x);
valy = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scaley = valsDropped[0].scale;
if (valy != 0 && scaley > 0)
{
valy /= pow(10.0, (double)scaley);
}
data->sumy -= valy;
// For decimal types, we need to move the decimal point.
uint32_t scalex = valsDropped[1].scale;
if (valx != 0 && scalex > 0)
{
valx /= pow(10.0, (double)scaley);
}
data->sumx -= valx;
data->sumx2 -= valx*valx;
data->sumxy -= valx*valy;
--data->cnt;
return mcsv1_UDAF::SUCCESS;
}

88
utils/regr/regr_slope.h Normal file
View File

@ -0,0 +1,88 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id$
*
* regr_slope.h
***********************************************************************/
/**
* Columnstore interface for for the regr_slope function
*
*
* CREATE AGGREGATE FUNCTION regr_slope returns REAL
* soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_slope
#define HEADER_regr_slope
#include <cstdlib>
#include <string>
#include <vector>
#ifdef _MSC_VER
#include <unordered_map>
#else
#include <tr1/unordered_map>
#endif
#include "mcsv1_udaf.h"
#include "calpontsystemcatalog.h"
#include "windowfunctioncolumn.h"
using namespace execplan;
#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
namespace mcsv1sdk
{
// Return the regr_slope value of the dataset
class regr_slope : public mcsv1_UDAF
{
public:
// Defaults OK
regr_slope() : mcsv1_UDAF() {};
virtual ~regr_slope() {};
virtual ReturnCode init(mcsv1Context* context,
ColumnDatum* colTypes);
virtual ReturnCode reset(mcsv1Context* context);
virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn);
virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn);
virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut);
virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped);
protected:
};
}; // namespace
#undef EXPORT
#endif // HEADER_regr_slope.h

View File

@ -199,7 +199,7 @@ extern "C"
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long regr_avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
double regr_avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
@ -283,7 +283,7 @@ extern "C"
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long regr_avgy(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
double regr_avgy(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
struct regr_avgy_data* data = (struct regr_avgy_data*)initid->ptr;
@ -368,7 +368,225 @@ extern "C"
struct regr_count_data* data = (struct regr_count_data*)initid->ptr;
return data->cnt;
}
//=======================================================================
/**
* regr_slope connector stub
*/
struct regr_slope_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool regr_slope_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct regr_slope_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_slope() requires two arguments");
return 1;
}
if (!(data = (struct regr_slope_data*) malloc(sizeof(struct regr_slope_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void regr_slope_deinit(UDF_INIT* initid)
{
free(initid->ptr);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_slope_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
{
struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr;
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_slope_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
{
// Test for NULL in x and y
if (args->args[0] == 0 || args->args[1] == 0)
{
return;
}
struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr;
double yval = cvtArgToDouble(args->arg_type[0], args->args[0]);
double xval = cvtArgToDouble(args->arg_type[1], args->args[1]);
data->sumy += yval;
data->sumx += xval;
data->sumx2 += xval*xval;
data->sumxy += xval*yval;
++data->cnt;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
double regr_slope(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr;
double N = data->cnt;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumxy = data->sumxy;
double variance = (N * sumx2) - (sumx * sumx);
if (variance)
{
return ((N * sumxy) - (sumx * sumy)) / variance;
}
}
*is_null = 1;
return 0;
}
//=======================================================================
/**
* regr_intercept connector stub
*/
struct regr_intercept_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool regr_intercept_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct regr_intercept_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_intercept() requires two arguments");
return 1;
}
if (!(data = (struct regr_intercept_data*) malloc(sizeof(struct regr_intercept_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void regr_intercept_deinit(UDF_INIT* initid)
{
free(initid->ptr);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_intercept_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
{
struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr;
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_intercept_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
{
// Test for NULL in x and y
if (args->args[0] == 0 || args->args[1] == 0)
{
return;
}
struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr;
double yval = cvtArgToDouble(args->arg_type[0], args->args[0]);
double xval = cvtArgToDouble(args->arg_type[1], args->args[1]);
data->sumy += yval;
data->sumx += xval;
data->sumx2 += xval*xval;
data->sumxy += xval*yval;
++data->cnt;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
double regr_intercept(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr;
double N = data->cnt;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumxy = data->sumxy;
double variance = (N * sumx2) - (sumx * sumx);
if (variance)
{
double slope = ((N * sumxy) - (sumx * sumy)) / variance;
return (sumy - (slope * sumx)) / N;
}
}
*is_null = 1;
return 0;
}
}
// vim:ts=4 sw=4:

View File

@ -381,6 +381,7 @@ private:
std::string functionName;
mcsv1sdk::mcsv1_UDAF* func;
int32_t fParamCount;
std::vector<uint32_t> paramKeys;
public:
// For use by the framework
@ -403,6 +404,7 @@ public:
EXPORT mcsv1sdk::mcsv1_UDAF* getFunction() const;
EXPORT boost::shared_ptr<UserData> getUserDataSP();
EXPORT void setParamCount(int32_t paramCount);
std::vector<uint32_t>* getParamKeys();
};
// Since aggregate functions can operate on any data type, we use the following structure
@ -606,6 +608,9 @@ public:
virtual ReturnCode createUserData(UserData*& userdata, int32_t& length);
protected:
// some handy conversion routines
template<typename T>
T convertAnyTo(static_any::any&);
// These are handy for testing the actual type of static_any
static const static_any::any& charTypeId;
static const static_any::any& scharTypeId;
@ -948,6 +953,11 @@ inline void mcsv1Context::setParamCount(int32_t paramCount)
fParamCount = paramCount;
}
inline std::vector<uint32_t>* mcsv1Context::getParamKeys()
{
return &paramKeys;
}
inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
return NOT_IMPLEMENTED;
@ -960,6 +970,68 @@ inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::createUserData(UserData*& userData, in
return SUCCESS;
}
// Handy helper functions
template<typename T>
inline T mcsv1_UDAF::convertAnyTo(static_any::any& valIn)
{
T val;
if (valIn.compatible(longTypeId))
{
val = valIn.cast<long>();
}
else if (valIn.compatible(charTypeId))
{
val = valIn.cast<char>();
}
else if (valIn.compatible(scharTypeId))
{
val = valIn.cast<signed char>();
}
else if (valIn.compatible(shortTypeId))
{
val = valIn.cast<short>();
}
else if (valIn.compatible(intTypeId))
{
val = valIn.cast<int>();
}
else if (valIn.compatible(llTypeId))
{
val = valIn.cast<long long>();
}
else if (valIn.compatible(ucharTypeId))
{
val = valIn.cast<unsigned char>();
}
else if (valIn.compatible(ushortTypeId))
{
val = valIn.cast<unsigned short>();
}
else if (valIn.compatible(uintTypeId))
{
val = valIn.cast<unsigned int>();
}
else if (valIn.compatible(ulongTypeId))
{
val = valIn.cast<unsigned long>();
}
else if (valIn.compatible(ullTypeId))
{
val = valIn.cast<unsigned long long>();
}
else if (valIn.compatible(floatTypeId))
{
val = valIn.cast<float>();
}
else if (valIn.compatible(doubleTypeId))
{
val = valIn.cast<double>();
}
return val;
}
}; // namespace mcssdk
#undef EXPORT