1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-521 Re-do changes for MariaDB 10.3 merge

This commit is contained in:
David Hall
2018-08-16 11:17:39 -05:00
parent 3e6f3568ec
commit 611cdb204d
6 changed files with 312 additions and 424 deletions

View File

@ -849,10 +849,9 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
idbassert(cc != NULL); // @bug5261
bool isNull = (ConstantColumn::NULLDATA == cc->type());
if (ac->aggOp() == ROWAGG_UDAF)
if (ac->aggOp() == AggregateColumn::UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
if (udafc)
{
constAggDataVec.push_back(
@ -1099,7 +1098,6 @@ void TupleAggregateStep::prep1PhaseAggregate(
uint32_t bigUintWidth = sizeof(uint64_t);
// For UDAF
uint32_t projColsUDAFIdx = 0;
uint32_t udafcParamIdx = 0;
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
// for count column of average function
@ -1296,12 +1294,10 @@ void TupleAggregateStep::prep1PhaseAggregate(
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
@ -1310,7 +1306,6 @@ void TupleAggregateStep::prep1PhaseAggregate(
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(1)prep1PhaseAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
@ -1489,44 +1484,11 @@ void TupleAggregateStep::prep1PhaseAggregate(
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
// If the first param is const
udafcParamIdx = 0;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
++udafcParamIdx;
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
widthAgg.push_back(width[colProj]);
// If the param is const
if (udafc)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
}
else
{
throw QueryDataExcept("prep1PhaseAggregate: UDAF multi function with no parms", aggregateFuncErr);
}
++udafcParamIdx;
}
break;
@ -1901,7 +1863,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
@ -2121,12 +2082,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// If the first param is const
udafcParamIdx = 0;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
++udafcParamIdx;
break;
}
@ -2141,12 +2100,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
widthAgg.push_back(widthProj[colProj]);
multiParmIndexes.push_back(colAgg);
++colAgg;
// If the param is const
if (udafc)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
@ -2156,7 +2113,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
{
throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr);
}
++udafcParamIdx;
}
break;
@ -2211,7 +2167,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// locate the return column position in aggregated rowgroup
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
udafc = NULL;
@ -2256,19 +2211,16 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
@ -2565,7 +2517,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, outIdx));
@ -2609,7 +2560,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
}
++outIdx;
} // for (i
@ -2860,7 +2810,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
++multiParms;
continue;
}
if (returnedColVec[k].first != distinctColKey)
continue;
@ -2881,7 +2830,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
f->fStatsFunction,
groupBySub.size() - 1,
f->fOutputColumnIndex,
f->fAuxColumnIndex - multiParms));
f->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
}
@ -2909,7 +2858,6 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
++multiParms;
continue;
}
// search non-distinct functions in functionVec
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVec2.begin();
@ -2925,7 +2873,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
udafFuncCol->fUDAFContext,
udafFuncCol->fInputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex - multiParms));
udafFuncCol->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
else if ((f->fOutputColumnIndex == k) &&
@ -2947,7 +2895,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
f->fStatsFunction,
f->fInputColumnIndex,
f->fOutputColumnIndex,
f->fAuxColumnIndex - multiParms));
f->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
}
@ -3183,12 +3131,10 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
@ -3197,7 +3143,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(1)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
@ -3420,12 +3365,10 @@ void TupleAggregateStep::prep2PhasesAggregate(
// If the first param is const
udafcParamIdx = 0;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
++udafcParamIdx;
break;
}
@ -3439,12 +3382,10 @@ void TupleAggregateStep::prep2PhasesAggregate(
typeAggPm.push_back(typeProj[colProj]);
widthAggPm.push_back(width[colProj]);
colAggPm++;
// If the param is const
if (udafc)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
@ -3454,7 +3395,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
{
throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with no parms", aggregateFuncErr);
}
++udafcParamIdx;
}
break;
@ -3482,7 +3422,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
AGG_MAP aggDupFuncMap;
projColsUDAFIdx = 0;
// copy over the groupby vector
// update the outputColumnIndex if returned
for (uint64_t i = 0; i < groupByPm.size(); i++)
@ -3494,7 +3433,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
// locate the return column position in aggregated rowgroup from PM
// outIdx is i without the multi-columns,
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
uint32_t retKey = returnedColVec[i].first;
@ -3511,7 +3449,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
// Is this a UDAF? use the function as part of the key.
pUDAFFunc = NULL;
udafc = NULL;
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
@ -3520,14 +3457,12 @@ void TupleAggregateStep::prep2PhasesAggregate(
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
@ -3680,7 +3615,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, outIdx));
@ -3722,7 +3656,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (returnedColVec[i].second == AggregateColumn::AVG)
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
}
++outIdx;
}
@ -4067,12 +4000,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
@ -4081,7 +4012,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(1)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
@ -4300,12 +4230,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// If the first param is const
udafcParamIdx = 0;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
++udafcParamIdx;
break;
}
@ -4320,12 +4248,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
widthAggPm.push_back(width[colProj]);
multiParmIndexes.push_back(colAggPm);
colAggPm++;
// If the param is const
if (udafc)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
@ -4335,7 +4261,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
{
throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr);
}
++udafcParamIdx;
}
break;
@ -4378,17 +4303,15 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (funcPm->fAggFunction == ROWAGG_UDAF)
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get());
if (!udafFuncCol)
{
throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
funct.reset(new RowUDAFFunctionCol(
udafFuncCol->fUDAFContext,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fOutputColumnIndex - multiParms,
udafFuncCol->fAuxColumnIndex - multiParms));
udafFuncCol->fOutputColumnIndex-multiParms,
udafFuncCol->fAuxColumnIndex-multiParms));
functionNoDistVec.push_back(funct);
pUDAFFunc = udafFuncCol->fUDAFContext.getFunction();
}
@ -4398,8 +4321,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
funcPm->fAggFunction,
funcPm->fStatsFunction,
funcPm->fOutputColumnIndex,
funcPm->fOutputColumnIndex - multiParms,
funcPm->fAuxColumnIndex - multiParms));
funcPm->fOutputColumnIndex-multiParms,
funcPm->fAuxColumnIndex-multiParms));
functionNoDistVec.push_back(funct);
pUDAFFunc = NULL;
}
@ -4412,7 +4335,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
{
continue;
}
oidsAggUm.push_back(oidsAggPm[idx]);
keysAggUm.push_back(keysAggPm[idx]);
scaleAggUm.push_back(scaleAggPm[idx]);
@ -4449,7 +4371,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// locate the return column position in aggregated rowgroup from PM
// outIdx is i without the multi-columns,
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
pUDAFFunc = NULL;
@ -4470,19 +4391,16 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(4)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
@ -4496,241 +4414,222 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (it != aggFuncMap.end())
{
colUm = it->second;
}
else
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[retKey] << "' isn't in tuple.";
cerr << "prep2PhasesDistinctAggregate: distinct " << emsg.str()
<< " oid=" << (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable;
if (jobInfo.keyInfo->tupleKeyVec[retKey].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView;
cerr << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
colUm = it->second - multiParms;
}
}
switch (aggOp)
if (colUm > -1) // Means we found a DISTINCT and have a column number
{
case ROWAGG_DISTINCT_AVG:
//avgFuncMap.insert(make_pair(key, funct));
case ROWAGG_DISTINCT_SUM:
switch (aggOp)
{
if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR ||
typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR ||
typeAggUm[colUm] == CalpontSystemCatalog::BLOB ||
typeAggUm[colUm] == CalpontSystemCatalog::TEXT ||
typeAggUm[colUm] == CalpontSystemCatalog::DATE ||
typeAggUm[colUm] == CalpontSystemCatalog::DATETIME ||
typeAggUm[colUm] == CalpontSystemCatalog::TIME)
case ROWAGG_DISTINCT_AVG:
//avgFuncMap.insert(make_pair(key, funct));
case ROWAGG_DISTINCT_SUM:
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeAggUm[colUm]));
string emsg = IDBErrorInfo::instance()->
errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR ||
typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR ||
typeAggUm[colUm] == CalpontSystemCatalog::BLOB ||
typeAggUm[colUm] == CalpontSystemCatalog::TEXT ||
typeAggUm[colUm] == CalpontSystemCatalog::DATE ||
typeAggUm[colUm] == CalpontSystemCatalog::DATETIME ||
typeAggUm[colUm] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeAggUm[colUm]));
string emsg = IDBErrorInfo::instance()->
errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
if (typeAggUm[colUm] != CalpontSystemCatalog::DOUBLE &&
typeAggUm[colUm] != CalpontSystemCatalog::FLOAT)
{
if (isUnsigned(typeAggUm[colUm]))
{
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
precisionAggDist.push_back(20);
}
else
{
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
precisionAggDist.push_back(19);
}
uint32_t scale = scaleAggUm[colUm];
// for int average, FE expects a decimal
if (aggOp == ROWAGG_DISTINCT_AVG)
scale = jobInfo.scaleOfAvg[retKey]; // scale += 4;
scaleAggDist.push_back(scale);
widthAggDist.push_back(bigIntWidth);
}
else
{
typeAggDist.push_back(typeAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]);
precisionAggDist.push_back(precisionAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
}
}
// PM: put the count column for avg next to the sum
// let fall through to add a count column for average function
//if (aggOp != ROWAGG_DISTINCT_AVG)
break;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
if (typeAggUm[colUm] != CalpontSystemCatalog::DOUBLE &&
typeAggUm[colUm] != CalpontSystemCatalog::FLOAT)
case ROWAGG_COUNT_DISTINCT_COL_NAME:
{
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
// work around count() in select subquery
precisionAggDist.push_back(9999);
if (isUnsigned(typeAggUm[colUm]))
{
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
precisionAggDist.push_back(20);
}
else
{
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
precisionAggDist.push_back(19);
}
uint32_t scale = scaleAggUm[colUm];
// for int average, FE expects a decimal
if (aggOp == ROWAGG_DISTINCT_AVG)
scale = jobInfo.scaleOfAvg[retKey]; // scale += 4;
scaleAggDist.push_back(scale);
widthAggDist.push_back(bigIntWidth);
}
else
{
typeAggDist.push_back(typeAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]);
precisionAggDist.push_back(precisionAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
}
}
// PM: put the count column for avg next to the sum
// let fall through to add a count column for average function
//if (aggOp != ROWAGG_DISTINCT_AVG)
break;
break;
case ROWAGG_COUNT_DISTINCT_COL_NAME:
default:
// cound happen if agg and agg distinct use same column.
colUm = -1;
break;
} // switch
}
// For non distinct aggregates
if (colUm == -1)
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
if (it != aggFuncMap.end())
{
colUm = it->second - multiParms;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
// work around count() in select subquery
precisionAggDist.push_back(9999);
if (isUnsigned(typeAggUm[colUm]))
{
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
}
else
{
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
}
widthAggDist.push_back(bigIntWidth);
keysAggDist.push_back(keysAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]);
precisionAggDist.push_back(precisionAggUm[colUm]);
typeAggDist.push_back(typeAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
colUm -= multiParms;
}
break;
case ROWAGG_MIN:
case ROWAGG_MAX:
case ROWAGG_SUM:
case ROWAGG_AVG:
case ROWAGG_COUNT_ASTERISK:
case ROWAGG_COUNT_COL_NAME:
case ROWAGG_STATS:
case ROWAGG_BIT_AND:
case ROWAGG_BIT_OR:
case ROWAGG_BIT_XOR:
case ROWAGG_CONSTANT:
default:
// not a direct hit -- a returned column is not already in the RG from PMs
else
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
bool returnColMissing = true;
if (it != aggFuncMap.end())
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
{
colUm = it->second;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(keysAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]);
precisionAggDist.push_back(precisionAggUm[colUm]);
typeAggDist.push_back(typeAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
colUm -= multiParms;
}
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
// not a direct hit -- a returned column is not already in the RG from PMs
else
{
bool returnColMissing = true;
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
if (it != aggFuncMap.end())
{
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc));
// false alarm
returnColMissing = false;
if (it != aggFuncMap.end())
colUm = it->second - multiParms;
if (aggOp == ROWAGG_SUM)
{
// false alarm
returnColMissing = false;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(scaleAggUm[colUm] >> 8);
precisionAggDist.push_back(precisionAggUm[colUm]);
typeAggDist.push_back(typeAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
}
else
{
// leave the count() to avg
aggOp = ROWAGG_COUNT_NO_OP;
colUm = it->second;
if (aggOp == ROWAGG_SUM)
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
if (isUnsigned(typeAggUm[colUm]))
{
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(scaleAggUm[colUm] >> 8);
precisionAggDist.push_back(precisionAggUm[colUm]);
typeAggDist.push_back(typeAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
precisionAggDist.push_back(20);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
}
else
{
// leave the count() to avg
aggOp = ROWAGG_COUNT_NO_OP;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
if (isUnsigned(typeAggUm[colUm]))
{
precisionAggDist.push_back(20);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
}
else
{
precisionAggDist.push_back(19);
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
}
widthAggDist.push_back(bigIntWidth);
precisionAggDist.push_back(19);
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
}
widthAggDist.push_back(bigIntWidth);
}
}
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(),
retKey) != jobInfo.expressionVec.end())
{
// a function on aggregation
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
widthAggDist.push_back(ti.width);
}
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(),
retKey) != jobInfo.expressionVec.end())
{
// a function on aggregation
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
{
// a window function
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
{
// a window function
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
else if (aggOp == ROWAGG_CONSTANT)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
else if (aggOp == ROWAGG_CONSTANT)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
returnColMissing = false;
}
if (returnColMissing)
{
Message::Args args;
args.add(keyName(outIdx, retKey, jobInfo));
string emsg = IDBErrorInfo::instance()->
errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid="
<< (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias="
<< jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view="
<< jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function="
<< (int) aggOp << endl;
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
}
} //else
} // switch
}
if (returnColMissing)
{
Message::Args args;
args.add(keyName(outIdx, retKey, jobInfo));
string emsg = IDBErrorInfo::instance()->
errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid="
<< (int) jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias="
<< jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view="
<< jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function="
<< (int) aggOp << endl;
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
}
} //else not a direct hit
} // else not a DISTINCT
// update groupby vector if the groupby column is a returned column
if (returnedColVec[i].second == 0)
@ -4757,7 +4656,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, outIdx));
@ -4801,7 +4699,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
}
++outIdx;
} // for (i
@ -5044,7 +4941,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
++multiParms;
continue;
}
if (returnedColVec[k].first != distinctColKey)
continue;
@ -5066,7 +4962,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
f->fStatsFunction,
groupBySub.size() - 1,
f->fOutputColumnIndex,
f->fAuxColumnIndex - multiParms));
f->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
}
@ -5092,7 +4988,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
++multiParms;
continue;
}
// search non-distinct functions in functionVec
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
@ -5110,7 +5005,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
udafFuncCol->fUDAFContext,
udafFuncCol->fInputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex - multiParms));
udafFuncCol->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK ||
@ -5131,7 +5026,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
f->fStatsFunction,
f->fInputColumnIndex,
f->fOutputColumnIndex,
f->fAuxColumnIndex - multiParms));
f->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
}

