/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2019-2020 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: tupleaggregatestep.cpp 9732 2013-08-02 15:56:15Z pleblanc $ // #define NDEBUG // Cross engine needs to be at top due to MySQL includes #define PREFER_MY_CONFIG_H #include "crossenginestep.h" #include #include #include #include using namespace std; #include #include #include #include "boost/tuple/tuple.hpp" using namespace boost; #include "messagequeue.h" using namespace messageqcpp; #include "loggingid.h" #include "errorcodes.h" #include "idberrorinfo.h" using namespace logging; #include "configcpp.h" using namespace config; #include "calpontsystemcatalog.h" #include "aggregatecolumn.h" #include "udafcolumn.h" #include "arithmeticcolumn.h" #include "functioncolumn.h" #include "constantcolumn.h" using namespace execplan; #include "rowgroup.h" #include "rowaggregation.h" using namespace rowgroup; #include "querytele.h" using namespace querytele; #include "jlf_common.h" #include "jobstep.h" #include "primitivestep.h" #include "subquerystep.h" #include "tuplehashjoin.h" #include "tupleaggregatestep.h" // #include "stopwatch.cpp" // Stopwatch timer; namespace { typedef vector> RowBucket; typedef vector RowBucketVec; inline RowAggFunctionType functionIdMap(int planFuncId) { switch (planFuncId) { case AggregateColumn::COUNT_ASTERISK: return ROWAGG_COUNT_ASTERISK; case AggregateColumn::COUNT: return ROWAGG_COUNT_COL_NAME; case AggregateColumn::SUM: return ROWAGG_SUM; case AggregateColumn::AVG: return ROWAGG_AVG; case AggregateColumn::MIN: return ROWAGG_MIN; case AggregateColumn::MAX: return ROWAGG_MAX; case AggregateColumn::DISTINCT_COUNT: return ROWAGG_COUNT_DISTINCT_COL_NAME; case AggregateColumn::DISTINCT_SUM: return ROWAGG_DISTINCT_SUM; case AggregateColumn::DISTINCT_AVG: return ROWAGG_DISTINCT_AVG; case AggregateColumn::STDDEV_POP: return ROWAGG_STATS; case AggregateColumn::STDDEV_SAMP: return ROWAGG_STATS; case AggregateColumn::VAR_POP: return ROWAGG_STATS; case AggregateColumn::VAR_SAMP: return ROWAGG_STATS; case AggregateColumn::BIT_AND: return ROWAGG_BIT_AND; case AggregateColumn::BIT_OR: return ROWAGG_BIT_OR; case AggregateColumn::BIT_XOR: return ROWAGG_BIT_XOR; case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT; case AggregateColumn::JSON_ARRAYAGG: return ROWAGG_JSON_ARRAY; case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT; case AggregateColumn::UDAF: return ROWAGG_UDAF; case AggregateColumn::MULTI_PARM: return ROWAGG_MULTI_PARM; case AggregateColumn::SELECT_SOME: return ROWAGG_SELECT_SOME; default: return ROWAGG_FUNCT_UNDEFINE; } } inline RowAggFunctionType statsFuncIdMap(int planFuncId) { switch (planFuncId) { case AggregateColumn::STDDEV_POP: return ROWAGG_STDDEV_POP; case AggregateColumn::STDDEV_SAMP: return ROWAGG_STDDEV_SAMP; case AggregateColumn::VAR_POP: return ROWAGG_VAR_POP; case AggregateColumn::VAR_SAMP: return ROWAGG_VAR_SAMP; default: return ROWAGG_FUNCT_UNDEFINE; } } inline string colTypeIdString(CalpontSystemCatalog::ColDataType type) { switch (type) { case CalpontSystemCatalog::BIT: return string("BIT"); case CalpontSystemCatalog::TINYINT: return string("TINYINT"); case CalpontSystemCatalog::CHAR: return string("CHAR"); case CalpontSystemCatalog::SMALLINT: return string("SMALLINT"); case CalpontSystemCatalog::DECIMAL: return string("DECIMAL"); case CalpontSystemCatalog::MEDINT: return string("MEDINT"); case CalpontSystemCatalog::INT: return string("INT"); case CalpontSystemCatalog::FLOAT: return string("FLOAT"); case CalpontSystemCatalog::DATE: return string("DATE"); case CalpontSystemCatalog::BIGINT: return string("BIGINT"); case CalpontSystemCatalog::DOUBLE: return string("DOUBLE"); case CalpontSystemCatalog::LONGDOUBLE: return string("LONGDOUBLE"); case CalpontSystemCatalog::DATETIME: return string("DATETIME"); case CalpontSystemCatalog::TIMESTAMP: return string("TIMESTAMP"); case CalpontSystemCatalog::TIME: return string("TIME"); case CalpontSystemCatalog::VARCHAR: return string("VARCHAR"); case CalpontSystemCatalog::CLOB: return string("CLOB"); case CalpontSystemCatalog::BLOB: return string("BLOB"); case CalpontSystemCatalog::TEXT: return string("TEXT"); case CalpontSystemCatalog::UTINYINT: return string("UTINYINT"); case CalpontSystemCatalog::USMALLINT: return string("USMALLINT"); case CalpontSystemCatalog::UDECIMAL: return string("UDECIMAL"); case CalpontSystemCatalog::UMEDINT: return string("UMEDINT"); case CalpontSystemCatalog::UINT: return string("UINT"); case CalpontSystemCatalog::UFLOAT: return string("UFLOAT"); case CalpontSystemCatalog::UBIGINT: return string("UBIGINT"); case CalpontSystemCatalog::UDOUBLE: return string("UDOUBLE"); default: return string("UNKNOWN"); } } string keyName(uint64_t i, uint32_t key, const joblist::JobInfo& jobInfo) { string name = jobInfo.projectionCols[i]->alias(); if (name.empty()) { name = jobInfo.keyInfo->tupleKeyToName[key]; if (jobInfo.keyInfo->tupleKeyVec[key].fId < 100) name = "Expression/Function"; } return name = "'" + name + "'"; } } // namespace namespace joblist { void wideDecimalOrLongDouble(const uint64_t colProj, const CalpontSystemCatalog::ColDataType type, const vector& precisionProj, const vector& scaleProj, const vector& width, vector& typeAgg, vector& scaleAgg, vector& precisionAgg, vector& widthAgg) { if ((type == CalpontSystemCatalog::DECIMAL || type == CalpontSystemCatalog::UDECIMAL) && datatypes::Decimal::isWideDecimalTypeByPrecision(precisionProj[colProj])) { typeAgg.push_back(type); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); widthAgg.push_back(width[colProj]); } else if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(type)) { typeAgg.push_back(CalpontSystemCatalog::DECIMAL); scaleAgg.push_back(0); precisionAgg.push_back(datatypes::INT128MAXPRECISION); widthAgg.push_back(datatypes::MAXDECIMALWIDTH); } else { typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); scaleAgg.push_back(0); precisionAgg.push_back(-1); widthAgg.push_back(sizeof(long double)); } } TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup& rgOut, const RowGroup& rgIn, const JobInfo& jobInfo) : JobStep(jobInfo) , fCatalog(jobInfo.csc) , fRowsReturned(0) , fDoneAggregate(false) , fEndOfResult(false) , fAggregator(agg) , fRowGroupOut(rgOut) , fRowGroupIn(rgIn) , fRunner(0) , fUmOnly(false) , fRm(jobInfo.rm) , fBucketNum(0) , fInputIter(-1) , fSessionMemLimit(jobInfo.umMemLimit) { fRowGroupData.reinit(fRowGroupOut); fRowGroupOut.setData(&fRowGroupData); fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut); // decide if this needs to be multi-threaded RowAggregationDistinct* multiAgg = dynamic_cast(fAggregator.get()); fIsMultiThread = (multiAgg || fAggregator->aggMapKeyLength() > 0); // initialize multi-thread variables fNumOfThreads = fRm->aggNumThreads(); fNumOfBuckets = fRm->aggNumBuckets(); fNumOfRowGroups = fRm->aggNumRowGroups(); auto memLimit = std::min(fRm->availableMemory(), *fSessionMemLimit); fNumOfBuckets = calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(), fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation()); fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets); fMemUsage.reset(new uint64_t[fNumOfThreads]); memset(fMemUsage.get(), 0, fNumOfThreads * sizeof(uint64_t)); fExtendedInfo = "TAS: "; fQtc.stepParms().stepType = StepTeleStats::T_TAS; fPrimitiveServerThreadPools = jobInfo.primitiveServerThreadPools; } TupleAggregateStep::~TupleAggregateStep() { for (uint32_t i = 0; i < fNumOfThreads; i++) fRm->returnMemory(fMemUsage[i], fSessionMemLimit); for (uint32_t i = 0; i < fAgg_mutex.size(); i++) delete fAgg_mutex[i]; } void TupleAggregateStep::initializeMultiThread() { RowGroupDL* dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL(); uint32_t i; if (dlIn == NULL) throw logic_error("Input is not RowGroup data list in delivery step."); if (fInputIter < 0) fInputIter = dlIn->getIterator(); fRowGroupIns.resize(fNumOfThreads); fRowGroupOuts.resize(fNumOfBuckets); fRowGroupDatas.resize(fNumOfBuckets); rowgroup::SP_ROWAGG_UM_t agg; RGData rgData; for (i = 0; i < fNumOfBuckets; i++) { boost::mutex* lock = new boost::mutex(); fAgg_mutex.push_back(lock); fRowGroupOuts[i] = fRowGroupOut; rgData.reinit(fRowGroupOut); fRowGroupDatas[i] = rgData; fRowGroupOuts[i].setData(&fRowGroupDatas[i]); fRowGroupOuts[i].resetRowGroup(0); } } void TupleAggregateStep::run() { if (fDelivery == false) { fRunner = jobstepThreadPool.invoke(Aggregator(this)); } } void TupleAggregateStep::join() { if (fRunner) jobstepThreadPool.join(fRunner); } void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID) { if (threadID >= fNumOfBuckets) return; bool finishedSecondPhase = false; bool diskAggAllowed = fRm->getAllowDiskAggregation(); const uint32_t maxRowsSize = 8192; while (!finishedSecondPhase && !fEndOfResult) { scoped_array rowBucketVecs(new RowBucketVec[fNumOfBuckets]); scoped_array bucketDone(new bool[fNumOfBuckets]); uint32_t hashlen = fAggregator->aggMapKeyLength(); bool outOfMemory = false; size_t totalMemSizeConsumed = 0; try { RowAggregationDistinct* aggDist = dynamic_cast(fAggregators[threadID].get()); RowAggregationMultiDistinct* multiDist = dynamic_cast(fAggregators[threadID].get()); Row rowIn; RowGroup* rowGroupIn = nullptr; rowGroupIn = (aggDist->aggregator()->getOutputRowGroup()); uint32_t bucketID; std::vector> rgDataVec; if (multiDist) { for (uint32_t i = 0; i < fNumOfBuckets; i++) rowBucketVecs[i].resize(multiDist->subAggregators().size()); } else { for (uint32_t i = 0; i < fNumOfBuckets; i++) rowBucketVecs[i].resize(1); } // dispatch rows to bucket if (multiDist) { for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++) { rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup()); rowGroupIn->initRow(&rowIn); auto* subDistAgg = dynamic_cast(multiDist->subAggregators()[j].get()); while (subDistAgg->nextOutputRowGroup()) { rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup()); rgDataVec.emplace_back(subDistAgg->moveCurrentRGData()); rowGroupIn->getRow(0, &rowIn); for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i) { // The key is the groupby columns, which are the leading columns. // uint8_t* hashMapKey = rowIn.getData() + 2; // bucketID = hash.operator()(hashMapKey) & fBucketMask; uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1); bucketID = hash % fNumOfBuckets; rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash); rowIn.nextRow(); } const auto rgSize = diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize) : rowGroupIn->getSizeWithStrings(); totalMemSizeConsumed += rgSize; if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed)) { outOfMemory = true; break; } } } } else { rowGroupIn->initRow(&rowIn); auto* subAgg = dynamic_cast(aggDist->aggregator().get()); while (subAgg->nextOutputRowGroup()) { rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData()); rgDataVec.emplace_back(subAgg->moveCurrentRGData()); rowGroupIn->getRow(0, &rowIn); for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i) { // The key is the groupby columns, which are the leading columns. // uint8_t* hashMapKey = rowIn.getData() + 2; // bucketID = hash.operator()(hashMapKey) & fBucketMask; uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1); bucketID = hash % fNumOfBuckets; rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash); rowIn.nextRow(); } const auto rgSize = diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize) : rowGroupIn->getSizeWithStrings(); totalMemSizeConsumed += rgSize; if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed)) { outOfMemory = true; break; } } } if (!outOfMemory) finishedSecondPhase = true; bool done = false; // reset bucketDone[] to be false // memset(bucketDone, 0, sizeof(bucketDone)); fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false); while (!done && !cancelled()) { done = true; for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++) { if (!bucketDone[c] && fAgg_mutex[c]->try_lock()) { try { if (multiDist) dynamic_cast(fAggregators[c].get()) ->doDistinctAggregation_rowVec(rowBucketVecs[c]); else dynamic_cast(fAggregators[c].get()) ->doDistinctAggregation_rowVec(rowBucketVecs[c][0]); } catch (...) { fAgg_mutex[c]->unlock(); throw; } fAgg_mutex[c]->unlock(); bucketDone[c] = true; rowBucketVecs[c][0].clear(); } else if (!bucketDone[c]) { done = false; } } } fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit); if (cancelled()) { finishedSecondPhase = true; fEndOfResult = true; } } // try catch (...) { fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit); handleException(std::current_exception(), logging::tupleAggregateStepErr, logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedSecondPhaseAggregate()"); fEndOfResult = true; finishedSecondPhase = true; } } fDoneAggregate = true; if (traceOn()) { dlTimes.setLastReadTime(); dlTimes.setEndOfInputTime(); } } uint32_t TupleAggregateStep::nextBand_singleThread(messageqcpp::ByteStream& bs) { uint32_t rowCount = 0; try { if (!fDoneAggregate) aggregateRowGroups(); if (fEndOfResult == false) { bs.restart(); // do the final aggregtion and deliver the results // at least one RowGroup for aggregate results if (dynamic_cast(fAggregator.get()) != NULL) { dynamic_cast(fAggregator.get())->doDistinctAggregation(); } if (fAggregator->nextRowGroup()) { fAggregator->finalize(); rowCount = fRowGroupOut.getRowCount(); fRowsReturned += rowCount; fRowGroupDelivered.setData(fRowGroupOut.getRGData()); if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) pruneAuxColumns(); fRowGroupDelivered.serializeRGData(bs); } else { fEndOfResult = true; } } } // try catch (...) { handleException(std::current_exception(), logging::tupleAggregateStepErr, logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedSecondPhaseAggregate()"); fEndOfResult = true; } if (fEndOfResult) { StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; sts.msg_type = StepTeleStats::ST_SUMMARY; sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); // send an empty / error band RGData rgData(fRowGroupOut, 0); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setStatus(status()); fRowGroupOut.serializeRGData(bs); rowCount = 0; if (traceOn()) printCalTrace(); } return rowCount; } bool TupleAggregateStep::nextDeliveredRowGroup() { for (; fBucketNum < fNumOfBuckets; fBucketNum++) { while (fAggregators[fBucketNum]->nextOutputRowGroup()) { fAggregators[fBucketNum]->finalize(); fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData()); fRowGroupOut.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData()); return true; } } fBucketNum = 0; return false; } uint32_t TupleAggregateStep::nextBand(messageqcpp::ByteStream& bs) { // use the orignal single thread model when no group by and distnct. // @bug4314. DO NOT access fAggregtor before the first read of input, // because hashjoin may not have finalized fAggregator. if (!fIsMultiThread) return nextBand_singleThread(bs); return doThreadedAggregate(bs, 0); } bool TupleAggregateStep::setPmHJAggregation(JobStep* step) { TupleBPS* bps = dynamic_cast(step); if (bps != NULL) { fAggregatorUM->expression(fAggregator->expression()); fAggregatorUM->constantAggregate(fAggregator->constantAggregate()); fAggregator = fAggregatorUM; fRowGroupIn = fRowGroupPMHJ; fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut); bps->setAggregateStep(fAggregatorPM, fRowGroupPMHJ); } return (bps != NULL); } void TupleAggregateStep::configDeliveredRowGroup(const JobInfo& jobInfo) { // configure the oids and keys vector oids = fRowGroupOut.getOIDs(); vector keys = fRowGroupOut.getKeys(); vector>::const_iterator begin = jobInfo.aggEidIndexList.begin(); vector>::const_iterator end = jobInfo.aggEidIndexList.end(); for (vector>::const_iterator i = begin; i != end; i++) { oids[i->second] = i->first; keys[i->second] = getExpTupleKey(jobInfo, i->first); } // correct the scale vector scale = fRowGroupOut.getScale(); vector precision = fRowGroupOut.getPrecision(); size_t retColCount = 0; auto scaleIter = scale.begin(); auto precisionIter = precision.begin(); if (jobInfo.havingStep) { retColCount = jobInfo.returnedColVec.size(); idbassert(jobInfo.returnedColVec.size() == jobInfo.projectionCols.size()); for (size_t i = 0; i < jobInfo.projectionCols.size() && scaleIter != scale.end(); i++) { const auto& colType = jobInfo.projectionCols[i]->resultType(); if (colType.isWideDecimalType()) { *scaleIter = colType.scale; *precisionIter = colType.precision; } scaleIter++; precisionIter++; } } else { retColCount = jobInfo.nonConstDelCols.size(); for (size_t i = 0; i < jobInfo.nonConstDelCols.size() && scaleIter != scale.end(); i++) { const auto& colType = jobInfo.nonConstDelCols[i]->resultType(); if (colType.isWideDecimalType()) { *scaleIter = colType.scale; *precisionIter = colType.precision; } scaleIter++; precisionIter++; } } vector::const_iterator offsets0 = fRowGroupOut.getOffsets().begin(); vector::const_iterator types0 = fRowGroupOut.getColTypes().begin(); vector csNums = fRowGroupOut.getCharsetNumbers(); vector::const_iterator precision0 = precision.begin(); fRowGroupDelivered = RowGroup(retColCount, vector(offsets0, offsets0 + retColCount + 1), vector(oids.begin(), oids.begin() + retColCount), vector(keys.begin(), keys.begin() + retColCount), vector(types0, types0 + retColCount), vector(csNums.begin(), csNums.begin() + retColCount), vector(scale.begin(), scale.begin() + retColCount), vector(precision0, precision0 + retColCount), jobInfo.stringTableThreshold); if (jobInfo.trace) cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl; } void TupleAggregateStep::setOutputRowGroup(const RowGroup& rg) { fRowGroupOut = rg; fRowGroupData.reinit(fRowGroupOut); fRowGroupOut.setData(&fRowGroupData); fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut); } const RowGroup& TupleAggregateStep::getOutputRowGroup() const { return fRowGroupOut; } const RowGroup& TupleAggregateStep::getDeliveredRowGroup() const { return fRowGroupDelivered; } void TupleAggregateStep::savePmHJData(SP_ROWAGG_t& um, SP_ROWAGG_t& pm, RowGroup& rg) { fAggregatorUM = dynamic_pointer_cast(um); fAggregatorPM = pm; fRowGroupPMHJ = rg; } void TupleAggregateStep::deliverStringTableRowGroup(bool b) { fRowGroupDelivered.setUseStringTable(b); } bool TupleAggregateStep::deliverStringTableRowGroup() const { return fRowGroupDelivered.usesStringTable(); } const string TupleAggregateStep::toString() const { ostringstream oss; oss << "AggregateStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; oss << " in:"; for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++) oss << fInputJobStepAssociation.outAt(i); if (fOutputJobStepAssociation.outSize() > 0) { oss << " out:"; for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++) oss << fOutputJobStepAssociation.outAt(i); } return oss.str(); } SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo) { SJSTEP spjs; TupleDeliveryStep* tds = dynamic_cast(step.get()); TupleBPS* tbps = dynamic_cast(step.get()); TupleHashJoinStep* thjs = dynamic_cast(step.get()); SubAdapterStep* sas = dynamic_cast(step.get()); CrossEngineStep* ces = dynamic_cast(step.get()); vector rgs; // 0-ProjRG, 1-UMRG, [2-PMRG -- if 2 phases] vector aggs; SP_ROWAGG_UM_t aggUM; bool distinctAgg = false; int64_t constKey = -1; vector constAggDataVec; vector> returnedColVecOrig = jobInfo.returnedColVec; for (uint32_t idx = 0; idx < jobInfo.returnedColVec.size(); idx++) { if (jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_COUNT || jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_AVG || jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_SUM) { distinctAgg = true; } // Change COUNT_ASTERISK to CONSTANT if necessary. // In joblistfactory, all aggregate(constant) are set to count(*) for easy process. map::iterator it = jobInfo.constAggregate.find(idx); if (it != jobInfo.constAggregate.end()) { AggregateColumn* ac = dynamic_cast(it->second.get()); if (ac->aggOp() == AggregateColumn::COUNT_ASTERISK) { if (jobInfo.cntStarPos == -1) jobInfo.cntStarPos = idx; } else { constKey = jobInfo.returnedColVec[idx].first; CalpontSystemCatalog::ColType ct = ac->resultType(); TupleInfo ti = setExpTupleInfo(ct, ac->expressionId(), ac->alias(), jobInfo); jobInfo.returnedColVec[idx].first = ti.key; jobInfo.returnedColVec[idx].second = AggregateColumn::CONSTANT; ConstantColumn* cc = dynamic_cast(ac->constCol().get()); idbassert(cc != NULL); // @bug5261 bool isNull = cc->isNull(); if (ac->aggOp() == AggregateColumn::UDAF) { UDAFColumn* udafc = dynamic_cast(ac); if (udafc) { constAggDataVec.push_back(ConstantAggData(cc->constval(), udafc->getContext().getName(), functionIdMap(ac->aggOp()), isNull)); } } else { constAggDataVec.push_back(ConstantAggData(cc->constval(), functionIdMap(ac->aggOp()), isNull)); } } } } // If there are aggregate(constant) columns, but no count(*), add a count(*). if (constAggDataVec.size() > 0 && jobInfo.cntStarPos < 0) { jobInfo.cntStarPos = jobInfo.returnedColVec.size(); jobInfo.returnedColVec.push_back(make_pair(constKey, AggregateColumn::COUNT_ASTERISK)); } // preprocess the columns used by group_concat jobInfo.groupConcatInfo.prepGroupConcat(jobInfo); bool doUMOnly = jobInfo.groupConcatInfo.columns().size() > 0 || sas || ces; rgs.push_back(tds->getDeliveredRowGroup()); // get rowgroup and aggregator // For TupleHashJoin, we prepare for both PM and UM only aggregation if (doUMOnly || thjs) { if (distinctAgg == true) prep1PhaseDistinctAggregate(jobInfo, rgs, aggs); else prep1PhaseAggregate(jobInfo, rgs, aggs); // TODO: fix this if (doUMOnly) rgs.push_back(rgs[0]); } if (!doUMOnly) { if (distinctAgg == true) prep2PhasesDistinctAggregate(jobInfo, rgs, aggs); else { prep2PhasesAggregate(jobInfo, rgs, aggs); } } if (tbps != NULL) { // create delivery step aggUM = dynamic_pointer_cast(aggs[0]); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo)); if (doUMOnly) dynamic_cast(spjs.get())->umOnly(true); else tbps->setAggregateStep(aggs[1], rgs[2]); } else if (thjs != NULL) { // create delivery step aggUM = dynamic_pointer_cast(aggs[0]); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo)); if (doUMOnly) dynamic_cast(spjs.get())->umOnly(true); else dynamic_cast(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]); // set input side thjs->deliveryStep(spjs); } else { aggUM = dynamic_pointer_cast(aggs[0]); spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo)); } // Setup the input JobstepAssoctiation -- the mechanism // whereby the previous step feeds data to this step. // Otherwise, we need to create one and hook to the // previous step as well as this aggregate step. spjs->stepId(step->stepId() + 1); JobStepAssociation jsa; AnyDataListSPtr spdl(new AnyDataList()); RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); dl->OID(execplan::CNX_VTABLE_ID); spdl->rowGroupDL(dl); jsa.outAdd(spdl); spjs->inputAssociation(jsa); // Aggregate input // Previous step output step->outputAssociation(jsa); // add the aggregate on constants if (constAggDataVec.size() > 0) { dynamic_cast(spjs.get())->addConstangAggregate(constAggDataVec); jobInfo.returnedColVec.swap(returnedColVecOrig); // restore the original return columns } // fix the delivered rowgroup data dynamic_cast(spjs.get())->configDeliveredRowGroup(jobInfo); if (jobInfo.expressionVec.size() > 0) dynamic_cast(spjs.get())->prepExpressionOnAggregate(aggUM, jobInfo); return spjs; } void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector& rowgroups, vector& aggregators) { // check if there are any aggregate columns vector> aggColVec; vector>& returnedColVec = jobInfo.returnedColVec; for (uint64_t i = 0; i < returnedColVec.size(); i++) { if (returnedColVec[i].second != 0) aggColVec.push_back(returnedColVec[i]); } // populate the aggregate rowgroup: projectedRG -> aggregateRG // // Aggregate preparation by joblist factory: // 1. get projected rowgroup (done by doAggProject) -- passed in // 2. construct aggregate rowgroup -- output of UM const RowGroup projRG = rowgroups[0]; const vector& oidsProj = projRG.getOIDs(); const vector& keysProj = projRG.getKeys(); const vector& scaleProj = projRG.getScale(); const vector& precisionProj = projRG.getPrecision(); const vector& typeProj = projRG.getColTypes(); const vector& csNumProj = projRG.getCharsetNumbers(); vector posAgg; vector oidsAgg; vector keysAgg; vector scaleAgg; vector precisionAgg; vector typeAgg; vector csNumAgg; vector widthAgg; vector groupBy; vector functionVec; uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigUintWidth = sizeof(uint64_t); // For UDAF uint32_t projColsUDAFIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // for count column of average function map avgFuncMap; // collect the projected column info, prepare for aggregation vector width; map projColPosMap; for (uint64_t i = 0; i < keysProj.size(); i++) { projColPosMap.insert(make_pair(keysProj[i], i)); width.push_back(projRG.getColumnWidth(i)); } // for groupby column map groupbyMap; for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { int64_t colProj = projColPosMap[jobInfo.groupByColVec[i]]; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1)); groupBy.push_back(groupby); groupbyMap.insert(make_pair(jobInfo.groupByColVec[i], i)); } // for distinct column for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++) { //@bug6126, continue if already in group by if (groupbyMap.find(jobInfo.distinctColVec[i]) != groupbyMap.end()) continue; int64_t colProj = projColPosMap[jobInfo.distinctColVec[i]]; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1)); groupBy.push_back(groupby); groupbyMap.insert(make_pair(jobInfo.distinctColVec[i], i)); } // populate the aggregate rowgroup AGG_MAP aggFuncMap; uint64_t outIdx = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); uint32_t key = returnedColVec[i].first; if (aggOp == ROWAGG_CONSTANT) { TupleInfo ti = getTupleInfo(key, jobInfo); oidsAgg.push_back(ti.oid); keysAgg.push_back(key); scaleAgg.push_back(ti.scale); precisionAgg.push_back(ti.precision); typeAgg.push_back(ti.dtype); csNumAgg.push_back(ti.csNum); widthAgg.push_back(ti.width); SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, 0, outIdx, jobInfo.cntStarPos)); functionVec.push_back(funct); ++outIdx; continue; } if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY) { TupleInfo ti = getTupleInfo(key, jobInfo); uint32_t ptrSize = sizeof(GroupConcatAg*); uint32_t width = (ti.width >= ptrSize) ? ti.width : ptrSize; oidsAgg.push_back(ti.oid); keysAgg.push_back(key); scaleAgg.push_back(ti.scale); precisionAgg.push_back(ti.precision); typeAgg.push_back(ti.dtype); csNumAgg.push_back(ti.csNum); widthAgg.push_back(width); SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, 0, outIdx, -1)); functionVec.push_back(funct); ++outIdx; continue; } if (projColPosMap.find(key) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple."; cerr << "prep1PhaseAggregate: " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable; if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView; cerr << endl; throw logic_error(emsg.str()); } // make sure the colProj is correct int64_t colProj = projColPosMap[key]; if (keysProj[colProj] != key) { ostringstream emsg; emsg << "projection column map is out of sync."; cerr << "prep1PhaseAggregate: " << emsg.str() << endl; throw logic_error(emsg.str()); } if (aggOp == ROWAGG_FUNCT_UNDEFINE) { // must be a groupby column or function on aggregation // or used by group_concat map::iterator it = groupbyMap.find(key); if (it != groupbyMap.end()) { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(width[colProj]); if (groupBy[it->second]->fOutputColumnIndex == (uint32_t)-1) groupBy[it->second]->fOutputColumnIndex = outIdx; else functionVec.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol( ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, groupBy[it->second]->fOutputColumnIndex))); ++outIdx; continue; } else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) != jobInfo.expressionVec.end()) { TupleInfo ti = getTupleInfo(key, jobInfo); oidsAgg.push_back(ti.oid); keysAgg.push_back(key); scaleAgg.push_back(ti.scale); precisionAgg.push_back(ti.precision); typeAgg.push_back(ti.dtype); csNumAgg.push_back(ti.csNum); widthAgg.push_back(ti.width); ++outIdx; continue; } else if (jobInfo.groupConcatInfo.columns().find(key) != jobInfo.groupConcatInfo.columns().end()) { // TODO: columns only for group_concat do not needed in result set. oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(width[colProj]); ++outIdx; continue; } else if (jobInfo.windowSet.find(key) != jobInfo.windowSet.end()) { // skip window columns/expression, which are computed later oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(width[colProj]); ++outIdx; continue; } else { uint32_t foundTupleKey{0}; if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, groupbyMap, key, foundTupleKey)) { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(width[colProj]); // Update key. key = foundTupleKey; ++outIdx; continue; } else { Message::Args args; args.add(keyName(i, key, jobInfo)); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); cerr << "prep1PhaseAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView << ", function=" << (int)aggOp << endl; throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION); } } } SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Save the multi-parm keys for dup-detection. if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) { for (uint64_t k = i + 1; k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k) { udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); } } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, outIdx)); break; } } if (it == jobInfo.projectionCols.end()) { throw logic_error( "(1)prep1PhaseAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); } } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, outIdx)); } functionVec.push_back(funct); switch (aggOp) { case ROWAGG_MIN: case ROWAGG_MAX: case ROWAGG_SELECT_SOME: { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(width[colProj]); } break; case ROWAGG_AVG: avgFuncMap.insert(make_pair(key, funct)); /* fall through */ case ROWAGG_SUM: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("sum/average"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep1PhaseAggregate: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAgg, scaleAgg, precisionAgg, widthAgg); oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); csNumAgg.push_back(csNumProj[colProj]); } break; case ROWAGG_COUNT_COL_NAME: case ROWAGG_COUNT_ASTERISK: { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(0); // work around count() in select subquery precisionAgg.push_back(rowgroup::MagicPrecisionForCountAgg); typeAgg.push_back(CalpontSystemCatalog::UBIGINT); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(bigIntWidth); } break; case ROWAGG_STATS: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("variance/standard deviation"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep1PhaseAggregate: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(0); typeAgg.push_back(CalpontSystemCatalog::DOUBLE); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(sizeof(double)); } break; case ROWAGG_BIT_AND: case ROWAGG_BIT_OR: case ROWAGG_BIT_XOR: { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(0); precisionAgg.push_back(-16); // for connector to skip null check typeAgg.push_back(CalpontSystemCatalog::UBIGINT); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(bigIntWidth); } break; case ROWAGG_UDAF: { RowUDAFFunctionCol* udafFuncCol = dynamic_cast(funct.get()); if (!udafFuncCol) { throw logic_error( "(2)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } // Return column oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale()); precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision()); typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType()); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth()); break; } case ROWAGG_MULTI_PARM: { } break; default: { ostringstream emsg; emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported"; cerr << "prep1PhaseAggregate: " << emsg.str() << endl; throw QueryDataExcept(emsg.str(), aggregateFuncErr); } } // find if this func is a duplicate AGG_MAP::iterator iter = aggFuncMap.find( boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM && iter != aggFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; else if (funct->fAggFunction == ROWAGG_UDAF) funct->fAggFunction = ROWAGG_DUP_UDAF; else funct->fAggFunction = ROWAGG_DUP_FUNCT; funct->fAuxColumnIndex = iter->second; } else { aggFuncMap.insert(make_pair( boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } if (aggOp != ROWAGG_MULTI_PARM) { ++outIdx; } } // now fix the AVG function, locate the count(column) position for (uint64_t i = 0; i < functionVec.size(); i++) { if (functionVec[i]->fAggFunction != ROWAGG_COUNT_COL_NAME) continue; // if the count(k) can be associated with an avg(k) map::iterator k = avgFuncMap.find(keysAgg[functionVec[i]->fOutputColumnIndex]); if (k != avgFuncMap.end()) { k->second->fAuxColumnIndex = functionVec[i]->fOutputColumnIndex; functionVec[i]->fAggFunction = ROWAGG_COUNT_NO_OP; } } // there is avg(k), but no count(k) in the select list uint64_t lastCol = outIdx; for (map::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++) { if (k->second->fAuxColumnIndex == (uint32_t)-1) { k->second->fAuxColumnIndex = lastCol++; oidsAgg.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId); keysAgg.push_back(k->first); scaleAgg.push_back(0); precisionAgg.push_back(19); typeAgg.push_back(CalpontSystemCatalog::UBIGINT); widthAgg.push_back(bigIntWidth); } } // add auxiliary fields for UDAF and statistics functions for (uint64_t i = 0; i < functionVec.size(); i++) { uint64_t j = functionVec[i]->fInputColumnIndex; if (functionVec[i]->fAggFunction == ROWAGG_UDAF) { // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVec[i].get()); if (!udafFuncCol) { throw logic_error( "(3)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } functionVec[i]->fAuxColumnIndex = lastCol++; oidsAgg.push_back(oidsProj[j]); keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); precisionAgg.push_back(0); precisionAgg.push_back(0); typeAgg.push_back(CalpontSystemCatalog::UBIGINT); csNumAgg.push_back(8); widthAgg.push_back(bigUintWidth); continue; } if (functionVec[i]->fAggFunction != ROWAGG_STATS) continue; functionVec[i]->fAuxColumnIndex = lastCol; // mean(x) oidsAgg.push_back(oidsProj[j]); keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); precisionAgg.push_back(-1); typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAgg.push_back(8); widthAgg.push_back(sizeof(long double)); ++lastCol; // sum(x_i - mean)^2 oidsAgg.push_back(oidsProj[j]); keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); precisionAgg.push_back(-1); typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAgg.push_back(8); widthAgg.push_back(sizeof(long double)); ++lastCol; } // calculate the offset and create the rowaggregation, rowgroup posAgg.push_back(2); for (uint64_t i = 0; i < oidsAgg.size(); i++) posAgg.push_back(posAgg[i] + widthAgg[i]); RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg, jobInfo.stringTableThreshold); SP_ROWAGG_UM_t rowAgg( new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit, jobInfo.hasRollup)); rowAgg->timeZone(jobInfo.timeZone); rowgroups.push_back(aggRG); aggregators.push_back(rowAgg); // mapping the group_concat columns, if any. if (jobInfo.groupConcatInfo.groupConcat().size() > 0) { jobInfo.groupConcatInfo.mapColumns(projRG); rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat()); } if (jobInfo.trace) cout << "\n====== Aggregation RowGroups ======" << endl << "projected RG: " << projRG.toString() << endl << "aggregated RG: " << aggRG.toString() << endl; } void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector& rowgroups, vector& aggregators) { // check if there are any aggregate columns vector> aggColVec; vector>& returnedColVec = jobInfo.returnedColVec; for (uint64_t i = 0; i < returnedColVec.size(); i++) { if (returnedColVec[i].second != 0) aggColVec.push_back(returnedColVec[i]); } // populate the aggregate rowgroup: projectedRG -> aggregateRG // // Aggregate preparation by joblist factory: // 1. get projected rowgroup (done by doAggProject) -- passed in // 2. construct aggregate rowgroup -- output of UM const RowGroup projRG = rowgroups[0]; const vector& oidsProj = projRG.getOIDs(); const vector& keysProj = projRG.getKeys(); const vector& scaleProj = projRG.getScale(); const vector& precisionProj = projRG.getPrecision(); const vector& typeProj = projRG.getColTypes(); const vector& csNumProj = projRG.getCharsetNumbers(); vector posAgg, posAggDist; vector oidsAgg, oidsAggDist; vector keysAgg, keysAggDist; vector scaleAgg, scaleAggDist; vector precisionAgg, precisionAggDist; vector typeAgg, typeAggDist; vector csNumAgg, csNumAggDist; vector widthProj, widthAgg, widthAggDist; vector groupBy, groupByNoDist; vector functionVec1, functionVec2, functionNoDistVec; uint32_t bigIntWidth = sizeof(int64_t); // map key = column key, operation (enum), and UDAF pointer if UDAF. AGG_MAP aggFuncMap; // set avgSet; list multiParmIndexes; // fOR udaf UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; uint32_t projColsUDAFIdx = 0; uint32_t udafcParamIdx = 0; // for count column of average function map avgFuncMap, avgDistFuncMap; // collect the projected column info, prepare for aggregation vector width; for (uint64_t i = 0; i < keysProj.size(); i++) { width.push_back(projRG.getColumnWidth(i)); } // associate the columns between projected RG and aggregate RG on UM // populated the aggregate columns // the groupby columns are put in front, even not a returned column // sum and count(column name) are omitted, if avg present { // project only unique oids, but they may be repeated in aggregation // collect the projected column info, prepare for aggregation map projColPosMap; for (uint64_t i = 0; i < keysProj.size(); i++) { projColPosMap.insert(make_pair(keysProj[i], i)); widthProj.push_back(projRG.getColumnWidth(i)); } // column index for aggregate rowgroup uint64_t colAgg = 0; // for groupby column for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { uint32_t key = jobInfo.groupByColVec[i]; if (projColPosMap.find(key) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple."; cerr << "prep1PhaseDistinctAggregate: groupby " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable; if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView; cerr << endl; throw logic_error(emsg.str()); } uint64_t colProj = projColPosMap[key]; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg)); groupBy.push_back(groupby); // copy down to aggregation rowgroup oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(widthProj[colProj]); aggFuncMap.insert(make_pair( boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); colAgg++; } // for distinct column for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++) { uint32_t key = jobInfo.distinctColVec[i]; if (projColPosMap.find(key) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple."; cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable; if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView; cerr << endl; throw logic_error(emsg.str()); } // check for dup distinct column -- @bug6126 if (find(keysAgg.begin(), keysAgg.end(), key) != keysAgg.end()) continue; uint64_t colProj = projColPosMap[key]; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg)); groupBy.push_back(groupby); // copy down to aggregation rowgroup oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(key); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(widthProj[colProj]); aggFuncMap.insert(make_pair( boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); colAgg++; } // vectors for aggregate functions RowAggFunctionType aggOp = ROWAGG_FUNCT_UNDEFINE; RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE; for (uint64_t i = 0; i < aggColVec.size(); i++) { pUDAFFunc = NULL; uint32_t aggKey = aggColVec[i].first; aggOp = functionIdMap(aggColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second); // Save the op for MULTI_PARM exclusion when COUNT(DISTINCT) if (aggOp != ROWAGG_MULTI_PARM) prevAggOp = aggOp; // skip if this is a constant if (aggOp == ROWAGG_CONSTANT) continue; // skip if this is a group_concat if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY) { TupleInfo ti = getTupleInfo(aggKey, jobInfo); uint32_t width = sizeof(GroupConcatAg*); oidsAgg.push_back(ti.oid); keysAgg.push_back(aggKey); scaleAgg.push_back(ti.scale); precisionAgg.push_back(ti.precision); typeAgg.push_back(ti.dtype); csNumAgg.push_back(ti.csNum); widthAgg.push_back(width); SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colAgg, colAgg, -1)); functionVec1.push_back(funct); aggFuncMap.insert(make_pair( boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); colAgg++; continue; } if (projColPosMap.find(aggKey) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple."; cerr << "prep1PhaseDistinctAggregate: aggregate " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable; if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView; cerr << endl; throw logic_error(emsg.str()); } // We skip distinct aggs, including extra parms. These are handled by adding them to group by list // above. if (aggOp == ROWAGG_DISTINCT_SUM || aggOp == ROWAGG_DISTINCT_AVG || aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME) continue; if (aggOp == ROWAGG_MULTI_PARM && prevAggOp == ROWAGG_COUNT_DISTINCT_COL_NAME) continue; uint64_t colProj = projColPosMap[aggKey]; SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Save the multi-parm keys for dup-detection. if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) { for (uint64_t k = i + 1; k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k) { udafc->getContext().getParamKeys()->push_back(aggColVec[k].first); } } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg)); break; } } if (it == jobInfo.projectionCols.end()) { throw logic_error( "(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough " "UDAFColumns"); } } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg)); } // skip if this is a duplicate if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM && aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end()) { // skip if this is a duplicate continue; } functionVec1.push_back(funct); aggFuncMap.insert(make_pair( boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAgg)); switch (aggOp) { case ROWAGG_MIN: case ROWAGG_MAX: case ROWAGG_SELECT_SOME: { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(widthProj[colProj]); colAgg++; } break; case ROWAGG_SUM: case ROWAGG_AVG: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("sum/average"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep1PhaseDistinctAggregate: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); csNumAgg.push_back(8); wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAgg, scaleAgg, precisionAgg, widthAgg); colAgg++; // has distinct step, put the count column for avg next to the sum // let fall through to add a count column for average function if (aggOp == ROWAGG_AVG) funct->fAuxColumnIndex = colAgg; else break; } /* fall through */ case ROWAGG_COUNT_ASTERISK: case ROWAGG_COUNT_COL_NAME: { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); // work around count() in select subquery precisionAgg.push_back(rowgroup::MagicPrecisionForCountAgg); if (isUnsigned(typeProj[colProj])) { typeAgg.push_back(CalpontSystemCatalog::UBIGINT); } else { typeAgg.push_back(CalpontSystemCatalog::BIGINT); } csNumAgg.push_back(8); widthAgg.push_back(bigIntWidth); colAgg++; } break; case ROWAGG_STATS: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("variance/standard deviation"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep1PhaseDistinctAggregate:: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } // count(x) oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(0); typeAgg.push_back(CalpontSystemCatalog::DOUBLE); csNumAgg.push_back(8); widthAgg.push_back(sizeof(double)); funct->fAuxColumnIndex = ++colAgg; // mean(x) oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); precisionAgg.push_back(-1); typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAgg.push_back(8); widthAgg.push_back(sizeof(long double)); ++colAgg; // sum(x_i - mean)^2 oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); precisionAgg.push_back(-1); typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAgg.push_back(8); widthAgg.push_back(sizeof(long double)); ++colAgg; } break; case ROWAGG_BIT_AND: case ROWAGG_BIT_OR: case ROWAGG_BIT_XOR: { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); precisionAgg.push_back(-16); // for connector to skip null check typeAgg.push_back(CalpontSystemCatalog::UBIGINT); csNumAgg.push_back(8); widthAgg.push_back(bigIntWidth); colAgg++; } break; case ROWAGG_UDAF: { RowUDAFFunctionCol* udafFuncCol = dynamic_cast(funct.get()); if (!udafFuncCol) { throw logic_error( "(2)prep1PhaseDistinctAggregate A UDAF function is called but there's no RowUDAFFunctionCol"); } // Return column oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale()); precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision()); typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType()); csNumAgg.push_back(udafFuncCol->fUDAFContext.getCharsetNumber()); widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth()); ++colAgg; // Column for index of UDAF UserData struct oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(0); precisionAgg.push_back(0); typeAgg.push_back(CalpontSystemCatalog::UBIGINT); csNumAgg.push_back(8); widthAgg.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAgg++; // If the first param is const udafcParamIdx = 0; ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } ++udafcParamIdx; break; } case ROWAGG_MULTI_PARM: { oidsAgg.push_back(oidsProj[colProj]); keysAgg.push_back(aggKey); scaleAgg.push_back(scaleProj[colProj]); precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); csNumAgg.push_back(csNumProj[colProj]); widthAgg.push_back(widthProj[colProj]); multiParmIndexes.push_back(colAgg); ++colAgg; // If the param is const if (udafc) { if (udafcParamIdx > udafc->aggParms().size() - 1) { throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr); } ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } } else if (prevAggOp != ROWAGG_COUNT_DISTINCT_COL_NAME) { throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr); } ++udafcParamIdx; } break; default: { ostringstream emsg; emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported"; cerr << "prep1PhaseDistinctAggregate: " << emsg.str() << endl; throw QueryDataExcept(emsg.str(), aggregateFuncErr); } } } } // populated the functionNoDistVec { // for (uint32_t idx = 0; idx < functionVec1.size(); idx++) // { // SP_ROWAGG_FUNC_t func1 = functionVec1[idx]; // SP_ROWAGG_FUNC_t funct( // new RowAggFunctionCol(func1->fAggFunction, // func1->fStatsFunction, // func1->fOutputColumnIndex, // func1->fOutputColumnIndex, // func1->fAuxColumnIndex)); // functionNoDistVec.push_back(funct); // } functionNoDistVec = functionVec1; } // associate the columns between the non-distinct aggregator and distinct aggregator // populated the returned columns // remove not returned groupby column // add back sum or count(column name) if omitted due to avg column // put count(column name) column to the end, if it is for avg only { // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. AGG_MAP aggDupFuncMap; projColsUDAFIdx = 0; // copy over the groupby vector // update the outputColumnIndex if returned for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1)); groupByNoDist.push_back(groupby); aggFuncMap.insert(make_pair( boost::make_tuple(keysAgg[i], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), i)); } // locate the return column position in aggregated rowgroup uint64_t outIdx = 0; RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE; uint32_t prevRetKey = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { udafc = NULL; pUDAFFunc = NULL; uint32_t retKey = returnedColVec[i].first; RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); int colAgg = -1; if (aggOp == ROWAGG_MULTI_PARM) { // Duplicate detection doesn't work for multi-parm` // If this function was earlier detected as a duplicate, unduplicate it. SP_ROWAGG_FUNC_t funct = functionVec2.back(); if (funct->fAggFunction == ROWAGG_DUP_FUNCT) funct->fAggFunction = prevAggOp; // Remove it from aggDupFuncMap if it's in there. funct->hasMultiParm = true; AGG_MAP::iterator it = aggDupFuncMap.find(boost::make_tuple( prevRetKey, prevAggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggDupFuncMap.end()) { aggDupFuncMap.erase(it); } // Skip on final agg.: Extra parms for an aggregate have no work there. continue; } else { // Save the op for MULTI_PARM exclusion when COUNT(DISTINCT) prevAggOp = aggOp; prevRetKey = returnedColVec[i].first; } if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != jobInfo.distinctColVec.end()) { AGG_MAP::iterator it = aggFuncMap.find( boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { colAgg = it->second; } else { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[retKey] << "' isn't in tuple."; cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable; if (jobInfo.keyInfo->tupleKeyVec[retKey].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView; cerr << endl; throw QueryDataExcept(emsg.str(), aggregateFuncErr); } } if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Save the multi-parm keys for dup-detection. if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) { for (uint64_t k = i + 1; k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k) { udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); } } break; } } if (it == jobInfo.projectionCols.end()) { throw logic_error( "(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough " "UDAFColumns"); } } switch (aggOp) { case ROWAGG_DISTINCT_AVG: case ROWAGG_DISTINCT_SUM: { if (typeAgg[colAgg] == CalpontSystemCatalog::CHAR || typeAgg[colAgg] == CalpontSystemCatalog::VARCHAR || typeAgg[colAgg] == CalpontSystemCatalog::BLOB || typeAgg[colAgg] == CalpontSystemCatalog::TEXT || typeAgg[colAgg] == CalpontSystemCatalog::DATE || typeAgg[colAgg] == CalpontSystemCatalog::DATETIME || typeAgg[colAgg] == CalpontSystemCatalog::TIMESTAMP || typeAgg[colAgg] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("sum/average"); args.add(colTypeIdString(typeAgg[colAgg])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep1PhaseDistinctAggregate: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } oidsAggDist.push_back(oidsAgg[colAgg]); keysAggDist.push_back(retKey); wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg, typeAggDist, scaleAggDist, precisionAggDist, widthAggDist); csNumAggDist.push_back(8); } break; case ROWAGG_COUNT_DISTINCT_COL_NAME: { oidsAggDist.push_back(oidsAgg[colAgg]); keysAggDist.push_back(retKey); scaleAggDist.push_back(0); // work around count() in select subquery precisionAggDist.push_back(rowgroup::MagicPrecisionForCountAgg); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } break; case ROWAGG_MIN: case ROWAGG_MAX: case ROWAGG_SUM: case ROWAGG_AVG: case ROWAGG_COUNT_ASTERISK: case ROWAGG_COUNT_COL_NAME: case ROWAGG_STATS: case ROWAGG_BIT_AND: case ROWAGG_BIT_OR: case ROWAGG_BIT_XOR: case ROWAGG_SELECT_SOME: default: { AGG_MAP::iterator it = aggFuncMap.find( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { colAgg = it->second; oidsAggDist.push_back(oidsAgg[colAgg]); keysAggDist.push_back(keysAgg[colAgg]); scaleAggDist.push_back(scaleAgg[colAgg]); precisionAggDist.push_back(precisionAgg[colAgg]); typeAggDist.push_back(typeAgg[colAgg]); csNumAggDist.push_back(csNumAgg[colAgg]); uint32_t width = widthAgg[colAgg]; if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY) { TupleInfo ti = getTupleInfo(retKey, jobInfo); if (ti.width > width) width = ti.width; } widthAggDist.push_back(width); } // not a direct hit -- a returned column is not already in the RG from PMs else { uint32_t foundTupleKey{0}; if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey)) { AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple( foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); colAgg = it->second; oidsAggDist.push_back(oidsAgg[colAgg]); keysAggDist.push_back(keysAgg[colAgg]); scaleAggDist.push_back(scaleAgg[colAgg]); precisionAggDist.push_back(precisionAgg[colAgg]); typeAggDist.push_back(typeAgg[colAgg]); csNumAggDist.push_back(csNumAgg[colAgg]); uint32_t width = widthAgg[colAgg]; if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY) { TupleInfo ti = getTupleInfo(retKey, jobInfo); if (ti.width > width) width = ti.width; } widthAggDist.push_back(width); // Update the `retKey` to specify that this column is a duplicate. retKey = foundTupleKey; } else { bool returnColMissing = true; // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { // false alarm returnColMissing = false; colAgg = it->second; if (aggOp == ROWAGG_SUM) { oidsAggDist.push_back(oidsAgg[colAgg]); keysAggDist.push_back(retKey); csNumAggDist.push_back(8); wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg, typeAggDist, scaleAggDist, precisionAggDist, widthAggDist); } else { // leave the count() to avg aggOp = ROWAGG_COUNT_NO_OP; oidsAggDist.push_back(oidsAgg[colAgg]); keysAggDist.push_back(retKey); scaleAggDist.push_back(0); if (isUnsigned(typeAgg[colAgg])) { typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); precisionAggDist.push_back(20); } else { typeAggDist.push_back(CalpontSystemCatalog::BIGINT); precisionAggDist.push_back(19); } csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } } } else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) != jobInfo.expressionVec.end()) { // a function on aggregation TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggDist.push_back(ti.oid); keysAggDist.push_back(retKey); scaleAggDist.push_back(ti.scale); precisionAggDist.push_back(ti.precision); typeAggDist.push_back(ti.dtype); csNumAggDist.push_back(ti.csNum); widthAggDist.push_back(ti.width); returnColMissing = false; } else if (aggOp == ROWAGG_CONSTANT) { TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggDist.push_back(ti.oid); keysAggDist.push_back(retKey); scaleAggDist.push_back(ti.scale); precisionAggDist.push_back(ti.precision); typeAggDist.push_back(ti.dtype); csNumAggDist.push_back(ti.csNum); widthAggDist.push_back(ti.width); returnColMissing = false; } #if 0 else if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY) { TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggDist.push_back(ti.oid); keysAggDist.push_back(retKey); scaleAggDist.push_back(ti.scale); precisionAggDist.push_back(ti.precision); typeAggDist.push_back(ti.dtype); csNumAggDist.push_back(ti.csNum); widthAggDist.push_back(ti.width); returnColMissing = false; } #endif else if (jobInfo.groupConcatInfo.columns().find(retKey) != jobInfo.groupConcatInfo.columns().end()) { // TODO: columns only for group_concat do not needed in result set. for (uint64_t k = 0; k < keysProj.size(); k++) { if (retKey == keysProj[k]) { oidsAggDist.push_back(oidsProj[k]); keysAggDist.push_back(retKey); scaleAggDist.push_back(scaleProj[k] >> 8); precisionAggDist.push_back(precisionProj[k]); typeAggDist.push_back(typeProj[k]); csNumAggDist.push_back(csNumProj[k]); widthAggDist.push_back(widthProj[k]); returnColMissing = false; break; } } } else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end()) { // skip window columns/expression, which are computed later for (uint64_t k = 0; k < keysProj.size(); k++) { if (retKey == keysProj[k]) { oidsAggDist.push_back(oidsProj[k]); keysAggDist.push_back(retKey); scaleAggDist.push_back(scaleProj[k] >> 8); precisionAggDist.push_back(precisionProj[k]); typeAggDist.push_back(typeProj[k]); csNumAggDist.push_back(csNumProj[k]); widthAggDist.push_back(widthProj[k]); returnColMissing = false; break; } } } if (returnColMissing) { Message::Args args; args.add(keyName(outIdx, retKey, jobInfo)); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); cerr << "prep1PhaseDistinctAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp << endl; throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION); } } // else } // switch } } // update groupby vector if the groupby column is a returned column if (returnedColVec[i].second == 0) { int dupGroupbyIndex = -1; for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++) { if (jobInfo.groupByColVec[j] == retKey) { if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t)-1) groupByNoDist[j]->fOutputColumnIndex = outIdx; else dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex; } } // a duplicate group by column if (dupGroupbyIndex != -1) functionVec2.push_back(SP_ROWAGG_FUNC_t( new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex))); } else { // update the aggregate function vector SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, outIdx)); } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colAgg, outIdx)); } if (aggOp == ROWAGG_COUNT_NO_OP) funct->fAuxColumnIndex = colAgg; else if (aggOp == ROWAGG_CONSTANT) funct->fAuxColumnIndex = jobInfo.cntStarPos; functionVec2.push_back(funct); // find if this func is a duplicate AGG_MAP::iterator iter = aggDupFuncMap.find( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (iter != aggDupFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; else if (funct->fAggFunction == ROWAGG_UDAF) funct->fAggFunction = ROWAGG_DUP_UDAF; else funct->fAggFunction = ROWAGG_DUP_FUNCT; funct->fAuxColumnIndex = iter->second; } else { aggDupFuncMap.insert(make_pair( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } if (returnedColVec[i].second == AggregateColumn::AVG) avgFuncMap.insert(make_pair(returnedColVec[i].first, funct)); else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG) avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct)); } ++outIdx; } // for (i // now fix the AVG function, locate the count(column) position for (uint64_t i = 0; i < functionVec2.size(); i++) { if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_NO_OP) { // if the count(k) can be associated with an avg(k) map::iterator k = avgFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]); if (k != avgFuncMap.end()) k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex; } } // there is avg(k), but no count(k) in the select list uint64_t lastCol = outIdx; for (map::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++) { if (k->second->fAuxColumnIndex == (uint32_t)-1) { k->second->fAuxColumnIndex = lastCol++; oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId); keysAggDist.push_back(k->first); scaleAggDist.push_back(0); precisionAggDist.push_back(19); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } } // now fix the AVG distinct function, locate the count(distinct column) position for (uint64_t i = 0; i < functionVec2.size(); i++) { if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME && !functionVec2[i]->hasMultiParm) { // if the count(distinct k) can be associated with an avg(distinct k) map::iterator k = avgDistFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]); if (k != avgDistFuncMap.end()) { k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex; functionVec2[i]->fAggFunction = ROWAGG_COUNT_NO_OP; } } } // there is avg(distinct k), but no count(distinct k) in the select list for (map::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++) { if (k->second->fAuxColumnIndex == (uint32_t)-1) { k->second->fAuxColumnIndex = lastCol++; oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId); keysAggDist.push_back(k->first); scaleAggDist.push_back(0); precisionAggDist.push_back(19); typeAggDist.push_back(CalpontSystemCatalog::BIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } } // add auxiliary fields for UDAF and statistics functions for (uint64_t i = 0; i < functionVec2.size(); i++) { uint64_t j = functionVec2[i]->fInputColumnIndex; if (functionVec2[i]->fAggFunction == ROWAGG_UDAF) { // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVec2[i].get()); if (!udafFuncCol) { throw logic_error( "(4)prep1PhaseDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } functionVec2[i]->fAuxColumnIndex = lastCol++; oidsAggDist.push_back(oidsAgg[j]); // Dummy? keysAggDist.push_back(keysAgg[j]); // Dummy? scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(sizeof(uint64_t)); continue; } if (functionVec2[i]->fAggFunction != ROWAGG_STATS) continue; functionVec2[i]->fAuxColumnIndex = lastCol; // mean(x) oidsAggDist.push_back(oidsAgg[j]); keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggDist.push_back(8); widthAggDist.push_back(sizeof(long double)); ++lastCol; // sum(x_i - mean)^2 oidsAggDist.push_back(oidsAgg[j]); keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(-1); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggDist.push_back(8); widthAggDist.push_back(sizeof(long double)); ++lastCol; } } // calculate the offset and create the rowaggregation, rowgroup posAgg.push_back(2); for (uint64_t i = 0; i < oidsAgg.size(); i++) posAgg.push_back(posAgg[i] + widthAgg[i]); RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg, jobInfo.stringTableThreshold); SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit, false)); rowAgg->timeZone(jobInfo.timeZone); posAggDist.push_back(2); // rid for (uint64_t i = 0; i < oidsAggDist.size(); i++) posAggDist.push_back(posAggDist[i] + widthAggDist[i]); RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, csNumAggDist, scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold); SP_ROWAGG_DIST rowAggDist( new RowAggregationDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit)); rowAggDist->timeZone(jobInfo.timeZone); // mapping the group_concat columns, if any. if (jobInfo.groupConcatInfo.groupConcat().size() > 0) { jobInfo.groupConcatInfo.mapColumns(projRG); rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat()); rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat()); } // if distinct key word applied to more than one aggregate column, reset rowAggDist vector subRgVec; if (jobInfo.distinctColVec.size() > 1) { RowAggregationMultiDistinct* multiDistinctAggregator = new RowAggregationMultiDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit); multiDistinctAggregator->timeZone(jobInfo.timeZone); rowAggDist.reset(multiDistinctAggregator); rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat()); // construct and add sub-aggregators to rowAggDist vector posAggGb, posAggSub; vector oidsAggGb, oidsAggSub; vector keysAggGb, keysAggSub; vector scaleAggGb, scaleAggSub; vector precisionAggGb, precisionAggSub; vector typeAggGb, typeAggSub; vector csNumAggGb, csNumAggSub; vector widthAggGb, widthAggSub; // populate groupby column info for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { oidsAggGb.push_back(oidsProj[i]); keysAggGb.push_back(keysProj[i]); scaleAggGb.push_back(scaleProj[i]); precisionAggGb.push_back(precisionProj[i]); typeAggGb.push_back(typeProj[i]); csNumAggGb.push_back(csNumProj[i]); widthAggGb.push_back(widthProj[i]); } // for distinct, each column requires seperate rowgroup vector rowAggSubDistVec; uint32_t distinctColKey; int64_t j; uint64_t k; uint64_t outIdx = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { if (returnedColVec[i].second == 0) { ++outIdx; continue; } j = -1; distinctColKey = -1; // Find the entry in distinctColVec, if any for (k = 0; k < jobInfo.distinctColVec.size(); k++) { distinctColKey = jobInfo.distinctColVec[k]; if (returnedColVec[i].first == distinctColKey) break; } if (distinctColKey == (uint32_t)-1) { ++outIdx; continue; } // locate the distinct key in the row group for (k = 0; k < keysAgg.size(); k++) { if (keysProj[k] == distinctColKey) { j = k; break; } } idbassert(j != -1); oidsAggSub = oidsAggGb; keysAggSub = keysAggGb; scaleAggSub = scaleAggGb; precisionAggSub = precisionAggGb; typeAggSub = typeAggGb; csNumAggSub = csNumAggGb; widthAggSub = widthAggGb; oidsAggSub.push_back(oidsProj[j]); keysAggSub.push_back(keysProj[j]); scaleAggSub.push_back(scaleProj[j]); precisionAggSub.push_back(precisionProj[j]); typeAggSub.push_back(typeProj[j]); csNumAggSub.push_back(csNumProj[j]); widthAggSub.push_back(widthProj[j]); // construct groupby vector vector groupBySub; k = 0; while (k < jobInfo.groupByColVec.size()) { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k)); groupBySub.push_back(groupby); k++; } // add the distinct column as groupby SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k)); groupBySub.push_back(groupby); // Add multi parm distinct while ((i + 1) < returnedColVec.size() && functionIdMap(returnedColVec[i + 1].second) == ROWAGG_MULTI_PARM) { ++i; uint32_t dColKey = -1; j = -1; // Find the entry in distinctColVec, if any for (k = 0; k < jobInfo.distinctColVec.size(); k++) { dColKey = jobInfo.distinctColVec[k]; if (returnedColVec[i].first == dColKey) break; } idbassert(dColKey != (uint32_t)-1); // locate the distinct key in the row group for (k = 0; k < keysAgg.size(); k++) { if (keysProj[k] == dColKey) { j = k; break; } } idbassert(j != -1); oidsAggSub.push_back(oidsProj[j]); keysAggSub.push_back(keysProj[j]); scaleAggSub.push_back(scaleProj[j]); precisionAggSub.push_back(precisionProj[j]); typeAggSub.push_back(typeProj[j]); csNumAggSub.push_back(csNumProj[j]); widthAggSub.push_back(widthProj[j]); SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k)); groupBySub.push_back(groupby); } // construct sub-rowgroup posAggSub.clear(); posAggSub.push_back(2); // rid for (k = 0; k < oidsAggSub.size(); k++) posAggSub.push_back(posAggSub[k] + widthAggSub[k]); RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub, csNumAggSub, scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold); subRgVec.push_back(subRg); // Keep a count of the parms after the first for any aggregate. // These will be skipped and the count needs to be subtracted // from where the aux column will be. int64_t multiParms = 0; // tricky part : 2 function vectors // -- dummy function vector for sub-aggregator, which does distinct only // -- aggregate function on this distinct column for rowAggDist vector functionSub1, functionSub2; // search the function in functionVec vector::iterator it = functionVec2.begin(); while (it != functionVec2.end()) { SP_ROWAGG_FUNC_t f = *it++; if ((f->fOutputColumnIndex == outIdx) && (f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || f->fAggFunction == ROWAGG_DISTINCT_SUM || f->fAggFunction == ROWAGG_DISTINCT_AVG)) { SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, groupBySub.size() - 1, f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms)); functionSub2.push_back(funct); } } // construct sub-aggregator SP_ROWAGG_UM_t subAgg( new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit)); subAgg->timeZone(jobInfo.timeZone); subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat()); // add to rowAggDist multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2); ++outIdx; } // cover any non-distinct column functions { vector functionSub1 = functionNoDistVec; vector functionSub2; int64_t multiParms = 0; for (uint64_t k = 0; k < returnedColVec.size(); k++) { // search non-distinct functions in functionVec vector::iterator it = functionVec2.begin(); while (it != functionVec2.end()) { SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t f = *it++; if (f->fAggFunction == ROWAGG_UDAF) { RowUDAFFunctionCol* udafFuncCol = dynamic_cast(f.get()); funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex, udafFuncCol->fOutputColumnIndex, udafFuncCol->fAuxColumnIndex - multiParms)); functionSub2.push_back(funct); } else if ((f->fOutputColumnIndex == k) && (f->fAggFunction == ROWAGG_COUNT_ASTERISK || f->fAggFunction == ROWAGG_COUNT_COL_NAME || f->fAggFunction == ROWAGG_SUM || f->fAggFunction == ROWAGG_AVG || f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX || f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND || f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR || f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT || f->fAggFunction == ROWAGG_JSON_ARRAY || f->fAggFunction == ROWAGG_SELECT_SOME)) { funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex, f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms)); functionSub2.push_back(funct); } } } if (functionSub1.size() > 0) { // make sure the group by columns are available for next aggregate phase. vector groupBySubNoDist; for (uint64_t i = 0; i < groupByNoDist.size(); i++) groupBySubNoDist.push_back( SP_ROWAGG_GRPBY_t(new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i))); // construct sub-aggregator SP_ROWAGG_UM_t subAgg( new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false)); subAgg->timeZone(jobInfo.timeZone); subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat()); // add to rowAggDist multiDistinctAggregator->addSubAggregator(subAgg, aggRG, functionSub2); subRgVec.push_back(aggRG); } } } rowAggDist->addAggregator(rowAgg, aggRG); rowgroups.push_back(aggRgDist); aggregators.push_back(rowAggDist); if (jobInfo.trace) { cout << "projected RG: " << projRG.toString() << endl << "aggregated RG: " << aggRG.toString() << endl; for (uint64_t i = 0; i < subRgVec.size(); i++) cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl; cout << "aggregatedDist RG: " << aggRgDist.toString() << endl; } } void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector& rowgroups, vector& aggregators) { // check if there are any aggregate columns // a vector that has the aggregate function to be done by PM vector> aggColVec; set avgSet; vector>& returnedColVec = jobInfo.returnedColVec; // For UDAF uint32_t projColsUDAFIdx = 0; uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; for (uint64_t i = 0; i < returnedColVec.size(); i++) { // skip if not an aggregation column if (returnedColVec[i].second == 0) continue; aggColVec.push_back(returnedColVec[i]); // remember if a column has an average function, // with avg function, no need for separate sum or count_column_name if (returnedColVec[i].second == AggregateColumn::AVG) avgSet.insert(returnedColVec[i].first); } // populate the aggregate rowgroup on PM and UM // PM: projectedRG -> aggregateRGPM // UM: aggregateRGPM -> aggregateRGUM // // Aggregate preparation by joblist factory: // 1. get projected rowgroup (done by doAggProject) -- input to PM AGG // 2. construct aggregate rowgroup -- output of PM, input of UM // 3. construct aggregate rowgroup -- output of UM const RowGroup projRG = rowgroups[0]; const vector& oidsProj = projRG.getOIDs(); const vector& keysProj = projRG.getKeys(); const vector& scaleProj = projRG.getScale(); const vector& precisionProj = projRG.getPrecision(); const vector& typeProj = projRG.getColTypes(); const vector& csNumProj = projRG.getCharsetNumbers(); vector posAggPm, posAggUm; vector oidsAggPm, oidsAggUm; vector keysAggPm, keysAggUm; vector scaleAggPm, scaleAggUm; vector precisionAggPm, precisionAggUm; vector typeAggPm, typeAggUm; vector csNumAggPm, csNumAggUm; vector widthAggPm, widthAggUm; vector groupByPm, groupByUm; vector functionVecPm, functionVecUm; uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigUintWidth = sizeof(uint64_t); AGG_MAP aggFuncMap; // associate the columns between projected RG and aggregate RG on PM // populated the aggregate columns // the groupby columns are put in front, even not a returned column // sum and count(column name) are omitted, if avg present { // project only unique oids, but they may be repeated in aggregation // collect the projected column info, prepare for aggregation vector width; map projColPosMap; for (uint64_t i = 0; i < keysProj.size(); i++) { projColPosMap.insert(make_pair(keysProj[i], i)); width.push_back(projRG.getColumnWidth(i)); } // column index for PM aggregate rowgroup uint64_t colAggPm = 0; // for groupby column for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { uint32_t key = jobInfo.groupByColVec[i]; if (projColPosMap.find(key) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple."; cerr << "prep2PhasesAggregate: groupby " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable; if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView; cerr << endl; throw logic_error(emsg.str()); } uint64_t colProj = projColPosMap[key]; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm)); groupByPm.push_back(groupby); // PM: just copy down to aggregation rowgroup oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(key); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(precisionProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } // for distinct column for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++) { uint32_t key = jobInfo.distinctColVec[i]; if (projColPosMap.find(key) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple."; cerr << "prep2PhasesAggregate: distinct " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable; if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView; cerr << endl; throw logic_error(emsg.str()); } uint64_t colProj = projColPosMap[key]; // check for dup distinct column -- @bug6126 if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end()) continue; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm)); groupByPm.push_back(groupby); // PM: just copy down to aggregation rowgroup oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(key); scaleAggPm.push_back(scaleProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); precisionAggPm.push_back(precisionProj[colProj]); aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } // vectors for aggregate functions for (uint64_t i = 0; i < aggColVec.size(); i++) { pUDAFFunc = NULL; uint32_t aggKey = aggColVec[i].first; RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second); // skip on PM if this is a constant if (aggOp == ROWAGG_CONSTANT) continue; if (projColPosMap.find(aggKey) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple."; cerr << "prep2PhasesAggregate: aggregate " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable; if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView; cerr << endl; throw logic_error(emsg.str()); } if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && (avgSet.find(aggKey) != avgSet.end())) // skip sum / count(column) if avg is also selected continue; uint64_t colProj = projColPosMap[aggKey]; SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Save the multi-parm keys for dup-detection. if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) { for (uint64_t k = i + 1; k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k) { udafc->getContext().getParamKeys()->push_back(aggColVec[k].first); } } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); break; } } if (it == jobInfo.projectionCols.end()) { throw logic_error( "(1)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); } } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); } // skip if this is a duplicate if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM && aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end()) { // skip if this is a duplicate continue; } functionVecPm.push_back(funct); aggFuncMap.insert(make_pair( boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); switch (aggOp) { case ROWAGG_MIN: case ROWAGG_MAX: case ROWAGG_SELECT_SOME: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(precisionProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); colAggPm++; } break; case ROWAGG_SUM: case ROWAGG_AVG: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("sum/average"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep2PhasesAggregate: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAggPm, scaleAggPm, precisionAggPm, widthAggPm); oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); csNumAggPm.push_back(8); colAggPm++; } // PM: put the count column for avg next to the sum // let fall through to add a count column for average function if (aggOp != ROWAGG_AVG) break; // The AVG aggregation has a special treatment everywhere. // This is so because AVG(column) is a SUM(column)/COUNT(column) // and these aggregations can be utilized by AVG, if present. /* fall through */ case ROWAGG_COUNT_ASTERISK: case ROWAGG_COUNT_COL_NAME: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); // work around count() in select subquery precisionAggPm.push_back(rowgroup::MagicPrecisionForCountAgg); typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggPm.push_back(8); widthAggPm.push_back(bigIntWidth); colAggPm++; } break; case ROWAGG_STATS: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("variance/standard deviation"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep2PhaseAggregate:: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } // counts(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(0); typeAggPm.push_back(CalpontSystemCatalog::DOUBLE); csNumAggPm.push_back(8); widthAggPm.push_back(sizeof(double)); funct->fAuxColumnIndex = ++colAggPm; // mean(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(-1); typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggPm.push_back(8); widthAggPm.push_back(sizeof(long double)); ++colAggPm; // sum(x_i - mean)^2 oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(-1); typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggPm.push_back(8); widthAggPm.push_back(sizeof(long double)); ++colAggPm; } break; case ROWAGG_BIT_AND: case ROWAGG_BIT_OR: case ROWAGG_BIT_XOR: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(-16); // for connector to skip null check typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggPm.push_back(8); widthAggPm.push_back(bigIntWidth); colAggPm++; } break; case ROWAGG_UDAF: { // Return column RowUDAFFunctionCol* udafFuncCol = dynamic_cast(funct.get()); if (!udafFuncCol) { throw logic_error( "(2)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale()); precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision()); typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType()); csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber()); widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth()); ++colAggPm; // Column for index of UDAF UserData struct oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(0); typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggPm.push_back(8); widthAggPm.push_back(bigUintWidth); funct->fAuxColumnIndex = colAggPm++; // If the first param is const udafcParamIdx = 0; ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } ++udafcParamIdx; break; } case ROWAGG_MULTI_PARM: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(precisionProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); colAggPm++; // If the param is const if (udafc) { if (udafcParamIdx > udafc->aggParms().size() - 1) { throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with too many parms", aggregateFuncErr); } ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } } else { throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with no parms", aggregateFuncErr); } ++udafcParamIdx; } break; default: { ostringstream emsg; emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported"; cerr << "prep2PhasesAggregate: " << emsg.str() << endl; throw QueryDataExcept(emsg.str(), aggregateFuncErr); } } } } // associate the columns between the aggregate RGs on PM and UM // populated the returned columns // remove not returned groupby column // add back sum or count(column name) if omitted due to avg column // put count(column name) column to the end, if it is for avg only { // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. map avgFuncMap; AGG_MAP aggDupFuncMap; projColsUDAFIdx = 0; // copy over the groupby vector // update the outputColumnIndex if returned for (uint64_t i = 0; i < groupByPm.size(); i++) { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(groupByPm[i]->fOutputColumnIndex, -1)); groupByUm.push_back(groupby); } // locate the return column position in aggregated rowgroup from PM // outIdx is i without the multi-columns, uint64_t outIdx = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { uint32_t retKey = returnedColVec[i].first; RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); int colPm = -1; if (aggOp == ROWAGG_MULTI_PARM) { // Skip on UM: Extra parms for an aggregate have no work on the UM continue; } // Is this a UDAF? use the function as part of the key. pUDAFFunc = NULL; udafc = NULL; if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Save the multi-parm keys for dup-detection. if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) { for (uint64_t k = i + 1; k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k) { udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); } } break; } } if (it == jobInfo.projectionCols.end()) { throw logic_error( "(3)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns"); } } AGG_MAP::iterator it = aggFuncMap.find( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { colPm = it->second; oidsAggUm.push_back(oidsAggPm[colPm]); keysAggUm.push_back(retKey); scaleAggUm.push_back(scaleAggPm[colPm]); precisionAggUm.push_back(precisionAggPm[colPm]); typeAggUm.push_back(typeAggPm[colPm]); csNumAggUm.push_back(csNumAggPm[colPm]); widthAggUm.push_back(widthAggPm[colPm]); } // not a direct hit -- a returned column is not already in the RG from PMs else { // MCOL-5476. uint32_t foundTupleKey{0}; if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey)) { AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple( foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); colPm = it->second; oidsAggUm.push_back(oidsAggPm[colPm]); keysAggUm.push_back(retKey); scaleAggUm.push_back(scaleAggPm[colPm]); precisionAggUm.push_back(precisionAggPm[colPm]); typeAggUm.push_back(typeAggPm[colPm]); csNumAggUm.push_back(csNumAggPm[colPm]); widthAggUm.push_back(widthAggPm[colPm]); // Update the `retKey` to specify that this column is a duplicate. retKey = foundTupleKey; } else { bool returnColMissing = true; // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { // false alarm returnColMissing = false; colPm = it->second; if (aggOp == ROWAGG_SUM) { wideDecimalOrLongDouble(colPm, typeAggPm[colPm], precisionAggPm, scaleAggPm, widthAggPm, typeAggUm, scaleAggUm, precisionAggUm, widthAggUm); oidsAggUm.push_back(oidsAggPm[colPm]); keysAggUm.push_back(retKey); csNumAggUm.push_back(8); } else { // leave the count() to avg aggOp = ROWAGG_COUNT_NO_OP; colPm++; oidsAggUm.push_back(oidsAggPm[colPm]); keysAggUm.push_back(retKey); scaleAggUm.push_back(0); precisionAggUm.push_back(19); typeAggUm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggUm.push_back(8); widthAggUm.push_back(bigIntWidth); } } } else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) != jobInfo.expressionVec.end()) { // a function on aggregation TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggUm.push_back(ti.oid); keysAggUm.push_back(retKey); scaleAggUm.push_back(ti.scale); precisionAggUm.push_back(ti.precision); typeAggUm.push_back(ti.dtype); csNumAggUm.push_back(ti.csNum); widthAggUm.push_back(ti.width); returnColMissing = false; } else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end()) { // an window function TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggUm.push_back(ti.oid); keysAggUm.push_back(retKey); scaleAggUm.push_back(ti.scale); precisionAggUm.push_back(ti.precision); typeAggUm.push_back(ti.dtype); csNumAggUm.push_back(ti.csNum); widthAggUm.push_back(ti.width); returnColMissing = false; } else if (aggOp == ROWAGG_CONSTANT) { TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggUm.push_back(ti.oid); keysAggUm.push_back(retKey); scaleAggUm.push_back(ti.scale); precisionAggUm.push_back(ti.precision); typeAggUm.push_back(ti.dtype); csNumAggUm.push_back(ti.csNum); widthAggUm.push_back(ti.width); returnColMissing = false; } if (returnColMissing) { Message::Args args; args.add(keyName(outIdx, retKey, jobInfo)); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); cerr << "prep2PhasesAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp << endl; throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION); } } } // update groupby vector if the groupby column is a returned column if (returnedColVec[i].second == 0) { int dupGroupbyIndex = -1; for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++) { if (jobInfo.groupByColVec[j] == retKey) { if (groupByUm[j]->fOutputColumnIndex == (uint32_t)-1) groupByUm[j]->fOutputColumnIndex = outIdx; else dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex; } } for (uint64_t j = 0; j < jobInfo.distinctColVec.size(); j++) { if (jobInfo.distinctColVec[j] == retKey) { if (groupByUm[j]->fOutputColumnIndex == (uint32_t)-1) groupByUm[j]->fOutputColumnIndex = outIdx; else dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex; } } // a duplicate group by column if (dupGroupbyIndex != -1) { functionVecUm.push_back(SP_ROWAGG_FUNC_t( new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex))); } } else { // update the aggregate function vector SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, outIdx)); } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, outIdx)); } if (aggOp == ROWAGG_COUNT_NO_OP) funct->fAuxColumnIndex = colPm; else if (aggOp == ROWAGG_CONSTANT) funct->fAuxColumnIndex = jobInfo.cntStarPos; functionVecUm.push_back(funct); // find if this func is a duplicate AGG_MAP::iterator iter = aggDupFuncMap.find( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (iter != aggDupFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; else if (funct->fAggFunction == ROWAGG_UDAF) funct->fAggFunction = ROWAGG_DUP_UDAF; else funct->fAggFunction = ROWAGG_DUP_FUNCT; funct->fAuxColumnIndex = iter->second; } else { aggDupFuncMap.insert(make_pair( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } if (returnedColVec[i].second == AggregateColumn::AVG) avgFuncMap.insert(make_pair(returnedColVec[i].first, funct)); } ++outIdx; } // now fix the AVG function, locate the count(column) position for (uint64_t i = 0; i < functionVecUm.size(); i++) { if (functionVecUm[i]->fAggFunction != ROWAGG_COUNT_NO_OP) continue; // if the count(k) can be associated with an avg(k) map::iterator k = avgFuncMap.find(keysAggUm[functionVecUm[i]->fOutputColumnIndex]); if (k != avgFuncMap.end()) k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex; } // there is avg(k), but no count(k) in the select list uint64_t lastCol = outIdx; for (map::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++) { if (k->second->fAuxColumnIndex == (uint32_t)-1) { k->second->fAuxColumnIndex = lastCol++; oidsAggUm.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId); keysAggUm.push_back(k->first); scaleAggUm.push_back(0); precisionAggUm.push_back(19); typeAggUm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggUm.push_back(8); widthAggUm.push_back(bigIntWidth); } } // add auxiliary fields for UDAF and statistics functions for (uint64_t i = 0; i < functionVecUm.size(); i++) { uint64_t j = functionVecUm[i]->fInputColumnIndex; if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF) { // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVecUm[i].get()); if (!udafFuncCol) { throw logic_error( "(4)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } functionVecUm[i]->fAuxColumnIndex = lastCol++; oidsAggUm.push_back(oidsAggPm[j]); // Dummy? keysAggUm.push_back(keysAggPm[j]); // Dummy? scaleAggUm.push_back(0); precisionAggUm.push_back(0); typeAggUm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggUm.push_back(8); widthAggUm.push_back(bigUintWidth); continue; } if (functionVecUm[i]->fAggFunction != ROWAGG_STATS) continue; functionVecUm[i]->fAuxColumnIndex = lastCol; // mean(x) oidsAggUm.push_back(oidsAggPm[j]); keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); precisionAggUm.push_back(-1); typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggUm.push_back(8); widthAggUm.push_back(sizeof(long double)); ++lastCol; // sum(x_i - mean)^2 oidsAggUm.push_back(oidsAggPm[j]); keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); precisionAggUm.push_back(-1); typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggUm.push_back(8); widthAggUm.push_back(sizeof(long double)); ++lastCol; } } // calculate the offset and create the rowaggregations, rowgroups posAggUm.push_back(2); // rid for (uint64_t i = 0; i < oidsAggUm.size(); i++) posAggUm.push_back(posAggUm[i] + widthAggUm[i]); RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold); SP_ROWAGG_UM_t rowAggUm( new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit, false)); rowAggUm->timeZone(jobInfo.timeZone); rowgroups.push_back(aggRgUm); aggregators.push_back(rowAggUm); posAggPm.push_back(2); // rid for (uint64_t i = 0; i < oidsAggPm.size(); i++) posAggPm.push_back(posAggPm[i] + widthAggPm[i]); RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm, csNumAggPm, scaleAggPm, precisionAggPm, jobInfo.stringTableThreshold); SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm, nullptr, nullptr, jobInfo.hasRollup)); rowAggPm->timeZone(jobInfo.timeZone); rowgroups.push_back(aggRgPm); aggregators.push_back(rowAggPm); if (jobInfo.trace) cout << "\n====== Aggregation RowGroups ======" << endl << "projected RG: " << projRG.toString() << endl << "aggregated1 RG: " << aggRgPm.toString() << endl << "aggregated2 RG: " << aggRgUm.toString() << endl; } void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector& rowgroups, vector& aggregators) { // check if there are any aggregate columns // a vector that has the aggregate function to be done by PM vector> aggColVec, aggNoDistColVec; set avgSet, avgDistSet; vector>& returnedColVec = jobInfo.returnedColVec; // For UDAF uint32_t projColsUDAFIdx = 0; uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; for (uint64_t i = 0; i < returnedColVec.size(); i++) { // col should be an aggregate or groupBy or window function uint32_t rtcKey = returnedColVec[i].first; uint32_t rtcOp = returnedColVec[i].second; if (rtcOp == 0 && find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), rtcKey) != jobInfo.distinctColVec.end() && find(jobInfo.groupByColVec.begin(), jobInfo.groupByColVec.end(), rtcKey) == jobInfo.groupByColVec.end() && jobInfo.windowSet.find(rtcKey) != jobInfo.windowSet.end()) { Message::Args args; args.add(keyName(i, rtcKey, jobInfo)); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[rtcKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[rtcKey].fTable << ", view=" << jobInfo.keyInfo->tupleKeyVec[rtcKey].fView << endl; throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION); } // skip if not an aggregation column if (returnedColVec[i].second == 0) continue; aggColVec.push_back(returnedColVec[i]); // remember if a column has an average function, // with avg function, no need for separate sum or count_column_name if (returnedColVec[i].second == AggregateColumn::AVG) avgSet.insert(returnedColVec[i].first); if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG) avgDistSet.insert(returnedColVec[i].first); } // populate the aggregate rowgroup on PM and UM // PM: projectedRG -> aggregateRGPM // UM: aggregateRGPM -> aggregateRGUM // // Aggregate preparation by joblist factory: // 1. get projected rowgroup (done by doAggProject) -- input to PM AGG // 2. construct aggregate rowgroup -- output of PM, input of UM // 3. construct aggregate rowgroup -- output of UM // 4. construct aggregate rowgroup -- output of distinct aggregates const RowGroup projRG = rowgroups[0]; const vector& oidsProj = projRG.getOIDs(); const vector& keysProj = projRG.getKeys(); const vector& scaleProj = projRG.getScale(); const vector& precisionProj = projRG.getPrecision(); const vector& typeProj = projRG.getColTypes(); const vector& csNumProj = projRG.getCharsetNumbers(); vector posAggPm, posAggUm, posAggDist; vector oidsAggPm, oidsAggUm, oidsAggDist; vector keysAggPm, keysAggUm, keysAggDist; vector scaleAggPm, scaleAggUm, scaleAggDist; vector precisionAggPm, precisionAggUm, precisionAggDist; vector typeAggPm, typeAggUm, typeAggDist; vector csNumAggPm, csNumAggUm, csNumAggDist; vector widthAggPm, widthAggUm, widthAggDist; vector groupByPm, groupByUm, groupByNoDist; vector functionVecPm, functionNoDistVec, functionVecUm; list multiParmIndexes; uint32_t bigIntWidth = sizeof(int64_t); map, uint64_t> avgFuncDistMap; AGG_MAP aggFuncMap; // associate the columns between projected RG and aggregate RG on PM // populated the aggregate columns // the groupby columns are put in front, even not a returned column // sum and count(column name) are omitted, if avg present { // project only unique oids, but they may be repeated in aggregation // collect the projected column info, prepare for aggregation vector width; map projColPosMap; for (uint64_t i = 0; i < keysProj.size(); i++) { projColPosMap.insert(make_pair(keysProj[i], i)); width.push_back(projRG.getColumnWidth(i)); } // column index for PM aggregate rowgroup uint64_t colAggPm = 0; uint64_t multiParm = 0; // for groupby column for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { uint32_t key = jobInfo.groupByColVec[i]; if (projColPosMap.find(key) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple."; cerr << "prep2PhasesDistinctAggregate: group " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable; if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView; cerr << endl; throw logic_error(emsg.str()); } uint64_t colProj = projColPosMap[key]; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm)); groupByPm.push_back(groupby); // PM: just copy down to aggregation rowgroup oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(key); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(precisionProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } // for distinct column for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++) { uint32_t key = jobInfo.distinctColVec[i]; if (projColPosMap.find(key) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple."; cerr << "prep2PhasesDistinctAggregate: distinct " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable; if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView; cerr << endl; throw logic_error(emsg.str()); } // check for dup distinct column -- @bug6126 if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end()) continue; uint64_t colProj = projColPosMap[key]; SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm)); groupByPm.push_back(groupby); // PM: just copy down to aggregation rowgroup oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(key); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(precisionProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm)); colAggPm++; } // vectors for aggregate functions RowAggFunctionType aggOp = ROWAGG_FUNCT_UNDEFINE; RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE; for (uint64_t i = 0; i < aggColVec.size(); i++) { aggOp = functionIdMap(aggColVec[i].second); // Save the op for MULTI_PARM exclusion when COUNT(DISTINCT) if (aggOp != ROWAGG_MULTI_PARM) prevAggOp = aggOp; // skip on PM if this is a constant if (aggOp == ROWAGG_CONSTANT) continue; pUDAFFunc = NULL; uint32_t aggKey = aggColVec[i].first; if (projColPosMap.find(aggKey) == projColPosMap.end()) { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple."; cerr << "prep2PhasesDistinctAggregate: aggregate " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable; if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0) cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView; cerr << endl; throw logic_error(emsg.str()); } RowAggFunctionType stats = statsFuncIdMap(aggOp); // skip sum / count(column) if avg is also selected if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && (avgSet.find(aggKey) != avgSet.end())) continue; // We skip distinct aggs, including extra parms. These are handled by adding them to group by list // above. if (aggOp == ROWAGG_DISTINCT_SUM || aggOp == ROWAGG_DISTINCT_AVG || aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME) continue; if (aggOp == ROWAGG_MULTI_PARM && prevAggOp == ROWAGG_COUNT_DISTINCT_COL_NAME) continue; uint64_t colProj = projColPosMap[aggKey]; SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Save the multi-parm keys for dup-detection. if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) { for (uint64_t k = i + 1; k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k) { udafc->getContext().getParamKeys()->push_back(aggColVec[k].first); } } // Create a RowAggFunctionCol (UDAF subtype) with the context. funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); break; } } if (it == jobInfo.projectionCols.end()) { throw logic_error( "(1)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough " "UDAFColumns"); } } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); } // skip if this is a duplicate if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM && aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)) != aggFuncMap.end()) { // skip if this is a duplicate continue; } functionVecPm.push_back(funct); aggFuncMap.insert(make_pair( boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), colAggPm - multiParm)); switch (aggOp) { case ROWAGG_MIN: case ROWAGG_MAX: case ROWAGG_SELECT_SOME: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(precisionProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); colAggPm++; } break; case ROWAGG_SUM: case ROWAGG_AVG: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("sum/average"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep2PhasesDistinctAggregate: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); csNumAggPm.push_back(8); wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAggPm, scaleAggPm, precisionAggPm, widthAggPm); colAggPm++; } // PM: put the count column for avg next to the sum // let fall through to add a count column for average function if (aggOp == ROWAGG_AVG) funct->fAuxColumnIndex = colAggPm; else break; /* fall through */ case ROWAGG_COUNT_ASTERISK: case ROWAGG_COUNT_COL_NAME: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); // work around count() in select subquery precisionAggPm.push_back(rowgroup::MagicPrecisionForCountAgg); if (isUnsigned(typeProj[colProj])) { typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); } else { typeAggPm.push_back(CalpontSystemCatalog::BIGINT); } csNumAggPm.push_back(8); widthAggPm.push_back(bigIntWidth); colAggPm++; } break; case ROWAGG_STATS: { if (typeProj[colProj] == CalpontSystemCatalog::CHAR || typeProj[colProj] == CalpontSystemCatalog::VARCHAR || typeProj[colProj] == CalpontSystemCatalog::BLOB || typeProj[colProj] == CalpontSystemCatalog::TEXT || typeProj[colProj] == CalpontSystemCatalog::DATE || typeProj[colProj] == CalpontSystemCatalog::DATETIME || typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP || typeProj[colProj] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("variance/standard deviation"); args.add(colTypeIdString(typeProj[colProj])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep2PhasesDistinctAggregate:: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } // count(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(0); typeAggPm.push_back(CalpontSystemCatalog::DOUBLE); csNumAggPm.push_back(8); widthAggPm.push_back(sizeof(double)); funct->fAuxColumnIndex = ++colAggPm; // mean(x) oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(-1); typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggPm.push_back(8); widthAggPm.push_back(sizeof(long double)); ++colAggPm; // sum(x_i - mean)^2 oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(-1); typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggPm.push_back(8); widthAggPm.push_back(sizeof(long double)); ++colAggPm; } break; case ROWAGG_BIT_AND: case ROWAGG_BIT_OR: case ROWAGG_BIT_XOR: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(-16); // for connector to skip null check typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggPm.push_back(8); widthAggPm.push_back(bigIntWidth); ++colAggPm; } break; case ROWAGG_UDAF: { RowUDAFFunctionCol* udafFuncCol = dynamic_cast(funct.get()); if (!udafFuncCol) { throw logic_error( "(2)prep2PhasesDistinctAggregate: A UDAF function is called but there's no " "RowUDAFFunctionCol"); } // Return column oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale()); precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision()); typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType()); csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber()); widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth()); ++colAggPm; // Column for index of UDAF UserData struct oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(0); typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); csNumAggPm.push_back(8); widthAggPm.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAggPm++; // If the first param is const udafcParamIdx = 0; ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } ++udafcParamIdx; break; } case ROWAGG_MULTI_PARM: { oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(scaleProj[colProj]); precisionAggPm.push_back(precisionProj[colProj]); typeAggPm.push_back(typeProj[colProj]); csNumAggPm.push_back(csNumProj[colProj]); widthAggPm.push_back(width[colProj]); multiParmIndexes.push_back(colAggPm); ++colAggPm; ++multiParm; // If the param is const if (udafc) { if (udafcParamIdx > udafc->aggParms().size() - 1) { throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with too many parms", aggregateFuncErr); } ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); if (cc) { funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; } } else if (prevAggOp != ROWAGG_COUNT_DISTINCT_COL_NAME) { throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with no parms", aggregateFuncErr); } ++udafcParamIdx; } break; default: { ostringstream emsg; emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported"; cerr << "prep2PhasesDistinctAggregate: " << emsg.str() << endl; throw QueryDataExcept(emsg.str(), aggregateFuncErr); } } } } // associate the columns between the aggregate RGs on PM and UM without distinct aggregator // populated the returned columns { int64_t multiParms = 0; for (uint32_t idx = 0; idx < groupByPm.size(); idx++) { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(idx, idx)); groupByUm.push_back(groupby); } for (uint32_t idx = 0; idx < functionVecPm.size(); idx++) { SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx]; if (funcPm->fAggFunction == ROWAGG_MULTI_PARM) { // Skip on UM: Extra parms for an aggregate have no work on the UM ++multiParms; continue; } if (funcPm->fAggFunction == ROWAGG_UDAF) { RowUDAFFunctionCol* udafFuncCol = dynamic_cast(funcPm.get()); if (!udafFuncCol) { throw logic_error( "(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fOutputColumnIndex, udafFuncCol->fOutputColumnIndex - multiParms, udafFuncCol->fAuxColumnIndex - multiParms)); functionNoDistVec.push_back(funct); pUDAFFunc = udafFuncCol->fUDAFContext.getFunction(); } else { funct.reset(new RowAggFunctionCol(funcPm->fAggFunction, funcPm->fStatsFunction, funcPm->fOutputColumnIndex, funcPm->fOutputColumnIndex - multiParms, funcPm->fAuxColumnIndex - multiParms)); functionNoDistVec.push_back(funct); pUDAFFunc = NULL; } } // Copy over the PM arrays to the UM. Skip any that are a multi-parm entry. for (uint32_t idx = 0; idx < oidsAggPm.size(); ++idx) { if (find(multiParmIndexes.begin(), multiParmIndexes.end(), idx) != multiParmIndexes.end()) { continue; } oidsAggUm.push_back(oidsAggPm[idx]); keysAggUm.push_back(keysAggPm[idx]); scaleAggUm.push_back(scaleAggPm[idx]); precisionAggUm.push_back(precisionAggPm[idx]); widthAggUm.push_back(widthAggPm[idx]); typeAggUm.push_back(typeAggPm[idx]); csNumAggUm.push_back(csNumAggPm[idx]); } } // associate the columns between the aggregate RGs on UM and Distinct // populated the returned columns // remove not returned groupby column // add back sum or count(column name) if omitted due to avg column // put count(column name) column to the end, if it is for avg only { // Keep a count of the parms after the first for any aggregate. // These will be skipped and the count needs to be subtracted // from where the aux column will be. projColsUDAFIdx = 0; // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. map avgFuncMap, avgDistFuncMap; AGG_MAP aggDupFuncMap; // copy over the groupby vector for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1)); groupByNoDist.push_back(groupby); } // locate the return column position in aggregated rowgroup from PM // outIdx is i without the multi-columns, uint64_t outIdx = 0; RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE; uint32_t prevRetKey = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { pUDAFFunc = NULL; udafc = NULL; uint32_t retKey = returnedColVec[i].first; RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second); RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second); int colUm = -1; if (aggOp == ROWAGG_MULTI_PARM) { // Duplicate detection doesn't work for multi-parm` // If this function was earlier detected as a duplicate, unduplicate it. SP_ROWAGG_FUNC_t funct = functionVecUm.back(); if (funct->fAggFunction == ROWAGG_DUP_FUNCT) funct->fAggFunction = prevAggOp; // Remove it from aggDupFuncMap if it's in there. funct->hasMultiParm = true; AGG_MAP::iterator it = aggDupFuncMap.find(boost::make_tuple( prevRetKey, prevAggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggDupFuncMap.end()) { aggDupFuncMap.erase(it); } // Skip further UM porocessing of the multi-parm: Extra parms for an aggregate have no work on the UM continue; } else { // Save the op for MULTI_PARM exclusion when COUNT(DISTINCT) prevAggOp = aggOp; prevRetKey = returnedColVec[i].first; } if (aggOp == ROWAGG_UDAF) { std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); // Save the multi-parm keys for dup-detection. if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0) { for (uint64_t k = i + 1; k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k) { udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first); } } break; } } if (it == jobInfo.projectionCols.end()) { throw logic_error( "(4)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough " "UDAFColumns"); } } if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) != jobInfo.distinctColVec.end()) { AGG_MAP::iterator it = aggFuncMap.find( boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { colUm = it->second; } } if (colUm > -1) // Means we found a DISTINCT and have a column number { switch (aggOp) { case ROWAGG_DISTINCT_AVG: // avgFuncMap.insert(make_pair(key, funct)); case ROWAGG_DISTINCT_SUM: { if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR || typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR || typeAggUm[colUm] == CalpontSystemCatalog::BLOB || typeAggUm[colUm] == CalpontSystemCatalog::TEXT || typeAggUm[colUm] == CalpontSystemCatalog::DATE || typeAggUm[colUm] == CalpontSystemCatalog::DATETIME || typeAggUm[colUm] == CalpontSystemCatalog::TIMESTAMP || typeAggUm[colUm] == CalpontSystemCatalog::TIME) { Message::Args args; args.add("sum/average"); args.add(colTypeIdString(typeAggUm[colUm])); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args); cerr << "prep2PhasesDistinctAggregate: " << emsg << endl; throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT); } oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(retKey); csNumAggDist.push_back(8); wideDecimalOrLongDouble(colUm, typeAggPm[colUm], precisionAggPm, scaleAggPm, widthAggPm, typeAggDist, scaleAggDist, precisionAggDist, widthAggDist); } // PM: put the count column for avg next to the sum // let fall through to add a count column for average function // if (aggOp != ROWAGG_DISTINCT_AVG) break; case ROWAGG_COUNT_DISTINCT_COL_NAME: { oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(retKey); scaleAggDist.push_back(0); // work around count() in select subquery precisionAggDist.push_back(rowgroup::MagicPrecisionForCountAgg); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } break; default: // could happen if agg and agg distinct use same column. colUm = -1; break; } // switch } // For non distinct aggregates if (colUm == -1) { AGG_MAP::iterator it = aggFuncMap.find( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { colUm = it->second; oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(keysAggUm[colUm]); scaleAggDist.push_back(scaleAggUm[colUm]); precisionAggDist.push_back(precisionAggUm[colUm]); typeAggDist.push_back(typeAggUm[colUm]); csNumAggDist.push_back(csNumAggUm[colUm]); widthAggDist.push_back(widthAggUm[colUm]); } // not a direct hit -- a returned column is not already in the RG from PMs else { // MCOL-5476. uint32_t foundTupleKey{0}; if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey)) { AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple( foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); colUm = it->second; oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(keysAggUm[colUm]); scaleAggDist.push_back(scaleAggUm[colUm]); precisionAggDist.push_back(precisionAggUm[colUm]); typeAggDist.push_back(typeAggUm[colUm]); csNumAggDist.push_back(csNumAggUm[colUm]); widthAggDist.push_back(widthAggUm[colUm]); // Update the `retKey` to specify that this column is a duplicate. retKey = foundTupleKey; } else { // here bool returnColMissing = true; // check if a SUM or COUNT covered by AVG if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) { it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (it != aggFuncMap.end()) { // false alarm returnColMissing = false; colUm = it->second; if (aggOp == ROWAGG_SUM) { oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(retKey); csNumAggDist.push_back(8); wideDecimalOrLongDouble(colUm, typeAggUm[colUm], precisionAggUm, scaleAggUm, widthAggUm, typeAggDist, scaleAggDist, precisionAggDist, widthAggDist); } else { // leave the count() to avg aggOp = ROWAGG_COUNT_NO_OP; oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(retKey); scaleAggDist.push_back(0); if (isUnsigned(typeAggUm[colUm])) { precisionAggDist.push_back(20); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); } else { precisionAggDist.push_back(19); typeAggDist.push_back(CalpontSystemCatalog::BIGINT); } csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } } } else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) != jobInfo.expressionVec.end()) { // a function on aggregation TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggDist.push_back(ti.oid); keysAggDist.push_back(retKey); scaleAggDist.push_back(ti.scale); precisionAggDist.push_back(ti.precision); typeAggDist.push_back(ti.dtype); csNumAggDist.push_back(ti.csNum); widthAggDist.push_back(ti.width); returnColMissing = false; } else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end()) { // a window function TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggDist.push_back(ti.oid); keysAggDist.push_back(retKey); scaleAggDist.push_back(ti.scale); precisionAggDist.push_back(ti.precision); typeAggDist.push_back(ti.dtype); csNumAggDist.push_back(ti.csNum); widthAggDist.push_back(ti.width); returnColMissing = false; } else if (aggOp == ROWAGG_CONSTANT) { TupleInfo ti = getTupleInfo(retKey, jobInfo); oidsAggDist.push_back(ti.oid); keysAggDist.push_back(retKey); scaleAggDist.push_back(ti.scale); precisionAggDist.push_back(ti.precision); typeAggDist.push_back(ti.dtype); csNumAggDist.push_back(ti.csNum); widthAggDist.push_back(ti.width); returnColMissing = false; } if (returnColMissing) { Message::Args args; args.add(keyName(outIdx, retKey, jobInfo)); string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args); cerr << "prep2PhasesDistinctAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId << ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp << endl; throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION); } } // else not a direct hit } } // else not a DISTINCT // update groupby vector if the groupby column is a returned column if (returnedColVec[i].second == 0) { int dupGroupbyIndex = -1; for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++) { if (jobInfo.groupByColVec[j] == retKey) { if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t)-1) groupByNoDist[j]->fOutputColumnIndex = outIdx; else dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex; } } // a duplicate group by column if (dupGroupbyIndex != -1) functionVecUm.push_back(SP_ROWAGG_FUNC_t( new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex))); } else { // update the aggregate function vector SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, outIdx)); } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, outIdx)); } if (aggOp == ROWAGG_COUNT_NO_OP) funct->fAuxColumnIndex = colUm; else if (aggOp == ROWAGG_CONSTANT) funct->fAuxColumnIndex = jobInfo.cntStarPos; functionVecUm.push_back(funct); // find if this func is a duplicate AGG_MAP::iterator iter = aggDupFuncMap.find( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL)); if (iter != aggDupFuncMap.end()) { if (funct->fAggFunction == ROWAGG_AVG) funct->fAggFunction = ROWAGG_DUP_AVG; else if (funct->fAggFunction == ROWAGG_STATS) funct->fAggFunction = ROWAGG_DUP_STATS; else if (funct->fAggFunction == ROWAGG_UDAF) funct->fAggFunction = ROWAGG_DUP_UDAF; else funct->fAggFunction = ROWAGG_DUP_FUNCT; funct->fAuxColumnIndex = iter->second; } else { aggDupFuncMap.insert(make_pair( boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), funct->fOutputColumnIndex)); } if (returnedColVec[i].second == AggregateColumn::AVG) avgFuncMap.insert(make_pair(returnedColVec[i].first, funct)); else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG) avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct)); } ++outIdx; } // for (i // now fix the AVG function, locate the count(column) position for (uint64_t i = 0; i < functionVecUm.size(); i++) { // if the count(k) can be associated with an avg(k) if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_NO_OP) { map::iterator k = avgFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]); if (k != avgFuncMap.end()) k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex; } } // there is avg(k), but no count(k) in the select list uint64_t lastCol = outIdx; for (map::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++) { if (k->second->fAuxColumnIndex == (uint32_t)-1) { k->second->fAuxColumnIndex = lastCol++; oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId); keysAggDist.push_back(k->first); scaleAggDist.push_back(0); precisionAggDist.push_back(19); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } } // distinct avg for (uint64_t i = 0; i < functionVecUm.size(); i++) { if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME && !functionVecUm[i]->hasMultiParm) { map::iterator k = avgDistFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]); if (k != avgDistFuncMap.end()) { k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex; functionVecUm[i]->fAggFunction = ROWAGG_COUNT_NO_OP; } } } // there is avg(distinct k), but no count(distinct k) in the select list for (map::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++) { // find count(distinct k) or add it if (k->second->fAuxColumnIndex == (uint32_t)-1) { k->second->fAuxColumnIndex = lastCol++; oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId); keysAggDist.push_back(k->first); scaleAggDist.push_back(0); precisionAggDist.push_back(19); typeAggDist.push_back(CalpontSystemCatalog::BIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(bigIntWidth); } } // add auxiliary fields for UDAF and statistics functions for (uint64_t i = 0; i < functionVecUm.size(); i++) { uint64_t j = functionVecUm[i]->fInputColumnIndex; if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF) { // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVecUm[i].get()); if (!udafFuncCol) { throw logic_error( "(5)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol"); } functionVecUm[i]->fAuxColumnIndex = lastCol++; oidsAggDist.push_back(oidsAggPm[j]); // Dummy? keysAggDist.push_back(keysAggPm[j]); // Dummy? scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); csNumAggDist.push_back(8); widthAggDist.push_back(sizeof(uint64_t)); continue; } if (functionVecUm[i]->fAggFunction != ROWAGG_STATS) continue; functionVecUm[i]->fAuxColumnIndex = lastCol; // mean(x) oidsAggDist.push_back(oidsAggPm[j]); keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(-1); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggDist.push_back(8); widthAggDist.push_back(sizeof(long double)); ++lastCol; // sum(x_i - mean)^2 oidsAggDist.push_back(oidsAggPm[j]); keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(-1); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); csNumAggDist.push_back(8); widthAggDist.push_back(sizeof(long double)); ++lastCol; } } // calculate the offset and create the rowaggregations, rowgroups posAggUm.push_back(2); // rid for (uint64_t i = 0; i < oidsAggUm.size(); i++) posAggUm.push_back(posAggUm[i] + widthAggUm[i]); RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm, precisionAggUm, jobInfo.stringTableThreshold); SP_ROWAGG_UM_t rowAggUm( new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit, false)); rowAggUm->timeZone(jobInfo.timeZone); posAggDist.push_back(2); // rid for (uint64_t i = 0; i < oidsAggDist.size(); i++) posAggDist.push_back(posAggDist[i] + widthAggDist[i]); RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, csNumAggDist, scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold); SP_ROWAGG_DIST rowAggDist( new RowAggregationDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit)); rowAggDist->timeZone(jobInfo.timeZone); // if distinct key word applied to more than one aggregate column, reset rowAggDist vector subRgVec; if (jobInfo.distinctColVec.size() > 1) { RowAggregationMultiDistinct* multiDistinctAggregator = new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit); multiDistinctAggregator->timeZone(jobInfo.timeZone); rowAggDist.reset(multiDistinctAggregator); // construct and add sub-aggregators to rowAggDist vector posAggGb, posAggSub; vector oidsAggGb, oidsAggSub; vector keysAggGb, keysAggSub; vector scaleAggGb, scaleAggSub; vector precisionAggGb, precisionAggSub; vector typeAggGb, typeAggSub; vector csNumAggGb, csNumAggSub; vector widthAggGb, widthAggSub; // populate groupby column info for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++) { oidsAggGb.push_back(oidsAggUm[i]); keysAggGb.push_back(keysAggUm[i]); scaleAggGb.push_back(scaleAggUm[i]); precisionAggGb.push_back(precisionAggUm[i]); typeAggGb.push_back(typeAggUm[i]); csNumAggGb.push_back(csNumAggUm[i]); widthAggGb.push_back(widthAggUm[i]); } // for distinct, each column requires a seperate rowgroup vector rowAggSubDistVec; uint32_t distinctColKey; int64_t j; uint64_t k; uint64_t outIdx = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { if (returnedColVec[i].second == 0) { ++outIdx; continue; } j = -1; distinctColKey = -1; // Find the entry in distinctColVec, if any for (k = 0; k < jobInfo.distinctColVec.size(); k++) { distinctColKey = jobInfo.distinctColVec[k]; if (returnedColVec[i].first == distinctColKey) break; } if (distinctColKey == (uint32_t)-1) { ++outIdx; continue; } // locate the distinct key in the row group for (k = 0; k < keysAggUm.size(); k++) { if (keysAggUm[k] == distinctColKey) { j = k; break; } } idbassert(j != -1); oidsAggSub = oidsAggGb; keysAggSub = keysAggGb; scaleAggSub = scaleAggGb; precisionAggSub = precisionAggGb; typeAggSub = typeAggGb; csNumAggSub = csNumAggGb; widthAggSub = widthAggGb; oidsAggSub.push_back(oidsAggUm[j]); keysAggSub.push_back(keysAggUm[j]); scaleAggSub.push_back(scaleAggUm[j]); precisionAggSub.push_back(precisionAggUm[j]); typeAggSub.push_back(typeAggUm[j]); csNumAggSub.push_back(csNumAggUm[j]); widthAggSub.push_back(widthAggUm[j]); // construct groupby vector vector groupBySub; k = 0; while (k < jobInfo.groupByColVec.size()) { SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k)); groupBySub.push_back(groupby); k++; } // add the distinct column as groupby SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k)); groupBySub.push_back(groupby); // Add multi parm distinct while ((i + 1) < returnedColVec.size() && functionIdMap(returnedColVec[i + 1].second) == ROWAGG_MULTI_PARM) { ++i; uint32_t dColKey = -1; j = -1; // Find the entry in distinctColVec, if any for (k = 0; k < jobInfo.distinctColVec.size(); k++) { dColKey = jobInfo.distinctColVec[k]; if (returnedColVec[i].first == dColKey) break; } idbassert(dColKey != (uint32_t)-1); // locate the distinct key in the row group for (k = 0; k < keysAggUm.size(); k++) { if (keysAggUm[k] == dColKey) { j = k; break; } } idbassert(j != -1); oidsAggSub.push_back(oidsAggUm[j]); keysAggSub.push_back(keysAggUm[j]); scaleAggSub.push_back(scaleAggUm[j]); precisionAggSub.push_back(precisionAggUm[j]); typeAggSub.push_back(typeAggUm[j]); csNumAggSub.push_back(csNumAggUm[j]); widthAggSub.push_back(widthAggUm[j]); SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k)); groupBySub.push_back(groupby); } // construct sub-rowgroup posAggSub.clear(); posAggSub.push_back(2); // rid for (k = 0; k < oidsAggSub.size(); k++) posAggSub.push_back(posAggSub[k] + widthAggSub[k]); RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub, csNumAggSub, scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold); subRgVec.push_back(subRg); // Keep a count of the parms after the first for any aggregate. // These will be skipped and the count needs to be subtracted // from where the aux column will be. int64_t multiParms = 0; // tricky part : 2 function vectors // -- dummy function vector for sub-aggregator, which does distinct only // -- aggregate function on this distinct column for rowAggDist vector functionSub1, functionSub2; // search the function in functionVec vector::iterator it = functionVecUm.begin(); while (it != functionVecUm.end()) { SP_ROWAGG_FUNC_t f = *it++; if ((f->fOutputColumnIndex == outIdx) && (f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || f->fAggFunction == ROWAGG_DISTINCT_SUM || f->fAggFunction == ROWAGG_DISTINCT_AVG)) { SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, groupBySub.size() - 1, f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms)); functionSub2.push_back(funct); } } // construct sub-aggregator SP_ROWAGG_UM_t subAgg( new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit)); subAgg->timeZone(jobInfo.timeZone); // add to rowAggDist multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2); ++outIdx; } // cover any non-distinct column functions { vector functionSub1 = functionNoDistVec; vector functionSub2; int64_t multiParms = 0; for (uint64_t k = 0; k < returnedColVec.size(); k++) { // search non-distinct functions in functionVec vector::iterator it = functionVecUm.begin(); while (it != functionVecUm.end()) { SP_ROWAGG_FUNC_t funct; SP_ROWAGG_FUNC_t f = *it++; if (f->fOutputColumnIndex == k) { if (f->fAggFunction == ROWAGG_UDAF) { RowUDAFFunctionCol* udafFuncCol = dynamic_cast(f.get()); funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex, udafFuncCol->fOutputColumnIndex, udafFuncCol->fAuxColumnIndex - multiParms)); functionSub2.push_back(funct); } else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK || f->fAggFunction == ROWAGG_COUNT_COL_NAME || f->fAggFunction == ROWAGG_SUM || f->fAggFunction == ROWAGG_AVG || f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX || f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND || f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR || f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_SELECT_SOME) { funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex, f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms)); functionSub2.push_back(funct); } } } } if (functionSub1.size() > 0) { // make sure the group by columns are available for next aggregate phase. vector groupBySubNoDist; for (uint64_t i = 0; i < groupByNoDist.size(); i++) groupBySubNoDist.push_back( SP_ROWAGG_GRPBY_t(new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i))); // construct sub-aggregator SP_ROWAGG_UM_t subAgg( new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false)); subAgg->timeZone(jobInfo.timeZone); // add to rowAggDist multiDistinctAggregator->addSubAggregator(subAgg, aggRgUm, functionSub2); subRgVec.push_back(aggRgUm); } } } rowAggDist->addAggregator(rowAggUm, aggRgUm); rowgroups.push_back(aggRgDist); aggregators.push_back(rowAggDist); posAggPm.push_back(2); // rid for (uint64_t i = 0; i < oidsAggPm.size(); i++) posAggPm.push_back(posAggPm[i] + widthAggPm[i]); RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm, csNumAggPm, scaleAggPm, precisionAggPm, jobInfo.stringTableThreshold); SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm)); rowAggPm->timeZone(jobInfo.timeZone); rowgroups.push_back(aggRgPm); aggregators.push_back(rowAggPm); if (jobInfo.trace) { cout << "projected RG: " << projRG.toString() << endl << "aggregated1 RG: " << aggRgPm.toString() << endl << "aggregated2 RG: " << aggRgUm.toString() << endl; for (uint64_t i = 0; i < subRgVec.size(); i++) cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl; cout << "aggregatedDist RG: " << aggRgDist.toString() << endl; } } void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInfo& jobInfo) { map keyToIndexMap; for (uint64_t i = 0; i < fRowGroupOut.getKeys().size(); ++i) { if (keyToIndexMap.find(fRowGroupOut.getKeys()[i]) == keyToIndexMap.end()) keyToIndexMap.insert(make_pair(fRowGroupOut.getKeys()[i], i)); } RetColsVector expressionVec; ArithmeticColumn* ac = NULL; FunctionColumn* fc = NULL; RetColsVector& cols = jobInfo.nonConstCols; vector simpleColumns; for (RetColsVector::iterator it = cols.begin(); it != cols.end(); ++it) { uint64_t eid = -1; if (((ac = dynamic_cast(it->get())) != NULL) && (ac->aggColumnList().size() > 0) && (ac->windowfunctionColumnList().size() == 0)) { const vector& scols = ac->simpleColumnList(); simpleColumns.insert(simpleColumns.end(), scols.begin(), scols.end()); eid = ac->expressionId(); expressionVec.push_back(*it); } else if (((fc = dynamic_cast(it->get())) != NULL) && (fc->aggColumnList().size() > 0) && (fc->windowfunctionColumnList().size() == 0)) { const vector& sCols = fc->simpleColumnList(); simpleColumns.insert(simpleColumns.end(), sCols.begin(), sCols.end()); eid = fc->expressionId(); expressionVec.push_back(*it); } // update the output index if (eid != (uint64_t)-1) { map::iterator mit = keyToIndexMap.find(getExpTupleKey(jobInfo, eid)); if (mit != keyToIndexMap.end()) { it->get()->outputIndex(mit->second); } else { ostringstream emsg; emsg << "expression " << eid << " cannot be found in tuple."; cerr << "prepExpressionOnAggregate: " << emsg.str() << endl; throw QueryDataExcept(emsg.str(), aggregateFuncErr); } } } // map the input indices for (vector::iterator i = simpleColumns.begin(); i != simpleColumns.end(); i++) { CalpontSystemCatalog::OID oid = (*i)->oid(); uint32_t key = getTupleKey(jobInfo, *i); CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*i)->colType()); if (dictOid > 0) { oid = dictOid; key = jobInfo.keyInfo->dictKeyMap[key]; } map::iterator mit = keyToIndexMap.find(key); if (mit != keyToIndexMap.end()) { (*i)->inputIndex(mit->second); } else { ostringstream emsg; emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' cannot be found in tuple."; cerr << "prepExpressionOnAggregate: " << emsg.str() << " simple column: oid(" << oid << "), alias(" << extractTableAlias(*i) << ")." << endl; throw QueryDataExcept(emsg.str(), aggregateFuncErr); } } // add expression to UM aggregator aggUM->expression(expressionVec); } void TupleAggregateStep::addConstangAggregate(vector& constAggDataVec) { fAggregator->constantAggregate(constAggDataVec); } void TupleAggregateStep::aggregateRowGroups() { RGData rgData; bool more = true; RowGroupDL* dlIn = NULL; if (!fDoneAggregate) { if (fInputJobStepAssociation.outSize() == 0) throw logic_error("No input data list for TupleAggregate step."); dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL(); if (dlIn == NULL) throw logic_error("Input is not RowGroup data list in TupleAggregate step."); if (fInputIter < 0) fInputIter = dlIn->getIterator(); more = dlIn->next(fInputIter, &rgData); if (traceOn()) dlTimes.setFirstReadTime(); StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; sts.msg_type = StepTeleStats::ST_START; sts.total_units_of_work = 1; postStepStartTele(sts); try { // this check covers the no row case if (!more && cancelled()) { fDoneAggregate = true; fEndOfResult = true; } while (more && !fEndOfResult) { fRowGroupIn.setData(&rgData); fAggregator->addRowGroup(&fRowGroupIn); more = dlIn->next(fInputIter, &rgData); // error checking if (cancelled()) { fEndOfResult = true; while (more) more = dlIn->next(fInputIter, &rgData); } } } // try catch (...) { handleException(std::current_exception(), logging::tupleAggregateStepErr, logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::aggregateRowGroups()"); fEndOfResult = true; } } fDoneAggregate = true; while (more) more = dlIn->next(fInputIter, &rgData); if (traceOn()) { dlTimes.setLastReadTime(); dlTimes.setEndOfInputTime(); } } void TupleAggregateStep::threadedAggregateFinalize(uint32_t threadID) { for (uint32_t i = 0; i < fNumOfBuckets; ++i) { if (fAgg_mutex[i]->try_lock()) { try { if (fAggregators[i]) fAggregators[i]->finalAggregation(); } catch (...) { fAgg_mutex[i]->unlock(); throw; } fAgg_mutex[i]->unlock(); } } } void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID) { RGData rgData; scoped_array rowBucketVecs(new RowBucketVec[fNumOfBuckets]); scoped_array distRow; scoped_array> distRowData; uint32_t bucketID; scoped_array bucketDone(new bool[fNumOfBuckets]); vector hashLens; bool locked = false; bool more = true; RowGroupDL* dlIn = nullptr; uint32_t rgVecShift = float(fNumOfBuckets) / fNumOfThreads * threadID; RowAggregationMultiDistinct* multiDist = nullptr; if (!fDoneAggregate) { if (fInputJobStepAssociation.outSize() == 0) throw logic_error("No input data list for delivery."); dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL(); if (dlIn == nullptr) throw logic_error("Input is not RowGroup data list in delivery step."); vector rgDatas; try { // this check covers the no row case if (!more && cancelled()) { fDoneAggregate = true; fEndOfResult = true; } bool firstRead = true; Row rowIn; while (more && !fEndOfResult) { fMutex.lock(); locked = true; for (uint32_t c = 0; c < fNumOfRowGroups && !cancelled(); c++) { more = dlIn->next(fInputIter, &rgData); if (firstRead) { if (threadID == 0) { if (traceOn()) dlTimes.setFirstReadTime(); StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; sts.msg_type = StepTeleStats::ST_START; sts.total_units_of_work = 1; postStepStartTele(sts); } multiDist = dynamic_cast(fAggregator.get()); if (multiDist) { for (uint32_t i = 0; i < fNumOfBuckets; i++) rowBucketVecs[i].resize(multiDist->subAggregators().size()); distRow.reset(new Row[multiDist->subAggregators().size()]); distRowData.reset(new std::shared_ptr[multiDist->subAggregators().size()]); for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++) { multiDist->subAggregators()[j]->getOutputRowGroup()->initRow(&distRow[j], true); distRowData[j].reset(new uint8_t[distRow[j].getSize()]); distRow[j].setData(rowgroup::Row::Pointer(distRowData[j].get())); hashLens.push_back(multiDist->subAggregators()[j]->aggMapKeyLength()); } } else { for (uint32_t i = 0; i < fNumOfBuckets; i++) rowBucketVecs[i].resize(1); if (dynamic_cast(fAggregator.get())) hashLens.push_back(dynamic_cast(fAggregator.get()) ->aggregator() ->aggMapKeyLength()); else hashLens.push_back(fAggregator->aggMapKeyLength()); } fRowGroupIns[threadID] = fRowGroupIn; fRowGroupIns[threadID].initRow(&rowIn); firstRead = false; } if (more) { fRowGroupIns[threadID].setData(&rgData); bool diskAggAllowed = fRm->getAllowDiskAggregation(); int64_t memSize = fRowGroupIns[threadID].getSizeWithStrings(); if (!fRm->getMemory(memSize, fSessionMemLimit, !diskAggAllowed)) { if (!diskAggAllowed) { rgDatas.clear(); // to short-cut the rest of processing more = false; fEndOfResult = true; if (status() == 0) { errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATION_TOO_BIG)); status(ERR_AGGREGATION_TOO_BIG); } } else { rgDatas.push_back(rgData); } break; } fMemUsage[threadID] += memSize; rgDatas.push_back(rgData); } else { break; } } // input rowgroup and aggregator is finalized only right before hashjoin starts // if there is. if (fAggregators.empty()) { fAggregators.resize(fNumOfBuckets); for (uint32_t i = 0; i < fNumOfBuckets; i++) { fAggregators[i].reset(fAggregator->clone()); fAggregators[i]->setInputOutput(fRowGroupIn, &fRowGroupOuts[i]); } } fMutex.unlock(); locked = false; multiDist = dynamic_cast(fAggregator.get()); // dispatch rows to row buckets if (multiDist) { for (uint32_t c = 0; c < rgDatas.size(); c++) { fRowGroupIns[threadID].setData(&rgDatas[c]); for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++) { fRowGroupIns[threadID].getRow(0, &rowIn); rowIn.setUserDataStore(rgDatas[c].getUserDataStore()); for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i) { for (uint64_t k = 0; k < multiDist->subAggregators()[j]->getGroupByCols().size(); ++k) { rowIn.copyField( distRow[j], k, multiDist->subAggregators()[j]->getGroupByCols()[k].get()->fInputColumnIndex); } // TBD This approach could potentiall // put all values in on bucket. uint64_t hash = rowgroup::hashRow(distRow[j], hashLens[j] - 1); bucketID = hash % fNumOfBuckets; rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash); rowIn.nextRow(); } } } } else { for (uint32_t c = 0; c < rgDatas.size(); c++) { fRowGroupIns[threadID].setData(&rgDatas[c]); fRowGroupIns[threadID].getRow(0, &rowIn); rowIn.setUserDataStore(rgDatas[c].getUserDataStore()); for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i) { // The key is the groupby columns, which are the leading columns. // TBD This approach could potential // put all values in on bucket. // The fAggregator->hasRollup() is true when we perform one-phase // aggregation and also are doing subtotals' computations. // Subtotals produce new keys whose hash values may not be in // the processing bucket. Consider case for key tuples (1,2) and (1,3). // Their subtotals's keys will be (1, NULL) and (1, NULL) // but they will be left in their processing buckets and never // gets aggregated properly. // Due to this, we put all rows into the same bucket 0 when perfoming // single-phase aggregation with subtotals. // For all other cases (single-phase without subtotals and two-phase // aggregation with and without subtotals) fAggregator->hasRollup() is false. // In these cases we have full parallel processing as expected. uint64_t hash = fAggregator->hasRollup() ? 0 : rowgroup::hashRow(rowIn, hashLens[0] - 1); int bucketID = hash % fNumOfBuckets; rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash); rowIn.nextRow(); } } } // insert to the hashmaps owned by each aggregator bool done = false; fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false); while (!fEndOfResult && !done && !cancelled()) { bool didWork = false; done = true; // each thread starts from its own bucket for better distribution uint32_t shift = (rgVecShift++) % fNumOfBuckets; for (uint32_t ci = 0; ci < fNumOfBuckets && !cancelled(); ci++) { uint32_t c = (ci + shift) % fNumOfBuckets; if (!fEndOfResult && !bucketDone[c] && fAgg_mutex[c]->try_lock()) { try { didWork = true; if (multiDist) dynamic_cast(fAggregators[c].get()) ->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c]); else { fAggregators[c]->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c][0]); } } catch (...) { fAgg_mutex[c]->unlock(); throw; } rowBucketVecs[c][0].clear(); bucketDone[c] = true; fAgg_mutex[c]->unlock(); } else if (!bucketDone[c]) { done = false; } } if (!didWork) usleep(1000); // avoid using all CPU during busy wait } rgDatas.clear(); fRm->returnMemory(fMemUsage[threadID], fSessionMemLimit); fMemUsage[threadID] = 0; if (cancelled()) { fEndOfResult = true; fMutex.lock(); while (more) more = dlIn->next(fInputIter, &rgData); fMutex.unlock(); } } } // try catch (...) { handleException(std::current_exception(), logging::tupleAggregateStepErr, logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::threadedAggregateRowGroups()[" + std::to_string(threadID) + "]"); fEndOfResult = true; fDoneAggregate = true; } } if (!locked) fMutex.lock(); while (more) more = dlIn->next(fInputIter, &rgData); fMutex.unlock(); locked = false; if (traceOn()) { dlTimes.setLastReadTime(); dlTimes.setEndOfInputTime(); } } void TupleAggregateStep::doAggregate_singleThread() { AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0); RowGroupDL* dlp = dl->rowGroupDL(); RGData rgData; try { if (!fDoneAggregate) aggregateRowGroups(); if (fEndOfResult == false) { // do the final aggregtion and deliver the results // at least one RowGroup for aggregate results if (dynamic_cast(fAggregator.get()) != NULL) { dynamic_cast(fAggregator.get())->doDistinctAggregation(); } while (fAggregator->nextRowGroup()) { fAggregator->finalize(); fRowsReturned += fRowGroupOut.getRowCount(); rgData = fRowGroupOut.duplicate(); fRowGroupDelivered.setData(&rgData); if (fRowGroupOut.getColumnCount() > fRowGroupDelivered.getColumnCount()) pruneAuxColumns(); dlp->insert(rgData); } } } // try catch (...) { handleException(std::current_exception(), logging::tupleAggregateStepErr, logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doAggregate_singleThread()"); } if (traceOn()) printCalTrace(); StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; sts.msg_type = StepTeleStats::ST_SUMMARY; sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); // Bug 3136, let mini stats to be formatted if traceOn. fEndOfResult = true; dlp->endOfInput(); } void TupleAggregateStep::doAggregate() { // @bug4314. DO NOT access fAggregtor before the first read of input, // because hashjoin may not have finalized fAggregator. if (!fIsMultiThread) return doAggregate_singleThread(); AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0); RowGroupDL* dlp = dl->rowGroupDL(); ByteStream bs; doThreadedAggregate(bs, dlp); return; } /** @brief Aggregate input row groups in two-phase multi-threaded aggregation. * In second phase handle three different aggregation cases differently: * 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one * GROUP BY column * 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column * 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column * DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp. */ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp) { // initialize return value variable uint64_t rowCount = 0; try { /* * Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns * per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators == * fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator. */ if (!fDoneAggregate) { initializeMultiThread(); vector runners; // thread pool handles runners.reserve(fNumOfThreads); // to prevent a resize during use // Start the aggregator threads for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++) { runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum))); } // Now wait for all those threads jobstepThreadPool.join(runners); } if (!cancelled()) { vector runners; // use half of the threads because finalizing requires twice as // much memory on average uint32_t threads = std::max(1U, fNumOfThreads / 2); runners.reserve(threads); for (uint32_t threadNum = 0; threadNum < threads; ++threadNum) { runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum))); } jobstepThreadPool.join(runners); } /* * Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows * that need to aggregated and output results. */ auto* distinctAggregator = dynamic_cast(fAggregator.get()); const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0; // Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column // e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2; if (distinctAggregator && hasGroupByColumns) { if (!fEndOfResult) { // Do multi-threaded second phase aggregation (per row group created for GROUP BY statement) if (!fDoneAggregate) { vector runners; // thread pool handles fRowGroupsDeliveredData.resize(fNumOfBuckets); uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads; uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1); runners.reserve(numThreads); for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++) { runners.push_back(jobstepThreadPool.invoke( ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread))); } jobstepThreadPool.join(runners); } // Deliver results fDoneAggregate = true; bool done = true; while (nextDeliveredRowGroup() && !cancelled()) { done = false; rowCount = fRowGroupOut.getRowCount(); if (rowCount != 0) { if (!cleanUpAndOutputRowGroup(bs, dlp)) break; } done = true; } if (done) { fEndOfResult = true; } } } // Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column // e.g. SELECT SUM(DISTINCT col1) FROM test; else if (distinctAggregator) { if (!fEndOfResult) { if (!fDoneAggregate) { // Do aggregation over all row groups. As all row groups need to be aggregated together there is no // easy way of multi-threading this and it's done in a single thread for now. for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++) { if (fEndOfResult == false) { // The distinctAggregator accumulates the aggregation results of all row groups by being added // all row groups of each bucket aggregator and doing an aggregation step after each addition. auto* bucketMultiDistinctAggregator = dynamic_cast(fAggregators[bucketNum].get()); auto* bucketDistinctAggregator = dynamic_cast(fAggregators[bucketNum].get()); distinctAggregator->aggregator(bucketDistinctAggregator->aggregator()); if (bucketMultiDistinctAggregator) { (dynamic_cast(distinctAggregator)) ->subAggregators(bucketMultiDistinctAggregator->subAggregators()); } distinctAggregator->aggregator()->finalAggregation(); distinctAggregator->doDistinctAggregation(); } } } // Deliver results fDoneAggregate = true; bool done = true; while (fAggregator->nextRowGroup() && !cancelled()) { done = false; fAggregator->finalize(); rowCount = fRowGroupOut.getRowCount(); fRowsReturned += rowCount; fRowGroupDelivered.setData(fRowGroupOut.getRGData()); if (rowCount != 0) { if (!cleanUpAndOutputRowGroup(bs, dlp)) break; } done = true; } if (done) fEndOfResult = true; } } // CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column // e.g. SELECT SUM(col1) FROM test GROUP BY col2; // Do aggregation over all row groups. As all row groups need to be aggregated together there is no // easy way of multi-threading this and it's done in a single thread for now. else if (hasGroupByColumns) { if (!fEndOfResult && !fDoneAggregate) { for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; ++bucketNum) { fAggregator->append(fAggregators[bucketNum].get()); } } fDoneAggregate = true; bool done = true; while (fAggregator->nextRowGroup() && !cancelled()) { done = false; fAggregator->finalize(); rowCount = fRowGroupOut.getRowCount(); fRowsReturned += rowCount; fRowGroupDelivered.setData(fRowGroupOut.getRGData()); if (rowCount != 0) { if (!cleanUpAndOutputRowGroup(bs, dlp)) break; } done = true; } if (done) { fEndOfResult = true; } } else { throw logic_error( "TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function " "or " "GROUP BY columns found. Should not reach here."); } } catch (...) { handleException(std::current_exception(), logging::tupleAggregateStepErr, logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedAggregate()"); fEndOfResult = true; } if (fEndOfResult) { StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; sts.msg_type = StepTeleStats::ST_SUMMARY; sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); if (dlp) { dlp->endOfInput(); } else { // send an empty / error band RGData rgData(fRowGroupOut, 0); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setStatus(status()); fRowGroupOut.serializeRGData(bs); rowCount = 0; } if (traceOn()) printCalTrace(); } return rowCount; } bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp) { if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount()) pruneAuxColumns(); if (dlp) { RGData rgData = fRowGroupDelivered.duplicate(); dlp->insert(rgData); return true; } bs.restart(); fRowGroupDelivered.serializeRGData(bs); return false; } void TupleAggregateStep::pruneAuxColumns() { uint64_t rowCount = fRowGroupOut.getRowCount(); Row row1, row2; fRowGroupOut.initRow(&row1); fRowGroupOut.getRow(0, &row1); fRowGroupDelivered.initRow(&row2); fRowGroupDelivered.getRow(0, &row2); for (uint64_t i = 1; i < rowCount; i++) { for (uint32_t j = 0; j < row2.getColumnCount(); j++) { row2.setNullMark(j, row1.getNullMark(j)); } // skip the first row row1.nextRow(); row2.nextRow(); // bug4463, memmove for src, dest overlap memmove(row2.getData(), row1.getData(), row2.getSize()); } for (uint32_t j = 0; j < row2.getColumnCount(); j++) { row2.setNullMark(j, row1.getNullMark(j)); } } void TupleAggregateStep::printCalTrace() { time_t t = time(0); char timeString[50]; ctime_r(&t, timeString); timeString[strlen(timeString) - 1] = '\0'; ostringstream logStr; logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString << "; total rows returned-" << fRowsReturned << endl << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl << "\tJob completion status " << status() << endl; logEnd(logStr.str().c_str()); fExtendedInfo += logStr.str(); formatMiniStats(); } void TupleAggregateStep::formatMiniStats() { ostringstream oss; oss << "TAS " << "UM " << "- " << "- " << "- " << "- " << "- " << "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << fRowsReturned << " "; fMiniInfo += oss.str(); } uint32_t TupleAggregateStep::getTupleKeyFromTuple( const boost::tuple*>& tuple) { return tuple.get<0>(); } uint32_t TupleAggregateStep::getTupleKeyFromTuple(uint32_t key) { return key; } template bool TupleAggregateStep::tryToFindEqualFunctionColumnByTupleKey(JobInfo& jobInfo, GroupByMap& groupByMap, const uint32_t tupleKey, uint32_t& foundKey) { auto funcMapIt = jobInfo.functionColumnMap.find(tupleKey); if (funcMapIt != jobInfo.functionColumnMap.end()) { const auto& rFunctionInfo = funcMapIt->second; // Try to match given `tupleKey` in `groupByMap`. for (const auto& groupByMapPair : groupByMap) { const auto currentTupleKey = getTupleKeyFromTuple(groupByMapPair.first); auto currentFuncMapIt = jobInfo.functionColumnMap.find(currentTupleKey); // Skip if the keys are the same. if (currentFuncMapIt != jobInfo.functionColumnMap.end() && currentTupleKey != tupleKey) { const auto& lFunctionInfo = currentFuncMapIt->second; // Oid and function name should be the same. if (lFunctionInfo.associatedColumnOid == rFunctionInfo.associatedColumnOid && lFunctionInfo.functionName == rFunctionInfo.functionName) { foundKey = currentTupleKey; return true; } } } } return false; } } // namespace joblist