1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

Merge branch 'develop-1.2' into develop-merge-up-20190924-2

This commit is contained in:
Andrew Hutchings
2019-09-24 14:17:57 +01:00
25 changed files with 304 additions and 222 deletions

View File

@ -840,6 +840,7 @@ const string TupleAggregateStep::toString() const
SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
{
SJSTEP spjs;
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(step.get());
TupleBPS* tbps = dynamic_cast<TupleBPS*>(step.get());
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(step.get());
SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(step.get());
@ -917,171 +918,83 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
// preprocess the columns used by group_concat
jobInfo.groupConcatInfo.prepGroupConcat(jobInfo);
bool doGroupConcat = false;
bool doUMOnly = jobInfo.groupConcatInfo.columns().size() > 0
// || jobInfo.windowSet.size() > 0
|| sas
|| ces;
rgs.push_back(tds->getDeliveredRowGroup());
// get rowgroup and aggregator
// For TupleHashJoin, we prepare for both PM and UM only aggregation
if (doUMOnly || thjs)
{
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// TODO: fix this
if (doUMOnly)
rgs.push_back(rgs[0]);
}
if (!doUMOnly)
{
if (distinctAgg == true)
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
else
prep2PhasesAggregate(jobInfo, rgs, aggs);
}
if (tbps != NULL)
{
// get rowgroup and aggregator
rgs.push_back(tbps->getDeliveredRowGroup());
if (jobInfo.groupConcatInfo.columns().size() == 0)
{
if (distinctAgg == true)
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
else
prep2PhasesAggregate(jobInfo, rgs, aggs);
}
else
{
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// TODO: fix this
rgs.push_back(rgs[0]);
doGroupConcat = true;
}
// make sure connected by a RowGroupDL
JobStepAssociation tbpsJsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
tbpsJsa.outAdd(spdl);
// create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo));
spjs->inputAssociation(tbpsJsa);
// step id??
spjs->stepId(step->stepId() + 1);
// set the PM/UM side aggregate structs
tbps->outputAssociation(tbpsJsa);
if (doGroupConcat)
if (doUMOnly)
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
else
tbps->setAggregateStep(aggs[1], rgs[2]);
}
else if (thjs != NULL)
{
// default to UM aggregation
rgs.push_back(thjs->getDeliveredRowGroup());
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// also prepare for PM aggregation
// rowgroups -- 0-proj, 1-um, [2-phase case: 2-um, 3-pm]
// aggregators -- 0-um, [2-phase case: 1-um, 2-pm]
if (jobInfo.groupConcatInfo.columns().size() == 0)
{
if (distinctAgg == true)
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
else
prep2PhasesAggregate(jobInfo, rgs, aggs);
}
else
{
// TODO: fix this
rgs.push_back(rgs[0]);
doGroupConcat = true;
}
// make sure connected by a RowGroupDL
JobStepAssociation thjsJsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
thjsJsa.outAdd(spdl);
// create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
spjs->inputAssociation(thjsJsa);
if (doGroupConcat)
if (doUMOnly)
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
else
dynamic_cast<TupleAggregateStep*>(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]);
// step id??
spjs->stepId(step->stepId() + 1);
// set input side
thjs->outputAssociation(thjsJsa);
thjs->deliveryStep(spjs);
}
else if (sas != NULL)
else
{
// UM aggregation
// rowgroups -- 0-proj, 1-um
// aggregators -- 0-um
rgs.push_back(sas->getDeliveredRowGroup());
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// make sure connected by a RowGroupDL
JobStepAssociation sasJsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
sasJsa.outAdd(spdl);
// create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
spjs->inputAssociation(sasJsa);
// step id??
spjs->stepId(step->stepId() + 1);
// set input side
sas->outputAssociation(sasJsa);
}
else if (ces != NULL)
{
// UM aggregation
// rowgroups -- 0-proj, 1-um
// aggregators -- 0-um
rgs.push_back(ces->getDeliveredRowGroup());
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// Setup the input JobstepAssoctiation -- the mechanism
// whereby the previous step feeds data to this step.
// Otherwise, we need to create one and hook to the
// previous step as well as this aggregate step.
spjs->stepId(step->stepId() + 1);
// make sure connected by a RowGroupDL
JobStepAssociation cesJsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
cesJsa.outAdd(spdl);
JobStepAssociation jsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
jsa.outAdd(spdl);
// create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
spjs->inputAssociation(cesJsa);
spjs->inputAssociation(jsa); // Aggregate input
// step id??
spjs->stepId(step->stepId() + 1);
// set input side
ces->outputAssociation(cesJsa);
}
//Previous step output
step->outputAssociation(jsa);
// add the aggregate on constants
if (constAggDataVec.size() > 0)