1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

MCOL-3492 Don't do DISTINCT as aggregate in the presence of Window Functions

This commit is contained in:
David Hall
2019-09-11 12:28:07 -05:00
parent 5c8ff4a1eb
commit eae773d122
2 changed files with 62 additions and 144 deletions

View File

@@ -604,7 +604,8 @@ void checkAggregation(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
jobInfo.hasDistinct = csep->distinct(); jobInfo.hasDistinct = csep->distinct();
if (csep->distinct() == true) // DISTINCT with window functions must be done in tupleannexstep
if (csep->distinct() == true && jobInfo.windowDels.size() == 0)
{ {
jobInfo.hasAggregation = true; jobInfo.hasAggregation = true;
} }
@@ -878,6 +879,10 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(srcp.get()); const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(srcp.get());
AggregateColumn* aggc = dynamic_cast<AggregateColumn*>(srcp.get()); AggregateColumn* aggc = dynamic_cast<AggregateColumn*>(srcp.get());
bool doDistinct = (csep->distinct() && csep->groupByCols().empty()); bool doDistinct = (csep->distinct() && csep->groupByCols().empty());
// Use this instead of the above line to mimic MariaDB's sql_mode = 'ONLY_FULL_GROUP_BY'
// bool doDistinct = (csep->distinct() &&
// csep->groupByCols().empty() &&
// !jobInfo.hasAggregation);
uint32_t tupleKey = -1; uint32_t tupleKey = -1;
string alias; string alias;
string view; string view;
@@ -1126,9 +1131,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
// remember the columns to be returned // remember the columns to be returned
jobInfo.returnedColVec.push_back(make_pair(tupleKey, op)); jobInfo.returnedColVec.push_back(make_pair(tupleKey, op));
// bug 1499 distinct processing, save unique distinct columns // bug 1499 distinct processing, save unique distinct columns that aren't Window columns
if (doDistinct && if (doDistinct
(jobInfo.distinctColVec.end() == && (jobInfo.distinctColVec.end() ==
find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey))) find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey)))
{ {
jobInfo.distinctColVec.push_back(tupleKey); jobInfo.distinctColVec.push_back(tupleKey);
@@ -1279,13 +1284,13 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
// remember the columns to be returned // remember the columns to be returned
jobInfo.returnedColVec.push_back(make_pair(tupleKey, op)); jobInfo.returnedColVec.push_back(make_pair(tupleKey, op));
// bug 1499 distinct processing, save unique distinct columns // bug 1499 distinct processing, save unique distinct columns that aren't Window columns
if (doDistinct && if (doDistinct
(jobInfo.distinctColVec.end() == && (jobInfo.distinctColVec.end() ==
find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey))) find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey)))
{ {
jobInfo.distinctColVec.push_back(tupleKey); jobInfo.distinctColVec.push_back(tupleKey);
} }
} }
} }

View File

