diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index b34215433..520e365ef 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -604,7 +604,8 @@ void checkAggregation(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) jobInfo.hasDistinct = csep->distinct(); - if (csep->distinct() == true) + // DISTINCT with window functions must be done in tupleannexstep + if (csep->distinct() == true && jobInfo.windowDels.size() == 0) { jobInfo.hasAggregation = true; } @@ -878,6 +879,10 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo const SimpleColumn* sc = dynamic_cast(srcp.get()); AggregateColumn* aggc = dynamic_cast(srcp.get()); bool doDistinct = (csep->distinct() && csep->groupByCols().empty()); + // Use this instead of the above line to mimic MariaDB's sql_mode = 'ONLY_FULL_GROUP_BY' + // bool doDistinct = (csep->distinct() && + // csep->groupByCols().empty() && + // !jobInfo.hasAggregation); uint32_t tupleKey = -1; string alias; string view; @@ -1126,9 +1131,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo // remember the columns to be returned jobInfo.returnedColVec.push_back(make_pair(tupleKey, op)); - // bug 1499 distinct processing, save unique distinct columns - if (doDistinct && - (jobInfo.distinctColVec.end() == + // bug 1499 distinct processing, save unique distinct columns that aren't Window columns + if (doDistinct + && (jobInfo.distinctColVec.end() == find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey))) { jobInfo.distinctColVec.push_back(tupleKey); @@ -1279,13 +1284,13 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo // remember the columns to be returned jobInfo.returnedColVec.push_back(make_pair(tupleKey, op)); - // bug 1499 distinct processing, save unique distinct columns - if (doDistinct && - (jobInfo.distinctColVec.end() == - find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey))) - { - jobInfo.distinctColVec.push_back(tupleKey); - } + // bug 1499 distinct processing, save unique distinct columns that aren't Window columns + if (doDistinct + && (jobInfo.distinctColVec.end() == + find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey))) + { + jobInfo.distinctColVec.push_back(tupleKey); + } } } diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 62f084412..c28de49c0 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -837,6 +837,7 @@ const string TupleAggregateStep::toString() const SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo) { SJSTEP spjs; + TupleDeliveryStep* tds = dynamic_cast(step.get()); TupleBPS* tbps = dynamic_cast(step.get()); TupleHashJoinStep* thjs = dynamic_cast(step.get()); SubAdapterStep* sas = dynamic_cast(step.get()); @@ -914,171 +915,83 @@ SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo) // preprocess the columns used by group_concat jobInfo.groupConcatInfo.prepGroupConcat(jobInfo); - bool doGroupConcat = false; + bool doUMOnly = jobInfo.groupConcatInfo.columns().size() > 0 +// || jobInfo.windowSet.size() > 0 + || sas + || ces; + + rgs.push_back(tds->getDeliveredRowGroup()); + + // get rowgroup and aggregator + // For TupleHashJoin, we prepare for both PM and UM only aggregation + if (doUMOnly || thjs) + { + if (distinctAgg == true) + prep1PhaseDistinctAggregate(jobInfo, rgs, aggs); + else + prep1PhaseAggregate(jobInfo, rgs, aggs); + + // TODO: fix this + if (doUMOnly) + rgs.push_back(rgs[0]); + } + + if (!doUMOnly) + { + if (distinctAgg == true) + prep2PhasesDistinctAggregate(jobInfo, rgs, aggs); + else + prep2PhasesAggregate(jobInfo, rgs, aggs); + } if (tbps != NULL) { - // get rowgroup and aggregator - rgs.push_back(tbps->getDeliveredRowGroup()); - - if (jobInfo.groupConcatInfo.columns().size() == 0) - { - if (distinctAgg == true) - prep2PhasesDistinctAggregate(jobInfo, rgs, aggs); - else - prep2PhasesAggregate(jobInfo, rgs, aggs); - } - else - { - if (distinctAgg == true) - prep1PhaseDistinctAggregate(jobInfo, rgs, aggs); - else - prep1PhaseAggregate(jobInfo, rgs, aggs); - - // TODO: fix this - rgs.push_back(rgs[0]); - doGroupConcat = true; - } - - // make sure connected by a RowGroupDL - JobStepAssociation tbpsJsa; - AnyDataListSPtr spdl(new AnyDataList()); - RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); - dl->OID(execplan::CNX_VTABLE_ID); - spdl->rowGroupDL(dl); - tbpsJsa.outAdd(spdl); - // create delivery step aggUM = dynamic_pointer_cast(aggs[0]); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo)); - spjs->inputAssociation(tbpsJsa); - // step id?? - spjs->stepId(step->stepId() + 1); - - // set the PM/UM side aggregate structs - tbps->outputAssociation(tbpsJsa); - - if (doGroupConcat) + if (doUMOnly) dynamic_cast(spjs.get())->umOnly(true); else tbps->setAggregateStep(aggs[1], rgs[2]); } else if (thjs != NULL) { - // default to UM aggregation - rgs.push_back(thjs->getDeliveredRowGroup()); - - if (distinctAgg == true) - prep1PhaseDistinctAggregate(jobInfo, rgs, aggs); - else - prep1PhaseAggregate(jobInfo, rgs, aggs); - - // also prepare for PM aggregation - // rowgroups -- 0-proj, 1-um, [2-phase case: 2-um, 3-pm] - // aggregators -- 0-um, [2-phase case: 1-um, 2-pm] - if (jobInfo.groupConcatInfo.columns().size() == 0) - { - if (distinctAgg == true) - prep2PhasesDistinctAggregate(jobInfo, rgs, aggs); - else - prep2PhasesAggregate(jobInfo, rgs, aggs); - } - else - { - // TODO: fix this - rgs.push_back(rgs[0]); - doGroupConcat = true; - } - - // make sure connected by a RowGroupDL - JobStepAssociation thjsJsa; - AnyDataListSPtr spdl(new AnyDataList()); - RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); - dl->OID(execplan::CNX_VTABLE_ID); - spdl->rowGroupDL(dl); - thjsJsa.outAdd(spdl); - // create delivery step aggUM = dynamic_pointer_cast(aggs[0]); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo)); - spjs->inputAssociation(thjsJsa); - if (doGroupConcat) + if (doUMOnly) dynamic_cast(spjs.get())->umOnly(true); else dynamic_cast(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]); - - // step id?? - spjs->stepId(step->stepId() + 1); - // set input side - thjs->outputAssociation(thjsJsa); thjs->deliveryStep(spjs); } - else if (sas != NULL) + else { - // UM aggregation - // rowgroups -- 0-proj, 1-um - // aggregators -- 0-um - rgs.push_back(sas->getDeliveredRowGroup()); - - if (distinctAgg == true) - prep1PhaseDistinctAggregate(jobInfo, rgs, aggs); - else - prep1PhaseAggregate(jobInfo, rgs, aggs); - - // make sure connected by a RowGroupDL - JobStepAssociation sasJsa; - AnyDataListSPtr spdl(new AnyDataList()); - RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); - dl->OID(execplan::CNX_VTABLE_ID); - spdl->rowGroupDL(dl); - sasJsa.outAdd(spdl); - - // create delivery step aggUM = dynamic_pointer_cast(aggs[0]); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo)); - spjs->inputAssociation(sasJsa); - - // step id?? - spjs->stepId(step->stepId() + 1); - - // set input side - sas->outputAssociation(sasJsa); } - else if (ces != NULL) - { - // UM aggregation - // rowgroups -- 0-proj, 1-um - // aggregators -- 0-um - rgs.push_back(ces->getDeliveredRowGroup()); - if (distinctAgg == true) - prep1PhaseDistinctAggregate(jobInfo, rgs, aggs); - else - prep1PhaseAggregate(jobInfo, rgs, aggs); + // Setup the input JobstepAssoctiation -- the mechanism + // whereby the previous step feeds data to this step. + // Otherwise, we need to create one and hook to the + // previous step as well as this aggregate step. + spjs->stepId(step->stepId() + 1); - // make sure connected by a RowGroupDL - JobStepAssociation cesJsa; - AnyDataListSPtr spdl(new AnyDataList()); - RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); - dl->OID(execplan::CNX_VTABLE_ID); - spdl->rowGroupDL(dl); - cesJsa.outAdd(spdl); + JobStepAssociation jsa; + AnyDataListSPtr spdl(new AnyDataList()); + RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); + dl->OID(execplan::CNX_VTABLE_ID); + spdl->rowGroupDL(dl); + jsa.outAdd(spdl); - // create delivery step - aggUM = dynamic_pointer_cast(aggs[0]); - spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo)); - spjs->inputAssociation(cesJsa); + spjs->inputAssociation(jsa); // Aggregate input - // step id?? - spjs->stepId(step->stepId() + 1); - - // set input side - ces->outputAssociation(cesJsa); - } + //Previous step output + step->outputAssociation(jsa); // add the aggregate on constants if (constAggDataVec.size() > 0)