1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

MCOL-1201 some fixes from testing

This commit is contained in:
David Hall
2018-05-14 17:28:24 -05:00
parent c67ac7699e
commit 06e9772310
4 changed files with 106 additions and 130 deletions

View File

@@ -852,7 +852,6 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
if (ac->aggOp() == ROWAGG_UDAF) if (ac->aggOp() == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac); UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
if (udafc) if (udafc)
{ {
constAggDataVec.push_back( constAggDataVec.push_back(
@@ -1097,8 +1096,9 @@ void TupleAggregateStep::prep1PhaseAggregate(
vector<SP_ROWAGG_FUNC_t> functionVec; vector<SP_ROWAGG_FUNC_t> functionVec;
uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigIntWidth = sizeof(int64_t);
uint32_t bigUintWidth = sizeof(uint64_t); uint32_t bigUintWidth = sizeof(uint64_t);
// For UDAF
uint32_t projColsUDAFIndex = 0; uint32_t projColsUDAFIndex = 0;
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
// for count column of average function // for count column of average function
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap; map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
@@ -1287,12 +1287,10 @@ void TupleAggregateStep::prep1PhaseAggregate(
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++) for (; it != jobInfo.projectionCols.end(); it++)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get()); udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++; projColsUDAFIndex++;
if (udafc) if (udafc)
{ {
pUDAFFunc = udafc->getContext().getFunction(); pUDAFFunc = udafc->getContext().getFunction();
@@ -1300,12 +1298,10 @@ void TupleAggregateStep::prep1PhaseAggregate(
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i)); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i));
break; break;
} }
} }
if (it == jobInfo.projectionCols.end()) if (it == jobInfo.projectionCols.end())
{ {
throw logic_error("(1)prep1PhaseAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); throw logic_error("(1)prep1PhaseAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
} }
} }
else else
@@ -1474,8 +1470,6 @@ void TupleAggregateStep::prep1PhaseAggregate(
throw logic_error("(2)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); throw logic_error("(2)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
} }
pUDAFFunc = udafFuncCol->fUDAFContext.getFunction();
// Return column // Return column
oidsAgg.push_back(oidsProj[colProj]); oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key); keysAgg.push_back(key);
@@ -1677,8 +1671,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigIntWidth = sizeof(int64_t);
// map key = column key, operation (enum), and UDAF pointer if UDAF. // map key = column key, operation (enum), and UDAF pointer if UDAF.
AGG_MAP aggFuncMap; AGG_MAP aggFuncMap;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
set<uint32_t> avgSet; set<uint32_t> avgSet;
// fOR udaf
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
uint32_t projColsUDAFIndex = 0; uint32_t projColsUDAFIndex = 0;
// for count column of average function // for count column of average function
@@ -1847,7 +1844,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
for (; it != jobInfo.projectionCols.end(); it++) for (; it != jobInfo.projectionCols.end(); it++)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get()); udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++; projColsUDAFIndex++;
if (udafc) if (udafc)
@@ -1857,12 +1854,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg)); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
break; break;
} }
} }
if (it == jobInfo.projectionCols.end()) if (it == jobInfo.projectionCols.end())
{ {
throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
} }
} }
else else
@@ -2142,6 +2137,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// locate the return column position in aggregated rowgroup // locate the return column position in aggregated rowgroup
for (uint64_t i = 0; i < returnedColVec.size(); i++) for (uint64_t i = 0; i < returnedColVec.size(); i++)
{ {
udafc = NULL;
pUDAFFunc = NULL; pUDAFFunc = NULL;
uint32_t retKey = returnedColVec[i].first; uint32_t retKey = returnedColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
@@ -2150,10 +2146,21 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get()); std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc) if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction(); pUDAFFunc = udafc->getContext().getFunction();
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
}
} }
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
@@ -2473,26 +2480,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, i)); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, i));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
} }
else else
{ {
@@ -2904,7 +2892,10 @@ void TupleAggregateStep::prep2PhasesAggregate(
vector<pair<uint32_t, int> > aggColVec; vector<pair<uint32_t, int> > aggColVec;
set<uint32_t> avgSet; set<uint32_t> avgSet;
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec; vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
// For UDAF
uint32_t projColsUDAFIndex = 0; uint32_t projColsUDAFIndex = 0;
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
for (uint64_t i = 0; i < returnedColVec.size(); i++) for (uint64_t i = 0; i < returnedColVec.size(); i++)
{ {
@@ -2947,7 +2938,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigIntWidth = sizeof(int64_t);
uint32_t bigUintWidth = sizeof(uint64_t); uint32_t bigUintWidth = sizeof(uint64_t);
AGG_MAP aggFuncMap; AGG_MAP aggFuncMap;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
// associate the columns between projected RG and aggregate RG on PM // associate the columns between projected RG and aggregate RG on PM
// populated the aggregate columns // populated the aggregate columns
@@ -3084,12 +3074,10 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++) for (; it != jobInfo.projectionCols.end(); it++)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get()); udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++; projColsUDAFIndex++;
if (udafc) if (udafc)
{ {
pUDAFFunc = udafc->getContext().getFunction(); pUDAFFunc = udafc->getContext().getFunction();
@@ -3098,10 +3086,9 @@ void TupleAggregateStep::prep2PhasesAggregate(
break; break;
} }
} }
if (it == jobInfo.projectionCols.end()) if (it == jobInfo.projectionCols.end())
{ {
throw logic_error("(1)prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); throw logic_error("(1)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
} }
} }
else else
@@ -3350,10 +3337,6 @@ void TupleAggregateStep::prep2PhasesAggregate(
// add back sum or count(column name) if omitted due to avg column // add back sum or count(column name) if omitted due to avg column
// put count(column name) column to the end, if it is for avg only // put count(column name) column to the end, if it is for avg only
{ {
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
// check if the count column for AVG is also a returned column, // check if the count column for AVG is also a returned column,
// if so, replace the "-1" to actual position in returned vec. // if so, replace the "-1" to actual position in returned vec.
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap; map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
@@ -3369,6 +3352,8 @@ void TupleAggregateStep::prep2PhasesAggregate(
} }
// locate the return column position in aggregated rowgroup from PM // locate the return column position in aggregated rowgroup from PM
// outIdx is i without the multi-columns,
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++) for (uint64_t i = 0; i < returnedColVec.size(); i++)
{ {
uint32_t retKey = returnedColVec[i].first; uint32_t retKey = returnedColVec[i].first;
@@ -3379,19 +3364,30 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (aggOp == ROWAGG_MULTI_PARM) if (aggOp == ROWAGG_MULTI_PARM)
{ {
// Skip on UM: Extra parms for an aggregate have no work on the UM // Skip on UM: Extra parms for an aggregate have no work on the UM
++multiParms;
continue; continue;
} }
// Is this a UDAF? use the function as part of the key. // Is this a UDAF? use the function as part of the key.
pUDAFFunc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; udafc = NULL;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get()); std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc) if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction(); pUDAFFunc = udafc->getContext().getFunction();
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
}
} }
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc)); AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(retKey, aggOp, pUDAFFunc));
@@ -3492,7 +3488,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (returnColMissing) if (returnColMissing)
{ {
Message::Args args; Message::Args args;
args.add(keyName(i, retKey, jobInfo)); args.add(keyName(outIdx, retKey, jobInfo));
string emsg = IDBErrorInfo::instance()-> string emsg = IDBErrorInfo::instance()->
errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep2PhasesAggregate: " << emsg << " oid=" cerr << "prep2PhasesAggregate: " << emsg << " oid="
@@ -3514,7 +3510,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (jobInfo.groupByColVec[j] == retKey) if (jobInfo.groupByColVec[j] == retKey)
{ {
if (groupByUm[j]->fOutputColumnIndex == (uint32_t) - 1) if (groupByUm[j]->fOutputColumnIndex == (uint32_t) - 1)
groupByUm[j]->fOutputColumnIndex = i; groupByUm[j]->fOutputColumnIndex = outIdx;
else else
dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex; dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
} }
@@ -3525,7 +3521,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (jobInfo.distinctColVec[j] == retKey) if (jobInfo.distinctColVec[j] == retKey)
{ {
if (groupByUm[j]->fOutputColumnIndex == (uint32_t) - 1) if (groupByUm[j]->fOutputColumnIndex == (uint32_t) - 1)
groupByUm[j]->fOutputColumnIndex = i; groupByUm[j]->fOutputColumnIndex = outIdx;
else else
dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex; dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
} }
@@ -3534,7 +3530,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
// a duplicate group by column // a duplicate group by column
if (dupGroupbyIndex != -1) if (dupGroupbyIndex != -1)
functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol( functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, i, dupGroupbyIndex))); ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
} }
else else
{ {
@@ -3542,30 +3538,11 @@ void TupleAggregateStep::prep2PhasesAggregate(
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, outIdx));
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, i-multiParms));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
} }
else else
{ {
funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, i-multiParms)); funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, outIdx));
} }
if (aggOp == ROWAGG_COUNT_NO_OP) if (aggOp == ROWAGG_COUNT_NO_OP)
@@ -3600,6 +3577,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (returnedColVec[i].second == AggregateColumn::AVG) if (returnedColVec[i].second == AggregateColumn::AVG)
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct)); avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
} }
++outIdx;
} }
// now fix the AVG function, locate the count(column) position // now fix the AVG function, locate the count(column) position
@@ -3617,7 +3595,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
} }
// there is avg(k), but no count(k) in the select list // there is avg(k), but no count(k) in the select list
uint64_t lastCol = returnedColVec.size() - multiParms; uint64_t lastCol = outIdx;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++) for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{ {
@@ -3724,7 +3702,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
vector<pair<uint32_t, int> > aggColVec, aggNoDistColVec; vector<pair<uint32_t, int> > aggColVec, aggNoDistColVec;
set<uint32_t> avgSet, avgDistSet; set<uint32_t> avgSet, avgDistSet;
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec; vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
// For UDAF
uint32_t projColsUDAFIndex = 0; uint32_t projColsUDAFIndex = 0;
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
for (uint64_t i = 0; i < returnedColVec.size(); i++) for (uint64_t i = 0; i < returnedColVec.size(); i++)
{ {
@@ -3796,7 +3777,6 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigIntWidth = sizeof(int64_t);
map<pair<uint32_t, int>, uint64_t> avgFuncDistMap; map<pair<uint32_t, int>, uint64_t> avgFuncDistMap;
AGG_MAP aggFuncMap; AGG_MAP aggFuncMap;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
// associate the columns between projected RG and aggregate RG on PM // associate the columns between projected RG and aggregate RG on PM
// populated the aggregate columns // populated the aggregate columns
@@ -3940,12 +3920,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++) for (; it != jobInfo.projectionCols.end(); it++)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get()); udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++; projColsUDAFIndex++;
if (udafc) if (udafc)
{ {
pUDAFFunc = udafc->getContext().getFunction(); pUDAFFunc = udafc->getContext().getFunction();
@@ -3954,10 +3932,9 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
break; break;
} }
} }
if (it == jobInfo.projectionCols.end()) if (it == jobInfo.projectionCols.end())
{ {
throw logic_error("(1)prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); throw logic_error("(1)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
} }
} }
else else
@@ -4201,32 +4178,33 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// associate the columns between the aggregate RGs on PM and UM without distinct aggregator // associate the columns between the aggregate RGs on PM and UM without distinct aggregator
// populated the returned columns // populated the returned columns
{ {
int64_t multiParms = 0;
for (uint32_t idx = 0; idx < groupByPm.size(); idx++) for (uint32_t idx = 0; idx < groupByPm.size(); idx++)
{ {
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(idx, idx)); SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(idx, idx));
groupByUm.push_back(groupby); groupByUm.push_back(groupby);
} }
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
for (uint32_t idx = 0; idx < functionVecPm.size(); idx++) for (uint32_t idx = 0; idx < functionVecPm.size(); idx++)
{ {
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx]; SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx];
// UDAF support
if (funcPm->fAggFunction == ROWAGG_MULTI_PARM) if (funcPm->fAggFunction == ROWAGG_MULTI_PARM)
{ {
// Multi-Parm is not used on the UM // Skip on UM: Extra parms for an aggregate have no work on the UM
++multiParms; ++multiParms;
continue; continue;
} }
if (funcPm->fAggFunction == ROWAGG_UDAF) if (funcPm->fAggFunction == ROWAGG_UDAF)
{ {
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get()); RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get());
if (!udafFuncCol)
{
throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
funct.reset(new RowUDAFFunctionCol( funct.reset(new RowUDAFFunctionCol(
udafFuncCol->fUDAFContext, udafFuncCol->fUDAFContext,
udafFuncCol->fOutputColumnIndex, udafFuncCol->fOutputColumnIndex,
@@ -4273,6 +4251,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// These will be skipped and the count needs to be subtracted // These will be skipped and the count needs to be subtracted
// from where the aux column will be. // from where the aux column will be.
int64_t multiParms = 0; int64_t multiParms = 0;
projColsUDAFIndex = 0;
// check if the count column for AVG is also a returned column, // check if the count column for AVG is also a returned column,
// if so, replace the "-1" to actual position in returned vec. // if so, replace the "-1" to actual position in returned vec.
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap; map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
@@ -4286,9 +4265,12 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
} }
// locate the return column position in aggregated rowgroup from PM // locate the return column position in aggregated rowgroup from PM
// outIdx is i without the multi-columns,
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++) for (uint64_t i = 0; i < returnedColVec.size(); i++)
{ {
pUDAFFunc = NULL; pUDAFFunc = NULL;
udafc = NULL;
uint32_t retKey = returnedColVec[i].first; uint32_t retKey = returnedColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
@@ -4304,10 +4286,21 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get()); std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc) if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction(); pUDAFFunc = udafc->getContext().getFunction();
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(4)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
}
} }
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
@@ -4436,7 +4429,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (it != aggFuncMap.end()) if (it != aggFuncMap.end())
{ {
colUm = it->second - multiParms; colUm = it->second;
oidsAggDist.push_back(oidsAggUm[colUm]); oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(keysAggUm[colUm]); keysAggDist.push_back(keysAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]); scaleAggDist.push_back(scaleAggUm[colUm]);
@@ -4460,7 +4453,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// false alarm // false alarm
returnColMissing = false; returnColMissing = false;
colUm = it->second - multiParms; colUm = it->second;
if (aggOp == ROWAGG_SUM) if (aggOp == ROWAGG_SUM)
{ {
@@ -4528,7 +4521,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (returnColMissing) if (returnColMissing)
{ {
Message::Args args; Message::Args args;
args.add(keyName(i, retKey, jobInfo)); args.add(keyName(outIdx, retKey, jobInfo));
string emsg = IDBErrorInfo::instance()-> string emsg = IDBErrorInfo::instance()->
errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid=" cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid="
@@ -4552,7 +4545,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (jobInfo.groupByColVec[j] == retKey) if (jobInfo.groupByColVec[j] == retKey)
{ {
if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t) - 1) if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t) - 1)
groupByNoDist[j]->fOutputColumnIndex = i; groupByNoDist[j]->fOutputColumnIndex = outIdx;
else else
dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex; dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex;
} }
@@ -4561,7 +4554,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// a duplicate group by column // a duplicate group by column
if (dupGroupbyIndex != -1) if (dupGroupbyIndex != -1)
functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol( functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, i, dupGroupbyIndex))); ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
} }
else else
{ {
@@ -4569,30 +4562,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, outIdx));
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, i-multiParms));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
} }
else else
{ {
funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, i-multiParms)); funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, outIdx));
} }
if (aggOp == ROWAGG_COUNT_NO_OP) if (aggOp == ROWAGG_COUNT_NO_OP)
@@ -4629,6 +4603,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG) else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct)); avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
} }
++outIdx;
} // for (i } // for (i
// now fix the AVG function, locate the count(column) position // now fix the AVG function, locate the count(column) position
@@ -4646,7 +4621,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
} }
// there is avg(k), but no count(k) in the select list // there is avg(k), but no count(k) in the select list
uint64_t lastCol = returnedColVec.size() - multiParms; uint64_t lastCol = outIdx;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++) for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{ {
@@ -4706,7 +4681,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (!udafFuncCol) if (!udafFuncCol)
{ {
throw logic_error("(4)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); throw logic_error("(5)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
} }
functionVecUm[i]->fAuxColumnIndex = lastCol++; functionVecUm[i]->fAuxColumnIndex = lastCol++;

View File

@@ -4573,7 +4573,6 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
udafc->resultType(ct); udafc->resultType(ct);
} }
} }
} }
catch (std::logic_error e) catch (std::logic_error e)
{ {

View File

@@ -200,6 +200,7 @@
<Folder <Folder
Name="Source Files" Name="Source Files"
Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d"> Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d">
<F N="crashtrace.cpp"/>
<F N="fixedallocator.cpp"/> <F N="fixedallocator.cpp"/>
<F N="MonitorProcMem.cpp"/> <F N="MonitorProcMem.cpp"/>
<F N="poolallocator.cpp"/> <F N="poolallocator.cpp"/>
@@ -208,6 +209,7 @@
Name="Header Files" Name="Header Files"
Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if"> Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if">
<F N="any.hpp"/> <F N="any.hpp"/>
<F N="crashtrace.h"/>
<F N="fixedallocator.h"/> <F N="fixedallocator.h"/>
<F N="hasher.h"/> <F N="hasher.h"/>
<F N="MonitorProcMem.h"/> <F N="MonitorProcMem.h"/>

View File

@@ -2015,13 +2015,13 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx) RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx)
{ {
int32_t paramCount = fRGContext.getParameterCount(); uint32_t paramCount = fRGContext.getParameterCount();
// The vector of parameters to be sent to the UDAF // The vector of parameters to be sent to the UDAF
mcsv1sdk::ColumnDatum valsIn[paramCount]; mcsv1sdk::ColumnDatum valsIn[paramCount];
uint32_t dataFlags[paramCount]; uint32_t dataFlags[paramCount];
execplan::CalpontSystemCatalog::ColDataType colDataType; execplan::CalpontSystemCatalog::ColDataType colDataType;
for (uint32_t i = 0; i < fRGContext.getParameterCount(); ++i) for (uint32_t i = 0; i < paramCount; ++i)
{ {
mcsv1sdk::ColumnDatum& datum = valsIn[i]; mcsv1sdk::ColumnDatum& datum = valsIn[i];
// Turn on NULL flags // Turn on NULL flags