@@ -837,6 +837,7 @@ const string TupleAggregateStep::toString() const
SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo) SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
{ {
SJSTEP spjs; SJSTEP spjs;
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(step.get());
TupleBPS* tbps = dynamic_cast<TupleBPS*>(step.get()); TupleBPS* tbps = dynamic_cast<TupleBPS*>(step.get());
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(step.get()); TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(step.get());
SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(step.get()); SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(step.get());
@@ -914,171 +915,83 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
// preprocess the columns used by group_concat // preprocess the columns used by group_concat
jobInfo.groupConcatInfo.prepGroupConcat(jobInfo); jobInfo.groupConcatInfo.prepGroupConcat(jobInfo);
bool doGroupConcat = false; bool doUMOnly = jobInfo.groupConcatInfo.columns().size() > 0
// || jobInfo.windowSet.size() > 0
|| sas
|| ces;
rgs.push_back(tds->getDeliveredRowGroup());
// get rowgroup and aggregator
// For TupleHashJoin, we prepare for both PM and UM only aggregation
if (doUMOnly || thjs)
{
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// TODO: fix this
if (doUMOnly)
rgs.push_back(rgs[0]);
}
if (!doUMOnly)
{
if (distinctAgg == true)
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
else
prep2PhasesAggregate(jobInfo, rgs, aggs);
}
if (tbps != NULL) if (tbps != NULL)
{ {
// get rowgroup and aggregator
rgs.push_back(tbps->getDeliveredRowGroup());
if (jobInfo.groupConcatInfo.columns().size() == 0)
{
if (distinctAgg == true)
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
else
prep2PhasesAggregate(jobInfo, rgs, aggs);
}
else
{
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// TODO: fix this
rgs.push_back(rgs[0]);
doGroupConcat = true;
}
// make sure connected by a RowGroupDL
JobStepAssociation tbpsJsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
tbpsJsa.outAdd(spdl);
// create delivery step // create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]); aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo)); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo));
spjs->inputAssociation(tbpsJsa);
// step id?? if (doUMOnly)
spjs->stepId(step->stepId() + 1);
// set the PM/UM side aggregate structs
tbps->outputAssociation(tbpsJsa);
if (doGroupConcat)
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true); dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
else else
tbps->setAggregateStep(aggs[1], rgs[2]); tbps->setAggregateStep(aggs[1], rgs[2]);
} }
else if (thjs != NULL) else if (thjs != NULL)
{ {
// default to UM aggregation
rgs.push_back(thjs->getDeliveredRowGroup());
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// also prepare for PM aggregation
// rowgroups -- 0-proj, 1-um, [2-phase case: 2-um, 3-pm]
// aggregators -- 0-um, [2-phase case: 1-um, 2-pm]
if (jobInfo.groupConcatInfo.columns().size() == 0)
{
if (distinctAgg == true)
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
else
prep2PhasesAggregate(jobInfo, rgs, aggs);
}
else
{
// TODO: fix this
rgs.push_back(rgs[0]);
doGroupConcat = true;
}
// make sure connected by a RowGroupDL
JobStepAssociation thjsJsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
thjsJsa.outAdd(spdl);
// create delivery step // create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]); aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo)); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
spjs->inputAssociation(thjsJsa);
if (doGroupConcat) if (doUMOnly)
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true); dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
else else
dynamic_cast<TupleAggregateStep*>(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]); dynamic_cast<TupleAggregateStep*>(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]);
// step id??
spjs->stepId(step->stepId() + 1);
// set input side // set input side
thjs->outputAssociation(thjsJsa);
thjs->deliveryStep(spjs); thjs->deliveryStep(spjs);
} }
else if (sas != NULL) else
{ {
// UM aggregation
// rowgroups -- 0-proj, 1-um
// aggregators -- 0-um
rgs.push_back(sas->getDeliveredRowGroup());
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// make sure connected by a RowGroupDL
JobStepAssociation sasJsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
sasJsa.outAdd(spdl);
// create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]); aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo)); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
spjs->inputAssociation(sasJsa);
// step id??
spjs->stepId(step->stepId() + 1);
// set input side
sas->outputAssociation(sasJsa);
} }
else if (ces != NULL)
{
// UM aggregation
// rowgroups -- 0-proj, 1-um
// aggregators -- 0-um
rgs.push_back(ces->getDeliveredRowGroup());
if (distinctAgg == true) // Setup the input JobstepAssoctiation -- the mechanism
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs); // whereby the previous step feeds data to this step.
else // Otherwise, we need to create one and hook to the
prep1PhaseAggregate(jobInfo, rgs, aggs); // previous step as well as this aggregate step.
spjs->stepId(step->stepId() + 1);
// make sure connected by a RowGroupDL JobStepAssociation jsa;
JobStepAssociation cesJsa; AnyDataListSPtr spdl(new AnyDataList());
AnyDataListSPtr spdl(new AnyDataList()); RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); dl->OID(execplan::CNX_VTABLE_ID);
dl->OID(execplan::CNX_VTABLE_ID); spdl->rowGroupDL(dl);
spdl->rowGroupDL(dl); jsa.outAdd(spdl);
cesJsa.outAdd(spdl);
// create delivery step spjs->inputAssociation(jsa); // Aggregate input
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
spjs->inputAssociation(cesJsa);
// step id?? //Previous step output
spjs->stepId(step->stepId() + 1); step->outputAssociation(jsa);
// set input side
ces->outputAssociation(cesJsa);
}
// add the aggregate on constants // add the aggregate on constants
if (constAggDataVec.size() > 0) if (constAggDataVec.size() > 0)