View File

@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
########### next target ###############
set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp avg_mode.cpp regr_avgx.cpp avgx.cpp)
set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp median.cpp avg_mode.cpp avgx.cpp)
add_definitions(-DMYSQL_DYNAMIC_PLUGIN)

View File

@ -31,14 +31,21 @@ using namespace mcsv1sdk;
* This is a temporary kludge until we get the library loader
* task complete
*/
UDAF_MAP UDAFMap::fm;
#include "allnull.h"
#include "ssq.h"
#include "median.h"
#include "avg_mode.h"
#include "regr_avgx.h"
#include "avgx.h"
UDAF_MAP& UDAFMap::fm()
{
static UDAF_MAP* m = new UDAF_MAP;
return *m;
}
UDAF_MAP& UDAFMap::getMap()
{
UDAF_MAP& fm = UDAFMap::fm();
if (fm.size() > 0)
{
return fm;
@ -51,8 +58,8 @@ UDAF_MAP& UDAFMap::getMap()
// the function names passed to the interface is always in lower case.
fm["allnull"] = new allnull();
fm["ssq"] = new ssq();
fm["median"] = new median();
fm["avg_mode"] = new avg_mode();
fm["regr_avgx"] = new regr_avgx();
fm["avgx"] = new avgx();
return fm;

View File

@ -108,7 +108,7 @@ public:
static EXPORT UDAF_MAP& getMap();
private:
static UDAF_MAP fm;
static UDAF_MAP& fm();
};
/**

View File

@ -349,6 +349,78 @@ extern "C"
return data->sumsq;
}
//=======================================================================
/**
* MEDIAN connector stub
*/
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool median_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 1)
{
strcpy(message, "median() requires one argument");
return 1;
}
/*
if (!(data = (struct ssq_data*) malloc(sizeof(struct ssq_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
}
data->sumsq = 0;
initid->ptr = (char*)data;
*/
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void median_deinit(UDF_INIT* initid)
{
// free(initid->ptr);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
median_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
{
// struct ssq_data* data = (struct ssq_data*)initid->ptr;
// data->sumsq = 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
median_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
{
// struct ssq_data* data = (struct ssq_data*)initid->ptr;
// double val = cvtArgToDouble(args->arg_type[0], args->args[0]);
// data->sumsq = val*val;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long median(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
// struct ssq_data* data = (struct ssq_data*)initid->ptr;
// return data->sumsq;
return 0;
}
/**
* avg_mode connector stub
*/
@ -419,88 +491,6 @@ extern "C"
return 0;
}
//=======================================================================
/**
* regr_avgx connector stub
*/
struct regr_avgx_data
{
double sumx;
int64_t cnt;
};
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool regr_avgx_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct regr_avgx_data* data;
if (args->arg_count != 2)
{
strcpy(message, "regr_avgx() requires two arguments");
return 1;
}
if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data))))
{
strmov(message, "Couldn't allocate memory");
return 1;
}
data->sumx = 0;
data->cnt = 0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void regr_avgx_deinit(UDF_INIT* initid)
{
free(initid->ptr);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_avgx_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
{
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
data->sumx = 0;
data->cnt = 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_avgx_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
{
// TODO test for NULL in x and y
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
double xval = cvtArgToDouble(args->arg_type[1], args->args[0]);
++data->cnt;
data->sumx += xval;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
long long regr_avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
return data->sumx / data->cnt;
}
//=======================================================================
/**
@ -510,79 +500,77 @@ extern "C"
*/
struct avgx_data
{
double sumx;
int64_t cnt;
double sumx;
int64_t cnt;
};
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
my_bool avgx_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct avgx_data* data;
struct avgx_data* data;
if (args->arg_count != 1)
{
strcpy(message,"avgx() requires one argument");
return 1;
}
if (args->arg_count != 1)
{
strcpy(message, "avgx() requires one argument");
return 1;
}
if (!(data = (struct avgx_data*) malloc(sizeof(struct avgx_data))))
{
strmov(message, "Couldn't allocate memory");
return 1;
}
data->sumx = 0;
if (!(data = (struct avgx_data*) malloc(sizeof(struct avgx_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
}
data->sumx = 0;
data->cnt = 0;
initid->ptr = (char*)data;
return 0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void avgx_deinit(UDF_INIT* initid)
{
free(initid->ptr);
free(initid->ptr);
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void
avgx_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
char* message __attribute__((unused)))
{
struct avgx_data* data = (struct avgx_data*)initid->ptr;
data->sumx = 0;
struct avgx_data* data = (struct avgx_data*)initid->ptr;
data->sumx = 0;
data->cnt = 0;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void
avgx_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
char* is_null,
char* message __attribute__((unused)))
{
// TODO test for NULL in x and y
struct avgx_data* data = (struct avgx_data*)initid->ptr;
double xval = cvtArgToDouble(args->arg_type[1], args->args[0]);
struct avgx_data* data = (struct avgx_data*)initid->ptr;
double xval = cvtArgToDouble(args->arg_type[1], args->args[0]);
++data->cnt;
data->sumx += xval;
data->sumx += xval;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
long long avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
char* is_null, char* error __attribute__((unused)))
{
struct avgx_data* data = (struct avgx_data*)initid->ptr;
return data->sumx / data->cnt;
struct avgx_data* data = (struct avgx_data*)initid->ptr;
return data->sumx / data->cnt;
}
}
// vim:ts=4 sw=4:

View File

@ -207,7 +207,6 @@
<F N="avgx.cpp"/>
<F N="mcsv1_udaf.cpp"/>
<F N="median.cpp"/>
<F N="regr_avgx.cpp"/>
<F N="ssq.cpp"/>
<F N="udfmysql.cpp"/>
<F N="udfsdk.cpp"/>
@ -220,7 +219,6 @@
<F N="avgx.h"/>
<F N="mcsv1_udaf.h"/>
<F N="median.h"/>
<F N="regr_avgx.h"/>
<F N="ssq.h"/>
<F N="udfsdk.h"/>
</Folder>