1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
mariadb-columnstore-engine/dbcon/joblist/tupleaggregatestep.cpp
2024-12-06 15:10:24 +00:00

6121 lines
212 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2019-2020 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: tupleaggregatestep.cpp 9732 2013-08-02 15:56:15Z pleblanc $
// #define NDEBUG
// Cross engine needs to be at top due to MySQL includes
#define PREFER_MY_CONFIG_H
#include "crossenginestep.h"
#include <cassert>
#include <sstream>
#include <iomanip>
#include <algorithm>
using namespace std;
#include <boost/shared_ptr.hpp>
#include <boost/scoped_array.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "boost/tuple/tuple.hpp"
using namespace boost;
#include "messagequeue.h"
using namespace messageqcpp;
#include "loggingid.h"
#include "errorcodes.h"
#include "idberrorinfo.h"
using namespace logging;
#include "configcpp.h"
using namespace config;
#include "calpontsystemcatalog.h"
#include "aggregatecolumn.h"
#include "udafcolumn.h"
#include "arithmeticcolumn.h"
#include "functioncolumn.h"
#include "constantcolumn.h"
using namespace execplan;
#include "rowgroup.h"
#include "rowaggregation.h"
using namespace rowgroup;
#include "querytele.h"
using namespace querytele;
#include "jlf_common.h"
#include "jobstep.h"
#include "primitivestep.h"
#include "subquerystep.h"
#include "tuplehashjoin.h"
#include "tupleaggregatestep.h"
// #include "stopwatch.cpp"
// Stopwatch timer;
namespace
{
typedef vector<std::pair<Row::Pointer, uint64_t>> RowBucket;
typedef vector<RowBucket> RowBucketVec;
inline RowAggFunctionType functionIdMap(int planFuncId)
{
switch (planFuncId)
{
case AggregateColumn::COUNT_ASTERISK: return ROWAGG_COUNT_ASTERISK;
case AggregateColumn::COUNT: return ROWAGG_COUNT_COL_NAME;
case AggregateColumn::SUM: return ROWAGG_SUM;
case AggregateColumn::AVG: return ROWAGG_AVG;
case AggregateColumn::MIN: return ROWAGG_MIN;
case AggregateColumn::MAX: return ROWAGG_MAX;
case AggregateColumn::DISTINCT_COUNT: return ROWAGG_COUNT_DISTINCT_COL_NAME;
case AggregateColumn::DISTINCT_SUM: return ROWAGG_DISTINCT_SUM;
case AggregateColumn::DISTINCT_AVG: return ROWAGG_DISTINCT_AVG;
case AggregateColumn::STDDEV_POP: return ROWAGG_STATS;
case AggregateColumn::STDDEV_SAMP: return ROWAGG_STATS;
case AggregateColumn::VAR_POP: return ROWAGG_STATS;
case AggregateColumn::VAR_SAMP: return ROWAGG_STATS;
case AggregateColumn::BIT_AND: return ROWAGG_BIT_AND;
case AggregateColumn::BIT_OR: return ROWAGG_BIT_OR;
case AggregateColumn::BIT_XOR: return ROWAGG_BIT_XOR;
case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT;
case AggregateColumn::JSON_ARRAYAGG: return ROWAGG_JSON_ARRAY;
case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT;
case AggregateColumn::UDAF: return ROWAGG_UDAF;
case AggregateColumn::MULTI_PARM: return ROWAGG_MULTI_PARM;
case AggregateColumn::SELECT_SOME: return ROWAGG_SELECT_SOME;
default: return ROWAGG_FUNCT_UNDEFINE;
}
}
inline RowAggFunctionType statsFuncIdMap(int planFuncId)
{
switch (planFuncId)
{
case AggregateColumn::STDDEV_POP: return ROWAGG_STDDEV_POP;
case AggregateColumn::STDDEV_SAMP: return ROWAGG_STDDEV_SAMP;
case AggregateColumn::VAR_POP: return ROWAGG_VAR_POP;
case AggregateColumn::VAR_SAMP: return ROWAGG_VAR_SAMP;
default: return ROWAGG_FUNCT_UNDEFINE;
}
}
inline string colTypeIdString(CalpontSystemCatalog::ColDataType type)
{
switch (type)
{
case CalpontSystemCatalog::BIT: return string("BIT");
case CalpontSystemCatalog::TINYINT: return string("TINYINT");
case CalpontSystemCatalog::CHAR: return string("CHAR");
case CalpontSystemCatalog::SMALLINT: return string("SMALLINT");
case CalpontSystemCatalog::DECIMAL: return string("DECIMAL");
case CalpontSystemCatalog::MEDINT: return string("MEDINT");
case CalpontSystemCatalog::INT: return string("INT");
case CalpontSystemCatalog::FLOAT: return string("FLOAT");
case CalpontSystemCatalog::DATE: return string("DATE");
case CalpontSystemCatalog::BIGINT: return string("BIGINT");
case CalpontSystemCatalog::DOUBLE: return string("DOUBLE");
case CalpontSystemCatalog::LONGDOUBLE: return string("LONGDOUBLE");
case CalpontSystemCatalog::DATETIME: return string("DATETIME");
case CalpontSystemCatalog::TIMESTAMP: return string("TIMESTAMP");
case CalpontSystemCatalog::TIME: return string("TIME");
case CalpontSystemCatalog::VARCHAR: return string("VARCHAR");
case CalpontSystemCatalog::CLOB: return string("CLOB");
case CalpontSystemCatalog::BLOB: return string("BLOB");
case CalpontSystemCatalog::TEXT: return string("TEXT");
case CalpontSystemCatalog::UTINYINT: return string("UTINYINT");
case CalpontSystemCatalog::USMALLINT: return string("USMALLINT");
case CalpontSystemCatalog::UDECIMAL: return string("UDECIMAL");
case CalpontSystemCatalog::UMEDINT: return string("UMEDINT");
case CalpontSystemCatalog::UINT: return string("UINT");
case CalpontSystemCatalog::UFLOAT: return string("UFLOAT");
case CalpontSystemCatalog::UBIGINT: return string("UBIGINT");
case CalpontSystemCatalog::UDOUBLE: return string("UDOUBLE");
default: return string("UNKNOWN");
}
}
string keyName(uint64_t i, uint32_t key, const joblist::JobInfo& jobInfo)
{
string name = jobInfo.projectionCols[i]->alias();
if (name.empty())
{
name = jobInfo.keyInfo->tupleKeyToName[key];
if (jobInfo.keyInfo->tupleKeyVec[key].fId < 100)
name = "Expression/Function";
}
return name = "'" + name + "'";
}
} // namespace
namespace joblist
{
void wideDecimalOrLongDouble(const uint64_t colProj, const CalpontSystemCatalog::ColDataType type,
const vector<uint32_t>& precisionProj, const vector<uint32_t>& scaleProj,
const vector<uint32_t>& width,
vector<CalpontSystemCatalog::ColDataType>& typeAgg, vector<uint32_t>& scaleAgg,
vector<uint32_t>& precisionAgg, vector<uint32_t>& widthAgg)
{
if ((type == CalpontSystemCatalog::DECIMAL || type == CalpontSystemCatalog::UDECIMAL) &&
datatypes::Decimal::isWideDecimalTypeByPrecision(precisionProj[colProj]))
{
typeAgg.push_back(type);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
widthAgg.push_back(width[colProj]);
}
else if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(type))
{
typeAgg.push_back(CalpontSystemCatalog::DECIMAL);
scaleAgg.push_back(0);
precisionAgg.push_back(datatypes::INT128MAXPRECISION);
widthAgg.push_back(datatypes::MAXDECIMALWIDTH);
}
else
{
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
scaleAgg.push_back(0);
precisionAgg.push_back(-1);
widthAgg.push_back(sizeof(long double));
}
}
TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup& rgOut, const RowGroup& rgIn,
const JobInfo& jobInfo)
: JobStep(jobInfo)
, fCatalog(jobInfo.csc)
, fRowsReturned(0)
, fDoneAggregate(false)
, fEndOfResult(false)
, fAggregator(agg)
, fRowGroupOut(rgOut)
, fRowGroupIn(rgIn)
, fRunner(0)
, fUmOnly(false)
, fRm(jobInfo.rm)
, fBucketNum(0)
, fInputIter(-1)
, fSessionMemLimit(jobInfo.umMemLimit)
{
fRowGroupData.reinit(fRowGroupOut);
fRowGroupOut.setData(&fRowGroupData);
fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
// decide if this needs to be multi-threaded
RowAggregationDistinct* multiAgg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
fIsMultiThread = (multiAgg || fAggregator->aggMapKeyLength() > 0);
// initialize multi-thread variables
fNumOfThreads = fRm->aggNumThreads();
fNumOfBuckets = fRm->aggNumBuckets();
fNumOfRowGroups = fRm->aggNumRowGroups();
auto memLimit = std::min(fRm->availableMemory(), *fSessionMemLimit);
fNumOfBuckets =
calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(),
fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation());
fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);
fMemUsage.reset(new uint64_t[fNumOfThreads]);
memset(fMemUsage.get(), 0, fNumOfThreads * sizeof(uint64_t));
fExtendedInfo = "TAS: ";
fQtc.stepParms().stepType = StepTeleStats::T_TAS;
fPrimitiveServerThreadPools = jobInfo.primitiveServerThreadPools;
}
TupleAggregateStep::~TupleAggregateStep()
{
for (uint32_t i = 0; i < fNumOfThreads; i++)
fRm->returnMemory(fMemUsage[i], fSessionMemLimit);
for (uint32_t i = 0; i < fAgg_mutex.size(); i++)
delete fAgg_mutex[i];
}
void TupleAggregateStep::initializeMultiThread()
{
RowGroupDL* dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
uint32_t i;
if (dlIn == NULL)
throw logic_error("Input is not RowGroup data list in delivery step.");
if (fInputIter < 0)
fInputIter = dlIn->getIterator();
fRowGroupIns.resize(fNumOfThreads);
fRowGroupOuts.resize(fNumOfBuckets);
fRowGroupDatas.resize(fNumOfBuckets);
rowgroup::SP_ROWAGG_UM_t agg;
RGData rgData;
for (i = 0; i < fNumOfBuckets; i++)
{
boost::mutex* lock = new boost::mutex();
fAgg_mutex.push_back(lock);
fRowGroupOuts[i] = fRowGroupOut;
rgData.reinit(fRowGroupOut);
fRowGroupDatas[i] = rgData;
fRowGroupOuts[i].setData(&fRowGroupDatas[i]);
fRowGroupOuts[i].resetRowGroup(0);
}
}
void TupleAggregateStep::run()
{
if (fDelivery == false)
{
fRunner = jobstepThreadPool.invoke(Aggregator(this));
}
}
void TupleAggregateStep::join()
{
if (fRunner)
jobstepThreadPool.join(fRunner);
}
void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
{
if (threadID >= fNumOfBuckets)
return;
bool finishedSecondPhase = false;
bool diskAggAllowed = fRm->getAllowDiskAggregation();
const uint32_t maxRowsSize = 8192;
while (!finishedSecondPhase && !fEndOfResult)
{
scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
uint32_t hashlen = fAggregator->aggMapKeyLength();
bool outOfMemory = false;
size_t totalMemSizeConsumed = 0;
try
{
RowAggregationDistinct* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[threadID].get());
RowAggregationMultiDistinct* multiDist =
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[threadID].get());
Row rowIn;
RowGroup* rowGroupIn = nullptr;
rowGroupIn = (aggDist->aggregator()->getOutputRowGroup());
uint32_t bucketID;
std::vector<std::unique_ptr<RGData>> rgDataVec;
if (multiDist)
{
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(multiDist->subAggregators().size());
}
else
{
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(1);
}
// dispatch rows to bucket
if (multiDist)
{
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rowGroupIn->initRow(&rowIn);
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
while (subDistAgg->nextOutputRowGroup())
{
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
rowGroupIn->getRow(0, &rowIn);
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
{
// The key is the groupby columns, which are the leading columns.
// uint8_t* hashMapKey = rowIn.getData() + 2;
// bucketID = hash.operator()(hashMapKey) & fBucketMask;
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
const auto rgSize = diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize)
: rowGroupIn->getSizeWithStrings();
totalMemSizeConsumed += rgSize;
if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed))
{
outOfMemory = true;
break;
}
}
}
}
else
{
rowGroupIn->initRow(&rowIn);
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
while (subAgg->nextOutputRowGroup())
{
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
rowGroupIn->getRow(0, &rowIn);
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
{
// The key is the groupby columns, which are the leading columns.
// uint8_t* hashMapKey = rowIn.getData() + 2;
// bucketID = hash.operator()(hashMapKey) & fBucketMask;
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
const auto rgSize =
diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize) : rowGroupIn->getSizeWithStrings();
totalMemSizeConsumed += rgSize;
if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed))
{
outOfMemory = true;
break;
}
}
}
if (!outOfMemory)
finishedSecondPhase = true;
bool done = false;
// reset bucketDone[] to be false
// memset(bucketDone, 0, sizeof(bucketDone));
fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
while (!done && !cancelled())
{
done = true;
for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++)
{
if (!bucketDone[c] && fAgg_mutex[c]->try_lock())
{
try
{
if (multiDist)
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())
->doDistinctAggregation_rowVec(rowBucketVecs[c]);
else
dynamic_cast<RowAggregationDistinct*>(fAggregators[c].get())
->doDistinctAggregation_rowVec(rowBucketVecs[c][0]);
}
catch (...)
{
fAgg_mutex[c]->unlock();
throw;
}
fAgg_mutex[c]->unlock();
bucketDone[c] = true;
rowBucketVecs[c][0].clear();
}
else if (!bucketDone[c])
{
done = false;
}
}
}
fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit);
if (cancelled())
{
finishedSecondPhase = true;
fEndOfResult = true;
}
} // try
catch (...)
{
fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit);
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG,
"TupleAggregateStep::doThreadedSecondPhaseAggregate()");
fEndOfResult = true;
finishedSecondPhase = true;
}
}
fDoneAggregate = true;
if (traceOn())
{
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
}
}
uint32_t TupleAggregateStep::nextBand_singleThread(messageqcpp::ByteStream& bs)
{
uint32_t rowCount = 0;
try
{
if (!fDoneAggregate)
aggregateRowGroups();
if (fEndOfResult == false)
{
bs.restart();
// do the final aggregtion and deliver the results
// at least one RowGroup for aggregate results
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) != NULL)
{
dynamic_cast<RowAggregationDistinct*>(fAggregator.get())->doDistinctAggregation();
}
if (fAggregator->nextRowGroup())
{
fAggregator->finalize();
rowCount = fRowGroupOut.getRowCount();
fRowsReturned += rowCount;
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
pruneAuxColumns();
fRowGroupDelivered.serializeRGData(bs);
}
else
{
fEndOfResult = true;
}
}
} // try
catch (...)
{
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedSecondPhaseAggregate()");
fEndOfResult = true;
}
if (fEndOfResult)
{
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
// send an empty / error band
RGData rgData(fRowGroupOut, 0);
fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status());
fRowGroupOut.serializeRGData(bs);
rowCount = 0;
if (traceOn())
printCalTrace();
}
return rowCount;
}
bool TupleAggregateStep::nextDeliveredRowGroup()
{
for (; fBucketNum < fNumOfBuckets; fBucketNum++)
{
while (fAggregators[fBucketNum]->nextOutputRowGroup())
{
fAggregators[fBucketNum]->finalize();
fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
fRowGroupOut.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
return true;
}
}
fBucketNum = 0;
return false;
}
uint32_t TupleAggregateStep::nextBand(messageqcpp::ByteStream& bs)
{
// use the orignal single thread model when no group by and distnct.
// @bug4314. DO NOT access fAggregtor before the first read of input,
// because hashjoin may not have finalized fAggregator.
if (!fIsMultiThread)
return nextBand_singleThread(bs);
return doThreadedAggregate(bs, 0);
}
bool TupleAggregateStep::setPmHJAggregation(JobStep* step)
{
TupleBPS* bps = dynamic_cast<TupleBPS*>(step);
if (bps != NULL)
{
fAggregatorUM->expression(fAggregator->expression());
fAggregatorUM->constantAggregate(fAggregator->constantAggregate());
fAggregator = fAggregatorUM;
fRowGroupIn = fRowGroupPMHJ;
fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
bps->setAggregateStep(fAggregatorPM, fRowGroupPMHJ);
}
return (bps != NULL);
}
void TupleAggregateStep::configDeliveredRowGroup(const JobInfo& jobInfo)
{
// configure the oids and keys
vector<uint32_t> oids = fRowGroupOut.getOIDs();
vector<uint32_t> keys = fRowGroupOut.getKeys();
vector<pair<int, int>>::const_iterator begin = jobInfo.aggEidIndexList.begin();
vector<pair<int, int>>::const_iterator end = jobInfo.aggEidIndexList.end();
for (vector<pair<int, int>>::const_iterator i = begin; i != end; i++)
{
oids[i->second] = i->first;
keys[i->second] = getExpTupleKey(jobInfo, i->first);
}
// correct the scale
vector<uint32_t> scale = fRowGroupOut.getScale();
vector<uint32_t> precision = fRowGroupOut.getPrecision();
size_t retColCount = 0;
auto scaleIter = scale.begin();
auto precisionIter = precision.begin();
if (jobInfo.havingStep)
{
retColCount = jobInfo.returnedColVec.size();
idbassert(jobInfo.returnedColVec.size() == jobInfo.projectionCols.size());
for (size_t i = 0; i < jobInfo.projectionCols.size() && scaleIter != scale.end(); i++)
{
const auto& colType = jobInfo.projectionCols[i]->resultType();
if (colType.isWideDecimalType())
{
*scaleIter = colType.scale;
*precisionIter = colType.precision;
}
scaleIter++;
precisionIter++;
}
}
else
{
retColCount = jobInfo.nonConstDelCols.size();
for (size_t i = 0; i < jobInfo.nonConstDelCols.size() && scaleIter != scale.end(); i++)
{
const auto& colType = jobInfo.nonConstDelCols[i]->resultType();
if (colType.isWideDecimalType())
{
*scaleIter = colType.scale;
*precisionIter = colType.precision;
}
scaleIter++;
precisionIter++;
}
}
vector<uint32_t>::const_iterator offsets0 = fRowGroupOut.getOffsets().begin();
vector<CalpontSystemCatalog::ColDataType>::const_iterator types0 = fRowGroupOut.getColTypes().begin();
vector<uint32_t> csNums = fRowGroupOut.getCharsetNumbers();
vector<uint32_t>::const_iterator precision0 = precision.begin();
fRowGroupDelivered =
RowGroup(retColCount, vector<uint32_t>(offsets0, offsets0 + retColCount + 1),
vector<uint32_t>(oids.begin(), oids.begin() + retColCount),
vector<uint32_t>(keys.begin(), keys.begin() + retColCount),
vector<CalpontSystemCatalog::ColDataType>(types0, types0 + retColCount),
vector<uint32_t>(csNums.begin(), csNums.begin() + retColCount),
vector<uint32_t>(scale.begin(), scale.begin() + retColCount),
vector<uint32_t>(precision0, precision0 + retColCount), jobInfo.stringTableThreshold);
if (jobInfo.trace)
cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
}
void TupleAggregateStep::setOutputRowGroup(const RowGroup& rg)
{
fRowGroupOut = rg;
fRowGroupData.reinit(fRowGroupOut);
fRowGroupOut.setData(&fRowGroupData);
fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
}
const RowGroup& TupleAggregateStep::getOutputRowGroup() const
{
return fRowGroupOut;
}
const RowGroup& TupleAggregateStep::getDeliveredRowGroup() const
{
return fRowGroupDelivered;
}
void TupleAggregateStep::savePmHJData(SP_ROWAGG_t& um, SP_ROWAGG_t& pm, RowGroup& rg)
{
fAggregatorUM = dynamic_pointer_cast<RowAggregationUM>(um);
fAggregatorPM = pm;
fRowGroupPMHJ = rg;
}
void TupleAggregateStep::deliverStringTableRowGroup(bool b)
{
fRowGroupDelivered.setUseStringTable(b);
}
bool TupleAggregateStep::deliverStringTableRowGroup() const
{
return fRowGroupDelivered.usesStringTable();
}
const string TupleAggregateStep::toString() const
{
ostringstream oss;
oss << "AggregateStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
oss << " in:";
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
oss << fInputJobStepAssociation.outAt(i);
if (fOutputJobStepAssociation.outSize() > 0)
{
oss << " out:";
for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
oss << fOutputJobStepAssociation.outAt(i);
}
return oss.str();
}
SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
{
SJSTEP spjs;
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(step.get());
TupleBPS* tbps = dynamic_cast<TupleBPS*>(step.get());
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(step.get());
SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(step.get());
CrossEngineStep* ces = dynamic_cast<CrossEngineStep*>(step.get());
vector<RowGroup> rgs; // 0-ProjRG, 1-UMRG, [2-PMRG -- if 2 phases]
vector<SP_ROWAGG_t> aggs;
SP_ROWAGG_UM_t aggUM;
bool distinctAgg = false;
int64_t constKey = -1;
vector<ConstantAggData> constAggDataVec;
vector<std::pair<uint32_t, int>> returnedColVecOrig = jobInfo.returnedColVec;
for (uint32_t idx = 0; idx < jobInfo.returnedColVec.size(); idx++)
{
if (jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_COUNT ||
jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_AVG ||
jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_SUM)
{
distinctAgg = true;
}
// Change COUNT_ASTERISK to CONSTANT if necessary.
// In joblistfactory, all aggregate(constant) are set to count(*) for easy process.
map<uint64_t, SRCP>::iterator it = jobInfo.constAggregate.find(idx);
if (it != jobInfo.constAggregate.end())
{
AggregateColumn* ac = dynamic_cast<AggregateColumn*>(it->second.get());
if (ac->aggOp() == AggregateColumn::COUNT_ASTERISK)
{
if (jobInfo.cntStarPos == -1)
jobInfo.cntStarPos = idx;
}
else
{
constKey = jobInfo.returnedColVec[idx].first;
CalpontSystemCatalog::ColType ct = ac->resultType();
TupleInfo ti = setExpTupleInfo(ct, ac->expressionId(), ac->alias(), jobInfo);
jobInfo.returnedColVec[idx].first = ti.key;
jobInfo.returnedColVec[idx].second = AggregateColumn::CONSTANT;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(ac->constCol().get());
idbassert(cc != NULL); // @bug5261
bool isNull = cc->isNull();
if (ac->aggOp() == AggregateColumn::UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
if (udafc)
{
constAggDataVec.push_back(ConstantAggData(cc->constval(), udafc->getContext().getName(),
functionIdMap(ac->aggOp()), isNull));
}
}
else
{
constAggDataVec.push_back(ConstantAggData(cc->constval(), functionIdMap(ac->aggOp()), isNull));
}
}
}
}
// If there are aggregate(constant) columns, but no count(*), add a count(*).
if (constAggDataVec.size() > 0 && jobInfo.cntStarPos < 0)
{
jobInfo.cntStarPos = jobInfo.returnedColVec.size();
jobInfo.returnedColVec.push_back(make_pair(constKey, AggregateColumn::COUNT_ASTERISK));
}
// preprocess the columns used by group_concat
jobInfo.groupConcatInfo.prepGroupConcat(jobInfo);
bool doUMOnly = jobInfo.groupConcatInfo.columns().size() > 0 || sas || ces;
rgs.push_back(tds->getDeliveredRowGroup());
// get rowgroup and aggregator
// For TupleHashJoin, we prepare for both PM and UM only aggregation
if (doUMOnly || thjs)
{
if (distinctAgg == true)
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
else
prep1PhaseAggregate(jobInfo, rgs, aggs);
// TODO: fix this
if (doUMOnly)
rgs.push_back(rgs[0]);
}
if (!doUMOnly)
{
if (distinctAgg == true)
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
else
{
prep2PhasesAggregate(jobInfo, rgs, aggs);
}
}
if (tbps != NULL)
{
// create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo));
if (doUMOnly)
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
else
tbps->setAggregateStep(aggs[1], rgs[2]);
}
else if (thjs != NULL)
{
// create delivery step
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
if (doUMOnly)
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
else
dynamic_cast<TupleAggregateStep*>(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]);
// set input side
thjs->deliveryStep(spjs);
}
else
{
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
}
// Setup the input JobstepAssoctiation -- the mechanism
// whereby the previous step feeds data to this step.
// Otherwise, we need to create one and hook to the
// previous step as well as this aggregate step.
spjs->stepId(step->stepId() + 1);
JobStepAssociation jsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
jsa.outAdd(spdl);
spjs->inputAssociation(jsa); // Aggregate input
// Previous step output
step->outputAssociation(jsa);
// add the aggregate on constants
if (constAggDataVec.size() > 0)
{
dynamic_cast<TupleAggregateStep*>(spjs.get())->addConstangAggregate(constAggDataVec);
jobInfo.returnedColVec.swap(returnedColVecOrig); // restore the original return columns
}
// fix the delivered rowgroup data
dynamic_cast<TupleAggregateStep*>(spjs.get())->configDeliveredRowGroup(jobInfo);
if (jobInfo.expressionVec.size() > 0)
dynamic_cast<TupleAggregateStep*>(spjs.get())->prepExpressionOnAggregate(aggUM, jobInfo);
return spjs;
}
void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
vector<SP_ROWAGG_t>& aggregators)
{
// check if there are any aggregate columns
vector<pair<uint32_t, int>> aggColVec;
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
if (returnedColVec[i].second != 0)
aggColVec.push_back(returnedColVec[i]);
}
// populate the aggregate rowgroup: projectedRG -> aggregateRG
//
// Aggregate preparation by joblist factory:
// 1. get projected rowgroup (done by doAggProject) -- passed in
// 2. construct aggregate rowgroup -- output of UM
const RowGroup projRG = rowgroups[0];
const vector<uint32_t>& oidsProj = projRG.getOIDs();
const vector<uint32_t>& keysProj = projRG.getKeys();
const vector<uint32_t>& scaleProj = projRG.getScale();
const vector<uint32_t>& precisionProj = projRG.getPrecision();
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
vector<uint32_t> posAgg;
vector<uint32_t> oidsAgg;
vector<uint32_t> keysAgg;
vector<uint32_t> scaleAgg;
vector<uint32_t> precisionAgg;
vector<CalpontSystemCatalog::ColDataType> typeAgg;
vector<uint32_t> csNumAgg;
vector<uint32_t> widthAgg;
vector<SP_ROWAGG_GRPBY_t> groupBy;
vector<SP_ROWAGG_FUNC_t> functionVec;
uint32_t bigIntWidth = sizeof(int64_t);
uint32_t bigUintWidth = sizeof(uint64_t);
// For UDAF
uint32_t projColsUDAFIdx = 0;
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
// for count column of average function
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
// collect the projected column info, prepare for aggregation
vector<uint32_t> width;
map<uint32_t, int> projColPosMap;
for (uint64_t i = 0; i < keysProj.size(); i++)
{
projColPosMap.insert(make_pair(keysProj[i], i));
width.push_back(projRG.getColumnWidth(i));
}
// for groupby column
map<uint32_t, int> groupbyMap;
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
int64_t colProj = projColPosMap[jobInfo.groupByColVec[i]];
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1));
groupBy.push_back(groupby);
groupbyMap.insert(make_pair(jobInfo.groupByColVec[i], i));
}
// for distinct column
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
{
//@bug6126, continue if already in group by
if (groupbyMap.find(jobInfo.distinctColVec[i]) != groupbyMap.end())
continue;
int64_t colProj = projColPosMap[jobInfo.distinctColVec[i]];
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1));
groupBy.push_back(groupby);
groupbyMap.insert(make_pair(jobInfo.distinctColVec[i], i));
}
// populate the aggregate rowgroup
AGG_MAP aggFuncMap;
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
uint32_t key = returnedColVec[i].first;
if (aggOp == ROWAGG_CONSTANT)
{
TupleInfo ti = getTupleInfo(key, jobInfo);
oidsAgg.push_back(ti.oid);
keysAgg.push_back(key);
scaleAgg.push_back(ti.scale);
precisionAgg.push_back(ti.precision);
typeAgg.push_back(ti.dtype);
csNumAgg.push_back(ti.csNum);
widthAgg.push_back(ti.width);
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, 0, outIdx, jobInfo.cntStarPos));
functionVec.push_back(funct);
++outIdx;
continue;
}
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(key, jobInfo);
uint32_t ptrSize = sizeof(GroupConcatAg*);
uint32_t width = (ti.width >= ptrSize) ? ti.width : ptrSize;
oidsAgg.push_back(ti.oid);
keysAgg.push_back(key);
scaleAgg.push_back(ti.scale);
precisionAgg.push_back(ti.precision);
typeAgg.push_back(ti.dtype);
csNumAgg.push_back(ti.csNum);
widthAgg.push_back(width);
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, 0, outIdx, -1));
functionVec.push_back(funct);
++outIdx;
continue;
}
if (projColPosMap.find(key) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
cerr << "prep1PhaseAggregate: " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
cerr << endl;
throw logic_error(emsg.str());
}
// make sure the colProj is correct
int64_t colProj = projColPosMap[key];
if (keysProj[colProj] != key)
{
ostringstream emsg;
emsg << "projection column map is out of sync.";
cerr << "prep1PhaseAggregate: " << emsg.str() << endl;
throw logic_error(emsg.str());
}
if (aggOp == ROWAGG_FUNCT_UNDEFINE)
{
// must be a groupby column or function on aggregation
// or used by group_concat
map<uint32_t, int>::iterator it = groupbyMap.find(key);
if (it != groupbyMap.end())
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(width[colProj]);
if (groupBy[it->second]->fOutputColumnIndex == (uint32_t)-1)
groupBy[it->second]->fOutputColumnIndex = outIdx;
else
functionVec.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, groupBy[it->second]->fOutputColumnIndex)));
++outIdx;
continue;
}
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) !=
jobInfo.expressionVec.end())
{
TupleInfo ti = getTupleInfo(key, jobInfo);
oidsAgg.push_back(ti.oid);
keysAgg.push_back(key);
scaleAgg.push_back(ti.scale);
precisionAgg.push_back(ti.precision);
typeAgg.push_back(ti.dtype);
csNumAgg.push_back(ti.csNum);
widthAgg.push_back(ti.width);
++outIdx;
continue;
}
else if (jobInfo.groupConcatInfo.columns().find(key) != jobInfo.groupConcatInfo.columns().end())
{
// TODO: columns only for group_concat do not needed in result set.
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(width[colProj]);
++outIdx;
continue;
}
else if (jobInfo.windowSet.find(key) != jobInfo.windowSet.end())
{
// skip window columns/expression, which are computed later
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(width[colProj]);
++outIdx;
continue;
}
else
{
uint32_t foundTupleKey{0};
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, groupbyMap, key, foundTupleKey))
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(width[colProj]);
// Update key.
key = foundTupleKey;
++outIdx;
continue;
}
else
{
Message::Args args;
args.add(keyName(i, key, jobInfo));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep1PhaseAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView << ", function=" << (int)aggOp << endl;
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
}
}
}
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i + 1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, outIdx));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error(
"(1)prep1PhaseAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, outIdx));
}
functionVec.push_back(funct);
switch (aggOp)
{
case ROWAGG_MIN:
case ROWAGG_MAX:
case ROWAGG_SELECT_SOME:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(width[colProj]);
}
break;
case ROWAGG_AVG:
avgFuncMap.insert(make_pair(key, funct));
/* fall through */
case ROWAGG_SUM:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep1PhaseAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAgg,
scaleAgg, precisionAgg, widthAgg);
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
csNumAgg.push_back(csNumProj[colProj]);
}
break;
case ROWAGG_COUNT_COL_NAME:
case ROWAGG_COUNT_ASTERISK:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(0);
// work around count() in select subquery
precisionAgg.push_back(rowgroup::MagicPrecisionForCountAgg);
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(bigIntWidth);
}
break;
case ROWAGG_STATS:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("variance/standard deviation");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep1PhaseAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(0);
typeAgg.push_back(CalpontSystemCatalog::DOUBLE);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(sizeof(double));
}
break;
case ROWAGG_BIT_AND:
case ROWAGG_BIT_OR:
case ROWAGG_BIT_XOR:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(0);
precisionAgg.push_back(-16); // for connector to skip null check
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(bigIntWidth);
}
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error(
"(2)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
// Return column
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
break;
}
case ROWAGG_MULTI_PARM:
{
}
break;
default:
{
ostringstream emsg;
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
cerr << "prep1PhaseAggregate: " << emsg.str() << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
}
}
// find if this func is a duplicate
AGG_MAP::iterator iter = aggFuncMap.find(
boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM && iter != aggFuncMap.end())
{
if (funct->fAggFunction == ROWAGG_AVG)
funct->fAggFunction = ROWAGG_DUP_AVG;
else if (funct->fAggFunction == ROWAGG_STATS)
funct->fAggFunction = ROWAGG_DUP_STATS;
else if (funct->fAggFunction == ROWAGG_UDAF)
funct->fAggFunction = ROWAGG_DUP_UDAF;
else
funct->fAggFunction = ROWAGG_DUP_FUNCT;
funct->fAuxColumnIndex = iter->second;
}
else
{
aggFuncMap.insert(make_pair(
boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
funct->fOutputColumnIndex));
}
if (aggOp != ROWAGG_MULTI_PARM)
{
++outIdx;
}
}
// now fix the AVG function, locate the count(column) position
for (uint64_t i = 0; i < functionVec.size(); i++)
{
if (functionVec[i]->fAggFunction != ROWAGG_COUNT_COL_NAME)
continue;
// if the count(k) can be associated with an avg(k)
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
avgFuncMap.find(keysAgg[functionVec[i]->fOutputColumnIndex]);
if (k != avgFuncMap.end())
{
k->second->fAuxColumnIndex = functionVec[i]->fOutputColumnIndex;
functionVec[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
}
}
// there is avg(k), but no count(k) in the select list
uint64_t lastCol = outIdx;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{
if (k->second->fAuxColumnIndex == (uint32_t)-1)
{
k->second->fAuxColumnIndex = lastCol++;
oidsAgg.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
keysAgg.push_back(k->first);
scaleAgg.push_back(0);
precisionAgg.push_back(19);
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
widthAgg.push_back(bigIntWidth);
}
}
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVec.size(); i++)
{
uint64_t j = functionVec[i]->fInputColumnIndex;
if (functionVec[i]->fAggFunction == ROWAGG_UDAF)
{
// Column for index of UDAF UserData struct
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec[i].get());
if (!udafFuncCol)
{
throw logic_error(
"(3)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVec[i]->fAuxColumnIndex = lastCol++;
oidsAgg.push_back(oidsProj[j]);
keysAgg.push_back(keysProj[j]);
scaleAgg.push_back(0);
precisionAgg.push_back(0);
precisionAgg.push_back(0);
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
csNumAgg.push_back(8);
widthAgg.push_back(bigUintWidth);
continue;
}
if (functionVec[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVec[i]->fAuxColumnIndex = lastCol;
// mean(x)
oidsAgg.push_back(oidsProj[j]);
keysAgg.push_back(keysProj[j]);
scaleAgg.push_back(0);
precisionAgg.push_back(-1);
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAgg.push_back(8);
widthAgg.push_back(sizeof(long double));
++lastCol;
// sum(x_i - mean)^2
oidsAgg.push_back(oidsProj[j]);
keysAgg.push_back(keysProj[j]);
scaleAgg.push_back(0);
precisionAgg.push_back(-1);
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAgg.push_back(8);
widthAgg.push_back(sizeof(long double));
++lastCol;
}
// calculate the offset and create the rowaggregation, rowgroup
posAgg.push_back(2);
for (uint64_t i = 0; i < oidsAgg.size(); i++)
posAgg.push_back(posAgg[i] + widthAgg[i]);
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAgg(
new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit, jobInfo.hasRollup));
rowAgg->timeZone(jobInfo.timeZone);
rowgroups.push_back(aggRG);
aggregators.push_back(rowAgg);
// mapping the group_concat columns, if any.
if (jobInfo.groupConcatInfo.groupConcat().size() > 0)
{
jobInfo.groupConcatInfo.mapColumns(projRG);
rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
}
if (jobInfo.trace)
cout << "\n====== Aggregation RowGroups ======" << endl
<< "projected RG: " << projRG.toString() << endl
<< "aggregated RG: " << aggRG.toString() << endl;
}
void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
vector<SP_ROWAGG_t>& aggregators)
{
// check if there are any aggregate columns
vector<pair<uint32_t, int>> aggColVec;
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
if (returnedColVec[i].second != 0)
aggColVec.push_back(returnedColVec[i]);
}
// populate the aggregate rowgroup: projectedRG -> aggregateRG
//
// Aggregate preparation by joblist factory:
// 1. get projected rowgroup (done by doAggProject) -- passed in
// 2. construct aggregate rowgroup -- output of UM
const RowGroup projRG = rowgroups[0];
const vector<uint32_t>& oidsProj = projRG.getOIDs();
const vector<uint32_t>& keysProj = projRG.getKeys();
const vector<uint32_t>& scaleProj = projRG.getScale();
const vector<uint32_t>& precisionProj = projRG.getPrecision();
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
vector<uint32_t> posAgg, posAggDist;
vector<uint32_t> oidsAgg, oidsAggDist;
vector<uint32_t> keysAgg, keysAggDist;
vector<uint32_t> scaleAgg, scaleAggDist;
vector<uint32_t> precisionAgg, precisionAggDist;
vector<CalpontSystemCatalog::ColDataType> typeAgg, typeAggDist;
vector<uint32_t> csNumAgg, csNumAggDist;
vector<uint32_t> widthProj, widthAgg, widthAggDist;
vector<SP_ROWAGG_GRPBY_t> groupBy, groupByNoDist;
vector<SP_ROWAGG_FUNC_t> functionVec1, functionVec2, functionNoDistVec;
uint32_t bigIntWidth = sizeof(int64_t);
// map key = column key, operation (enum), and UDAF pointer if UDAF.
AGG_MAP aggFuncMap;
// set<uint32_t> avgSet;
list<uint32_t> multiParmIndexes;
// fOR udaf
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
uint32_t projColsUDAFIdx = 0;
uint32_t udafcParamIdx = 0;
// for count column of average function
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
// collect the projected column info, prepare for aggregation
vector<uint32_t> width;
for (uint64_t i = 0; i < keysProj.size(); i++)
{
width.push_back(projRG.getColumnWidth(i));
}
// associate the columns between projected RG and aggregate RG on UM
// populated the aggregate columns
// the groupby columns are put in front, even not a returned column
// sum and count(column name) are omitted, if avg present
{
// project only unique oids, but they may be repeated in aggregation
// collect the projected column info, prepare for aggregation
map<uint32_t, int> projColPosMap;
for (uint64_t i = 0; i < keysProj.size(); i++)
{
projColPosMap.insert(make_pair(keysProj[i], i));
widthProj.push_back(projRG.getColumnWidth(i));
}
// column index for aggregate rowgroup
uint64_t colAgg = 0;
// for groupby column
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
uint32_t key = jobInfo.groupByColVec[i];
if (projColPosMap.find(key) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
cerr << "prep1PhaseDistinctAggregate: groupby " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
cerr << endl;
throw logic_error(emsg.str());
}
uint64_t colProj = projColPosMap[key];
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg));
groupBy.push_back(groupby);
// copy down to aggregation rowgroup
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(widthProj[colProj]);
aggFuncMap.insert(make_pair(
boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
colAgg));
colAgg++;
}
// for distinct column
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
{
uint32_t key = jobInfo.distinctColVec[i];
if (projColPosMap.find(key) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
cerr << endl;
throw logic_error(emsg.str());
}
// check for dup distinct column -- @bug6126
if (find(keysAgg.begin(), keysAgg.end(), key) != keysAgg.end())
continue;
uint64_t colProj = projColPosMap[key];
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg));
groupBy.push_back(groupby);
// copy down to aggregation rowgroup
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(widthProj[colProj]);
aggFuncMap.insert(make_pair(
boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
colAgg));
colAgg++;
}
// vectors for aggregate functions
RowAggFunctionType aggOp = ROWAGG_FUNCT_UNDEFINE;
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
for (uint64_t i = 0; i < aggColVec.size(); i++)
{
pUDAFFunc = NULL;
uint32_t aggKey = aggColVec[i].first;
aggOp = functionIdMap(aggColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
if (aggOp != ROWAGG_MULTI_PARM)
prevAggOp = aggOp;
// skip if this is a constant
if (aggOp == ROWAGG_CONSTANT)
continue;
// skip if this is a group_concat
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(aggKey, jobInfo);
uint32_t width = sizeof(GroupConcatAg*);
oidsAgg.push_back(ti.oid);
keysAgg.push_back(aggKey);
scaleAgg.push_back(ti.scale);
precisionAgg.push_back(ti.precision);
typeAgg.push_back(ti.dtype);
csNumAgg.push_back(ti.csNum);
widthAgg.push_back(width);
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colAgg, colAgg, -1));
functionVec1.push_back(funct);
aggFuncMap.insert(make_pair(
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
colAgg));
colAgg++;
continue;
}
if (projColPosMap.find(aggKey) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
cerr << "prep1PhaseDistinctAggregate: aggregate " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
cerr << endl;
throw logic_error(emsg.str());
}
// We skip distinct aggs, including extra parms. These are handled by adding them to group by list
// above.
if (aggOp == ROWAGG_DISTINCT_SUM || aggOp == ROWAGG_DISTINCT_AVG ||
aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
continue;
if (aggOp == ROWAGG_MULTI_PARM && prevAggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
continue;
uint64_t colProj = projColPosMap[aggKey];
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i + 1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error(
"(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough "
"UDAFColumns");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
}
// skip if this is a duplicate
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM &&
aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL)) !=
aggFuncMap.end())
{
// skip if this is a duplicate
continue;
}
functionVec1.push_back(funct);
aggFuncMap.insert(make_pair(
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
colAgg));
switch (aggOp)
{
case ROWAGG_MIN:
case ROWAGG_MAX:
case ROWAGG_SELECT_SOME:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(widthProj[colProj]);
colAgg++;
}
break;
case ROWAGG_SUM:
case ROWAGG_AVG:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep1PhaseDistinctAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
csNumAgg.push_back(8);
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAgg,
scaleAgg, precisionAgg, widthAgg);
colAgg++;
// has distinct step, put the count column for avg next to the sum
// let fall through to add a count column for average function
if (aggOp == ROWAGG_AVG)
funct->fAuxColumnIndex = colAgg;
else
break;
}
/* fall through */
case ROWAGG_COUNT_ASTERISK:
case ROWAGG_COUNT_COL_NAME:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(0);
// work around count() in select subquery
precisionAgg.push_back(rowgroup::MagicPrecisionForCountAgg);
if (isUnsigned(typeProj[colProj]))
{
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
}
else
{
typeAgg.push_back(CalpontSystemCatalog::BIGINT);
}
csNumAgg.push_back(8);
widthAgg.push_back(bigIntWidth);
colAgg++;
}
break;
case ROWAGG_STATS:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("variance/standard deviation");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep1PhaseDistinctAggregate:: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
// count(x)
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(0);
typeAgg.push_back(CalpontSystemCatalog::DOUBLE);
csNumAgg.push_back(8);
widthAgg.push_back(sizeof(double));
funct->fAuxColumnIndex = ++colAgg;
// mean(x)
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(0);
precisionAgg.push_back(-1);
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAgg.push_back(8);
widthAgg.push_back(sizeof(long double));
++colAgg;
// sum(x_i - mean)^2
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(0);
precisionAgg.push_back(-1);
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAgg.push_back(8);
widthAgg.push_back(sizeof(long double));
++colAgg;
}
break;
case ROWAGG_BIT_AND:
case ROWAGG_BIT_OR:
case ROWAGG_BIT_XOR:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(0);
precisionAgg.push_back(-16); // for connector to skip null check
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
csNumAgg.push_back(8);
widthAgg.push_back(bigIntWidth);
colAgg++;
}
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error(
"(2)prep1PhaseDistinctAggregate A UDAF function is called but there's no RowUDAFFunctionCol");
}
// Return column
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
csNumAgg.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
++colAgg;
// Column for index of UDAF UserData struct
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(0);
precisionAgg.push_back(0);
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
csNumAgg.push_back(8);
widthAgg.push_back(sizeof(uint64_t));
funct->fAuxColumnIndex = colAgg++;
// If the first param is const
udafcParamIdx = 0;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
++udafcParamIdx;
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
csNumAgg.push_back(csNumProj[colProj]);
widthAgg.push_back(widthProj[colProj]);
multiParmIndexes.push_back(colAgg);
++colAgg;
// If the param is const
if (udafc)
{
if (udafcParamIdx > udafc->aggParms().size() - 1)
{
throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with too many parms",
aggregateFuncErr);
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
}
else if (prevAggOp != ROWAGG_COUNT_DISTINCT_COL_NAME)
{
throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with no parms",
aggregateFuncErr);
}
++udafcParamIdx;
}
break;
default:
{
ostringstream emsg;
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
cerr << "prep1PhaseDistinctAggregate: " << emsg.str() << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
}
}
}
}
// populated the functionNoDistVec
{
// for (uint32_t idx = 0; idx < functionVec1.size(); idx++)
// {
// SP_ROWAGG_FUNC_t func1 = functionVec1[idx];
// SP_ROWAGG_FUNC_t funct(
// new RowAggFunctionCol(func1->fAggFunction,
// func1->fStatsFunction,
// func1->fOutputColumnIndex,
// func1->fOutputColumnIndex,
// func1->fAuxColumnIndex));
// functionNoDistVec.push_back(funct);
// }
functionNoDistVec = functionVec1;
}
// associate the columns between the non-distinct aggregator and distinct aggregator
// populated the returned columns
// remove not returned groupby column
// add back sum or count(column name) if omitted due to avg column
// put count(column name) column to the end, if it is for avg only
{
// check if the count column for AVG is also a returned column,
// if so, replace the "-1" to actual position in returned vec.
AGG_MAP aggDupFuncMap;
projColsUDAFIdx = 0;
// copy over the groupby vector
// update the outputColumnIndex if returned
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
groupByNoDist.push_back(groupby);
aggFuncMap.insert(make_pair(
boost::make_tuple(keysAgg[i], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), i));
}
// locate the return column position in aggregated rowgroup
uint64_t outIdx = 0;
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
uint32_t prevRetKey = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
udafc = NULL;
pUDAFFunc = NULL;
uint32_t retKey = returnedColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
int colAgg = -1;
if (aggOp == ROWAGG_MULTI_PARM)
{
// Duplicate detection doesn't work for multi-parm`
// If this function was earlier detected as a duplicate, unduplicate it.
SP_ROWAGG_FUNC_t funct = functionVec2.back();
if (funct->fAggFunction == ROWAGG_DUP_FUNCT)
funct->fAggFunction = prevAggOp;
// Remove it from aggDupFuncMap if it's in there.
funct->hasMultiParm = true;
AGG_MAP::iterator it = aggDupFuncMap.find(boost::make_tuple(
prevRetKey, prevAggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggDupFuncMap.end())
{
aggDupFuncMap.erase(it);
}
// Skip on final agg.: Extra parms for an aggregate have no work there.
continue;
}
else
{
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
prevAggOp = aggOp;
prevRetKey = returnedColVec[i].first;
}
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
jobInfo.distinctColVec.end())
{
AGG_MAP::iterator it = aggFuncMap.find(
boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
colAgg = it->second;
}
else
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[retKey] << "' isn't in tuple.";
cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable;
if (jobInfo.keyInfo->tupleKeyVec[retKey].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView;
cerr << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
}
}
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i + 1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error(
"(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough "
"UDAFColumns");
}
}
switch (aggOp)
{
case ROWAGG_DISTINCT_AVG:
case ROWAGG_DISTINCT_SUM:
{
if (typeAgg[colAgg] == CalpontSystemCatalog::CHAR ||
typeAgg[colAgg] == CalpontSystemCatalog::VARCHAR ||
typeAgg[colAgg] == CalpontSystemCatalog::BLOB ||
typeAgg[colAgg] == CalpontSystemCatalog::TEXT ||
typeAgg[colAgg] == CalpontSystemCatalog::DATE ||
typeAgg[colAgg] == CalpontSystemCatalog::DATETIME ||
typeAgg[colAgg] == CalpontSystemCatalog::TIMESTAMP ||
typeAgg[colAgg] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeAgg[colAgg]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep1PhaseDistinctAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
oidsAggDist.push_back(oidsAgg[colAgg]);
keysAggDist.push_back(retKey);
wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg, typeAggDist,
scaleAggDist, precisionAggDist, widthAggDist);
csNumAggDist.push_back(8);
}
break;
case ROWAGG_COUNT_DISTINCT_COL_NAME:
{
oidsAggDist.push_back(oidsAgg[colAgg]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
// work around count() in select subquery
precisionAggDist.push_back(rowgroup::MagicPrecisionForCountAgg);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
break;
case ROWAGG_MIN:
case ROWAGG_MAX:
case ROWAGG_SUM:
case ROWAGG_AVG:
case ROWAGG_COUNT_ASTERISK:
case ROWAGG_COUNT_COL_NAME:
case ROWAGG_STATS:
case ROWAGG_BIT_AND:
case ROWAGG_BIT_OR:
case ROWAGG_BIT_XOR:
case ROWAGG_SELECT_SOME:
default:
{
AGG_MAP::iterator it = aggFuncMap.find(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
colAgg = it->second;
oidsAggDist.push_back(oidsAgg[colAgg]);
keysAggDist.push_back(keysAgg[colAgg]);
scaleAggDist.push_back(scaleAgg[colAgg]);
precisionAggDist.push_back(precisionAgg[colAgg]);
typeAggDist.push_back(typeAgg[colAgg]);
csNumAggDist.push_back(csNumAgg[colAgg]);
uint32_t width = widthAgg[colAgg];
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
if (ti.width > width)
width = ti.width;
}
widthAggDist.push_back(width);
}
// not a direct hit -- a returned column is not already in the RG from PMs
else
{
uint32_t foundTupleKey{0};
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
colAgg = it->second;
oidsAggDist.push_back(oidsAgg[colAgg]);
keysAggDist.push_back(keysAgg[colAgg]);
scaleAggDist.push_back(scaleAgg[colAgg]);
precisionAggDist.push_back(precisionAgg[colAgg]);
typeAggDist.push_back(typeAgg[colAgg]);
csNumAggDist.push_back(csNumAgg[colAgg]);
uint32_t width = widthAgg[colAgg];
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
if (ti.width > width)
width = ti.width;
}
widthAggDist.push_back(width);
// Update the `retKey` to specify that this column is a duplicate.
retKey = foundTupleKey;
}
else
{
bool returnColMissing = true;
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
{
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
// false alarm
returnColMissing = false;
colAgg = it->second;
if (aggOp == ROWAGG_SUM)
{
oidsAggDist.push_back(oidsAgg[colAgg]);
keysAggDist.push_back(retKey);
csNumAggDist.push_back(8);
wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg,
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
}
else
{
// leave the count() to avg
aggOp = ROWAGG_COUNT_NO_OP;
oidsAggDist.push_back(oidsAgg[colAgg]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
if (isUnsigned(typeAgg[colAgg]))
{
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
precisionAggDist.push_back(20);
}
else
{
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
precisionAggDist.push_back(19);
}
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
}
}
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
jobInfo.expressionVec.end())
{
// a function on aggregation
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
csNumAggDist.push_back(ti.csNum);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
else if (aggOp == ROWAGG_CONSTANT)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
csNumAggDist.push_back(ti.csNum);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
#if 0
else if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
csNumAggDist.push_back(ti.csNum);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
#endif
else if (jobInfo.groupConcatInfo.columns().find(retKey) !=
jobInfo.groupConcatInfo.columns().end())
{
// TODO: columns only for group_concat do not needed in result set.
for (uint64_t k = 0; k < keysProj.size(); k++)
{
if (retKey == keysProj[k])
{
oidsAggDist.push_back(oidsProj[k]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(scaleProj[k] >> 8);
precisionAggDist.push_back(precisionProj[k]);
typeAggDist.push_back(typeProj[k]);
csNumAggDist.push_back(csNumProj[k]);
widthAggDist.push_back(widthProj[k]);
returnColMissing = false;
break;
}
}
}
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
{
// skip window columns/expression, which are computed later
for (uint64_t k = 0; k < keysProj.size(); k++)
{
if (retKey == keysProj[k])
{
oidsAggDist.push_back(oidsProj[k]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(scaleProj[k] >> 8);
precisionAggDist.push_back(precisionProj[k]);
typeAggDist.push_back(typeProj[k]);
csNumAggDist.push_back(csNumProj[k]);
widthAggDist.push_back(widthProj[k]);
returnColMissing = false;
break;
}
}
}
if (returnColMissing)
{
Message::Args args;
args.add(keyName(outIdx, retKey, jobInfo));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep1PhaseDistinctAggregate: " << emsg
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
<< endl;
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
}
} // else
} // switch
}
}
// update groupby vector if the groupby column is a returned column
if (returnedColVec[i].second == 0)
{
int dupGroupbyIndex = -1;
for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
{
if (jobInfo.groupByColVec[j] == retKey)
{
if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t)-1)
groupByNoDist[j]->fOutputColumnIndex = outIdx;
else
dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex;
}
}
// a duplicate group by column
if (dupGroupbyIndex != -1)
functionVec2.push_back(SP_ROWAGG_FUNC_t(
new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
}
else
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, outIdx));
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colAgg, outIdx));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
funct->fAuxColumnIndex = colAgg;
else if (aggOp == ROWAGG_CONSTANT)
funct->fAuxColumnIndex = jobInfo.cntStarPos;
functionVec2.push_back(funct);
// find if this func is a duplicate
AGG_MAP::iterator iter = aggDupFuncMap.find(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (iter != aggDupFuncMap.end())
{
if (funct->fAggFunction == ROWAGG_AVG)
funct->fAggFunction = ROWAGG_DUP_AVG;
else if (funct->fAggFunction == ROWAGG_STATS)
funct->fAggFunction = ROWAGG_DUP_STATS;
else if (funct->fAggFunction == ROWAGG_UDAF)
funct->fAggFunction = ROWAGG_DUP_UDAF;
else
funct->fAggFunction = ROWAGG_DUP_FUNCT;
funct->fAuxColumnIndex = iter->second;
}
else
{
aggDupFuncMap.insert(make_pair(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
funct->fOutputColumnIndex));
}
if (returnedColVec[i].second == AggregateColumn::AVG)
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
}
++outIdx;
} // for (i
// now fix the AVG function, locate the count(column) position
for (uint64_t i = 0; i < functionVec2.size(); i++)
{
if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_NO_OP)
{
// if the count(k) can be associated with an avg(k)
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
avgFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]);
if (k != avgFuncMap.end())
k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex;
}
}
// there is avg(k), but no count(k) in the select list
uint64_t lastCol = outIdx;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{
if (k->second->fAuxColumnIndex == (uint32_t)-1)
{
k->second->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
keysAggDist.push_back(k->first);
scaleAggDist.push_back(0);
precisionAggDist.push_back(19);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
}
// now fix the AVG distinct function, locate the count(distinct column) position
for (uint64_t i = 0; i < functionVec2.size(); i++)
{
if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME && !functionVec2[i]->hasMultiParm)
{
// if the count(distinct k) can be associated with an avg(distinct k)
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
avgDistFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]);
if (k != avgDistFuncMap.end())
{
k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex;
functionVec2[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
}
}
}
// there is avg(distinct k), but no count(distinct k) in the select list
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++)
{
if (k->second->fAuxColumnIndex == (uint32_t)-1)
{
k->second->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
keysAggDist.push_back(k->first);
scaleAggDist.push_back(0);
precisionAggDist.push_back(19);
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
}
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVec2.size(); i++)
{
uint64_t j = functionVec2[i]->fInputColumnIndex;
if (functionVec2[i]->fAggFunction == ROWAGG_UDAF)
{
// Column for index of UDAF UserData struct
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec2[i].get());
if (!udafFuncCol)
{
throw logic_error(
"(4)prep1PhaseDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVec2[i]->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(oidsAgg[j]); // Dummy?
keysAggDist.push_back(keysAgg[j]); // Dummy?
scaleAggDist.push_back(0);
precisionAggDist.push_back(0);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(sizeof(uint64_t));
continue;
}
if (functionVec2[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVec2[i]->fAuxColumnIndex = lastCol;
// mean(x)
oidsAggDist.push_back(oidsAgg[j]);
keysAggDist.push_back(keysAgg[j]);
scaleAggDist.push_back(0);
precisionAggDist.push_back(0);
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggDist.push_back(8);
widthAggDist.push_back(sizeof(long double));
++lastCol;
// sum(x_i - mean)^2
oidsAggDist.push_back(oidsAgg[j]);
keysAggDist.push_back(keysAgg[j]);
scaleAggDist.push_back(0);
precisionAggDist.push_back(-1);
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggDist.push_back(8);
widthAggDist.push_back(sizeof(long double));
++lastCol;
}
}
// calculate the offset and create the rowaggregation, rowgroup
posAgg.push_back(2);
for (uint64_t i = 0; i < oidsAgg.size(); i++)
posAgg.push_back(posAgg[i] + widthAgg[i]);
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit, false));
rowAgg->timeZone(jobInfo.timeZone);
posAggDist.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggDist.size(); i++)
posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, csNumAggDist,
scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold);
SP_ROWAGG_DIST rowAggDist(
new RowAggregationDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit));
rowAggDist->timeZone(jobInfo.timeZone);
// mapping the group_concat columns, if any.
if (jobInfo.groupConcatInfo.groupConcat().size() > 0)
{
jobInfo.groupConcatInfo.mapColumns(projRG);
rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat());
}
// if distinct key word applied to more than one aggregate column, reset rowAggDist
vector<RowGroup> subRgVec;
if (jobInfo.distinctColVec.size() > 1)
{
RowAggregationMultiDistinct* multiDistinctAggregator =
new RowAggregationMultiDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit);
multiDistinctAggregator->timeZone(jobInfo.timeZone);
rowAggDist.reset(multiDistinctAggregator);
rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat());
// construct and add sub-aggregators to rowAggDist
vector<uint32_t> posAggGb, posAggSub;
vector<uint32_t> oidsAggGb, oidsAggSub;
vector<uint32_t> keysAggGb, keysAggSub;
vector<uint32_t> scaleAggGb, scaleAggSub;
vector<uint32_t> precisionAggGb, precisionAggSub;
vector<CalpontSystemCatalog::ColDataType> typeAggGb, typeAggSub;
vector<uint32_t> csNumAggGb, csNumAggSub;
vector<uint32_t> widthAggGb, widthAggSub;
// populate groupby column info
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
oidsAggGb.push_back(oidsProj[i]);
keysAggGb.push_back(keysProj[i]);
scaleAggGb.push_back(scaleProj[i]);
precisionAggGb.push_back(precisionProj[i]);
typeAggGb.push_back(typeProj[i]);
csNumAggGb.push_back(csNumProj[i]);
widthAggGb.push_back(widthProj[i]);
}
// for distinct, each column requires seperate rowgroup
vector<SP_ROWAGG_DIST> rowAggSubDistVec;
uint32_t distinctColKey;
int64_t j;
uint64_t k;
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
if (returnedColVec[i].second == 0)
{
++outIdx;
continue;
}
j = -1;
distinctColKey = -1;
// Find the entry in distinctColVec, if any
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
{
distinctColKey = jobInfo.distinctColVec[k];
if (returnedColVec[i].first == distinctColKey)
break;
}
if (distinctColKey == (uint32_t)-1)
{
++outIdx;
continue;
}
// locate the distinct key in the row group
for (k = 0; k < keysAgg.size(); k++)
{
if (keysProj[k] == distinctColKey)
{
j = k;
break;
}
}
idbassert(j != -1);
oidsAggSub = oidsAggGb;
keysAggSub = keysAggGb;
scaleAggSub = scaleAggGb;
precisionAggSub = precisionAggGb;
typeAggSub = typeAggGb;
csNumAggSub = csNumAggGb;
widthAggSub = widthAggGb;
oidsAggSub.push_back(oidsProj[j]);
keysAggSub.push_back(keysProj[j]);
scaleAggSub.push_back(scaleProj[j]);
precisionAggSub.push_back(precisionProj[j]);
typeAggSub.push_back(typeProj[j]);
csNumAggSub.push_back(csNumProj[j]);
widthAggSub.push_back(widthProj[j]);
// construct groupby vector
vector<SP_ROWAGG_GRPBY_t> groupBySub;
k = 0;
while (k < jobInfo.groupByColVec.size())
{
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k));
groupBySub.push_back(groupby);
k++;
}
// add the distinct column as groupby
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
groupBySub.push_back(groupby);
// Add multi parm distinct
while ((i + 1) < returnedColVec.size() &&
functionIdMap(returnedColVec[i + 1].second) == ROWAGG_MULTI_PARM)
{
++i;
uint32_t dColKey = -1;
j = -1;
// Find the entry in distinctColVec, if any
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
{
dColKey = jobInfo.distinctColVec[k];
if (returnedColVec[i].first == dColKey)
break;
}
idbassert(dColKey != (uint32_t)-1);
// locate the distinct key in the row group
for (k = 0; k < keysAgg.size(); k++)
{
if (keysProj[k] == dColKey)
{
j = k;
break;
}
}
idbassert(j != -1);
oidsAggSub.push_back(oidsProj[j]);
keysAggSub.push_back(keysProj[j]);
scaleAggSub.push_back(scaleProj[j]);
precisionAggSub.push_back(precisionProj[j]);
typeAggSub.push_back(typeProj[j]);
csNumAggSub.push_back(csNumProj[j]);
widthAggSub.push_back(widthProj[j]);
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
groupBySub.push_back(groupby);
}
// construct sub-rowgroup
posAggSub.clear();
posAggSub.push_back(2); // rid
for (k = 0; k < oidsAggSub.size(); k++)
posAggSub.push_back(posAggSub[k] + widthAggSub[k]);
RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub, csNumAggSub,
scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold);
subRgVec.push_back(subRg);
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
// tricky part : 2 function vectors
// -- dummy function vector for sub-aggregator, which does distinct only
// -- aggregate function on this distinct column for rowAggDist
vector<SP_ROWAGG_FUNC_t> functionSub1, functionSub2;
// search the function in functionVec
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVec2.begin();
while (it != functionVec2.end())
{
SP_ROWAGG_FUNC_t f = *it++;
if ((f->fOutputColumnIndex == outIdx) &&
(f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || f->fAggFunction == ROWAGG_DISTINCT_SUM ||
f->fAggFunction == ROWAGG_DISTINCT_AVG))
{
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction,
groupBySub.size() - 1, f->fOutputColumnIndex,
f->fAuxColumnIndex - multiParms));
functionSub2.push_back(funct);
}
}
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(
new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
subAgg->timeZone(jobInfo.timeZone);
subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
// add to rowAggDist
multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2);
++outIdx;
}
// cover any non-distinct column functions
{
vector<SP_ROWAGG_FUNC_t> functionSub1 = functionNoDistVec;
vector<SP_ROWAGG_FUNC_t> functionSub2;
int64_t multiParms = 0;
for (uint64_t k = 0; k < returnedColVec.size(); k++)
{
// search non-distinct functions in functionVec
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVec2.begin();
while (it != functionVec2.end())
{
SP_ROWAGG_FUNC_t funct;
SP_ROWAGG_FUNC_t f = *it++;
if (f->fAggFunction == ROWAGG_UDAF)
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(f.get());
funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex - multiParms));
functionSub2.push_back(funct);
}
else if ((f->fOutputColumnIndex == k) &&
(f->fAggFunction == ROWAGG_COUNT_ASTERISK || f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
f->fAggFunction == ROWAGG_SUM || f->fAggFunction == ROWAGG_AVG ||
f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX ||
f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND ||
f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR ||
f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT ||
f->fAggFunction == ROWAGG_JSON_ARRAY || f->fAggFunction == ROWAGG_SELECT_SOME))
{
funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex,
f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms));
functionSub2.push_back(funct);
}
}
}
if (functionSub1.size() > 0)
{
// make sure the group by columns are available for next aggregate phase.
vector<SP_ROWAGG_GRPBY_t> groupBySubNoDist;
for (uint64_t i = 0; i < groupByNoDist.size(); i++)
groupBySubNoDist.push_back(
SP_ROWAGG_GRPBY_t(new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i)));
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(
new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false));
subAgg->timeZone(jobInfo.timeZone);
subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
// add to rowAggDist
multiDistinctAggregator->addSubAggregator(subAgg, aggRG, functionSub2);
subRgVec.push_back(aggRG);
}
}
}
rowAggDist->addAggregator(rowAgg, aggRG);
rowgroups.push_back(aggRgDist);
aggregators.push_back(rowAggDist);
if (jobInfo.trace)
{
cout << "projected RG: " << projRG.toString() << endl << "aggregated RG: " << aggRG.toString() << endl;
for (uint64_t i = 0; i < subRgVec.size(); i++)
cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl;
cout << "aggregatedDist RG: " << aggRgDist.toString() << endl;
}
}
void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
vector<SP_ROWAGG_t>& aggregators)
{
// check if there are any aggregate columns
// a vector that has the aggregate function to be done by PM
vector<pair<uint32_t, int>> aggColVec;
set<uint32_t> avgSet;
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
// For UDAF
uint32_t projColsUDAFIdx = 0;
uint32_t udafcParamIdx = 0;
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
// skip if not an aggregation column
if (returnedColVec[i].second == 0)
continue;
aggColVec.push_back(returnedColVec[i]);
// remember if a column has an average function,
// with avg function, no need for separate sum or count_column_name
if (returnedColVec[i].second == AggregateColumn::AVG)
avgSet.insert(returnedColVec[i].first);
}
// populate the aggregate rowgroup on PM and UM
// PM: projectedRG -> aggregateRGPM
// UM: aggregateRGPM -> aggregateRGUM
//
// Aggregate preparation by joblist factory:
// 1. get projected rowgroup (done by doAggProject) -- input to PM AGG
// 2. construct aggregate rowgroup -- output of PM, input of UM
// 3. construct aggregate rowgroup -- output of UM
const RowGroup projRG = rowgroups[0];
const vector<uint32_t>& oidsProj = projRG.getOIDs();
const vector<uint32_t>& keysProj = projRG.getKeys();
const vector<uint32_t>& scaleProj = projRG.getScale();
const vector<uint32_t>& precisionProj = projRG.getPrecision();
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
vector<uint32_t> posAggPm, posAggUm;
vector<uint32_t> oidsAggPm, oidsAggUm;
vector<uint32_t> keysAggPm, keysAggUm;
vector<uint32_t> scaleAggPm, scaleAggUm;
vector<uint32_t> precisionAggPm, precisionAggUm;
vector<CalpontSystemCatalog::ColDataType> typeAggPm, typeAggUm;
vector<uint32_t> csNumAggPm, csNumAggUm;
vector<uint32_t> widthAggPm, widthAggUm;
vector<SP_ROWAGG_GRPBY_t> groupByPm, groupByUm;
vector<SP_ROWAGG_FUNC_t> functionVecPm, functionVecUm;
uint32_t bigIntWidth = sizeof(int64_t);
uint32_t bigUintWidth = sizeof(uint64_t);
AGG_MAP aggFuncMap;
// associate the columns between projected RG and aggregate RG on PM
// populated the aggregate columns
// the groupby columns are put in front, even not a returned column
// sum and count(column name) are omitted, if avg present
{
// project only unique oids, but they may be repeated in aggregation
// collect the projected column info, prepare for aggregation
vector<uint32_t> width;
map<uint32_t, int> projColPosMap;
for (uint64_t i = 0; i < keysProj.size(); i++)
{
projColPosMap.insert(make_pair(keysProj[i], i));
width.push_back(projRG.getColumnWidth(i));
}
// column index for PM aggregate rowgroup
uint64_t colAggPm = 0;
// for groupby column
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
uint32_t key = jobInfo.groupByColVec[i];
if (projColPosMap.find(key) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
cerr << "prep2PhasesAggregate: groupby " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
cerr << endl;
throw logic_error(emsg.str());
}
uint64_t colProj = projColPosMap[key];
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
groupByPm.push_back(groupby);
// PM: just copy down to aggregation rowgroup
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(key);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL),
colAggPm));
colAggPm++;
}
// for distinct column
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
{
uint32_t key = jobInfo.distinctColVec[i];
if (projColPosMap.find(key) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
cerr << "prep2PhasesAggregate: distinct " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
cerr << endl;
throw logic_error(emsg.str());
}
uint64_t colProj = projColPosMap[key];
// check for dup distinct column -- @bug6126
if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end())
continue;
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
groupByPm.push_back(groupby);
// PM: just copy down to aggregation rowgroup
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(key);
scaleAggPm.push_back(scaleProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL),
colAggPm));
colAggPm++;
}
// vectors for aggregate functions
for (uint64_t i = 0; i < aggColVec.size(); i++)
{
pUDAFFunc = NULL;
uint32_t aggKey = aggColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
// skip on PM if this is a constant
if (aggOp == ROWAGG_CONSTANT)
continue;
if (projColPosMap.find(aggKey) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
cerr << "prep2PhasesAggregate: aggregate " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
cerr << endl;
throw logic_error(emsg.str());
}
if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && (avgSet.find(aggKey) != avgSet.end()))
// skip sum / count(column) if avg is also selected
continue;
uint64_t colProj = projColPosMap[aggKey];
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i + 1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error(
"(1)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
}
// skip if this is a duplicate
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM &&
aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL)) !=
aggFuncMap.end())
{
// skip if this is a duplicate
continue;
}
functionVecPm.push_back(funct);
aggFuncMap.insert(make_pair(
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
colAggPm));
switch (aggOp)
{
case ROWAGG_MIN:
case ROWAGG_MAX:
case ROWAGG_SELECT_SOME:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
colAggPm++;
}
break;
case ROWAGG_SUM:
case ROWAGG_AVG:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep2PhasesAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAggPm,
scaleAggPm, precisionAggPm, widthAggPm);
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
csNumAggPm.push_back(8);
colAggPm++;
}
// PM: put the count column for avg next to the sum
// let fall through to add a count column for average function
if (aggOp != ROWAGG_AVG)
break;
// The AVG aggregation has a special treatment everywhere.
// This is so because AVG(column) is a SUM(column)/COUNT(column)
// and these aggregations can be utilized by AVG, if present.
/* fall through */
case ROWAGG_COUNT_ASTERISK:
case ROWAGG_COUNT_COL_NAME:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
// work around count() in select subquery
precisionAggPm.push_back(rowgroup::MagicPrecisionForCountAgg);
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggPm.push_back(8);
widthAggPm.push_back(bigIntWidth);
colAggPm++;
}
break;
case ROWAGG_STATS:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("variance/standard deviation");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep2PhaseAggregate:: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
// counts(x)
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(0);
typeAggPm.push_back(CalpontSystemCatalog::DOUBLE);
csNumAggPm.push_back(8);
widthAggPm.push_back(sizeof(double));
funct->fAuxColumnIndex = ++colAggPm;
// mean(x)
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(-1);
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggPm.push_back(8);
widthAggPm.push_back(sizeof(long double));
++colAggPm;
// sum(x_i - mean)^2
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(-1);
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggPm.push_back(8);
widthAggPm.push_back(sizeof(long double));
++colAggPm;
}
break;
case ROWAGG_BIT_AND:
case ROWAGG_BIT_OR:
case ROWAGG_BIT_XOR:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(-16); // for connector to skip null check
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggPm.push_back(8);
widthAggPm.push_back(bigIntWidth);
colAggPm++;
}
break;
case ROWAGG_UDAF:
{
// Return column
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error(
"(2)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
++colAggPm;
// Column for index of UDAF UserData struct
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(0);
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggPm.push_back(8);
widthAggPm.push_back(bigUintWidth);
funct->fAuxColumnIndex = colAggPm++;
// If the first param is const
udafcParamIdx = 0;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
++udafcParamIdx;
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
colAggPm++;
// If the param is const
if (udafc)
{
if (udafcParamIdx > udafc->aggParms().size() - 1)
{
throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with too many parms",
aggregateFuncErr);
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
}
else
{
throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with no parms",
aggregateFuncErr);
}
++udafcParamIdx;
}
break;
default:
{
ostringstream emsg;
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
cerr << "prep2PhasesAggregate: " << emsg.str() << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
}
}
}
}
// associate the columns between the aggregate RGs on PM and UM
// populated the returned columns
// remove not returned groupby column
// add back sum or count(column name) if omitted due to avg column
// put count(column name) column to the end, if it is for avg only
{
// check if the count column for AVG is also a returned column,
// if so, replace the "-1" to actual position in returned vec.
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
AGG_MAP aggDupFuncMap;
projColsUDAFIdx = 0;
// copy over the groupby vector
// update the outputColumnIndex if returned
for (uint64_t i = 0; i < groupByPm.size(); i++)
{
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(groupByPm[i]->fOutputColumnIndex, -1));
groupByUm.push_back(groupby);
}
// locate the return column position in aggregated rowgroup from PM
// outIdx is i without the multi-columns,
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
uint32_t retKey = returnedColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
int colPm = -1;
if (aggOp == ROWAGG_MULTI_PARM)
{
// Skip on UM: Extra parms for an aggregate have no work on the UM
continue;
}
// Is this a UDAF? use the function as part of the key.
pUDAFFunc = NULL;
udafc = NULL;
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i + 1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error(
"(3)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
}
}
AGG_MAP::iterator it = aggFuncMap.find(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
colPm = it->second;
oidsAggUm.push_back(oidsAggPm[colPm]);
keysAggUm.push_back(retKey);
scaleAggUm.push_back(scaleAggPm[colPm]);
precisionAggUm.push_back(precisionAggPm[colPm]);
typeAggUm.push_back(typeAggPm[colPm]);
csNumAggUm.push_back(csNumAggPm[colPm]);
widthAggUm.push_back(widthAggPm[colPm]);
}
// not a direct hit -- a returned column is not already in the RG from PMs
else
{
// MCOL-5476.
uint32_t foundTupleKey{0};
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
colPm = it->second;
oidsAggUm.push_back(oidsAggPm[colPm]);
keysAggUm.push_back(retKey);
scaleAggUm.push_back(scaleAggPm[colPm]);
precisionAggUm.push_back(precisionAggPm[colPm]);
typeAggUm.push_back(typeAggPm[colPm]);
csNumAggUm.push_back(csNumAggPm[colPm]);
widthAggUm.push_back(widthAggPm[colPm]);
// Update the `retKey` to specify that this column is a duplicate.
retKey = foundTupleKey;
}
else
{
bool returnColMissing = true;
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
{
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
// false alarm
returnColMissing = false;
colPm = it->second;
if (aggOp == ROWAGG_SUM)
{
wideDecimalOrLongDouble(colPm, typeAggPm[colPm], precisionAggPm, scaleAggPm, widthAggPm,
typeAggUm, scaleAggUm, precisionAggUm, widthAggUm);
oidsAggUm.push_back(oidsAggPm[colPm]);
keysAggUm.push_back(retKey);
csNumAggUm.push_back(8);
}
else
{
// leave the count() to avg
aggOp = ROWAGG_COUNT_NO_OP;
colPm++;
oidsAggUm.push_back(oidsAggPm[colPm]);
keysAggUm.push_back(retKey);
scaleAggUm.push_back(0);
precisionAggUm.push_back(19);
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggUm.push_back(8);
widthAggUm.push_back(bigIntWidth);
}
}
}
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
jobInfo.expressionVec.end())
{
// a function on aggregation
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggUm.push_back(ti.oid);
keysAggUm.push_back(retKey);
scaleAggUm.push_back(ti.scale);
precisionAggUm.push_back(ti.precision);
typeAggUm.push_back(ti.dtype);
csNumAggUm.push_back(ti.csNum);
widthAggUm.push_back(ti.width);
returnColMissing = false;
}
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
{
// an window function
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggUm.push_back(ti.oid);
keysAggUm.push_back(retKey);
scaleAggUm.push_back(ti.scale);
precisionAggUm.push_back(ti.precision);
typeAggUm.push_back(ti.dtype);
csNumAggUm.push_back(ti.csNum);
widthAggUm.push_back(ti.width);
returnColMissing = false;
}
else if (aggOp == ROWAGG_CONSTANT)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggUm.push_back(ti.oid);
keysAggUm.push_back(retKey);
scaleAggUm.push_back(ti.scale);
precisionAggUm.push_back(ti.precision);
typeAggUm.push_back(ti.dtype);
csNumAggUm.push_back(ti.csNum);
widthAggUm.push_back(ti.width);
returnColMissing = false;
}
if (returnColMissing)
{
Message::Args args;
args.add(keyName(outIdx, retKey, jobInfo));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep2PhasesAggregate: " << emsg
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
<< endl;
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
}
}
}
// update groupby vector if the groupby column is a returned column
if (returnedColVec[i].second == 0)
{
int dupGroupbyIndex = -1;
for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
{
if (jobInfo.groupByColVec[j] == retKey)
{
if (groupByUm[j]->fOutputColumnIndex == (uint32_t)-1)
groupByUm[j]->fOutputColumnIndex = outIdx;
else
dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
}
}
for (uint64_t j = 0; j < jobInfo.distinctColVec.size(); j++)
{
if (jobInfo.distinctColVec[j] == retKey)
{
if (groupByUm[j]->fOutputColumnIndex == (uint32_t)-1)
groupByUm[j]->fOutputColumnIndex = outIdx;
else
dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
}
}
// a duplicate group by column
if (dupGroupbyIndex != -1)
{
functionVecUm.push_back(SP_ROWAGG_FUNC_t(
new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
}
}
else
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, outIdx));
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, outIdx));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
funct->fAuxColumnIndex = colPm;
else if (aggOp == ROWAGG_CONSTANT)
funct->fAuxColumnIndex = jobInfo.cntStarPos;
functionVecUm.push_back(funct);
// find if this func is a duplicate
AGG_MAP::iterator iter = aggDupFuncMap.find(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (iter != aggDupFuncMap.end())
{
if (funct->fAggFunction == ROWAGG_AVG)
funct->fAggFunction = ROWAGG_DUP_AVG;
else if (funct->fAggFunction == ROWAGG_STATS)
funct->fAggFunction = ROWAGG_DUP_STATS;
else if (funct->fAggFunction == ROWAGG_UDAF)
funct->fAggFunction = ROWAGG_DUP_UDAF;
else
funct->fAggFunction = ROWAGG_DUP_FUNCT;
funct->fAuxColumnIndex = iter->second;
}
else
{
aggDupFuncMap.insert(make_pair(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
funct->fOutputColumnIndex));
}
if (returnedColVec[i].second == AggregateColumn::AVG)
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
}
++outIdx;
}
// now fix the AVG function, locate the count(column) position
for (uint64_t i = 0; i < functionVecUm.size(); i++)
{
if (functionVecUm[i]->fAggFunction != ROWAGG_COUNT_NO_OP)
continue;
// if the count(k) can be associated with an avg(k)
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
avgFuncMap.find(keysAggUm[functionVecUm[i]->fOutputColumnIndex]);
if (k != avgFuncMap.end())
k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
}
// there is avg(k), but no count(k) in the select list
uint64_t lastCol = outIdx;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{
if (k->second->fAuxColumnIndex == (uint32_t)-1)
{
k->second->fAuxColumnIndex = lastCol++;
oidsAggUm.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
keysAggUm.push_back(k->first);
scaleAggUm.push_back(0);
precisionAggUm.push_back(19);
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggUm.push_back(8);
widthAggUm.push_back(bigIntWidth);
}
}
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVecUm.size(); i++)
{
uint64_t j = functionVecUm[i]->fInputColumnIndex;
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
{
// Column for index of UDAF UserData struct
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
if (!udafFuncCol)
{
throw logic_error(
"(4)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVecUm[i]->fAuxColumnIndex = lastCol++;
oidsAggUm.push_back(oidsAggPm[j]); // Dummy?
keysAggUm.push_back(keysAggPm[j]); // Dummy?
scaleAggUm.push_back(0);
precisionAggUm.push_back(0);
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggUm.push_back(8);
widthAggUm.push_back(bigUintWidth);
continue;
}
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVecUm[i]->fAuxColumnIndex = lastCol;
// mean(x)
oidsAggUm.push_back(oidsAggPm[j]);
keysAggUm.push_back(keysAggPm[j]);
scaleAggUm.push_back(0);
precisionAggUm.push_back(-1);
typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggUm.push_back(8);
widthAggUm.push_back(sizeof(long double));
++lastCol;
// sum(x_i - mean)^2
oidsAggUm.push_back(oidsAggPm[j]);
keysAggUm.push_back(keysAggPm[j]);
scaleAggUm.push_back(0);
precisionAggUm.push_back(-1);
typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggUm.push_back(8);
widthAggUm.push_back(sizeof(long double));
++lastCol;
}
}
// calculate the offset and create the rowaggregations, rowgroups
posAggUm.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggUm.size(); i++)
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
precisionAggUm, jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAggUm(
new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit, false));
rowAggUm->timeZone(jobInfo.timeZone);
rowgroups.push_back(aggRgUm);
aggregators.push_back(rowAggUm);
posAggPm.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggPm.size(); i++)
posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm, csNumAggPm, scaleAggPm,
precisionAggPm, jobInfo.stringTableThreshold);
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm, nullptr, nullptr, jobInfo.hasRollup));
rowAggPm->timeZone(jobInfo.timeZone);
rowgroups.push_back(aggRgPm);
aggregators.push_back(rowAggPm);
if (jobInfo.trace)
cout << "\n====== Aggregation RowGroups ======" << endl
<< "projected RG: " << projRG.toString() << endl
<< "aggregated1 RG: " << aggRgPm.toString() << endl
<< "aggregated2 RG: " << aggRgUm.toString() << endl;
}
void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
vector<SP_ROWAGG_t>& aggregators)
{
// check if there are any aggregate columns
// a vector that has the aggregate function to be done by PM
vector<pair<uint32_t, int>> aggColVec, aggNoDistColVec;
set<uint32_t> avgSet, avgDistSet;
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
// For UDAF
uint32_t projColsUDAFIdx = 0;
uint32_t udafcParamIdx = 0;
UDAFColumn* udafc = NULL;
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
// col should be an aggregate or groupBy or window function
uint32_t rtcKey = returnedColVec[i].first;
uint32_t rtcOp = returnedColVec[i].second;
if (rtcOp == 0 &&
find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), rtcKey) !=
jobInfo.distinctColVec.end() &&
find(jobInfo.groupByColVec.begin(), jobInfo.groupByColVec.end(), rtcKey) ==
jobInfo.groupByColVec.end() &&
jobInfo.windowSet.find(rtcKey) != jobInfo.windowSet.end())
{
Message::Args args;
args.add(keyName(i, rtcKey, jobInfo));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[rtcKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[rtcKey].fTable
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[rtcKey].fView << endl;
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
}
// skip if not an aggregation column
if (returnedColVec[i].second == 0)
continue;
aggColVec.push_back(returnedColVec[i]);
// remember if a column has an average function,
// with avg function, no need for separate sum or count_column_name
if (returnedColVec[i].second == AggregateColumn::AVG)
avgSet.insert(returnedColVec[i].first);
if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
avgDistSet.insert(returnedColVec[i].first);
}
// populate the aggregate rowgroup on PM and UM
// PM: projectedRG -> aggregateRGPM
// UM: aggregateRGPM -> aggregateRGUM
//
// Aggregate preparation by joblist factory:
// 1. get projected rowgroup (done by doAggProject) -- input to PM AGG
// 2. construct aggregate rowgroup -- output of PM, input of UM
// 3. construct aggregate rowgroup -- output of UM
// 4. construct aggregate rowgroup -- output of distinct aggregates
const RowGroup projRG = rowgroups[0];
const vector<uint32_t>& oidsProj = projRG.getOIDs();
const vector<uint32_t>& keysProj = projRG.getKeys();
const vector<uint32_t>& scaleProj = projRG.getScale();
const vector<uint32_t>& precisionProj = projRG.getPrecision();
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
vector<uint32_t> posAggPm, posAggUm, posAggDist;
vector<uint32_t> oidsAggPm, oidsAggUm, oidsAggDist;
vector<uint32_t> keysAggPm, keysAggUm, keysAggDist;
vector<uint32_t> scaleAggPm, scaleAggUm, scaleAggDist;
vector<uint32_t> precisionAggPm, precisionAggUm, precisionAggDist;
vector<CalpontSystemCatalog::ColDataType> typeAggPm, typeAggUm, typeAggDist;
vector<uint32_t> csNumAggPm, csNumAggUm, csNumAggDist;
vector<uint32_t> widthAggPm, widthAggUm, widthAggDist;
vector<SP_ROWAGG_GRPBY_t> groupByPm, groupByUm, groupByNoDist;
vector<SP_ROWAGG_FUNC_t> functionVecPm, functionNoDistVec, functionVecUm;
list<uint32_t> multiParmIndexes;
uint32_t bigIntWidth = sizeof(int64_t);
map<pair<uint32_t, int>, uint64_t> avgFuncDistMap;
AGG_MAP aggFuncMap;
// associate the columns between projected RG and aggregate RG on PM
// populated the aggregate columns
// the groupby columns are put in front, even not a returned column
// sum and count(column name) are omitted, if avg present
{
// project only unique oids, but they may be repeated in aggregation
// collect the projected column info, prepare for aggregation
vector<uint32_t> width;
map<uint32_t, int> projColPosMap;
for (uint64_t i = 0; i < keysProj.size(); i++)
{
projColPosMap.insert(make_pair(keysProj[i], i));
width.push_back(projRG.getColumnWidth(i));
}
// column index for PM aggregate rowgroup
uint64_t colAggPm = 0;
uint64_t multiParm = 0;
// for groupby column
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
uint32_t key = jobInfo.groupByColVec[i];
if (projColPosMap.find(key) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
cerr << "prep2PhasesDistinctAggregate: group " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
cerr << endl;
throw logic_error(emsg.str());
}
uint64_t colProj = projColPosMap[key];
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
groupByPm.push_back(groupby);
// PM: just copy down to aggregation rowgroup
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(key);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL),
colAggPm));
colAggPm++;
}
// for distinct column
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
{
uint32_t key = jobInfo.distinctColVec[i];
if (projColPosMap.find(key) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
cerr << "prep2PhasesDistinctAggregate: distinct " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
cerr << endl;
throw logic_error(emsg.str());
}
// check for dup distinct column -- @bug6126
if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end())
continue;
uint64_t colProj = projColPosMap[key];
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
groupByPm.push_back(groupby);
// PM: just copy down to aggregation rowgroup
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(key);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL),
colAggPm));
colAggPm++;
}
// vectors for aggregate functions
RowAggFunctionType aggOp = ROWAGG_FUNCT_UNDEFINE;
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
for (uint64_t i = 0; i < aggColVec.size(); i++)
{
aggOp = functionIdMap(aggColVec[i].second);
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
if (aggOp != ROWAGG_MULTI_PARM)
prevAggOp = aggOp;
// skip on PM if this is a constant
if (aggOp == ROWAGG_CONSTANT)
continue;
pUDAFFunc = NULL;
uint32_t aggKey = aggColVec[i].first;
if (projColPosMap.find(aggKey) == projColPosMap.end())
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
cerr << "prep2PhasesDistinctAggregate: aggregate " << emsg.str()
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
cerr << endl;
throw logic_error(emsg.str());
}
RowAggFunctionType stats = statsFuncIdMap(aggOp);
// skip sum / count(column) if avg is also selected
if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && (avgSet.find(aggKey) != avgSet.end()))
continue;
// We skip distinct aggs, including extra parms. These are handled by adding them to group by list
// above.
if (aggOp == ROWAGG_DISTINCT_SUM || aggOp == ROWAGG_DISTINCT_AVG ||
aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
continue;
if (aggOp == ROWAGG_MULTI_PARM && prevAggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
continue;
uint64_t colProj = projColPosMap[aggKey];
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i + 1;
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
{
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
}
}
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error(
"(1)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough "
"UDAFColumns");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
}
// skip if this is a duplicate
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM &&
aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL)) !=
aggFuncMap.end())
{
// skip if this is a duplicate
continue;
}
functionVecPm.push_back(funct);
aggFuncMap.insert(make_pair(
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
colAggPm - multiParm));
switch (aggOp)
{
case ROWAGG_MIN:
case ROWAGG_MAX:
case ROWAGG_SELECT_SOME:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
colAggPm++;
}
break;
case ROWAGG_SUM:
case ROWAGG_AVG:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
csNumAggPm.push_back(8);
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAggPm,
scaleAggPm, precisionAggPm, widthAggPm);
colAggPm++;
}
// PM: put the count column for avg next to the sum
// let fall through to add a count column for average function
if (aggOp == ROWAGG_AVG)
funct->fAuxColumnIndex = colAggPm;
else
break;
/* fall through */
case ROWAGG_COUNT_ASTERISK:
case ROWAGG_COUNT_COL_NAME:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
// work around count() in select subquery
precisionAggPm.push_back(rowgroup::MagicPrecisionForCountAgg);
if (isUnsigned(typeProj[colProj]))
{
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
}
else
{
typeAggPm.push_back(CalpontSystemCatalog::BIGINT);
}
csNumAggPm.push_back(8);
widthAggPm.push_back(bigIntWidth);
colAggPm++;
}
break;
case ROWAGG_STATS:
{
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
typeProj[colProj] == CalpontSystemCatalog::DATE ||
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
typeProj[colProj] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("variance/standard deviation");
args.add(colTypeIdString(typeProj[colProj]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep2PhasesDistinctAggregate:: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
// count(x)
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(0);
typeAggPm.push_back(CalpontSystemCatalog::DOUBLE);
csNumAggPm.push_back(8);
widthAggPm.push_back(sizeof(double));
funct->fAuxColumnIndex = ++colAggPm;
// mean(x)
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(-1);
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggPm.push_back(8);
widthAggPm.push_back(sizeof(long double));
++colAggPm;
// sum(x_i - mean)^2
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(-1);
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggPm.push_back(8);
widthAggPm.push_back(sizeof(long double));
++colAggPm;
}
break;
case ROWAGG_BIT_AND:
case ROWAGG_BIT_OR:
case ROWAGG_BIT_XOR:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(-16); // for connector to skip null check
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggPm.push_back(8);
widthAggPm.push_back(bigIntWidth);
++colAggPm;
}
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
if (!udafFuncCol)
{
throw logic_error(
"(2)prep2PhasesDistinctAggregate: A UDAF function is called but there's no "
"RowUDAFFunctionCol");
}
// Return column
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
++colAggPm;
// Column for index of UDAF UserData struct
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(0);
precisionAggPm.push_back(0);
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggPm.push_back(8);
widthAggPm.push_back(sizeof(uint64_t));
funct->fAuxColumnIndex = colAggPm++;
// If the first param is const
udafcParamIdx = 0;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
++udafcParamIdx;
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
csNumAggPm.push_back(csNumProj[colProj]);
widthAggPm.push_back(width[colProj]);
multiParmIndexes.push_back(colAggPm);
++colAggPm;
++multiParm;
// If the param is const
if (udafc)
{
if (udafcParamIdx > udafc->aggParms().size() - 1)
{
throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with too many parms",
aggregateFuncErr);
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
if (cc)
{
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
}
}
else if (prevAggOp != ROWAGG_COUNT_DISTINCT_COL_NAME)
{
throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with no parms",
aggregateFuncErr);
}
++udafcParamIdx;
}
break;
default:
{
ostringstream emsg;
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
cerr << "prep2PhasesDistinctAggregate: " << emsg.str() << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
}
}
}
}
// associate the columns between the aggregate RGs on PM and UM without distinct aggregator
// populated the returned columns
{
int64_t multiParms = 0;
for (uint32_t idx = 0; idx < groupByPm.size(); idx++)
{
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(idx, idx));
groupByUm.push_back(groupby);
}
for (uint32_t idx = 0; idx < functionVecPm.size(); idx++)
{
SP_ROWAGG_FUNC_t funct;
SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx];
if (funcPm->fAggFunction == ROWAGG_MULTI_PARM)
{
// Skip on UM: Extra parms for an aggregate have no work on the UM
++multiParms;
continue;
}
if (funcPm->fAggFunction == ROWAGG_UDAF)
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get());
if (!udafFuncCol)
{
throw logic_error(
"(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fOutputColumnIndex,
udafFuncCol->fOutputColumnIndex - multiParms,
udafFuncCol->fAuxColumnIndex - multiParms));
functionNoDistVec.push_back(funct);
pUDAFFunc = udafFuncCol->fUDAFContext.getFunction();
}
else
{
funct.reset(new RowAggFunctionCol(funcPm->fAggFunction, funcPm->fStatsFunction,
funcPm->fOutputColumnIndex, funcPm->fOutputColumnIndex - multiParms,
funcPm->fAuxColumnIndex - multiParms));
functionNoDistVec.push_back(funct);
pUDAFFunc = NULL;
}
}
// Copy over the PM arrays to the UM. Skip any that are a multi-parm entry.
for (uint32_t idx = 0; idx < oidsAggPm.size(); ++idx)
{
if (find(multiParmIndexes.begin(), multiParmIndexes.end(), idx) != multiParmIndexes.end())
{
continue;
}
oidsAggUm.push_back(oidsAggPm[idx]);
keysAggUm.push_back(keysAggPm[idx]);
scaleAggUm.push_back(scaleAggPm[idx]);
precisionAggUm.push_back(precisionAggPm[idx]);
widthAggUm.push_back(widthAggPm[idx]);
typeAggUm.push_back(typeAggPm[idx]);
csNumAggUm.push_back(csNumAggPm[idx]);
}
}
// associate the columns between the aggregate RGs on UM and Distinct
// populated the returned columns
// remove not returned groupby column
// add back sum or count(column name) if omitted due to avg column
// put count(column name) column to the end, if it is for avg only
{
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
projColsUDAFIdx = 0;
// check if the count column for AVG is also a returned column,
// if so, replace the "-1" to actual position in returned vec.
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
AGG_MAP aggDupFuncMap;
// copy over the groupby vector
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
groupByNoDist.push_back(groupby);
}
// locate the return column position in aggregated rowgroup from PM
// outIdx is i without the multi-columns,
uint64_t outIdx = 0;
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
uint32_t prevRetKey = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
pUDAFFunc = NULL;
udafc = NULL;
uint32_t retKey = returnedColVec[i].first;
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
int colUm = -1;
if (aggOp == ROWAGG_MULTI_PARM)
{
// Duplicate detection doesn't work for multi-parm`
// If this function was earlier detected as a duplicate, unduplicate it.
SP_ROWAGG_FUNC_t funct = functionVecUm.back();
if (funct->fAggFunction == ROWAGG_DUP_FUNCT)
funct->fAggFunction = prevAggOp;
// Remove it from aggDupFuncMap if it's in there.
funct->hasMultiParm = true;
AGG_MAP::iterator it = aggDupFuncMap.find(boost::make_tuple(
prevRetKey, prevAggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggDupFuncMap.end())
{
aggDupFuncMap.erase(it);
}
// Skip further UM porocessing of the multi-parm: Extra parms for an aggregate have no work on the UM
continue;
}
else
{
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
prevAggOp = aggOp;
prevRetKey = returnedColVec[i].first;
}
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
for (; it != jobInfo.projectionCols.end(); it++)
{
udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIdx++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Save the multi-parm keys for dup-detection.
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
{
for (uint64_t k = i + 1;
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
{
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
}
}
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error(
"(4)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough "
"UDAFColumns");
}
}
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
jobInfo.distinctColVec.end())
{
AGG_MAP::iterator it = aggFuncMap.find(
boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
colUm = it->second;
}
}
if (colUm > -1) // Means we found a DISTINCT and have a column number
{
switch (aggOp)
{
case ROWAGG_DISTINCT_AVG:
// avgFuncMap.insert(make_pair(key, funct));
case ROWAGG_DISTINCT_SUM:
{
if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR ||
typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR ||
typeAggUm[colUm] == CalpontSystemCatalog::BLOB ||
typeAggUm[colUm] == CalpontSystemCatalog::TEXT ||
typeAggUm[colUm] == CalpontSystemCatalog::DATE ||
typeAggUm[colUm] == CalpontSystemCatalog::DATETIME ||
typeAggUm[colUm] == CalpontSystemCatalog::TIMESTAMP ||
typeAggUm[colUm] == CalpontSystemCatalog::TIME)
{
Message::Args args;
args.add("sum/average");
args.add(colTypeIdString(typeAggUm[colUm]));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
}
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
csNumAggDist.push_back(8);
wideDecimalOrLongDouble(colUm, typeAggPm[colUm], precisionAggPm, scaleAggPm, widthAggPm,
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
}
// PM: put the count column for avg next to the sum
// let fall through to add a count column for average function
// if (aggOp != ROWAGG_DISTINCT_AVG)
break;
case ROWAGG_COUNT_DISTINCT_COL_NAME:
{
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
// work around count() in select subquery
precisionAggDist.push_back(rowgroup::MagicPrecisionForCountAgg);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
break;
default:
// could happen if agg and agg distinct use same column.
colUm = -1;
break;
} // switch
}
// For non distinct aggregates
if (colUm == -1)
{
AGG_MAP::iterator it = aggFuncMap.find(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
colUm = it->second;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(keysAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]);
precisionAggDist.push_back(precisionAggUm[colUm]);
typeAggDist.push_back(typeAggUm[colUm]);
csNumAggDist.push_back(csNumAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
}
// not a direct hit -- a returned column is not already in the RG from PMs
else
{
// MCOL-5476.
uint32_t foundTupleKey{0};
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
{
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
colUm = it->second;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(keysAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]);
precisionAggDist.push_back(precisionAggUm[colUm]);
typeAggDist.push_back(typeAggUm[colUm]);
csNumAggDist.push_back(csNumAggUm[colUm]);
widthAggDist.push_back(widthAggUm[colUm]);
// Update the `retKey` to specify that this column is a duplicate.
retKey = foundTupleKey;
}
else
{
// here
bool returnColMissing = true;
// check if a SUM or COUNT covered by AVG
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
{
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
udafc ? udafc->getContext().getParamKeys() : NULL));
if (it != aggFuncMap.end())
{
// false alarm
returnColMissing = false;
colUm = it->second;
if (aggOp == ROWAGG_SUM)
{
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
csNumAggDist.push_back(8);
wideDecimalOrLongDouble(colUm, typeAggUm[colUm], precisionAggUm, scaleAggUm, widthAggUm,
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
}
else
{
// leave the count() to avg
aggOp = ROWAGG_COUNT_NO_OP;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(0);
if (isUnsigned(typeAggUm[colUm]))
{
precisionAggDist.push_back(20);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
}
else
{
precisionAggDist.push_back(19);
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
}
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
}
}
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
jobInfo.expressionVec.end())
{
// a function on aggregation
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
csNumAggDist.push_back(ti.csNum);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
{
// a window function
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
csNumAggDist.push_back(ti.csNum);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
else if (aggOp == ROWAGG_CONSTANT)
{
TupleInfo ti = getTupleInfo(retKey, jobInfo);
oidsAggDist.push_back(ti.oid);
keysAggDist.push_back(retKey);
scaleAggDist.push_back(ti.scale);
precisionAggDist.push_back(ti.precision);
typeAggDist.push_back(ti.dtype);
csNumAggDist.push_back(ti.csNum);
widthAggDist.push_back(ti.width);
returnColMissing = false;
}
if (returnColMissing)
{
Message::Args args;
args.add(keyName(outIdx, retKey, jobInfo));
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
cerr << "prep2PhasesDistinctAggregate: " << emsg
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
<< endl;
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
}
} // else not a direct hit
}
} // else not a DISTINCT
// update groupby vector if the groupby column is a returned column
if (returnedColVec[i].second == 0)
{
int dupGroupbyIndex = -1;
for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
{
if (jobInfo.groupByColVec[j] == retKey)
{
if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t)-1)
groupByNoDist[j]->fOutputColumnIndex = outIdx;
else
dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex;
}
}
// a duplicate group by column
if (dupGroupbyIndex != -1)
functionVecUm.push_back(SP_ROWAGG_FUNC_t(
new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
}
else
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, outIdx));
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, outIdx));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
funct->fAuxColumnIndex = colUm;
else if (aggOp == ROWAGG_CONSTANT)
funct->fAuxColumnIndex = jobInfo.cntStarPos;
functionVecUm.push_back(funct);
// find if this func is a duplicate
AGG_MAP::iterator iter = aggDupFuncMap.find(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
if (iter != aggDupFuncMap.end())
{
if (funct->fAggFunction == ROWAGG_AVG)
funct->fAggFunction = ROWAGG_DUP_AVG;
else if (funct->fAggFunction == ROWAGG_STATS)
funct->fAggFunction = ROWAGG_DUP_STATS;
else if (funct->fAggFunction == ROWAGG_UDAF)
funct->fAggFunction = ROWAGG_DUP_UDAF;
else
funct->fAggFunction = ROWAGG_DUP_FUNCT;
funct->fAuxColumnIndex = iter->second;
}
else
{
aggDupFuncMap.insert(make_pair(
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
funct->fOutputColumnIndex));
}
if (returnedColVec[i].second == AggregateColumn::AVG)
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
}
++outIdx;
} // for (i
// now fix the AVG function, locate the count(column) position
for (uint64_t i = 0; i < functionVecUm.size(); i++)
{
// if the count(k) can be associated with an avg(k)
if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_NO_OP)
{
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
avgFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]);
if (k != avgFuncMap.end())
k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
}
}
// there is avg(k), but no count(k) in the select list
uint64_t lastCol = outIdx;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{
if (k->second->fAuxColumnIndex == (uint32_t)-1)
{
k->second->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
keysAggDist.push_back(k->first);
scaleAggDist.push_back(0);
precisionAggDist.push_back(19);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
}
// distinct avg
for (uint64_t i = 0; i < functionVecUm.size(); i++)
{
if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME && !functionVecUm[i]->hasMultiParm)
{
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
avgDistFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]);
if (k != avgDistFuncMap.end())
{
k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
functionVecUm[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
}
}
}
// there is avg(distinct k), but no count(distinct k) in the select list
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++)
{
// find count(distinct k) or add it
if (k->second->fAuxColumnIndex == (uint32_t)-1)
{
k->second->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
keysAggDist.push_back(k->first);
scaleAggDist.push_back(0);
precisionAggDist.push_back(19);
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(bigIntWidth);
}
}
// add auxiliary fields for UDAF and statistics functions
for (uint64_t i = 0; i < functionVecUm.size(); i++)
{
uint64_t j = functionVecUm[i]->fInputColumnIndex;
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
{
// Column for index of UDAF UserData struct
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
if (!udafFuncCol)
{
throw logic_error(
"(5)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVecUm[i]->fAuxColumnIndex = lastCol++;
oidsAggDist.push_back(oidsAggPm[j]); // Dummy?
keysAggDist.push_back(keysAggPm[j]); // Dummy?
scaleAggDist.push_back(0);
precisionAggDist.push_back(0);
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
csNumAggDist.push_back(8);
widthAggDist.push_back(sizeof(uint64_t));
continue;
}
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
continue;
functionVecUm[i]->fAuxColumnIndex = lastCol;
// mean(x)
oidsAggDist.push_back(oidsAggPm[j]);
keysAggDist.push_back(keysAggPm[j]);
scaleAggDist.push_back(0);
precisionAggDist.push_back(-1);
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggDist.push_back(8);
widthAggDist.push_back(sizeof(long double));
++lastCol;
// sum(x_i - mean)^2
oidsAggDist.push_back(oidsAggPm[j]);
keysAggDist.push_back(keysAggPm[j]);
scaleAggDist.push_back(0);
precisionAggDist.push_back(-1);
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
csNumAggDist.push_back(8);
widthAggDist.push_back(sizeof(long double));
++lastCol;
}
}
// calculate the offset and create the rowaggregations, rowgroups
posAggUm.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggUm.size(); i++)
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
precisionAggUm, jobInfo.stringTableThreshold);
SP_ROWAGG_UM_t rowAggUm(
new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit, false));
rowAggUm->timeZone(jobInfo.timeZone);
posAggDist.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggDist.size(); i++)
posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, csNumAggDist,
scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold);
SP_ROWAGG_DIST rowAggDist(
new RowAggregationDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
rowAggDist->timeZone(jobInfo.timeZone);
// if distinct key word applied to more than one aggregate column, reset rowAggDist
vector<RowGroup> subRgVec;
if (jobInfo.distinctColVec.size() > 1)
{
RowAggregationMultiDistinct* multiDistinctAggregator =
new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit);
multiDistinctAggregator->timeZone(jobInfo.timeZone);
rowAggDist.reset(multiDistinctAggregator);
// construct and add sub-aggregators to rowAggDist
vector<uint32_t> posAggGb, posAggSub;
vector<uint32_t> oidsAggGb, oidsAggSub;
vector<uint32_t> keysAggGb, keysAggSub;
vector<uint32_t> scaleAggGb, scaleAggSub;
vector<uint32_t> precisionAggGb, precisionAggSub;
vector<CalpontSystemCatalog::ColDataType> typeAggGb, typeAggSub;
vector<uint32_t> csNumAggGb, csNumAggSub;
vector<uint32_t> widthAggGb, widthAggSub;
// populate groupby column info
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
{
oidsAggGb.push_back(oidsAggUm[i]);
keysAggGb.push_back(keysAggUm[i]);
scaleAggGb.push_back(scaleAggUm[i]);
precisionAggGb.push_back(precisionAggUm[i]);
typeAggGb.push_back(typeAggUm[i]);
csNumAggGb.push_back(csNumAggUm[i]);
widthAggGb.push_back(widthAggUm[i]);
}
// for distinct, each column requires a seperate rowgroup
vector<SP_ROWAGG_DIST> rowAggSubDistVec;
uint32_t distinctColKey;
int64_t j;
uint64_t k;
uint64_t outIdx = 0;
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
if (returnedColVec[i].second == 0)
{
++outIdx;
continue;
}
j = -1;
distinctColKey = -1;
// Find the entry in distinctColVec, if any
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
{
distinctColKey = jobInfo.distinctColVec[k];
if (returnedColVec[i].first == distinctColKey)
break;
}
if (distinctColKey == (uint32_t)-1)
{
++outIdx;
continue;
}
// locate the distinct key in the row group
for (k = 0; k < keysAggUm.size(); k++)
{
if (keysAggUm[k] == distinctColKey)
{
j = k;
break;
}
}
idbassert(j != -1);
oidsAggSub = oidsAggGb;
keysAggSub = keysAggGb;
scaleAggSub = scaleAggGb;
precisionAggSub = precisionAggGb;
typeAggSub = typeAggGb;
csNumAggSub = csNumAggGb;
widthAggSub = widthAggGb;
oidsAggSub.push_back(oidsAggUm[j]);
keysAggSub.push_back(keysAggUm[j]);
scaleAggSub.push_back(scaleAggUm[j]);
precisionAggSub.push_back(precisionAggUm[j]);
typeAggSub.push_back(typeAggUm[j]);
csNumAggSub.push_back(csNumAggUm[j]);
widthAggSub.push_back(widthAggUm[j]);
// construct groupby vector
vector<SP_ROWAGG_GRPBY_t> groupBySub;
k = 0;
while (k < jobInfo.groupByColVec.size())
{
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k));
groupBySub.push_back(groupby);
k++;
}
// add the distinct column as groupby
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
groupBySub.push_back(groupby);
// Add multi parm distinct
while ((i + 1) < returnedColVec.size() &&
functionIdMap(returnedColVec[i + 1].second) == ROWAGG_MULTI_PARM)
{
++i;
uint32_t dColKey = -1;
j = -1;
// Find the entry in distinctColVec, if any
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
{
dColKey = jobInfo.distinctColVec[k];
if (returnedColVec[i].first == dColKey)
break;
}
idbassert(dColKey != (uint32_t)-1);
// locate the distinct key in the row group
for (k = 0; k < keysAggUm.size(); k++)
{
if (keysAggUm[k] == dColKey)
{
j = k;
break;
}
}
idbassert(j != -1);
oidsAggSub.push_back(oidsAggUm[j]);
keysAggSub.push_back(keysAggUm[j]);
scaleAggSub.push_back(scaleAggUm[j]);
precisionAggSub.push_back(precisionAggUm[j]);
typeAggSub.push_back(typeAggUm[j]);
csNumAggSub.push_back(csNumAggUm[j]);
widthAggSub.push_back(widthAggUm[j]);
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
groupBySub.push_back(groupby);
}
// construct sub-rowgroup
posAggSub.clear();
posAggSub.push_back(2); // rid
for (k = 0; k < oidsAggSub.size(); k++)
posAggSub.push_back(posAggSub[k] + widthAggSub[k]);
RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub, csNumAggSub,
scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold);
subRgVec.push_back(subRg);
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
// tricky part : 2 function vectors
// -- dummy function vector for sub-aggregator, which does distinct only
// -- aggregate function on this distinct column for rowAggDist
vector<SP_ROWAGG_FUNC_t> functionSub1, functionSub2;
// search the function in functionVec
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
while (it != functionVecUm.end())
{
SP_ROWAGG_FUNC_t f = *it++;
if ((f->fOutputColumnIndex == outIdx) &&
(f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || f->fAggFunction == ROWAGG_DISTINCT_SUM ||
f->fAggFunction == ROWAGG_DISTINCT_AVG))
{
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction,
groupBySub.size() - 1, f->fOutputColumnIndex,
f->fAuxColumnIndex - multiParms));
functionSub2.push_back(funct);
}
}
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(
new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
subAgg->timeZone(jobInfo.timeZone);
// add to rowAggDist
multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2);
++outIdx;
}
// cover any non-distinct column functions
{
vector<SP_ROWAGG_FUNC_t> functionSub1 = functionNoDistVec;
vector<SP_ROWAGG_FUNC_t> functionSub2;
int64_t multiParms = 0;
for (uint64_t k = 0; k < returnedColVec.size(); k++)
{
// search non-distinct functions in functionVec
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
while (it != functionVecUm.end())
{
SP_ROWAGG_FUNC_t funct;
SP_ROWAGG_FUNC_t f = *it++;
if (f->fOutputColumnIndex == k)
{
if (f->fAggFunction == ROWAGG_UDAF)
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(f.get());
funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex - multiParms));
functionSub2.push_back(funct);
}
else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK || f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
f->fAggFunction == ROWAGG_SUM || f->fAggFunction == ROWAGG_AVG ||
f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX ||
f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND ||
f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR ||
f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_SELECT_SOME)
{
funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex,
f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms));
functionSub2.push_back(funct);
}
}
}
}
if (functionSub1.size() > 0)
{
// make sure the group by columns are available for next aggregate phase.
vector<SP_ROWAGG_GRPBY_t> groupBySubNoDist;
for (uint64_t i = 0; i < groupByNoDist.size(); i++)
groupBySubNoDist.push_back(
SP_ROWAGG_GRPBY_t(new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i)));
// construct sub-aggregator
SP_ROWAGG_UM_t subAgg(
new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false));
subAgg->timeZone(jobInfo.timeZone);
// add to rowAggDist
multiDistinctAggregator->addSubAggregator(subAgg, aggRgUm, functionSub2);
subRgVec.push_back(aggRgUm);
}
}
}
rowAggDist->addAggregator(rowAggUm, aggRgUm);
rowgroups.push_back(aggRgDist);
aggregators.push_back(rowAggDist);
posAggPm.push_back(2); // rid
for (uint64_t i = 0; i < oidsAggPm.size(); i++)
posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm, csNumAggPm, scaleAggPm,
precisionAggPm, jobInfo.stringTableThreshold);
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm));
rowAggPm->timeZone(jobInfo.timeZone);
rowgroups.push_back(aggRgPm);
aggregators.push_back(rowAggPm);
if (jobInfo.trace)
{
cout << "projected RG: " << projRG.toString() << endl
<< "aggregated1 RG: " << aggRgPm.toString() << endl
<< "aggregated2 RG: " << aggRgUm.toString() << endl;
for (uint64_t i = 0; i < subRgVec.size(); i++)
cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl;
cout << "aggregatedDist RG: " << aggRgDist.toString() << endl;
}
}
void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInfo& jobInfo)
{
map<uint32_t, uint32_t> keyToIndexMap;
for (uint64_t i = 0; i < fRowGroupOut.getKeys().size(); ++i)
{
if (keyToIndexMap.find(fRowGroupOut.getKeys()[i]) == keyToIndexMap.end())
keyToIndexMap.insert(make_pair(fRowGroupOut.getKeys()[i], i));
}
RetColsVector expressionVec;
ArithmeticColumn* ac = NULL;
FunctionColumn* fc = NULL;
RetColsVector& cols = jobInfo.nonConstCols;
vector<SimpleColumn*> simpleColumns;
for (RetColsVector::iterator it = cols.begin(); it != cols.end(); ++it)
{
uint64_t eid = -1;
if (((ac = dynamic_cast<ArithmeticColumn*>(it->get())) != NULL) && (ac->aggColumnList().size() > 0) &&
(ac->windowfunctionColumnList().size() == 0))
{
const vector<SimpleColumn*>& scols = ac->simpleColumnList();
simpleColumns.insert(simpleColumns.end(), scols.begin(), scols.end());
eid = ac->expressionId();
expressionVec.push_back(*it);
}
else if (((fc = dynamic_cast<FunctionColumn*>(it->get())) != NULL) && (fc->aggColumnList().size() > 0) &&
(fc->windowfunctionColumnList().size() == 0))
{
const vector<SimpleColumn*>& sCols = fc->simpleColumnList();
simpleColumns.insert(simpleColumns.end(), sCols.begin(), sCols.end());
eid = fc->expressionId();
expressionVec.push_back(*it);
}
// update the output index
if (eid != (uint64_t)-1)
{
map<uint32_t, uint32_t>::iterator mit = keyToIndexMap.find(getExpTupleKey(jobInfo, eid));
if (mit != keyToIndexMap.end())
{
it->get()->outputIndex(mit->second);
}
else
{
ostringstream emsg;
emsg << "expression " << eid << " cannot be found in tuple.";
cerr << "prepExpressionOnAggregate: " << emsg.str() << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
}
}
}
// map the input indices
for (vector<SimpleColumn*>::iterator i = simpleColumns.begin(); i != simpleColumns.end(); i++)
{
CalpontSystemCatalog::OID oid = (*i)->oid();
uint32_t key = getTupleKey(jobInfo, *i);
CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*i)->colType());
if (dictOid > 0)
{
oid = dictOid;
key = jobInfo.keyInfo->dictKeyMap[key];
}
map<uint32_t, uint32_t>::iterator mit = keyToIndexMap.find(key);
if (mit != keyToIndexMap.end())
{
(*i)->inputIndex(mit->second);
}
else
{
ostringstream emsg;
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' cannot be found in tuple.";
cerr << "prepExpressionOnAggregate: " << emsg.str() << " simple column: oid(" << oid << "), alias("
<< extractTableAlias(*i) << ")." << endl;
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
}
}
// add expression to UM aggregator
aggUM->expression(expressionVec);
}
void TupleAggregateStep::addConstangAggregate(vector<ConstantAggData>& constAggDataVec)
{
fAggregator->constantAggregate(constAggDataVec);
}
void TupleAggregateStep::aggregateRowGroups()
{
RGData rgData;
bool more = true;
RowGroupDL* dlIn = NULL;
if (!fDoneAggregate)
{
if (fInputJobStepAssociation.outSize() == 0)
throw logic_error("No input data list for TupleAggregate step.");
dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (dlIn == NULL)
throw logic_error("Input is not RowGroup data list in TupleAggregate step.");
if (fInputIter < 0)
fInputIter = dlIn->getIterator();
more = dlIn->next(fInputIter, &rgData);
if (traceOn())
dlTimes.setFirstReadTime();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_START;
sts.total_units_of_work = 1;
postStepStartTele(sts);
try
{
// this check covers the no row case
if (!more && cancelled())
{
fDoneAggregate = true;
fEndOfResult = true;
}
while (more && !fEndOfResult)
{
fRowGroupIn.setData(&rgData);
fAggregator->addRowGroup(&fRowGroupIn);
more = dlIn->next(fInputIter, &rgData);
// error checking
if (cancelled())
{
fEndOfResult = true;
while (more)
more = dlIn->next(fInputIter, &rgData);
}
}
} // try
catch (...)
{
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::aggregateRowGroups()");
fEndOfResult = true;
}
}
fDoneAggregate = true;
while (more)
more = dlIn->next(fInputIter, &rgData);
if (traceOn())
{
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
}
}
void TupleAggregateStep::threadedAggregateFinalize(uint32_t threadID)
{
for (uint32_t i = 0; i < fNumOfBuckets; ++i)
{
if (fAgg_mutex[i]->try_lock())
{
try
{
if (fAggregators[i])
fAggregators[i]->finalAggregation();
}
catch (...)
{
fAgg_mutex[i]->unlock();
throw;
}
fAgg_mutex[i]->unlock();
}
}
}
void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
{
RGData rgData;
scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
scoped_array<Row> distRow;
scoped_array<std::shared_ptr<uint8_t[]>> distRowData;
uint32_t bucketID;
scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
vector<uint32_t> hashLens;
bool locked = false;
bool more = true;
RowGroupDL* dlIn = nullptr;
uint32_t rgVecShift = float(fNumOfBuckets) / fNumOfThreads * threadID;
RowAggregationMultiDistinct* multiDist = nullptr;
if (!fDoneAggregate)
{
if (fInputJobStepAssociation.outSize() == 0)
throw logic_error("No input data list for delivery.");
dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (dlIn == nullptr)
throw logic_error("Input is not RowGroup data list in delivery step.");
vector<RGData> rgDatas;
try
{
// this check covers the no row case
if (!more && cancelled())
{
fDoneAggregate = true;
fEndOfResult = true;
}
bool firstRead = true;
Row rowIn;
while (more && !fEndOfResult)
{
fMutex.lock();
locked = true;
for (uint32_t c = 0; c < fNumOfRowGroups && !cancelled(); c++)
{
more = dlIn->next(fInputIter, &rgData);
if (firstRead)
{
if (threadID == 0)
{
if (traceOn())
dlTimes.setFirstReadTime();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_START;
sts.total_units_of_work = 1;
postStepStartTele(sts);
}
multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregator.get());
if (multiDist)
{
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(multiDist->subAggregators().size());
distRow.reset(new Row[multiDist->subAggregators().size()]);
distRowData.reset(new std::shared_ptr<uint8_t[]>[multiDist->subAggregators().size()]);
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
{
multiDist->subAggregators()[j]->getOutputRowGroup()->initRow(&distRow[j], true);
distRowData[j].reset(new uint8_t[distRow[j].getSize()]);
distRow[j].setData(rowgroup::Row::Pointer(distRowData[j].get()));
hashLens.push_back(multiDist->subAggregators()[j]->aggMapKeyLength());
}
}
else
{
for (uint32_t i = 0; i < fNumOfBuckets; i++)
rowBucketVecs[i].resize(1);
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()))
hashLens.push_back(dynamic_cast<RowAggregationDistinct*>(fAggregator.get())
->aggregator()
->aggMapKeyLength());
else
hashLens.push_back(fAggregator->aggMapKeyLength());
}
fRowGroupIns[threadID] = fRowGroupIn;
fRowGroupIns[threadID].initRow(&rowIn);
firstRead = false;
}
if (more)
{
fRowGroupIns[threadID].setData(&rgData);
bool diskAggAllowed = fRm->getAllowDiskAggregation();
int64_t memSize = fRowGroupIns[threadID].getSizeWithStrings();
if (!fRm->getMemory(memSize, fSessionMemLimit, !diskAggAllowed))
{
if (!diskAggAllowed)
{
rgDatas.clear(); // to short-cut the rest of processing
more = false;
fEndOfResult = true;
if (status() == 0)
{
errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATION_TOO_BIG));
status(ERR_AGGREGATION_TOO_BIG);
}
}
else
{
rgDatas.push_back(rgData);
}
break;
}
fMemUsage[threadID] += memSize;
rgDatas.push_back(rgData);
}
else
{
break;
}
}
// input rowgroup and aggregator is finalized only right before hashjoin starts
// if there is.
if (fAggregators.empty())
{
fAggregators.resize(fNumOfBuckets);
for (uint32_t i = 0; i < fNumOfBuckets; i++)
{
fAggregators[i].reset(fAggregator->clone());
fAggregators[i]->setInputOutput(fRowGroupIn, &fRowGroupOuts[i]);
}
}
fMutex.unlock();
locked = false;
multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregator.get());
// dispatch rows to row buckets
if (multiDist)
{
for (uint32_t c = 0; c < rgDatas.size(); c++)
{
fRowGroupIns[threadID].setData(&rgDatas[c]);
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
{
fRowGroupIns[threadID].getRow(0, &rowIn);
rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
{
for (uint64_t k = 0; k < multiDist->subAggregators()[j]->getGroupByCols().size(); ++k)
{
rowIn.copyField(
distRow[j], k,
multiDist->subAggregators()[j]->getGroupByCols()[k].get()->fInputColumnIndex);
}
// TBD This approach could potentiall
// put all values in on bucket.
uint64_t hash = rowgroup::hashRow(distRow[j], hashLens[j] - 1);
bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
}
}
else
{
for (uint32_t c = 0; c < rgDatas.size(); c++)
{
fRowGroupIns[threadID].setData(&rgDatas[c]);
fRowGroupIns[threadID].getRow(0, &rowIn);
rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
{
// The key is the groupby columns, which are the leading columns.
// TBD This approach could potential
// put all values in on bucket.
// The fAggregator->hasRollup() is true when we perform one-phase
// aggregation and also are doing subtotals' computations.
// Subtotals produce new keys whose hash values may not be in
// the processing bucket. Consider case for key tuples (1,2) and (1,3).
// Their subtotals's keys will be (1, NULL) and (1, NULL)
// but they will be left in their processing buckets and never
// gets aggregated properly.
// Due to this, we put all rows into the same bucket 0 when perfoming
// single-phase aggregation with subtotals.
// For all other cases (single-phase without subtotals and two-phase
// aggregation with and without subtotals) fAggregator->hasRollup() is false.
// In these cases we have full parallel processing as expected.
uint64_t hash = fAggregator->hasRollup() ? 0 : rowgroup::hashRow(rowIn, hashLens[0] - 1);
int bucketID = hash % fNumOfBuckets;
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
rowIn.nextRow();
}
}
}
// insert to the hashmaps owned by each aggregator
bool done = false;
fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
while (!fEndOfResult && !done && !cancelled())
{
bool didWork = false;
done = true;
// each thread starts from its own bucket for better distribution
uint32_t shift = (rgVecShift++) % fNumOfBuckets;
for (uint32_t ci = 0; ci < fNumOfBuckets && !cancelled(); ci++)
{
uint32_t c = (ci + shift) % fNumOfBuckets;
if (!fEndOfResult && !bucketDone[c] && fAgg_mutex[c]->try_lock())
{
try
{
didWork = true;
if (multiDist)
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())
->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c]);
else
{
fAggregators[c]->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c][0]);
}
}
catch (...)
{
fAgg_mutex[c]->unlock();
throw;
}
rowBucketVecs[c][0].clear();
bucketDone[c] = true;
fAgg_mutex[c]->unlock();
}
else if (!bucketDone[c])
{
done = false;
}
}
if (!didWork)
usleep(1000); // avoid using all CPU during busy wait
}
rgDatas.clear();
fRm->returnMemory(fMemUsage[threadID], fSessionMemLimit);
fMemUsage[threadID] = 0;
if (cancelled())
{
fEndOfResult = true;
fMutex.lock();
while (more)
more = dlIn->next(fInputIter, &rgData);
fMutex.unlock();
}
}
} // try
catch (...)
{
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG,
"TupleAggregateStep::threadedAggregateRowGroups()[" + std::to_string(threadID) + "]");
fEndOfResult = true;
fDoneAggregate = true;
}
}
if (!locked)
fMutex.lock();
while (more)
more = dlIn->next(fInputIter, &rgData);
fMutex.unlock();
locked = false;
if (traceOn())
{
dlTimes.setLastReadTime();
dlTimes.setEndOfInputTime();
}
}
void TupleAggregateStep::doAggregate_singleThread()
{
AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
RowGroupDL* dlp = dl->rowGroupDL();
RGData rgData;
try
{
if (!fDoneAggregate)
aggregateRowGroups();
if (fEndOfResult == false)
{
// do the final aggregtion and deliver the results
// at least one RowGroup for aggregate results
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) != NULL)
{
dynamic_cast<RowAggregationDistinct*>(fAggregator.get())->doDistinctAggregation();
}
while (fAggregator->nextRowGroup())
{
fAggregator->finalize();
fRowsReturned += fRowGroupOut.getRowCount();
rgData = fRowGroupOut.duplicate();
fRowGroupDelivered.setData(&rgData);
if (fRowGroupOut.getColumnCount() > fRowGroupDelivered.getColumnCount())
pruneAuxColumns();
dlp->insert(rgData);
}
}
} // try
catch (...)
{
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doAggregate_singleThread()");
}
if (traceOn())
printCalTrace();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
// Bug 3136, let mini stats to be formatted if traceOn.
fEndOfResult = true;
dlp->endOfInput();
}
void TupleAggregateStep::doAggregate()
{
// @bug4314. DO NOT access fAggregtor before the first read of input,
// because hashjoin may not have finalized fAggregator.
if (!fIsMultiThread)
return doAggregate_singleThread();
AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
RowGroupDL* dlp = dl->rowGroupDL();
ByteStream bs;
doThreadedAggregate(bs, dlp);
return;
}
/** @brief Aggregate input row groups in two-phase multi-threaded aggregation.
* In second phase handle three different aggregation cases differently:
* 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one
* GROUP BY column
* 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
* 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
* DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp.
*/
uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp)
{
// initialize return value variable
uint64_t rowCount = 0;
try
{
/*
* Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns
* per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators ==
* fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator.
*/
if (!fDoneAggregate)
{
initializeMultiThread();
vector<uint64_t> runners; // thread pool handles
runners.reserve(fNumOfThreads); // to prevent a resize during use
// Start the aggregator threads
for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++)
{
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum)));
}
// Now wait for all those threads
jobstepThreadPool.join(runners);
}
if (!cancelled())
{
vector<uint64_t> runners;
// use half of the threads because finalizing requires twice as
// much memory on average
uint32_t threads = std::max(1U, fNumOfThreads / 2);
runners.reserve(threads);
for (uint32_t threadNum = 0; threadNum < threads; ++threadNum)
{
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum)));
}
jobstepThreadPool.join(runners);
}
/*
* Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows
* that need to aggregated and output results.
*/
auto* distinctAggregator = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0;
// Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column
// e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2;
if (distinctAggregator && hasGroupByColumns)
{
if (!fEndOfResult)
{
// Do multi-threaded second phase aggregation (per row group created for GROUP BY statement)
if (!fDoneAggregate)
{
vector<uint64_t> runners; // thread pool handles
fRowGroupsDeliveredData.resize(fNumOfBuckets);
uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads;
uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1);
runners.reserve(numThreads);
for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++)
{
runners.push_back(jobstepThreadPool.invoke(
ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread)));
}
jobstepThreadPool.join(runners);
}
// Deliver results
fDoneAggregate = true;
bool done = true;
while (nextDeliveredRowGroup() && !cancelled())
{
done = false;
rowCount = fRowGroupOut.getRowCount();
if (rowCount != 0)
{
if (!cleanUpAndOutputRowGroup(bs, dlp))
break;
}
done = true;
}
if (done)
{
fEndOfResult = true;
}
}
}
// Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
// e.g. SELECT SUM(DISTINCT col1) FROM test;
else if (distinctAggregator)
{
if (!fEndOfResult)
{
if (!fDoneAggregate)
{
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
// easy way of multi-threading this and it's done in a single thread for now.
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++)
{
if (fEndOfResult == false)
{
// The distinctAggregator accumulates the aggregation results of all row groups by being added
// all row groups of each bucket aggregator and doing an aggregation step after each addition.
auto* bucketMultiDistinctAggregator =
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[bucketNum].get());
auto* bucketDistinctAggregator =
dynamic_cast<RowAggregationDistinct*>(fAggregators[bucketNum].get());
distinctAggregator->aggregator(bucketDistinctAggregator->aggregator());
if (bucketMultiDistinctAggregator)
{
(dynamic_cast<RowAggregationMultiDistinct*>(distinctAggregator))
->subAggregators(bucketMultiDistinctAggregator->subAggregators());
}
distinctAggregator->aggregator()->finalAggregation();
distinctAggregator->doDistinctAggregation();
}
}
}
// Deliver results
fDoneAggregate = true;
bool done = true;
while (fAggregator->nextRowGroup() && !cancelled())
{
done = false;
fAggregator->finalize();
rowCount = fRowGroupOut.getRowCount();
fRowsReturned += rowCount;
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
if (rowCount != 0)
{
if (!cleanUpAndOutputRowGroup(bs, dlp))
break;
}
done = true;
}
if (done)
fEndOfResult = true;
}
}
// CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
// e.g. SELECT SUM(col1) FROM test GROUP BY col2;
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
// easy way of multi-threading this and it's done in a single thread for now.
else if (hasGroupByColumns)
{
if (!fEndOfResult && !fDoneAggregate)
{
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; ++bucketNum)
{
fAggregator->append(fAggregators[bucketNum].get());
}
}
fDoneAggregate = true;
bool done = true;
while (fAggregator->nextRowGroup() && !cancelled())
{
done = false;
fAggregator->finalize();
rowCount = fRowGroupOut.getRowCount();
fRowsReturned += rowCount;
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
if (rowCount != 0)
{
if (!cleanUpAndOutputRowGroup(bs, dlp))
break;
}
done = true;
}
if (done)
{
fEndOfResult = true;
}
}
else
{
throw logic_error(
"TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function "
"or "
"GROUP BY columns found. Should not reach here.");
}
}
catch (...)
{
handleException(std::current_exception(), logging::tupleAggregateStepErr,
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedAggregate()");
fEndOfResult = true;
}
if (fEndOfResult)
{
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (dlp)
{
dlp->endOfInput();
}
else
{
// send an empty / error band
RGData rgData(fRowGroupOut, 0);
fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status());
fRowGroupOut.serializeRGData(bs);
rowCount = 0;
}
if (traceOn())
printCalTrace();
}
return rowCount;
}
bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp)
{
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
pruneAuxColumns();
if (dlp)
{
RGData rgData = fRowGroupDelivered.duplicate();
dlp->insert(rgData);
return true;
}
bs.restart();
fRowGroupDelivered.serializeRGData(bs);
return false;
}
void TupleAggregateStep::pruneAuxColumns()
{
uint64_t rowCount = fRowGroupOut.getRowCount();
Row row1, row2;
fRowGroupOut.initRow(&row1);
fRowGroupOut.getRow(0, &row1);
fRowGroupDelivered.initRow(&row2);
fRowGroupDelivered.getRow(0, &row2);
for (uint64_t i = 1; i < rowCount; i++)
{
for (uint32_t j = 0; j < row2.getColumnCount(); j++)
{
row2.setNullMark(j, row1.getNullMark(j));
}
// skip the first row
row1.nextRow();
row2.nextRow();
// bug4463, memmove for src, dest overlap
memmove(row2.getData(), row1.getData(), row2.getSize());
}
for (uint32_t j = 0; j < row2.getColumnCount(); j++)
{
row2.setNullMark(j, row1.getNullMark(j));
}
}
void TupleAggregateStep::printCalTrace()
{
time_t t = time(0);
char timeString[50];
ctime_r(&t, timeString);
timeString[strlen(timeString) - 1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
<< "; total rows returned-" << fRowsReturned << endl
<< "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
<< "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
<< "\tJob completion status " << status() << endl;
logEnd(logStr.str().c_str());
fExtendedInfo += logStr.str();
formatMiniStats();
}
void TupleAggregateStep::formatMiniStats()
{
ostringstream oss;
oss << "TAS "
<< "UM "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
<< fRowsReturned << " ";
fMiniInfo += oss.str();
}
uint32_t TupleAggregateStep::getTupleKeyFromTuple(
const boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*>& tuple)
{
return tuple.get<0>();
}
uint32_t TupleAggregateStep::getTupleKeyFromTuple(uint32_t key)
{
return key;
}
template <class GroupByMap>
bool TupleAggregateStep::tryToFindEqualFunctionColumnByTupleKey(JobInfo& jobInfo, GroupByMap& groupByMap,
const uint32_t tupleKey, uint32_t& foundKey)
{
auto funcMapIt = jobInfo.functionColumnMap.find(tupleKey);
if (funcMapIt != jobInfo.functionColumnMap.end())
{
const auto& rFunctionInfo = funcMapIt->second;
// Try to match given `tupleKey` in `groupByMap`.
for (const auto& groupByMapPair : groupByMap)
{
const auto currentTupleKey = getTupleKeyFromTuple(groupByMapPair.first);
auto currentFuncMapIt = jobInfo.functionColumnMap.find(currentTupleKey);
// Skip if the keys are the same.
if (currentFuncMapIt != jobInfo.functionColumnMap.end() && currentTupleKey != tupleKey)
{
const auto& lFunctionInfo = currentFuncMapIt->second;
// Oid and function name should be the same.
if (lFunctionInfo.associatedColumnOid == rFunctionInfo.associatedColumnOid &&
lFunctionInfo.functionName == rFunctionInfo.functionName)
{
foundKey = currentTupleKey;
return true;
}
}
}
}
return false;
}
} // namespace joblist