1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

MCOL-1348 Add multiply UDAF calls support.

This commit is contained in:
Roman Nozdrin
2018-04-24 15:14:08 +03:00
parent 3e8fed9170
commit 51715c76ee

View File

@@ -958,6 +958,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
vector<SP_ROWAGG_FUNC_t> functionVec; vector<SP_ROWAGG_FUNC_t> functionVec;
uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigIntWidth = sizeof(int64_t);
uint32_t bigUintWidth = sizeof(uint64_t); uint32_t bigUintWidth = sizeof(uint64_t);
uint32_t projColsUDAFIndex = 0;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
// for count column of average function // for count column of average function
@@ -1135,18 +1136,27 @@ void TupleAggregateStep::prep1PhaseAggregate(
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get()); std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
if (udafc) for (; it != jobInfo.projectionCols.end(); it++)
{ {
// Create a RowAggFunctionCol (UDAF subtype) with the context. UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i)); projColsUDAFIndex++;
} if (udafc)
else {
{ pUDAFFunc = udafc->getContext().getFunction();
throw logic_error("prep1PhasesAggregate: A UDAF function is called but there's no UDAFColumn"); // Create a RowAggFunctionCol (UDAF subtype) with the context.
} funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i));
} break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else else
{ {
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, i)); funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, i));
@@ -1484,6 +1494,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
AGG_MAP aggFuncMap; AGG_MAP aggFuncMap;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
set<uint32_t> avgSet; set<uint32_t> avgSet;
uint32_t projColsUDAFIndex = 0;
// for count column of average function // for count column of average function
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap; map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
@@ -1636,19 +1647,27 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get()); std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
if (udafc) for (; it != jobInfo.projectionCols.end(); it++)
{ {
pUDAFFunc = udafc->getContext().getFunction(); UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
// Create a RowAggFunctionCol (UDAF subtype) with the context. projColsUDAFIndex++;
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg)); if (udafc)
} {
else pUDAFFunc = udafc->getContext().getFunction();
{ // Create a RowAggFunctionCol (UDAF subtype) with the context.
throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no UDAFColumn"); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
} break;
} }
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else else
{ {
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg)); funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
@@ -2579,6 +2598,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
vector<pair<uint32_t, int> > aggColVec; vector<pair<uint32_t, int> > aggColVec;
set<uint32_t> avgSet; set<uint32_t> avgSet;
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec; vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
uint32_t projColsUDAFIndex = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++) for (uint64_t i = 0; i < returnedColVec.size(); i++)
{ {
// skip if not an aggregation column // skip if not an aggregation column
@@ -2746,18 +2766,26 @@ void TupleAggregateStep::prep2PhasesAggregate(
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get()); std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
if (udafc) for (; it != jobInfo.projectionCols.end(); it++)
{ {
pUDAFFunc = udafc->getContext().getFunction(); UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
// Create a RowAggFunctionCol (UDAF subtype) with the context. projColsUDAFIndex++;
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); if (udafc)
} {
else pUDAFFunc = udafc->getContext().getFunction();
{ // Create a RowAggFunctionCol (UDAF subtype) with the context.
throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no UDAFColumn"); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
} break;
} }
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else else
{ {
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
@@ -3301,6 +3329,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
vector<pair<uint32_t, int> > aggColVec, aggNoDistColVec; vector<pair<uint32_t, int> > aggColVec, aggNoDistColVec;
set<uint32_t> avgSet, avgDistSet; set<uint32_t> avgSet, avgDistSet;
vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec; vector<std::pair<uint32_t, int> >& returnedColVec = jobInfo.returnedColVec;
uint32_t projColsUDAFIndex = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++) for (uint64_t i = 0; i < returnedColVec.size(); i++)
{ {
// col should be an aggregate or groupBy or window function // col should be an aggregate or groupBy or window function
@@ -3498,18 +3527,25 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF) if (aggOp == ROWAGG_UDAF)
{ {
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get()); std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
if (udafc) for (; it != jobInfo.projectionCols.end(); it++)
{ {
pUDAFFunc = udafc->getContext().getFunction(); UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
// Create a RowAggFunctionCol (UDAF subtype) with the context. projColsUDAFIndex++;
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); if (udafc)
} {
else pUDAFFunc = udafc->getContext().getFunction();
{ // Create a RowAggFunctionCol (UDAF subtype) with the context.
throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no UDAFColumn"); funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
} break;
} }
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else else
{ {
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));