You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-08 14:22:09 +03:00
MCOL-1201 Add support for UDAF multiple parm constants
This commit is contained in:
@@ -38,6 +38,8 @@ class ByteStream;
|
||||
*/
|
||||
namespace execplan
|
||||
{
|
||||
class ConstantColumn;
|
||||
|
||||
/**
|
||||
* @brief A class to represent a constant return column
|
||||
*
|
||||
|
@@ -405,7 +405,7 @@ uint32_t getTupleKey(JobInfo& jobInfo, const SRCP& srcp, bool add)
|
||||
|
||||
if (add)
|
||||
{
|
||||
// setTupleInfo first if add is ture, ok if already set.
|
||||
// setTupleInfo first if add is true, ok if already set.
|
||||
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(srcp.get());
|
||||
|
||||
if (sc != NULL)
|
||||
|
@@ -300,6 +300,7 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo)
|
||||
{
|
||||
const ArithmeticColumn* ac = NULL;
|
||||
const FunctionColumn* fc = NULL;
|
||||
const ConstantColumn* cc = NULL;
|
||||
uint64_t eid = -1;
|
||||
CalpontSystemCatalog::ColType ct;
|
||||
ExpressionStep* es = new ExpressionStep(jobInfo);
|
||||
@@ -316,6 +317,11 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo)
|
||||
eid = fc->expressionId();
|
||||
ct = fc->resultType();
|
||||
}
|
||||
else if ((cc = dynamic_cast<const ConstantColumn*>(retCols[i].get())) != NULL)
|
||||
{
|
||||
eid = cc->expressionId();
|
||||
ct = cc->resultType();
|
||||
}
|
||||
else
|
||||
{
|
||||
std::ostringstream errmsg;
|
||||
@@ -1004,7 +1010,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
|
||||
for (uint32_t parm = 0; parm < aggParms.size(); ++parm)
|
||||
{
|
||||
if (aggc->constCol().get() != NULL)
|
||||
// Only do the optimization of converting to count(*) if
|
||||
// there is only one parameter.
|
||||
if (aggParms.size() == 1 && aggc->constCol().get() != NULL)
|
||||
{
|
||||
// replace the aggregate on constant with a count(*)
|
||||
SRCP clone;
|
||||
|
@@ -1097,7 +1097,8 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
uint32_t bigIntWidth = sizeof(int64_t);
|
||||
uint32_t bigUintWidth = sizeof(uint64_t);
|
||||
// For UDAF
|
||||
uint32_t projColsUDAFIndex = 0;
|
||||
uint32_t projColsUDAFIdx = 0;
|
||||
uint32_t udafcParamIdx = 0;
|
||||
UDAFColumn* udafc = NULL;
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
// for count column of average function
|
||||
@@ -1286,11 +1287,11 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
||||
for (; it != jobInfo.projectionCols.end(); it++)
|
||||
{
|
||||
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
||||
projColsUDAFIndex++;
|
||||
projColsUDAFIdx++;
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
@@ -1477,6 +1478,14 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
|
||||
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
|
||||
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
||||
// If the first param is const
|
||||
udafcParamIdx = 0;
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1488,6 +1497,13 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
||||
precisionAgg.push_back(precisionProj[colProj]);
|
||||
typeAgg.push_back(typeProj[colProj]);
|
||||
widthAgg.push_back(width[colProj]);
|
||||
// If the param is const
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -1676,7 +1692,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
// fOR udaf
|
||||
UDAFColumn* udafc = NULL;
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
uint32_t projColsUDAFIndex = 0;
|
||||
uint32_t projColsUDAFIdx = 0;
|
||||
uint32_t udafcParamIdx = 0;
|
||||
|
||||
// for count column of average function
|
||||
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
|
||||
@@ -1840,12 +1857,12 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
||||
|
||||
for (; it != jobInfo.projectionCols.end(); it++)
|
||||
{
|
||||
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
||||
projColsUDAFIndex++;
|
||||
projColsUDAFIdx++;
|
||||
|
||||
if (udafc)
|
||||
{
|
||||
@@ -2071,6 +2088,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
||||
widthAgg.push_back(sizeof(uint64_t));
|
||||
funct->fAuxColumnIndex = colAgg++;
|
||||
// If the first param is const
|
||||
udafcParamIdx = 0;
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -2083,6 +2108,13 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
typeAgg.push_back(typeProj[colProj]);
|
||||
widthAgg.push_back(widthProj[colProj]);
|
||||
++colAgg;
|
||||
// If the param is const
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -2133,7 +2165,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc), i));
|
||||
}
|
||||
|
||||
projColsUDAFIndex = 0;
|
||||
projColsUDAFIdx = 0;
|
||||
// locate the return column position in aggregated rowgroup
|
||||
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
||||
{
|
||||
@@ -2146,11 +2178,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
||||
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
||||
for (; it != jobInfo.projectionCols.end(); it++)
|
||||
{
|
||||
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
||||
projColsUDAFIndex++;
|
||||
projColsUDAFIdx++;
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
@@ -2893,7 +2925,8 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
set<uint32_t> avgSet;
|
||||
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
|
||||
// For UDAF
|
||||
uint32_t projColsUDAFIndex = 0;
|
||||
uint32_t projColsUDAFIdx = 0;
|
||||
uint32_t udafcParamIdx = 0;
|
||||
UDAFColumn* udafc = NULL;
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
|
||||
@@ -3073,11 +3106,11 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
||||
for (; it != jobInfo.projectionCols.end(); it++)
|
||||
{
|
||||
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
||||
projColsUDAFIndex++;
|
||||
projColsUDAFIdx++;
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
@@ -3305,6 +3338,14 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
||||
widthAggPm.push_back(bigUintWidth);
|
||||
funct->fAuxColumnIndex = colAggPm++;
|
||||
// If the first param is const
|
||||
udafcParamIdx = 0;
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -3317,6 +3358,13 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
typeAggPm.push_back(typeProj[colProj]);
|
||||
widthAggPm.push_back(width[colProj]);
|
||||
colAggPm++;
|
||||
// If the param is const
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -3342,7 +3390,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
|
||||
AGG_MAP aggDupFuncMap;
|
||||
|
||||
projColsUDAFIndex = 0;
|
||||
projColsUDAFIdx = 0;
|
||||
// copy over the groupby vector
|
||||
// update the outputColumnIndex if returned
|
||||
for (uint64_t i = 0; i < groupByPm.size(); i++)
|
||||
@@ -3372,12 +3420,12 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
||||
udafc = NULL;
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
||||
|
||||
for (; it != jobInfo.projectionCols.end(); it++)
|
||||
{
|
||||
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
||||
projColsUDAFIndex++;
|
||||
projColsUDAFIdx++;
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
@@ -3703,7 +3751,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
set<uint32_t> avgSet, avgDistSet;
|
||||
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
|
||||
// For UDAF
|
||||
uint32_t projColsUDAFIndex = 0;
|
||||
uint32_t projColsUDAFIdx = 0;
|
||||
uint32_t udafcParamIdx = 0;
|
||||
UDAFColumn* udafc = NULL;
|
||||
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
||||
|
||||
@@ -3919,11 +3968,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
||||
for (; it != jobInfo.projectionCols.end(); it++)
|
||||
{
|
||||
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
||||
projColsUDAFIndex++;
|
||||
projColsUDAFIdx++;
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
@@ -4147,6 +4196,14 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
||||
widthAggPm.push_back(sizeof(uint64_t));
|
||||
funct->fAuxColumnIndex = colAggPm++;
|
||||
// If the first param is const
|
||||
udafcParamIdx = 0;
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -4160,6 +4217,13 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
widthAggPm.push_back(width[colProj]);
|
||||
multiParmIndexes.push_back(colAggPm);
|
||||
colAggPm++;
|
||||
// If the param is const
|
||||
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
||||
if (cc)
|
||||
{
|
||||
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
||||
}
|
||||
++udafcParamIdx;
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -4251,7 +4315,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
// These will be skipped and the count needs to be subtracted
|
||||
// from where the aux column will be.
|
||||
int64_t multiParms = 0;
|
||||
projColsUDAFIndex = 0;
|
||||
projColsUDAFIdx = 0;
|
||||
// check if the count column for AVG is also a returned column,
|
||||
// if so, replace the "-1" to actual position in returned vec.
|
||||
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
|
||||
@@ -4286,11 +4350,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
||||
|
||||
if (aggOp == ROWAGG_UDAF)
|
||||
{
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
|
||||
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
||||
for (; it != jobInfo.projectionCols.end(); it++)
|
||||
{
|
||||
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
||||
projColsUDAFIndex++;
|
||||
projColsUDAFIdx++;
|
||||
if (udafc)
|
||||
{
|
||||
pUDAFFunc = udafc->getContext().getFunction();
|
||||
|
@@ -569,6 +569,7 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
|
||||
|
||||
for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
|
||||
{
|
||||
bool isUDAF = false;
|
||||
// window function type
|
||||
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
|
||||
uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index
|
||||
@@ -590,6 +591,7 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
|
||||
// if (boost::iequals(wc->functionName(),"UDAF_FUNC")
|
||||
if (wc->functionName() == "UDAF_FUNC")
|
||||
{
|
||||
isUDAF = true;
|
||||
++wfsUserFunctionCount;
|
||||
}
|
||||
|
||||
@@ -646,10 +648,13 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
|
||||
// column type for functor templates
|
||||
int ct = 0;
|
||||
|
||||
if (isUDAF)
|
||||
{
|
||||
ct = wc->getUDAFContext().getResultType();
|
||||
}
|
||||
// make sure index is in range
|
||||
if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
|
||||
else if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
|
||||
ct = types[fields[1]];
|
||||
|
||||
// workaround for functions using "within group (order by)" syntax
|
||||
string fn = boost::to_upper_copy(wc->functionName());
|
||||
|
||||
|
@@ -4206,8 +4206,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
|
||||
// treat as count(*)
|
||||
if (ac->aggOp() == AggregateColumn::COUNT)
|
||||
ac->aggOp(AggregateColumn::COUNT_ASTERISK);
|
||||
|
||||
ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)));
|
||||
parm.reset(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError));
|
||||
ac->constCol(parm);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -4485,17 +4485,20 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
|
||||
// @bug5977 @note Temporary fix to avoid mysqld crash. The permanent fix will
|
||||
// be applied in ExeMgr. When the ExeMgr fix is available, this checking
|
||||
// will be taken out.
|
||||
if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty())
|
||||
if (isp->sum_func() != Item_sum::UDF_SUM_FUNC)
|
||||
{
|
||||
gwi.fatalParseError = true;
|
||||
gwi.parseErrorText = "No project column found for aggregate function";
|
||||
if (ac)
|
||||
delete ac;
|
||||
return NULL;
|
||||
}
|
||||
else if (ac->constCol())
|
||||
{
|
||||
gwi.count_asterisk_list.push_back(ac);
|
||||
if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty())
|
||||
{
|
||||
gwi.fatalParseError = true;
|
||||
gwi.parseErrorText = "No project column found for aggregate function";
|
||||
if (ac)
|
||||
delete ac;
|
||||
return NULL;
|
||||
}
|
||||
else if (ac->constCol())
|
||||
{
|
||||
gwi.count_asterisk_list.push_back(ac);
|
||||
}
|
||||
}
|
||||
|
||||
// For UDAF, populate the context and call the UDAF init() function.
|
||||
@@ -7903,8 +7906,15 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
|
||||
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
(*coliter)->aggParms().push_back(minSc);
|
||||
// Replace the last (presumably constant) object with minSc
|
||||
if ((*coliter)->aggParms().empty())
|
||||
{
|
||||
(*coliter)->aggParms().push_back(minSc);
|
||||
}
|
||||
else
|
||||
{
|
||||
(*coliter)->aggParms()[0] = minSc;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<FunctionColumn*>::iterator funciter;
|
||||
|
Reference in New Issue
Block a user