mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
6121 lines
212 KiB
C++
6121 lines
212 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2019-2020 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: tupleaggregatestep.cpp 9732 2013-08-02 15:56:15Z pleblanc $
|
|
|
|
// #define NDEBUG
|
|
// Cross engine needs to be at top due to MySQL includes
|
|
#define PREFER_MY_CONFIG_H
|
|
#include "crossenginestep.h"
|
|
|
|
#include <cassert>
|
|
#include <sstream>
|
|
#include <iomanip>
|
|
#include <algorithm>
|
|
using namespace std;
|
|
|
|
#include <boost/shared_ptr.hpp>
|
|
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/uuid/uuid_io.hpp>
|
|
#include "boost/tuple/tuple.hpp"
|
|
using namespace boost;
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "loggingid.h"
|
|
#include "errorcodes.h"
|
|
#include "idberrorinfo.h"
|
|
using namespace logging;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
#include "aggregatecolumn.h"
|
|
#include "udafcolumn.h"
|
|
#include "arithmeticcolumn.h"
|
|
#include "functioncolumn.h"
|
|
#include "constantcolumn.h"
|
|
using namespace execplan;
|
|
|
|
#include "rowgroup.h"
|
|
#include "rowaggregation.h"
|
|
using namespace rowgroup;
|
|
|
|
#include "querytele.h"
|
|
using namespace querytele;
|
|
|
|
#include "jlf_common.h"
|
|
#include "jobstep.h"
|
|
#include "primitivestep.h"
|
|
#include "subquerystep.h"
|
|
#include "tuplehashjoin.h"
|
|
#include "tupleaggregatestep.h"
|
|
|
|
// #include "stopwatch.cpp"
|
|
|
|
// Stopwatch timer;
|
|
|
|
namespace
|
|
{
|
|
typedef vector<std::pair<Row::Pointer, uint64_t>> RowBucket;
|
|
typedef vector<RowBucket> RowBucketVec;
|
|
|
|
inline RowAggFunctionType functionIdMap(int planFuncId)
|
|
{
|
|
switch (planFuncId)
|
|
{
|
|
case AggregateColumn::COUNT_ASTERISK: return ROWAGG_COUNT_ASTERISK;
|
|
|
|
case AggregateColumn::COUNT: return ROWAGG_COUNT_COL_NAME;
|
|
|
|
case AggregateColumn::SUM: return ROWAGG_SUM;
|
|
|
|
case AggregateColumn::AVG: return ROWAGG_AVG;
|
|
|
|
case AggregateColumn::MIN: return ROWAGG_MIN;
|
|
|
|
case AggregateColumn::MAX: return ROWAGG_MAX;
|
|
|
|
case AggregateColumn::DISTINCT_COUNT: return ROWAGG_COUNT_DISTINCT_COL_NAME;
|
|
|
|
case AggregateColumn::DISTINCT_SUM: return ROWAGG_DISTINCT_SUM;
|
|
|
|
case AggregateColumn::DISTINCT_AVG: return ROWAGG_DISTINCT_AVG;
|
|
|
|
case AggregateColumn::STDDEV_POP: return ROWAGG_STATS;
|
|
|
|
case AggregateColumn::STDDEV_SAMP: return ROWAGG_STATS;
|
|
|
|
case AggregateColumn::VAR_POP: return ROWAGG_STATS;
|
|
|
|
case AggregateColumn::VAR_SAMP: return ROWAGG_STATS;
|
|
|
|
case AggregateColumn::BIT_AND: return ROWAGG_BIT_AND;
|
|
|
|
case AggregateColumn::BIT_OR: return ROWAGG_BIT_OR;
|
|
|
|
case AggregateColumn::BIT_XOR: return ROWAGG_BIT_XOR;
|
|
|
|
case AggregateColumn::GROUP_CONCAT: return ROWAGG_GROUP_CONCAT;
|
|
|
|
case AggregateColumn::JSON_ARRAYAGG: return ROWAGG_JSON_ARRAY;
|
|
|
|
case AggregateColumn::CONSTANT: return ROWAGG_CONSTANT;
|
|
|
|
case AggregateColumn::UDAF: return ROWAGG_UDAF;
|
|
|
|
case AggregateColumn::MULTI_PARM: return ROWAGG_MULTI_PARM;
|
|
|
|
case AggregateColumn::SELECT_SOME: return ROWAGG_SELECT_SOME;
|
|
|
|
default: return ROWAGG_FUNCT_UNDEFINE;
|
|
}
|
|
}
|
|
|
|
inline RowAggFunctionType statsFuncIdMap(int planFuncId)
|
|
{
|
|
switch (planFuncId)
|
|
{
|
|
case AggregateColumn::STDDEV_POP: return ROWAGG_STDDEV_POP;
|
|
|
|
case AggregateColumn::STDDEV_SAMP: return ROWAGG_STDDEV_SAMP;
|
|
|
|
case AggregateColumn::VAR_POP: return ROWAGG_VAR_POP;
|
|
|
|
case AggregateColumn::VAR_SAMP: return ROWAGG_VAR_SAMP;
|
|
|
|
default: return ROWAGG_FUNCT_UNDEFINE;
|
|
}
|
|
}
|
|
|
|
inline string colTypeIdString(CalpontSystemCatalog::ColDataType type)
|
|
{
|
|
switch (type)
|
|
{
|
|
case CalpontSystemCatalog::BIT: return string("BIT");
|
|
|
|
case CalpontSystemCatalog::TINYINT: return string("TINYINT");
|
|
|
|
case CalpontSystemCatalog::CHAR: return string("CHAR");
|
|
|
|
case CalpontSystemCatalog::SMALLINT: return string("SMALLINT");
|
|
|
|
case CalpontSystemCatalog::DECIMAL: return string("DECIMAL");
|
|
|
|
case CalpontSystemCatalog::MEDINT: return string("MEDINT");
|
|
|
|
case CalpontSystemCatalog::INT: return string("INT");
|
|
|
|
case CalpontSystemCatalog::FLOAT: return string("FLOAT");
|
|
|
|
case CalpontSystemCatalog::DATE: return string("DATE");
|
|
|
|
case CalpontSystemCatalog::BIGINT: return string("BIGINT");
|
|
|
|
case CalpontSystemCatalog::DOUBLE: return string("DOUBLE");
|
|
|
|
case CalpontSystemCatalog::LONGDOUBLE: return string("LONGDOUBLE");
|
|
|
|
case CalpontSystemCatalog::DATETIME: return string("DATETIME");
|
|
|
|
case CalpontSystemCatalog::TIMESTAMP: return string("TIMESTAMP");
|
|
|
|
case CalpontSystemCatalog::TIME: return string("TIME");
|
|
|
|
case CalpontSystemCatalog::VARCHAR: return string("VARCHAR");
|
|
|
|
case CalpontSystemCatalog::CLOB: return string("CLOB");
|
|
|
|
case CalpontSystemCatalog::BLOB: return string("BLOB");
|
|
|
|
case CalpontSystemCatalog::TEXT: return string("TEXT");
|
|
|
|
case CalpontSystemCatalog::UTINYINT: return string("UTINYINT");
|
|
|
|
case CalpontSystemCatalog::USMALLINT: return string("USMALLINT");
|
|
|
|
case CalpontSystemCatalog::UDECIMAL: return string("UDECIMAL");
|
|
|
|
case CalpontSystemCatalog::UMEDINT: return string("UMEDINT");
|
|
|
|
case CalpontSystemCatalog::UINT: return string("UINT");
|
|
|
|
case CalpontSystemCatalog::UFLOAT: return string("UFLOAT");
|
|
|
|
case CalpontSystemCatalog::UBIGINT: return string("UBIGINT");
|
|
|
|
case CalpontSystemCatalog::UDOUBLE: return string("UDOUBLE");
|
|
|
|
default: return string("UNKNOWN");
|
|
}
|
|
}
|
|
|
|
string keyName(uint64_t i, uint32_t key, const joblist::JobInfo& jobInfo)
|
|
{
|
|
string name = jobInfo.projectionCols[i]->alias();
|
|
|
|
if (name.empty())
|
|
{
|
|
name = jobInfo.keyInfo->tupleKeyToName[key];
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fId < 100)
|
|
name = "Expression/Function";
|
|
}
|
|
|
|
return name = "'" + name + "'";
|
|
}
|
|
|
|
} // namespace
|
|
|
|
namespace joblist
|
|
{
|
|
void wideDecimalOrLongDouble(const uint64_t colProj, const CalpontSystemCatalog::ColDataType type,
|
|
const vector<uint32_t>& precisionProj, const vector<uint32_t>& scaleProj,
|
|
const vector<uint32_t>& width,
|
|
vector<CalpontSystemCatalog::ColDataType>& typeAgg, vector<uint32_t>& scaleAgg,
|
|
vector<uint32_t>& precisionAgg, vector<uint32_t>& widthAgg)
|
|
{
|
|
if ((type == CalpontSystemCatalog::DECIMAL || type == CalpontSystemCatalog::UDECIMAL) &&
|
|
datatypes::Decimal::isWideDecimalTypeByPrecision(precisionProj[colProj]))
|
|
{
|
|
typeAgg.push_back(type);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
widthAgg.push_back(width[colProj]);
|
|
}
|
|
else if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(type))
|
|
{
|
|
typeAgg.push_back(CalpontSystemCatalog::DECIMAL);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(datatypes::INT128MAXPRECISION);
|
|
widthAgg.push_back(datatypes::MAXDECIMALWIDTH);
|
|
}
|
|
else
|
|
{
|
|
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(-1);
|
|
widthAgg.push_back(sizeof(long double));
|
|
}
|
|
}
|
|
|
|
TupleAggregateStep::TupleAggregateStep(const SP_ROWAGG_UM_t& agg, const RowGroup& rgOut, const RowGroup& rgIn,
|
|
const JobInfo& jobInfo)
|
|
: JobStep(jobInfo)
|
|
, fCatalog(jobInfo.csc)
|
|
, fRowsReturned(0)
|
|
, fDoneAggregate(false)
|
|
, fEndOfResult(false)
|
|
, fAggregator(agg)
|
|
, fRowGroupOut(rgOut)
|
|
, fRowGroupIn(rgIn)
|
|
, fRunner(0)
|
|
, fUmOnly(false)
|
|
, fRm(jobInfo.rm)
|
|
, fBucketNum(0)
|
|
, fInputIter(-1)
|
|
, fSessionMemLimit(jobInfo.umMemLimit)
|
|
{
|
|
fRowGroupData.reinit(fRowGroupOut);
|
|
fRowGroupOut.setData(&fRowGroupData);
|
|
fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
|
|
|
|
// decide if this needs to be multi-threaded
|
|
RowAggregationDistinct* multiAgg = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
|
|
fIsMultiThread = (multiAgg || fAggregator->aggMapKeyLength() > 0);
|
|
|
|
// initialize multi-thread variables
|
|
fNumOfThreads = fRm->aggNumThreads();
|
|
fNumOfBuckets = fRm->aggNumBuckets();
|
|
fNumOfRowGroups = fRm->aggNumRowGroups();
|
|
|
|
auto memLimit = std::min(fRm->availableMemory(), *fSessionMemLimit);
|
|
fNumOfBuckets =
|
|
calcNumberOfBuckets(memLimit, fNumOfThreads, fNumOfBuckets, fNumOfRowGroups, fRowGroupIn.getRowSize(),
|
|
fRowGroupOut.getRowSize(), fRm->getAllowDiskAggregation());
|
|
|
|
fNumOfThreads = std::min(fNumOfThreads, fNumOfBuckets);
|
|
|
|
fMemUsage.reset(new uint64_t[fNumOfThreads]);
|
|
memset(fMemUsage.get(), 0, fNumOfThreads * sizeof(uint64_t));
|
|
|
|
fExtendedInfo = "TAS: ";
|
|
fQtc.stepParms().stepType = StepTeleStats::T_TAS;
|
|
fPrimitiveServerThreadPools = jobInfo.primitiveServerThreadPools;
|
|
}
|
|
|
|
TupleAggregateStep::~TupleAggregateStep()
|
|
{
|
|
for (uint32_t i = 0; i < fNumOfThreads; i++)
|
|
fRm->returnMemory(fMemUsage[i], fSessionMemLimit);
|
|
|
|
for (uint32_t i = 0; i < fAgg_mutex.size(); i++)
|
|
delete fAgg_mutex[i];
|
|
}
|
|
|
|
void TupleAggregateStep::initializeMultiThread()
|
|
{
|
|
RowGroupDL* dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
|
|
uint32_t i;
|
|
|
|
if (dlIn == NULL)
|
|
throw logic_error("Input is not RowGroup data list in delivery step.");
|
|
|
|
if (fInputIter < 0)
|
|
fInputIter = dlIn->getIterator();
|
|
|
|
fRowGroupIns.resize(fNumOfThreads);
|
|
fRowGroupOuts.resize(fNumOfBuckets);
|
|
fRowGroupDatas.resize(fNumOfBuckets);
|
|
|
|
rowgroup::SP_ROWAGG_UM_t agg;
|
|
RGData rgData;
|
|
|
|
for (i = 0; i < fNumOfBuckets; i++)
|
|
{
|
|
boost::mutex* lock = new boost::mutex();
|
|
fAgg_mutex.push_back(lock);
|
|
fRowGroupOuts[i] = fRowGroupOut;
|
|
rgData.reinit(fRowGroupOut);
|
|
fRowGroupDatas[i] = rgData;
|
|
fRowGroupOuts[i].setData(&fRowGroupDatas[i]);
|
|
fRowGroupOuts[i].resetRowGroup(0);
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::run()
|
|
{
|
|
if (fDelivery == false)
|
|
{
|
|
fRunner = jobstepThreadPool.invoke(Aggregator(this));
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::join()
|
|
{
|
|
if (fRunner)
|
|
jobstepThreadPool.join(fRunner);
|
|
}
|
|
|
|
void TupleAggregateStep::doThreadedSecondPhaseAggregate(uint32_t threadID)
|
|
{
|
|
if (threadID >= fNumOfBuckets)
|
|
return;
|
|
|
|
bool finishedSecondPhase = false;
|
|
bool diskAggAllowed = fRm->getAllowDiskAggregation();
|
|
const uint32_t maxRowsSize = 8192;
|
|
|
|
while (!finishedSecondPhase && !fEndOfResult)
|
|
{
|
|
scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
|
|
scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
|
|
uint32_t hashlen = fAggregator->aggMapKeyLength();
|
|
bool outOfMemory = false;
|
|
size_t totalMemSizeConsumed = 0;
|
|
|
|
try
|
|
{
|
|
RowAggregationDistinct* aggDist = dynamic_cast<RowAggregationDistinct*>(fAggregators[threadID].get());
|
|
RowAggregationMultiDistinct* multiDist =
|
|
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[threadID].get());
|
|
Row rowIn;
|
|
RowGroup* rowGroupIn = nullptr;
|
|
rowGroupIn = (aggDist->aggregator()->getOutputRowGroup());
|
|
uint32_t bucketID;
|
|
std::vector<std::unique_ptr<RGData>> rgDataVec;
|
|
|
|
if (multiDist)
|
|
{
|
|
for (uint32_t i = 0; i < fNumOfBuckets; i++)
|
|
rowBucketVecs[i].resize(multiDist->subAggregators().size());
|
|
}
|
|
else
|
|
{
|
|
for (uint32_t i = 0; i < fNumOfBuckets; i++)
|
|
rowBucketVecs[i].resize(1);
|
|
}
|
|
|
|
// dispatch rows to bucket
|
|
if (multiDist)
|
|
{
|
|
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
|
|
{
|
|
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
|
|
rowGroupIn->initRow(&rowIn);
|
|
auto* subDistAgg = dynamic_cast<RowAggregationUM*>(multiDist->subAggregators()[j].get());
|
|
|
|
while (subDistAgg->nextOutputRowGroup())
|
|
{
|
|
rowGroupIn = (multiDist->subAggregators()[j]->getOutputRowGroup());
|
|
rgDataVec.emplace_back(subDistAgg->moveCurrentRGData());
|
|
rowGroupIn->getRow(0, &rowIn);
|
|
|
|
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
|
|
{
|
|
// The key is the groupby columns, which are the leading columns.
|
|
// uint8_t* hashMapKey = rowIn.getData() + 2;
|
|
// bucketID = hash.operator()(hashMapKey) & fBucketMask;
|
|
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
|
|
bucketID = hash % fNumOfBuckets;
|
|
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
|
|
rowIn.nextRow();
|
|
}
|
|
|
|
const auto rgSize = diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize)
|
|
: rowGroupIn->getSizeWithStrings();
|
|
totalMemSizeConsumed += rgSize;
|
|
if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed))
|
|
{
|
|
outOfMemory = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
rowGroupIn->initRow(&rowIn);
|
|
auto* subAgg = dynamic_cast<RowAggregationUM*>(aggDist->aggregator().get());
|
|
|
|
while (subAgg->nextOutputRowGroup())
|
|
{
|
|
rowGroupIn->setData(aggDist->aggregator()->getOutputRowGroup()->getRGData());
|
|
rgDataVec.emplace_back(subAgg->moveCurrentRGData());
|
|
rowGroupIn->getRow(0, &rowIn);
|
|
|
|
for (uint64_t i = 0; i < rowGroupIn->getRowCount(); ++i)
|
|
{
|
|
// The key is the groupby columns, which are the leading columns.
|
|
// uint8_t* hashMapKey = rowIn.getData() + 2;
|
|
// bucketID = hash.operator()(hashMapKey) & fBucketMask;
|
|
uint64_t hash = rowgroup::hashRow(rowIn, hashlen - 1);
|
|
bucketID = hash % fNumOfBuckets;
|
|
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
|
|
rowIn.nextRow();
|
|
}
|
|
|
|
const auto rgSize =
|
|
diskAggAllowed ? rowGroupIn->getSizeWithStrings(maxRowsSize) : rowGroupIn->getSizeWithStrings();
|
|
totalMemSizeConsumed += rgSize;
|
|
if (!fRm->getMemory(rgSize, fSessionMemLimit, !diskAggAllowed))
|
|
{
|
|
outOfMemory = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!outOfMemory)
|
|
finishedSecondPhase = true;
|
|
|
|
bool done = false;
|
|
// reset bucketDone[] to be false
|
|
// memset(bucketDone, 0, sizeof(bucketDone));
|
|
fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
|
|
|
|
while (!done && !cancelled())
|
|
{
|
|
done = true;
|
|
|
|
for (uint32_t c = 0; c < fNumOfBuckets && !cancelled(); c++)
|
|
{
|
|
if (!bucketDone[c] && fAgg_mutex[c]->try_lock())
|
|
{
|
|
try
|
|
{
|
|
if (multiDist)
|
|
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())
|
|
->doDistinctAggregation_rowVec(rowBucketVecs[c]);
|
|
else
|
|
dynamic_cast<RowAggregationDistinct*>(fAggregators[c].get())
|
|
->doDistinctAggregation_rowVec(rowBucketVecs[c][0]);
|
|
}
|
|
catch (...)
|
|
{
|
|
fAgg_mutex[c]->unlock();
|
|
throw;
|
|
}
|
|
|
|
fAgg_mutex[c]->unlock();
|
|
bucketDone[c] = true;
|
|
rowBucketVecs[c][0].clear();
|
|
}
|
|
else if (!bucketDone[c])
|
|
{
|
|
done = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit);
|
|
if (cancelled())
|
|
{
|
|
finishedSecondPhase = true;
|
|
fEndOfResult = true;
|
|
}
|
|
} // try
|
|
catch (...)
|
|
{
|
|
fRm->returnMemory(totalMemSizeConsumed, fSessionMemLimit);
|
|
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
|
logging::ERR_AGGREGATION_TOO_BIG,
|
|
"TupleAggregateStep::doThreadedSecondPhaseAggregate()");
|
|
fEndOfResult = true;
|
|
finishedSecondPhase = true;
|
|
}
|
|
}
|
|
|
|
fDoneAggregate = true;
|
|
|
|
if (traceOn())
|
|
{
|
|
dlTimes.setLastReadTime();
|
|
dlTimes.setEndOfInputTime();
|
|
}
|
|
}
|
|
|
|
uint32_t TupleAggregateStep::nextBand_singleThread(messageqcpp::ByteStream& bs)
|
|
{
|
|
uint32_t rowCount = 0;
|
|
|
|
try
|
|
{
|
|
if (!fDoneAggregate)
|
|
aggregateRowGroups();
|
|
|
|
if (fEndOfResult == false)
|
|
{
|
|
bs.restart();
|
|
|
|
// do the final aggregtion and deliver the results
|
|
// at least one RowGroup for aggregate results
|
|
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) != NULL)
|
|
{
|
|
dynamic_cast<RowAggregationDistinct*>(fAggregator.get())->doDistinctAggregation();
|
|
}
|
|
|
|
if (fAggregator->nextRowGroup())
|
|
{
|
|
fAggregator->finalize();
|
|
rowCount = fRowGroupOut.getRowCount();
|
|
fRowsReturned += rowCount;
|
|
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
|
|
|
|
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
|
pruneAuxColumns();
|
|
|
|
fRowGroupDelivered.serializeRGData(bs);
|
|
}
|
|
else
|
|
{
|
|
fEndOfResult = true;
|
|
}
|
|
}
|
|
} // try
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
|
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedSecondPhaseAggregate()");
|
|
fEndOfResult = true;
|
|
}
|
|
|
|
if (fEndOfResult)
|
|
{
|
|
StepTeleStats sts;
|
|
sts.query_uuid = fQueryUuid;
|
|
sts.step_uuid = fStepUuid;
|
|
sts.msg_type = StepTeleStats::ST_SUMMARY;
|
|
sts.total_units_of_work = sts.units_of_work_completed = 1;
|
|
sts.rows = fRowsReturned;
|
|
postStepSummaryTele(sts);
|
|
|
|
// send an empty / error band
|
|
RGData rgData(fRowGroupOut, 0);
|
|
fRowGroupOut.setData(&rgData);
|
|
fRowGroupOut.resetRowGroup(0);
|
|
fRowGroupOut.setStatus(status());
|
|
fRowGroupOut.serializeRGData(bs);
|
|
rowCount = 0;
|
|
|
|
if (traceOn())
|
|
printCalTrace();
|
|
}
|
|
|
|
return rowCount;
|
|
}
|
|
|
|
bool TupleAggregateStep::nextDeliveredRowGroup()
|
|
{
|
|
for (; fBucketNum < fNumOfBuckets; fBucketNum++)
|
|
{
|
|
while (fAggregators[fBucketNum]->nextOutputRowGroup())
|
|
{
|
|
fAggregators[fBucketNum]->finalize();
|
|
fRowGroupDelivered.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
|
|
fRowGroupOut.setData(fAggregators[fBucketNum]->getOutputRowGroup()->getRGData());
|
|
return true;
|
|
}
|
|
}
|
|
|
|
fBucketNum = 0;
|
|
return false;
|
|
}
|
|
|
|
uint32_t TupleAggregateStep::nextBand(messageqcpp::ByteStream& bs)
|
|
{
|
|
// use the orignal single thread model when no group by and distnct.
|
|
// @bug4314. DO NOT access fAggregtor before the first read of input,
|
|
// because hashjoin may not have finalized fAggregator.
|
|
if (!fIsMultiThread)
|
|
return nextBand_singleThread(bs);
|
|
|
|
return doThreadedAggregate(bs, 0);
|
|
}
|
|
|
|
bool TupleAggregateStep::setPmHJAggregation(JobStep* step)
|
|
{
|
|
TupleBPS* bps = dynamic_cast<TupleBPS*>(step);
|
|
|
|
if (bps != NULL)
|
|
{
|
|
fAggregatorUM->expression(fAggregator->expression());
|
|
fAggregatorUM->constantAggregate(fAggregator->constantAggregate());
|
|
fAggregator = fAggregatorUM;
|
|
fRowGroupIn = fRowGroupPMHJ;
|
|
fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
|
|
bps->setAggregateStep(fAggregatorPM, fRowGroupPMHJ);
|
|
}
|
|
|
|
return (bps != NULL);
|
|
}
|
|
|
|
void TupleAggregateStep::configDeliveredRowGroup(const JobInfo& jobInfo)
|
|
{
|
|
// configure the oids and keys
|
|
vector<uint32_t> oids = fRowGroupOut.getOIDs();
|
|
vector<uint32_t> keys = fRowGroupOut.getKeys();
|
|
vector<pair<int, int>>::const_iterator begin = jobInfo.aggEidIndexList.begin();
|
|
vector<pair<int, int>>::const_iterator end = jobInfo.aggEidIndexList.end();
|
|
|
|
for (vector<pair<int, int>>::const_iterator i = begin; i != end; i++)
|
|
{
|
|
oids[i->second] = i->first;
|
|
keys[i->second] = getExpTupleKey(jobInfo, i->first);
|
|
}
|
|
|
|
// correct the scale
|
|
vector<uint32_t> scale = fRowGroupOut.getScale();
|
|
vector<uint32_t> precision = fRowGroupOut.getPrecision();
|
|
|
|
size_t retColCount = 0;
|
|
auto scaleIter = scale.begin();
|
|
auto precisionIter = precision.begin();
|
|
|
|
if (jobInfo.havingStep)
|
|
{
|
|
retColCount = jobInfo.returnedColVec.size();
|
|
|
|
idbassert(jobInfo.returnedColVec.size() == jobInfo.projectionCols.size());
|
|
|
|
for (size_t i = 0; i < jobInfo.projectionCols.size() && scaleIter != scale.end(); i++)
|
|
{
|
|
const auto& colType = jobInfo.projectionCols[i]->resultType();
|
|
|
|
if (colType.isWideDecimalType())
|
|
{
|
|
*scaleIter = colType.scale;
|
|
*precisionIter = colType.precision;
|
|
}
|
|
|
|
scaleIter++;
|
|
precisionIter++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
retColCount = jobInfo.nonConstDelCols.size();
|
|
|
|
for (size_t i = 0; i < jobInfo.nonConstDelCols.size() && scaleIter != scale.end(); i++)
|
|
{
|
|
const auto& colType = jobInfo.nonConstDelCols[i]->resultType();
|
|
|
|
if (colType.isWideDecimalType())
|
|
{
|
|
*scaleIter = colType.scale;
|
|
*precisionIter = colType.precision;
|
|
}
|
|
|
|
scaleIter++;
|
|
precisionIter++;
|
|
}
|
|
}
|
|
|
|
vector<uint32_t>::const_iterator offsets0 = fRowGroupOut.getOffsets().begin();
|
|
vector<CalpontSystemCatalog::ColDataType>::const_iterator types0 = fRowGroupOut.getColTypes().begin();
|
|
vector<uint32_t> csNums = fRowGroupOut.getCharsetNumbers();
|
|
vector<uint32_t>::const_iterator precision0 = precision.begin();
|
|
fRowGroupDelivered =
|
|
RowGroup(retColCount, vector<uint32_t>(offsets0, offsets0 + retColCount + 1),
|
|
vector<uint32_t>(oids.begin(), oids.begin() + retColCount),
|
|
vector<uint32_t>(keys.begin(), keys.begin() + retColCount),
|
|
vector<CalpontSystemCatalog::ColDataType>(types0, types0 + retColCount),
|
|
vector<uint32_t>(csNums.begin(), csNums.begin() + retColCount),
|
|
vector<uint32_t>(scale.begin(), scale.begin() + retColCount),
|
|
vector<uint32_t>(precision0, precision0 + retColCount), jobInfo.stringTableThreshold);
|
|
|
|
if (jobInfo.trace)
|
|
cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
|
|
}
|
|
|
|
void TupleAggregateStep::setOutputRowGroup(const RowGroup& rg)
|
|
{
|
|
fRowGroupOut = rg;
|
|
fRowGroupData.reinit(fRowGroupOut);
|
|
fRowGroupOut.setData(&fRowGroupData);
|
|
fAggregator->setInputOutput(fRowGroupIn, &fRowGroupOut);
|
|
}
|
|
|
|
const RowGroup& TupleAggregateStep::getOutputRowGroup() const
|
|
{
|
|
return fRowGroupOut;
|
|
}
|
|
|
|
const RowGroup& TupleAggregateStep::getDeliveredRowGroup() const
|
|
{
|
|
return fRowGroupDelivered;
|
|
}
|
|
|
|
void TupleAggregateStep::savePmHJData(SP_ROWAGG_t& um, SP_ROWAGG_t& pm, RowGroup& rg)
|
|
{
|
|
fAggregatorUM = dynamic_pointer_cast<RowAggregationUM>(um);
|
|
fAggregatorPM = pm;
|
|
fRowGroupPMHJ = rg;
|
|
}
|
|
|
|
void TupleAggregateStep::deliverStringTableRowGroup(bool b)
|
|
{
|
|
fRowGroupDelivered.setUseStringTable(b);
|
|
}
|
|
|
|
bool TupleAggregateStep::deliverStringTableRowGroup() const
|
|
{
|
|
return fRowGroupDelivered.usesStringTable();
|
|
}
|
|
|
|
const string TupleAggregateStep::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "AggregateStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
|
|
|
|
oss << " in:";
|
|
|
|
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
|
|
oss << fInputJobStepAssociation.outAt(i);
|
|
|
|
if (fOutputJobStepAssociation.outSize() > 0)
|
|
{
|
|
oss << " out:";
|
|
|
|
for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
|
|
oss << fOutputJobStepAssociation.outAt(i);
|
|
}
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
SJSTEP TupleAggregateStep::prepAggregate(SJSTEP& step, JobInfo& jobInfo)
|
|
{
|
|
SJSTEP spjs;
|
|
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(step.get());
|
|
TupleBPS* tbps = dynamic_cast<TupleBPS*>(step.get());
|
|
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(step.get());
|
|
SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(step.get());
|
|
CrossEngineStep* ces = dynamic_cast<CrossEngineStep*>(step.get());
|
|
vector<RowGroup> rgs; // 0-ProjRG, 1-UMRG, [2-PMRG -- if 2 phases]
|
|
vector<SP_ROWAGG_t> aggs;
|
|
SP_ROWAGG_UM_t aggUM;
|
|
bool distinctAgg = false;
|
|
int64_t constKey = -1;
|
|
vector<ConstantAggData> constAggDataVec;
|
|
|
|
vector<std::pair<uint32_t, int>> returnedColVecOrig = jobInfo.returnedColVec;
|
|
|
|
for (uint32_t idx = 0; idx < jobInfo.returnedColVec.size(); idx++)
|
|
{
|
|
if (jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_COUNT ||
|
|
jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_AVG ||
|
|
jobInfo.returnedColVec[idx].second == AggregateColumn::DISTINCT_SUM)
|
|
{
|
|
distinctAgg = true;
|
|
}
|
|
|
|
// Change COUNT_ASTERISK to CONSTANT if necessary.
|
|
// In joblistfactory, all aggregate(constant) are set to count(*) for easy process.
|
|
map<uint64_t, SRCP>::iterator it = jobInfo.constAggregate.find(idx);
|
|
|
|
if (it != jobInfo.constAggregate.end())
|
|
{
|
|
AggregateColumn* ac = dynamic_cast<AggregateColumn*>(it->second.get());
|
|
|
|
if (ac->aggOp() == AggregateColumn::COUNT_ASTERISK)
|
|
{
|
|
if (jobInfo.cntStarPos == -1)
|
|
jobInfo.cntStarPos = idx;
|
|
}
|
|
else
|
|
{
|
|
constKey = jobInfo.returnedColVec[idx].first;
|
|
CalpontSystemCatalog::ColType ct = ac->resultType();
|
|
TupleInfo ti = setExpTupleInfo(ct, ac->expressionId(), ac->alias(), jobInfo);
|
|
jobInfo.returnedColVec[idx].first = ti.key;
|
|
jobInfo.returnedColVec[idx].second = AggregateColumn::CONSTANT;
|
|
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(ac->constCol().get());
|
|
idbassert(cc != NULL); // @bug5261
|
|
bool isNull = cc->isNull();
|
|
|
|
if (ac->aggOp() == AggregateColumn::UDAF)
|
|
{
|
|
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
|
|
if (udafc)
|
|
{
|
|
constAggDataVec.push_back(ConstantAggData(cc->constval(), udafc->getContext().getName(),
|
|
functionIdMap(ac->aggOp()), isNull));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
constAggDataVec.push_back(ConstantAggData(cc->constval(), functionIdMap(ac->aggOp()), isNull));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If there are aggregate(constant) columns, but no count(*), add a count(*).
|
|
if (constAggDataVec.size() > 0 && jobInfo.cntStarPos < 0)
|
|
{
|
|
jobInfo.cntStarPos = jobInfo.returnedColVec.size();
|
|
jobInfo.returnedColVec.push_back(make_pair(constKey, AggregateColumn::COUNT_ASTERISK));
|
|
}
|
|
|
|
// preprocess the columns used by group_concat
|
|
jobInfo.groupConcatInfo.prepGroupConcat(jobInfo);
|
|
bool doUMOnly = jobInfo.groupConcatInfo.columns().size() > 0 || sas || ces;
|
|
|
|
rgs.push_back(tds->getDeliveredRowGroup());
|
|
|
|
// get rowgroup and aggregator
|
|
// For TupleHashJoin, we prepare for both PM and UM only aggregation
|
|
if (doUMOnly || thjs)
|
|
{
|
|
if (distinctAgg == true)
|
|
prep1PhaseDistinctAggregate(jobInfo, rgs, aggs);
|
|
else
|
|
prep1PhaseAggregate(jobInfo, rgs, aggs);
|
|
|
|
// TODO: fix this
|
|
if (doUMOnly)
|
|
rgs.push_back(rgs[0]);
|
|
}
|
|
|
|
if (!doUMOnly)
|
|
{
|
|
if (distinctAgg == true)
|
|
prep2PhasesDistinctAggregate(jobInfo, rgs, aggs);
|
|
else
|
|
{
|
|
prep2PhasesAggregate(jobInfo, rgs, aggs);
|
|
}
|
|
}
|
|
|
|
if (tbps != NULL)
|
|
{
|
|
// create delivery step
|
|
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
|
|
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[2], jobInfo));
|
|
|
|
if (doUMOnly)
|
|
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
|
|
else
|
|
tbps->setAggregateStep(aggs[1], rgs[2]);
|
|
}
|
|
else if (thjs != NULL)
|
|
{
|
|
// create delivery step
|
|
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
|
|
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
|
|
|
|
if (doUMOnly)
|
|
dynamic_cast<TupleAggregateStep*>(spjs.get())->umOnly(true);
|
|
else
|
|
dynamic_cast<TupleAggregateStep*>(spjs.get())->savePmHJData(aggs[1], aggs[2], rgs[3]);
|
|
|
|
// set input side
|
|
thjs->deliveryStep(spjs);
|
|
}
|
|
else
|
|
{
|
|
aggUM = dynamic_pointer_cast<RowAggregationUM>(aggs[0]);
|
|
spjs.reset(new TupleAggregateStep(aggUM, rgs[1], rgs[0], jobInfo));
|
|
}
|
|
|
|
// Setup the input JobstepAssoctiation -- the mechanism
|
|
// whereby the previous step feeds data to this step.
|
|
// Otherwise, we need to create one and hook to the
|
|
// previous step as well as this aggregate step.
|
|
spjs->stepId(step->stepId() + 1);
|
|
|
|
JobStepAssociation jsa;
|
|
AnyDataListSPtr spdl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
dl->OID(execplan::CNX_VTABLE_ID);
|
|
spdl->rowGroupDL(dl);
|
|
jsa.outAdd(spdl);
|
|
|
|
spjs->inputAssociation(jsa); // Aggregate input
|
|
|
|
// Previous step output
|
|
step->outputAssociation(jsa);
|
|
|
|
// add the aggregate on constants
|
|
if (constAggDataVec.size() > 0)
|
|
{
|
|
dynamic_cast<TupleAggregateStep*>(spjs.get())->addConstangAggregate(constAggDataVec);
|
|
jobInfo.returnedColVec.swap(returnedColVecOrig); // restore the original return columns
|
|
}
|
|
|
|
// fix the delivered rowgroup data
|
|
dynamic_cast<TupleAggregateStep*>(spjs.get())->configDeliveredRowGroup(jobInfo);
|
|
|
|
if (jobInfo.expressionVec.size() > 0)
|
|
dynamic_cast<TupleAggregateStep*>(spjs.get())->prepExpressionOnAggregate(aggUM, jobInfo);
|
|
|
|
return spjs;
|
|
}
|
|
|
|
void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
|
|
vector<SP_ROWAGG_t>& aggregators)
|
|
{
|
|
// check if there are any aggregate columns
|
|
vector<pair<uint32_t, int>> aggColVec;
|
|
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
|
|
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
if (returnedColVec[i].second != 0)
|
|
aggColVec.push_back(returnedColVec[i]);
|
|
}
|
|
|
|
// populate the aggregate rowgroup: projectedRG -> aggregateRG
|
|
//
|
|
// Aggregate preparation by joblist factory:
|
|
// 1. get projected rowgroup (done by doAggProject) -- passed in
|
|
// 2. construct aggregate rowgroup -- output of UM
|
|
const RowGroup projRG = rowgroups[0];
|
|
const vector<uint32_t>& oidsProj = projRG.getOIDs();
|
|
const vector<uint32_t>& keysProj = projRG.getKeys();
|
|
const vector<uint32_t>& scaleProj = projRG.getScale();
|
|
const vector<uint32_t>& precisionProj = projRG.getPrecision();
|
|
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
|
|
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
|
|
|
|
vector<uint32_t> posAgg;
|
|
vector<uint32_t> oidsAgg;
|
|
vector<uint32_t> keysAgg;
|
|
vector<uint32_t> scaleAgg;
|
|
vector<uint32_t> precisionAgg;
|
|
vector<CalpontSystemCatalog::ColDataType> typeAgg;
|
|
vector<uint32_t> csNumAgg;
|
|
vector<uint32_t> widthAgg;
|
|
vector<SP_ROWAGG_GRPBY_t> groupBy;
|
|
vector<SP_ROWAGG_FUNC_t> functionVec;
|
|
uint32_t bigIntWidth = sizeof(int64_t);
|
|
uint32_t bigUintWidth = sizeof(uint64_t);
|
|
// For UDAF
|
|
uint32_t projColsUDAFIdx = 0;
|
|
UDAFColumn* udafc = NULL;
|
|
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
|
// for count column of average function
|
|
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
|
|
|
|
// collect the projected column info, prepare for aggregation
|
|
vector<uint32_t> width;
|
|
map<uint32_t, int> projColPosMap;
|
|
|
|
for (uint64_t i = 0; i < keysProj.size(); i++)
|
|
{
|
|
projColPosMap.insert(make_pair(keysProj[i], i));
|
|
width.push_back(projRG.getColumnWidth(i));
|
|
}
|
|
|
|
// for groupby column
|
|
map<uint32_t, int> groupbyMap;
|
|
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
int64_t colProj = projColPosMap[jobInfo.groupByColVec[i]];
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1));
|
|
groupBy.push_back(groupby);
|
|
groupbyMap.insert(make_pair(jobInfo.groupByColVec[i], i));
|
|
}
|
|
|
|
// for distinct column
|
|
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
|
|
{
|
|
//@bug6126, continue if already in group by
|
|
if (groupbyMap.find(jobInfo.distinctColVec[i]) != groupbyMap.end())
|
|
continue;
|
|
|
|
int64_t colProj = projColPosMap[jobInfo.distinctColVec[i]];
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, -1));
|
|
groupBy.push_back(groupby);
|
|
groupbyMap.insert(make_pair(jobInfo.distinctColVec[i], i));
|
|
}
|
|
|
|
// populate the aggregate rowgroup
|
|
AGG_MAP aggFuncMap;
|
|
uint64_t outIdx = 0;
|
|
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
|
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
|
|
uint32_t key = returnedColVec[i].first;
|
|
|
|
if (aggOp == ROWAGG_CONSTANT)
|
|
{
|
|
TupleInfo ti = getTupleInfo(key, jobInfo);
|
|
oidsAgg.push_back(ti.oid);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(ti.scale);
|
|
precisionAgg.push_back(ti.precision);
|
|
typeAgg.push_back(ti.dtype);
|
|
csNumAgg.push_back(ti.csNum);
|
|
widthAgg.push_back(ti.width);
|
|
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, 0, outIdx, jobInfo.cntStarPos));
|
|
functionVec.push_back(funct);
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
|
|
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
|
{
|
|
TupleInfo ti = getTupleInfo(key, jobInfo);
|
|
uint32_t ptrSize = sizeof(GroupConcatAg*);
|
|
uint32_t width = (ti.width >= ptrSize) ? ti.width : ptrSize;
|
|
oidsAgg.push_back(ti.oid);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(ti.scale);
|
|
precisionAgg.push_back(ti.precision);
|
|
typeAgg.push_back(ti.dtype);
|
|
csNumAgg.push_back(ti.csNum);
|
|
widthAgg.push_back(width);
|
|
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, 0, outIdx, -1));
|
|
functionVec.push_back(funct);
|
|
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
|
|
if (projColPosMap.find(key) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
|
|
cerr << "prep1PhaseAggregate: " << emsg.str() << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
// make sure the colProj is correct
|
|
int64_t colProj = projColPosMap[key];
|
|
|
|
if (keysProj[colProj] != key)
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "projection column map is out of sync.";
|
|
cerr << "prep1PhaseAggregate: " << emsg.str() << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
if (aggOp == ROWAGG_FUNCT_UNDEFINE)
|
|
{
|
|
// must be a groupby column or function on aggregation
|
|
// or used by group_concat
|
|
map<uint32_t, int>::iterator it = groupbyMap.find(key);
|
|
|
|
if (it != groupbyMap.end())
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(width[colProj]);
|
|
|
|
if (groupBy[it->second]->fOutputColumnIndex == (uint32_t)-1)
|
|
groupBy[it->second]->fOutputColumnIndex = outIdx;
|
|
else
|
|
functionVec.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
|
|
ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, groupBy[it->second]->fOutputColumnIndex)));
|
|
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) !=
|
|
jobInfo.expressionVec.end())
|
|
{
|
|
TupleInfo ti = getTupleInfo(key, jobInfo);
|
|
oidsAgg.push_back(ti.oid);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(ti.scale);
|
|
precisionAgg.push_back(ti.precision);
|
|
typeAgg.push_back(ti.dtype);
|
|
csNumAgg.push_back(ti.csNum);
|
|
widthAgg.push_back(ti.width);
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
else if (jobInfo.groupConcatInfo.columns().find(key) != jobInfo.groupConcatInfo.columns().end())
|
|
{
|
|
// TODO: columns only for group_concat do not needed in result set.
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(width[colProj]);
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
else if (jobInfo.windowSet.find(key) != jobInfo.windowSet.end())
|
|
{
|
|
// skip window columns/expression, which are computed later
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(width[colProj]);
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
uint32_t foundTupleKey{0};
|
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, groupbyMap, key, foundTupleKey))
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(width[colProj]);
|
|
// Update key.
|
|
key = foundTupleKey;
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
Message::Args args;
|
|
args.add(keyName(i, key, jobInfo));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
|
cerr << "prep1PhaseAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable
|
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView << ", function=" << (int)aggOp << endl;
|
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
|
}
|
|
}
|
|
}
|
|
|
|
SP_ROWAGG_FUNC_t funct;
|
|
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
|
for (; it != jobInfo.projectionCols.end(); it++)
|
|
{
|
|
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
|
projColsUDAFIdx++;
|
|
if (udafc)
|
|
{
|
|
pUDAFFunc = udafc->getContext().getFunction();
|
|
// Save the multi-parm keys for dup-detection.
|
|
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
|
|
{
|
|
for (uint64_t k = i + 1;
|
|
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
|
|
{
|
|
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
|
|
}
|
|
}
|
|
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
|
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, outIdx));
|
|
break;
|
|
}
|
|
}
|
|
if (it == jobInfo.projectionCols.end())
|
|
{
|
|
throw logic_error(
|
|
"(1)prep1PhaseAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, outIdx));
|
|
}
|
|
|
|
functionVec.push_back(funct);
|
|
|
|
switch (aggOp)
|
|
{
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX:
|
|
case ROWAGG_SELECT_SOME:
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(width[colProj]);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_AVG:
|
|
avgFuncMap.insert(make_pair(key, funct));
|
|
/* fall through */
|
|
case ROWAGG_SUM:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("sum/average");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep1PhaseAggregate: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAgg,
|
|
scaleAgg, precisionAgg, widthAgg);
|
|
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(0);
|
|
// work around count() in select subquery
|
|
precisionAgg.push_back(rowgroup::MagicPrecisionForCountAgg);
|
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(bigIntWidth);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("variance/standard deviation");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep1PhaseAggregate: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(0);
|
|
typeAgg.push_back(CalpontSystemCatalog::DOUBLE);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(sizeof(double));
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(-16); // for connector to skip null check
|
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(bigIntWidth);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(2)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
|
|
// Return column
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
|
|
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
|
|
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_MULTI_PARM:
|
|
{
|
|
}
|
|
break;
|
|
|
|
default:
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
|
|
cerr << "prep1PhaseAggregate: " << emsg.str() << endl;
|
|
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
|
|
}
|
|
}
|
|
|
|
// find if this func is a duplicate
|
|
AGG_MAP::iterator iter = aggFuncMap.find(
|
|
boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM && iter != aggFuncMap.end())
|
|
{
|
|
if (funct->fAggFunction == ROWAGG_AVG)
|
|
funct->fAggFunction = ROWAGG_DUP_AVG;
|
|
else if (funct->fAggFunction == ROWAGG_STATS)
|
|
funct->fAggFunction = ROWAGG_DUP_STATS;
|
|
else if (funct->fAggFunction == ROWAGG_UDAF)
|
|
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
|
else
|
|
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
|
|
|
funct->fAuxColumnIndex = iter->second;
|
|
}
|
|
else
|
|
{
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(key, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
funct->fOutputColumnIndex));
|
|
}
|
|
|
|
if (aggOp != ROWAGG_MULTI_PARM)
|
|
{
|
|
++outIdx;
|
|
}
|
|
}
|
|
|
|
// now fix the AVG function, locate the count(column) position
|
|
for (uint64_t i = 0; i < functionVec.size(); i++)
|
|
{
|
|
if (functionVec[i]->fAggFunction != ROWAGG_COUNT_COL_NAME)
|
|
continue;
|
|
|
|
// if the count(k) can be associated with an avg(k)
|
|
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
|
|
avgFuncMap.find(keysAgg[functionVec[i]->fOutputColumnIndex]);
|
|
|
|
if (k != avgFuncMap.end())
|
|
{
|
|
k->second->fAuxColumnIndex = functionVec[i]->fOutputColumnIndex;
|
|
functionVec[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
|
|
}
|
|
}
|
|
|
|
// there is avg(k), but no count(k) in the select list
|
|
uint64_t lastCol = outIdx;
|
|
|
|
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
|
|
{
|
|
if (k->second->fAuxColumnIndex == (uint32_t)-1)
|
|
{
|
|
k->second->fAuxColumnIndex = lastCol++;
|
|
oidsAgg.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
|
|
keysAgg.push_back(k->first);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(19);
|
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
|
widthAgg.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
|
|
// add auxiliary fields for UDAF and statistics functions
|
|
for (uint64_t i = 0; i < functionVec.size(); i++)
|
|
{
|
|
uint64_t j = functionVec[i]->fInputColumnIndex;
|
|
|
|
if (functionVec[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
// Column for index of UDAF UserData struct
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec[i].get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(3)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
|
|
functionVec[i]->fAuxColumnIndex = lastCol++;
|
|
oidsAgg.push_back(oidsProj[j]);
|
|
keysAgg.push_back(keysProj[j]);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(0);
|
|
precisionAgg.push_back(0);
|
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(bigUintWidth);
|
|
continue;
|
|
}
|
|
|
|
if (functionVec[i]->fAggFunction != ROWAGG_STATS)
|
|
continue;
|
|
|
|
functionVec[i]->fAuxColumnIndex = lastCol;
|
|
|
|
// mean(x)
|
|
oidsAgg.push_back(oidsProj[j]);
|
|
keysAgg.push_back(keysProj[j]);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(-1);
|
|
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(sizeof(long double));
|
|
++lastCol;
|
|
|
|
// sum(x_i - mean)^2
|
|
oidsAgg.push_back(oidsProj[j]);
|
|
keysAgg.push_back(keysProj[j]);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(-1);
|
|
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(sizeof(long double));
|
|
++lastCol;
|
|
}
|
|
|
|
// calculate the offset and create the rowaggregation, rowgroup
|
|
posAgg.push_back(2);
|
|
|
|
for (uint64_t i = 0; i < oidsAgg.size(); i++)
|
|
posAgg.push_back(posAgg[i] + widthAgg[i]);
|
|
|
|
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
|
|
jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_UM_t rowAgg(
|
|
new RowAggregationUM(groupBy, functionVec, jobInfo.rm, jobInfo.umMemLimit, jobInfo.hasRollup));
|
|
rowAgg->timeZone(jobInfo.timeZone);
|
|
rowgroups.push_back(aggRG);
|
|
aggregators.push_back(rowAgg);
|
|
|
|
// mapping the group_concat columns, if any.
|
|
if (jobInfo.groupConcatInfo.groupConcat().size() > 0)
|
|
{
|
|
jobInfo.groupConcatInfo.mapColumns(projRG);
|
|
rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
cout << "\n====== Aggregation RowGroups ======" << endl
|
|
<< "projected RG: " << projRG.toString() << endl
|
|
<< "aggregated RG: " << aggRG.toString() << endl;
|
|
}
|
|
|
|
void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
|
|
vector<SP_ROWAGG_t>& aggregators)
|
|
{
|
|
// check if there are any aggregate columns
|
|
vector<pair<uint32_t, int>> aggColVec;
|
|
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
|
|
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
if (returnedColVec[i].second != 0)
|
|
aggColVec.push_back(returnedColVec[i]);
|
|
}
|
|
|
|
// populate the aggregate rowgroup: projectedRG -> aggregateRG
|
|
//
|
|
// Aggregate preparation by joblist factory:
|
|
// 1. get projected rowgroup (done by doAggProject) -- passed in
|
|
// 2. construct aggregate rowgroup -- output of UM
|
|
const RowGroup projRG = rowgroups[0];
|
|
const vector<uint32_t>& oidsProj = projRG.getOIDs();
|
|
const vector<uint32_t>& keysProj = projRG.getKeys();
|
|
const vector<uint32_t>& scaleProj = projRG.getScale();
|
|
const vector<uint32_t>& precisionProj = projRG.getPrecision();
|
|
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
|
|
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
|
|
|
|
vector<uint32_t> posAgg, posAggDist;
|
|
vector<uint32_t> oidsAgg, oidsAggDist;
|
|
vector<uint32_t> keysAgg, keysAggDist;
|
|
vector<uint32_t> scaleAgg, scaleAggDist;
|
|
vector<uint32_t> precisionAgg, precisionAggDist;
|
|
vector<CalpontSystemCatalog::ColDataType> typeAgg, typeAggDist;
|
|
vector<uint32_t> csNumAgg, csNumAggDist;
|
|
vector<uint32_t> widthProj, widthAgg, widthAggDist;
|
|
vector<SP_ROWAGG_GRPBY_t> groupBy, groupByNoDist;
|
|
vector<SP_ROWAGG_FUNC_t> functionVec1, functionVec2, functionNoDistVec;
|
|
uint32_t bigIntWidth = sizeof(int64_t);
|
|
// map key = column key, operation (enum), and UDAF pointer if UDAF.
|
|
AGG_MAP aggFuncMap;
|
|
// set<uint32_t> avgSet;
|
|
list<uint32_t> multiParmIndexes;
|
|
|
|
// fOR udaf
|
|
UDAFColumn* udafc = NULL;
|
|
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
|
uint32_t projColsUDAFIdx = 0;
|
|
uint32_t udafcParamIdx = 0;
|
|
|
|
// for count column of average function
|
|
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
|
|
|
|
// collect the projected column info, prepare for aggregation
|
|
vector<uint32_t> width;
|
|
for (uint64_t i = 0; i < keysProj.size(); i++)
|
|
{
|
|
width.push_back(projRG.getColumnWidth(i));
|
|
}
|
|
|
|
// associate the columns between projected RG and aggregate RG on UM
|
|
// populated the aggregate columns
|
|
// the groupby columns are put in front, even not a returned column
|
|
// sum and count(column name) are omitted, if avg present
|
|
{
|
|
// project only unique oids, but they may be repeated in aggregation
|
|
// collect the projected column info, prepare for aggregation
|
|
map<uint32_t, int> projColPosMap;
|
|
|
|
for (uint64_t i = 0; i < keysProj.size(); i++)
|
|
{
|
|
projColPosMap.insert(make_pair(keysProj[i], i));
|
|
widthProj.push_back(projRG.getColumnWidth(i));
|
|
}
|
|
|
|
// column index for aggregate rowgroup
|
|
uint64_t colAgg = 0;
|
|
|
|
// for groupby column
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
uint32_t key = jobInfo.groupByColVec[i];
|
|
|
|
if (projColPosMap.find(key) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
|
|
cerr << "prep1PhaseDistinctAggregate: groupby " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
uint64_t colProj = projColPosMap[key];
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg));
|
|
groupBy.push_back(groupby);
|
|
|
|
// copy down to aggregation rowgroup
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(widthProj[colProj]);
|
|
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAgg));
|
|
colAgg++;
|
|
}
|
|
|
|
// for distinct column
|
|
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
|
|
{
|
|
uint32_t key = jobInfo.distinctColVec[i];
|
|
|
|
if (projColPosMap.find(key) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
|
|
cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
// check for dup distinct column -- @bug6126
|
|
if (find(keysAgg.begin(), keysAgg.end(), key) != keysAgg.end())
|
|
continue;
|
|
|
|
uint64_t colProj = projColPosMap[key];
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAgg));
|
|
groupBy.push_back(groupby);
|
|
|
|
// copy down to aggregation rowgroup
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(key);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(widthProj[colProj]);
|
|
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(keysAgg[colAgg], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAgg));
|
|
colAgg++;
|
|
}
|
|
|
|
// vectors for aggregate functions
|
|
RowAggFunctionType aggOp = ROWAGG_FUNCT_UNDEFINE;
|
|
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
|
|
for (uint64_t i = 0; i < aggColVec.size(); i++)
|
|
{
|
|
pUDAFFunc = NULL;
|
|
uint32_t aggKey = aggColVec[i].first;
|
|
aggOp = functionIdMap(aggColVec[i].second);
|
|
RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
|
|
|
|
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
|
|
if (aggOp != ROWAGG_MULTI_PARM)
|
|
prevAggOp = aggOp;
|
|
|
|
// skip if this is a constant
|
|
if (aggOp == ROWAGG_CONSTANT)
|
|
continue;
|
|
|
|
// skip if this is a group_concat
|
|
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
|
{
|
|
TupleInfo ti = getTupleInfo(aggKey, jobInfo);
|
|
uint32_t width = sizeof(GroupConcatAg*);
|
|
oidsAgg.push_back(ti.oid);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(ti.scale);
|
|
precisionAgg.push_back(ti.precision);
|
|
typeAgg.push_back(ti.dtype);
|
|
csNumAgg.push_back(ti.csNum);
|
|
widthAgg.push_back(width);
|
|
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colAgg, colAgg, -1));
|
|
functionVec1.push_back(funct);
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAgg));
|
|
colAgg++;
|
|
|
|
continue;
|
|
}
|
|
|
|
if (projColPosMap.find(aggKey) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
|
|
cerr << "prep1PhaseDistinctAggregate: aggregate " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
// We skip distinct aggs, including extra parms. These are handled by adding them to group by list
|
|
// above.
|
|
if (aggOp == ROWAGG_DISTINCT_SUM || aggOp == ROWAGG_DISTINCT_AVG ||
|
|
aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
|
|
continue;
|
|
if (aggOp == ROWAGG_MULTI_PARM && prevAggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
|
|
continue;
|
|
|
|
uint64_t colProj = projColPosMap[aggKey];
|
|
|
|
SP_ROWAGG_FUNC_t funct;
|
|
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
|
|
|
for (; it != jobInfo.projectionCols.end(); it++)
|
|
{
|
|
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
|
projColsUDAFIdx++;
|
|
|
|
if (udafc)
|
|
{
|
|
pUDAFFunc = udafc->getContext().getFunction();
|
|
// Save the multi-parm keys for dup-detection.
|
|
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
|
|
{
|
|
for (uint64_t k = i + 1;
|
|
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
|
|
{
|
|
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
|
|
}
|
|
}
|
|
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
|
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg));
|
|
break;
|
|
}
|
|
}
|
|
if (it == jobInfo.projectionCols.end())
|
|
{
|
|
throw logic_error(
|
|
"(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough "
|
|
"UDAFColumns");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg));
|
|
}
|
|
|
|
// skip if this is a duplicate
|
|
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM &&
|
|
aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL)) !=
|
|
aggFuncMap.end())
|
|
{
|
|
// skip if this is a duplicate
|
|
continue;
|
|
}
|
|
|
|
functionVec1.push_back(funct);
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAgg));
|
|
|
|
switch (aggOp)
|
|
{
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX:
|
|
case ROWAGG_SELECT_SOME:
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(widthProj[colProj]);
|
|
colAgg++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_SUM:
|
|
case ROWAGG_AVG:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("sum/average");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep1PhaseDistinctAggregate: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
csNumAgg.push_back(8);
|
|
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAgg,
|
|
scaleAgg, precisionAgg, widthAgg);
|
|
colAgg++;
|
|
|
|
// has distinct step, put the count column for avg next to the sum
|
|
// let fall through to add a count column for average function
|
|
if (aggOp == ROWAGG_AVG)
|
|
funct->fAuxColumnIndex = colAgg;
|
|
else
|
|
break;
|
|
}
|
|
/* fall through */
|
|
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(0);
|
|
// work around count() in select subquery
|
|
precisionAgg.push_back(rowgroup::MagicPrecisionForCountAgg);
|
|
|
|
if (isUnsigned(typeProj[colProj]))
|
|
{
|
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
|
}
|
|
else
|
|
{
|
|
typeAgg.push_back(CalpontSystemCatalog::BIGINT);
|
|
}
|
|
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(bigIntWidth);
|
|
colAgg++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("variance/standard deviation");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep1PhaseDistinctAggregate:: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
// count(x)
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(0);
|
|
typeAgg.push_back(CalpontSystemCatalog::DOUBLE);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(sizeof(double));
|
|
funct->fAuxColumnIndex = ++colAgg;
|
|
|
|
// mean(x)
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(-1);
|
|
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(sizeof(long double));
|
|
++colAgg;
|
|
|
|
// sum(x_i - mean)^2
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(-1);
|
|
typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(sizeof(long double));
|
|
++colAgg;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(-16); // for connector to skip null check
|
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(bigIntWidth);
|
|
colAgg++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(2)prep1PhaseDistinctAggregate A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
|
|
// Return column
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(udafFuncCol->fUDAFContext.getScale());
|
|
precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision());
|
|
typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType());
|
|
csNumAgg.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
|
|
widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
|
++colAgg;
|
|
// Column for index of UDAF UserData struct
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(0);
|
|
precisionAgg.push_back(0);
|
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAgg.push_back(8);
|
|
widthAgg.push_back(sizeof(uint64_t));
|
|
funct->fAuxColumnIndex = colAgg++;
|
|
// If the first param is const
|
|
udafcParamIdx = 0;
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
|
if (cc)
|
|
{
|
|
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
|
}
|
|
++udafcParamIdx;
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_MULTI_PARM:
|
|
{
|
|
oidsAgg.push_back(oidsProj[colProj]);
|
|
keysAgg.push_back(aggKey);
|
|
scaleAgg.push_back(scaleProj[colProj]);
|
|
precisionAgg.push_back(precisionProj[colProj]);
|
|
typeAgg.push_back(typeProj[colProj]);
|
|
csNumAgg.push_back(csNumProj[colProj]);
|
|
widthAgg.push_back(widthProj[colProj]);
|
|
multiParmIndexes.push_back(colAgg);
|
|
++colAgg;
|
|
// If the param is const
|
|
if (udafc)
|
|
{
|
|
if (udafcParamIdx > udafc->aggParms().size() - 1)
|
|
{
|
|
throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with too many parms",
|
|
aggregateFuncErr);
|
|
}
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
|
if (cc)
|
|
{
|
|
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
|
}
|
|
}
|
|
else if (prevAggOp != ROWAGG_COUNT_DISTINCT_COL_NAME)
|
|
{
|
|
throw QueryDataExcept("prep1PhaseDistinctAggregate: UDAF multi function with no parms",
|
|
aggregateFuncErr);
|
|
}
|
|
++udafcParamIdx;
|
|
}
|
|
break;
|
|
|
|
default:
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
|
|
cerr << "prep1PhaseDistinctAggregate: " << emsg.str() << endl;
|
|
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// populated the functionNoDistVec
|
|
{
|
|
// for (uint32_t idx = 0; idx < functionVec1.size(); idx++)
|
|
// {
|
|
// SP_ROWAGG_FUNC_t func1 = functionVec1[idx];
|
|
// SP_ROWAGG_FUNC_t funct(
|
|
// new RowAggFunctionCol(func1->fAggFunction,
|
|
// func1->fStatsFunction,
|
|
// func1->fOutputColumnIndex,
|
|
// func1->fOutputColumnIndex,
|
|
// func1->fAuxColumnIndex));
|
|
// functionNoDistVec.push_back(funct);
|
|
// }
|
|
functionNoDistVec = functionVec1;
|
|
}
|
|
|
|
// associate the columns between the non-distinct aggregator and distinct aggregator
|
|
// populated the returned columns
|
|
// remove not returned groupby column
|
|
// add back sum or count(column name) if omitted due to avg column
|
|
// put count(column name) column to the end, if it is for avg only
|
|
{
|
|
// check if the count column for AVG is also a returned column,
|
|
// if so, replace the "-1" to actual position in returned vec.
|
|
AGG_MAP aggDupFuncMap;
|
|
projColsUDAFIdx = 0;
|
|
|
|
// copy over the groupby vector
|
|
// update the outputColumnIndex if returned
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
|
|
groupByNoDist.push_back(groupby);
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(keysAgg[i], 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL), i));
|
|
}
|
|
|
|
// locate the return column position in aggregated rowgroup
|
|
uint64_t outIdx = 0;
|
|
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
|
|
uint32_t prevRetKey = 0;
|
|
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
udafc = NULL;
|
|
pUDAFFunc = NULL;
|
|
uint32_t retKey = returnedColVec[i].first;
|
|
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
|
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
|
|
int colAgg = -1;
|
|
|
|
if (aggOp == ROWAGG_MULTI_PARM)
|
|
{
|
|
// Duplicate detection doesn't work for multi-parm`
|
|
|
|
// If this function was earlier detected as a duplicate, unduplicate it.
|
|
SP_ROWAGG_FUNC_t funct = functionVec2.back();
|
|
if (funct->fAggFunction == ROWAGG_DUP_FUNCT)
|
|
funct->fAggFunction = prevAggOp;
|
|
|
|
// Remove it from aggDupFuncMap if it's in there.
|
|
funct->hasMultiParm = true;
|
|
AGG_MAP::iterator it = aggDupFuncMap.find(boost::make_tuple(
|
|
prevRetKey, prevAggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
if (it != aggDupFuncMap.end())
|
|
{
|
|
aggDupFuncMap.erase(it);
|
|
}
|
|
|
|
// Skip on final agg.: Extra parms for an aggregate have no work there.
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
|
|
prevAggOp = aggOp;
|
|
prevRetKey = returnedColVec[i].first;
|
|
}
|
|
|
|
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
|
|
jobInfo.distinctColVec.end())
|
|
{
|
|
AGG_MAP::iterator it = aggFuncMap.find(
|
|
boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
colAgg = it->second;
|
|
}
|
|
else
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[retKey] << "' isn't in tuple.";
|
|
cerr << "prep1PhaseDistinctAggregate: distinct " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[retKey].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView;
|
|
|
|
cerr << endl;
|
|
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
|
|
}
|
|
}
|
|
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
|
for (; it != jobInfo.projectionCols.end(); it++)
|
|
{
|
|
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
|
projColsUDAFIdx++;
|
|
if (udafc)
|
|
{
|
|
pUDAFFunc = udafc->getContext().getFunction();
|
|
// Save the multi-parm keys for dup-detection.
|
|
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
|
|
{
|
|
for (uint64_t k = i + 1;
|
|
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
|
|
{
|
|
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
if (it == jobInfo.projectionCols.end())
|
|
{
|
|
throw logic_error(
|
|
"(1)prep1PhaseDistinctAggregate: A UDAF function is called but there\'s not enough "
|
|
"UDAFColumns");
|
|
}
|
|
}
|
|
|
|
switch (aggOp)
|
|
{
|
|
case ROWAGG_DISTINCT_AVG:
|
|
case ROWAGG_DISTINCT_SUM:
|
|
{
|
|
if (typeAgg[colAgg] == CalpontSystemCatalog::CHAR ||
|
|
typeAgg[colAgg] == CalpontSystemCatalog::VARCHAR ||
|
|
typeAgg[colAgg] == CalpontSystemCatalog::BLOB ||
|
|
typeAgg[colAgg] == CalpontSystemCatalog::TEXT ||
|
|
typeAgg[colAgg] == CalpontSystemCatalog::DATE ||
|
|
typeAgg[colAgg] == CalpontSystemCatalog::DATETIME ||
|
|
typeAgg[colAgg] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeAgg[colAgg] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("sum/average");
|
|
args.add(colTypeIdString(typeAgg[colAgg]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep1PhaseDistinctAggregate: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
|
keysAggDist.push_back(retKey);
|
|
wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg, typeAggDist,
|
|
scaleAggDist, precisionAggDist, widthAggDist);
|
|
csNumAggDist.push_back(8);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_COUNT_DISTINCT_COL_NAME:
|
|
{
|
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(0);
|
|
// work around count() in select subquery
|
|
precisionAggDist.push_back(rowgroup::MagicPrecisionForCountAgg);
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX:
|
|
case ROWAGG_SUM:
|
|
case ROWAGG_AVG:
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
case ROWAGG_STATS:
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
case ROWAGG_SELECT_SOME:
|
|
default:
|
|
{
|
|
AGG_MAP::iterator it = aggFuncMap.find(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
colAgg = it->second;
|
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
|
keysAggDist.push_back(keysAgg[colAgg]);
|
|
scaleAggDist.push_back(scaleAgg[colAgg]);
|
|
precisionAggDist.push_back(precisionAgg[colAgg]);
|
|
typeAggDist.push_back(typeAgg[colAgg]);
|
|
csNumAggDist.push_back(csNumAgg[colAgg]);
|
|
uint32_t width = widthAgg[colAgg];
|
|
|
|
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
|
{
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
|
|
if (ti.width > width)
|
|
width = ti.width;
|
|
}
|
|
|
|
widthAggDist.push_back(width);
|
|
}
|
|
|
|
// not a direct hit -- a returned column is not already in the RG from PMs
|
|
else
|
|
{
|
|
uint32_t foundTupleKey{0};
|
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
|
|
{
|
|
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
|
|
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
colAgg = it->second;
|
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
|
keysAggDist.push_back(keysAgg[colAgg]);
|
|
scaleAggDist.push_back(scaleAgg[colAgg]);
|
|
precisionAggDist.push_back(precisionAgg[colAgg]);
|
|
typeAggDist.push_back(typeAgg[colAgg]);
|
|
csNumAggDist.push_back(csNumAgg[colAgg]);
|
|
uint32_t width = widthAgg[colAgg];
|
|
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
|
{
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
|
|
if (ti.width > width)
|
|
width = ti.width;
|
|
}
|
|
widthAggDist.push_back(width);
|
|
|
|
// Update the `retKey` to specify that this column is a duplicate.
|
|
retKey = foundTupleKey;
|
|
}
|
|
else
|
|
{
|
|
bool returnColMissing = true;
|
|
|
|
// check if a SUM or COUNT covered by AVG
|
|
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
|
{
|
|
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
// false alarm
|
|
returnColMissing = false;
|
|
|
|
colAgg = it->second;
|
|
|
|
if (aggOp == ROWAGG_SUM)
|
|
{
|
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
|
keysAggDist.push_back(retKey);
|
|
csNumAggDist.push_back(8);
|
|
wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg,
|
|
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
|
|
}
|
|
else
|
|
{
|
|
// leave the count() to avg
|
|
aggOp = ROWAGG_COUNT_NO_OP;
|
|
|
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(0);
|
|
|
|
if (isUnsigned(typeAgg[colAgg]))
|
|
{
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
precisionAggDist.push_back(20);
|
|
}
|
|
else
|
|
{
|
|
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
|
precisionAggDist.push_back(19);
|
|
}
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
}
|
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
|
jobInfo.expressionVec.end())
|
|
{
|
|
// a function on aggregation
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggDist.push_back(ti.oid);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(ti.scale);
|
|
precisionAggDist.push_back(ti.precision);
|
|
typeAggDist.push_back(ti.dtype);
|
|
csNumAggDist.push_back(ti.csNum);
|
|
widthAggDist.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
else if (aggOp == ROWAGG_CONSTANT)
|
|
{
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggDist.push_back(ti.oid);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(ti.scale);
|
|
precisionAggDist.push_back(ti.precision);
|
|
typeAggDist.push_back(ti.dtype);
|
|
csNumAggDist.push_back(ti.csNum);
|
|
widthAggDist.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
|
|
#if 0
|
|
else if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
|
{
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggDist.push_back(ti.oid);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(ti.scale);
|
|
precisionAggDist.push_back(ti.precision);
|
|
typeAggDist.push_back(ti.dtype);
|
|
csNumAggDist.push_back(ti.csNum);
|
|
widthAggDist.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
|
|
#endif
|
|
else if (jobInfo.groupConcatInfo.columns().find(retKey) !=
|
|
jobInfo.groupConcatInfo.columns().end())
|
|
{
|
|
// TODO: columns only for group_concat do not needed in result set.
|
|
for (uint64_t k = 0; k < keysProj.size(); k++)
|
|
{
|
|
if (retKey == keysProj[k])
|
|
{
|
|
oidsAggDist.push_back(oidsProj[k]);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(scaleProj[k] >> 8);
|
|
precisionAggDist.push_back(precisionProj[k]);
|
|
typeAggDist.push_back(typeProj[k]);
|
|
csNumAggDist.push_back(csNumProj[k]);
|
|
widthAggDist.push_back(widthProj[k]);
|
|
|
|
returnColMissing = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
|
{
|
|
// skip window columns/expression, which are computed later
|
|
for (uint64_t k = 0; k < keysProj.size(); k++)
|
|
{
|
|
if (retKey == keysProj[k])
|
|
{
|
|
oidsAggDist.push_back(oidsProj[k]);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(scaleProj[k] >> 8);
|
|
precisionAggDist.push_back(precisionProj[k]);
|
|
typeAggDist.push_back(typeProj[k]);
|
|
csNumAggDist.push_back(csNumProj[k]);
|
|
widthAggDist.push_back(widthProj[k]);
|
|
|
|
returnColMissing = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (returnColMissing)
|
|
{
|
|
Message::Args args;
|
|
args.add(keyName(outIdx, retKey, jobInfo));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
|
cerr << "prep1PhaseDistinctAggregate: " << emsg
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
|
<< endl;
|
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
|
}
|
|
} // else
|
|
} // switch
|
|
}
|
|
}
|
|
|
|
// update groupby vector if the groupby column is a returned column
|
|
if (returnedColVec[i].second == 0)
|
|
{
|
|
int dupGroupbyIndex = -1;
|
|
|
|
for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
|
|
{
|
|
if (jobInfo.groupByColVec[j] == retKey)
|
|
{
|
|
if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t)-1)
|
|
groupByNoDist[j]->fOutputColumnIndex = outIdx;
|
|
else
|
|
dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex;
|
|
}
|
|
}
|
|
|
|
// a duplicate group by column
|
|
if (dupGroupbyIndex != -1)
|
|
functionVec2.push_back(SP_ROWAGG_FUNC_t(
|
|
new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
|
|
}
|
|
else
|
|
{
|
|
// update the aggregate function vector
|
|
SP_ROWAGG_FUNC_t funct;
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, outIdx));
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(aggOp, stats, colAgg, outIdx));
|
|
}
|
|
|
|
if (aggOp == ROWAGG_COUNT_NO_OP)
|
|
funct->fAuxColumnIndex = colAgg;
|
|
else if (aggOp == ROWAGG_CONSTANT)
|
|
funct->fAuxColumnIndex = jobInfo.cntStarPos;
|
|
|
|
functionVec2.push_back(funct);
|
|
|
|
// find if this func is a duplicate
|
|
AGG_MAP::iterator iter = aggDupFuncMap.find(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (iter != aggDupFuncMap.end())
|
|
{
|
|
if (funct->fAggFunction == ROWAGG_AVG)
|
|
funct->fAggFunction = ROWAGG_DUP_AVG;
|
|
else if (funct->fAggFunction == ROWAGG_STATS)
|
|
funct->fAggFunction = ROWAGG_DUP_STATS;
|
|
else if (funct->fAggFunction == ROWAGG_UDAF)
|
|
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
|
else
|
|
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
|
|
|
funct->fAuxColumnIndex = iter->second;
|
|
}
|
|
else
|
|
{
|
|
aggDupFuncMap.insert(make_pair(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
funct->fOutputColumnIndex));
|
|
}
|
|
|
|
if (returnedColVec[i].second == AggregateColumn::AVG)
|
|
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
|
|
else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
|
|
avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
|
|
}
|
|
++outIdx;
|
|
} // for (i
|
|
|
|
// now fix the AVG function, locate the count(column) position
|
|
for (uint64_t i = 0; i < functionVec2.size(); i++)
|
|
{
|
|
if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_NO_OP)
|
|
{
|
|
// if the count(k) can be associated with an avg(k)
|
|
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
|
|
avgFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]);
|
|
|
|
if (k != avgFuncMap.end())
|
|
k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex;
|
|
}
|
|
}
|
|
|
|
// there is avg(k), but no count(k) in the select list
|
|
uint64_t lastCol = outIdx;
|
|
|
|
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
|
|
{
|
|
if (k->second->fAuxColumnIndex == (uint32_t)-1)
|
|
{
|
|
k->second->fAuxColumnIndex = lastCol++;
|
|
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
|
|
keysAggDist.push_back(k->first);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(19);
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
|
|
// now fix the AVG distinct function, locate the count(distinct column) position
|
|
for (uint64_t i = 0; i < functionVec2.size(); i++)
|
|
{
|
|
if (functionVec2[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME && !functionVec2[i]->hasMultiParm)
|
|
{
|
|
// if the count(distinct k) can be associated with an avg(distinct k)
|
|
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
|
|
avgDistFuncMap.find(keysAggDist[functionVec2[i]->fOutputColumnIndex]);
|
|
|
|
if (k != avgDistFuncMap.end())
|
|
{
|
|
k->second->fAuxColumnIndex = functionVec2[i]->fOutputColumnIndex;
|
|
functionVec2[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
|
|
}
|
|
}
|
|
}
|
|
|
|
// there is avg(distinct k), but no count(distinct k) in the select list
|
|
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++)
|
|
{
|
|
if (k->second->fAuxColumnIndex == (uint32_t)-1)
|
|
{
|
|
k->second->fAuxColumnIndex = lastCol++;
|
|
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
|
|
keysAggDist.push_back(k->first);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(19);
|
|
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
|
|
// add auxiliary fields for UDAF and statistics functions
|
|
for (uint64_t i = 0; i < functionVec2.size(); i++)
|
|
{
|
|
uint64_t j = functionVec2[i]->fInputColumnIndex;
|
|
|
|
if (functionVec2[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
// Column for index of UDAF UserData struct
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec2[i].get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(4)prep1PhaseDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
|
|
functionVec2[i]->fAuxColumnIndex = lastCol++;
|
|
oidsAggDist.push_back(oidsAgg[j]); // Dummy?
|
|
keysAggDist.push_back(keysAgg[j]); // Dummy?
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(0);
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(sizeof(uint64_t));
|
|
continue;
|
|
}
|
|
|
|
if (functionVec2[i]->fAggFunction != ROWAGG_STATS)
|
|
continue;
|
|
|
|
functionVec2[i]->fAuxColumnIndex = lastCol;
|
|
|
|
// mean(x)
|
|
oidsAggDist.push_back(oidsAgg[j]);
|
|
keysAggDist.push_back(keysAgg[j]);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(0);
|
|
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(sizeof(long double));
|
|
++lastCol;
|
|
|
|
// sum(x_i - mean)^2
|
|
oidsAggDist.push_back(oidsAgg[j]);
|
|
keysAggDist.push_back(keysAgg[j]);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(-1);
|
|
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(sizeof(long double));
|
|
++lastCol;
|
|
}
|
|
}
|
|
|
|
// calculate the offset and create the rowaggregation, rowgroup
|
|
posAgg.push_back(2);
|
|
|
|
for (uint64_t i = 0; i < oidsAgg.size(); i++)
|
|
posAgg.push_back(posAgg[i] + widthAgg[i]);
|
|
|
|
RowGroup aggRG(oidsAgg.size(), posAgg, oidsAgg, keysAgg, typeAgg, csNumAgg, scaleAgg, precisionAgg,
|
|
jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_UM_t rowAgg(new RowAggregationUM(groupBy, functionVec1, jobInfo.rm, jobInfo.umMemLimit, false));
|
|
rowAgg->timeZone(jobInfo.timeZone);
|
|
|
|
posAggDist.push_back(2); // rid
|
|
|
|
for (uint64_t i = 0; i < oidsAggDist.size(); i++)
|
|
posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
|
|
|
|
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, csNumAggDist,
|
|
scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_DIST rowAggDist(
|
|
new RowAggregationDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit));
|
|
rowAggDist->timeZone(jobInfo.timeZone);
|
|
|
|
// mapping the group_concat columns, if any.
|
|
if (jobInfo.groupConcatInfo.groupConcat().size() > 0)
|
|
{
|
|
jobInfo.groupConcatInfo.mapColumns(projRG);
|
|
rowAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
|
|
rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat());
|
|
}
|
|
|
|
// if distinct key word applied to more than one aggregate column, reset rowAggDist
|
|
vector<RowGroup> subRgVec;
|
|
|
|
if (jobInfo.distinctColVec.size() > 1)
|
|
{
|
|
RowAggregationMultiDistinct* multiDistinctAggregator =
|
|
new RowAggregationMultiDistinct(groupByNoDist, functionVec2, jobInfo.rm, jobInfo.umMemLimit);
|
|
multiDistinctAggregator->timeZone(jobInfo.timeZone);
|
|
rowAggDist.reset(multiDistinctAggregator);
|
|
rowAggDist->groupConcat(jobInfo.groupConcatInfo.groupConcat());
|
|
|
|
// construct and add sub-aggregators to rowAggDist
|
|
vector<uint32_t> posAggGb, posAggSub;
|
|
vector<uint32_t> oidsAggGb, oidsAggSub;
|
|
vector<uint32_t> keysAggGb, keysAggSub;
|
|
vector<uint32_t> scaleAggGb, scaleAggSub;
|
|
vector<uint32_t> precisionAggGb, precisionAggSub;
|
|
vector<CalpontSystemCatalog::ColDataType> typeAggGb, typeAggSub;
|
|
vector<uint32_t> csNumAggGb, csNumAggSub;
|
|
vector<uint32_t> widthAggGb, widthAggSub;
|
|
|
|
// populate groupby column info
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
oidsAggGb.push_back(oidsProj[i]);
|
|
keysAggGb.push_back(keysProj[i]);
|
|
scaleAggGb.push_back(scaleProj[i]);
|
|
precisionAggGb.push_back(precisionProj[i]);
|
|
typeAggGb.push_back(typeProj[i]);
|
|
csNumAggGb.push_back(csNumProj[i]);
|
|
widthAggGb.push_back(widthProj[i]);
|
|
}
|
|
|
|
// for distinct, each column requires seperate rowgroup
|
|
vector<SP_ROWAGG_DIST> rowAggSubDistVec;
|
|
|
|
uint32_t distinctColKey;
|
|
int64_t j;
|
|
uint64_t k;
|
|
uint64_t outIdx = 0;
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
if (returnedColVec[i].second == 0)
|
|
{
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
|
|
j = -1;
|
|
|
|
distinctColKey = -1;
|
|
// Find the entry in distinctColVec, if any
|
|
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
|
|
{
|
|
distinctColKey = jobInfo.distinctColVec[k];
|
|
if (returnedColVec[i].first == distinctColKey)
|
|
break;
|
|
}
|
|
if (distinctColKey == (uint32_t)-1)
|
|
{
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
// locate the distinct key in the row group
|
|
for (k = 0; k < keysAgg.size(); k++)
|
|
{
|
|
if (keysProj[k] == distinctColKey)
|
|
{
|
|
j = k;
|
|
break;
|
|
}
|
|
}
|
|
|
|
idbassert(j != -1);
|
|
|
|
oidsAggSub = oidsAggGb;
|
|
keysAggSub = keysAggGb;
|
|
scaleAggSub = scaleAggGb;
|
|
precisionAggSub = precisionAggGb;
|
|
typeAggSub = typeAggGb;
|
|
csNumAggSub = csNumAggGb;
|
|
widthAggSub = widthAggGb;
|
|
|
|
oidsAggSub.push_back(oidsProj[j]);
|
|
keysAggSub.push_back(keysProj[j]);
|
|
scaleAggSub.push_back(scaleProj[j]);
|
|
precisionAggSub.push_back(precisionProj[j]);
|
|
typeAggSub.push_back(typeProj[j]);
|
|
csNumAggSub.push_back(csNumProj[j]);
|
|
widthAggSub.push_back(widthProj[j]);
|
|
|
|
// construct groupby vector
|
|
vector<SP_ROWAGG_GRPBY_t> groupBySub;
|
|
k = 0;
|
|
|
|
while (k < jobInfo.groupByColVec.size())
|
|
{
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k));
|
|
groupBySub.push_back(groupby);
|
|
k++;
|
|
}
|
|
// add the distinct column as groupby
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
|
|
groupBySub.push_back(groupby);
|
|
|
|
// Add multi parm distinct
|
|
while ((i + 1) < returnedColVec.size() &&
|
|
functionIdMap(returnedColVec[i + 1].second) == ROWAGG_MULTI_PARM)
|
|
{
|
|
++i;
|
|
uint32_t dColKey = -1;
|
|
j = -1;
|
|
|
|
// Find the entry in distinctColVec, if any
|
|
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
|
|
{
|
|
dColKey = jobInfo.distinctColVec[k];
|
|
if (returnedColVec[i].first == dColKey)
|
|
break;
|
|
}
|
|
idbassert(dColKey != (uint32_t)-1);
|
|
// locate the distinct key in the row group
|
|
for (k = 0; k < keysAgg.size(); k++)
|
|
{
|
|
if (keysProj[k] == dColKey)
|
|
{
|
|
j = k;
|
|
break;
|
|
}
|
|
}
|
|
idbassert(j != -1);
|
|
|
|
oidsAggSub.push_back(oidsProj[j]);
|
|
keysAggSub.push_back(keysProj[j]);
|
|
scaleAggSub.push_back(scaleProj[j]);
|
|
precisionAggSub.push_back(precisionProj[j]);
|
|
typeAggSub.push_back(typeProj[j]);
|
|
csNumAggSub.push_back(csNumProj[j]);
|
|
widthAggSub.push_back(widthProj[j]);
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
|
|
groupBySub.push_back(groupby);
|
|
}
|
|
|
|
// construct sub-rowgroup
|
|
posAggSub.clear();
|
|
posAggSub.push_back(2); // rid
|
|
|
|
for (k = 0; k < oidsAggSub.size(); k++)
|
|
posAggSub.push_back(posAggSub[k] + widthAggSub[k]);
|
|
|
|
RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub, csNumAggSub,
|
|
scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold);
|
|
subRgVec.push_back(subRg);
|
|
|
|
// Keep a count of the parms after the first for any aggregate.
|
|
// These will be skipped and the count needs to be subtracted
|
|
// from where the aux column will be.
|
|
int64_t multiParms = 0;
|
|
|
|
// tricky part : 2 function vectors
|
|
// -- dummy function vector for sub-aggregator, which does distinct only
|
|
// -- aggregate function on this distinct column for rowAggDist
|
|
vector<SP_ROWAGG_FUNC_t> functionSub1, functionSub2;
|
|
// search the function in functionVec
|
|
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVec2.begin();
|
|
|
|
while (it != functionVec2.end())
|
|
{
|
|
SP_ROWAGG_FUNC_t f = *it++;
|
|
|
|
if ((f->fOutputColumnIndex == outIdx) &&
|
|
(f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || f->fAggFunction == ROWAGG_DISTINCT_SUM ||
|
|
f->fAggFunction == ROWAGG_DISTINCT_AVG))
|
|
{
|
|
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction,
|
|
groupBySub.size() - 1, f->fOutputColumnIndex,
|
|
f->fAuxColumnIndex - multiParms));
|
|
functionSub2.push_back(funct);
|
|
}
|
|
}
|
|
|
|
// construct sub-aggregator
|
|
SP_ROWAGG_UM_t subAgg(
|
|
new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
|
|
subAgg->timeZone(jobInfo.timeZone);
|
|
subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
|
|
|
|
// add to rowAggDist
|
|
multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2);
|
|
|
|
++outIdx;
|
|
}
|
|
|
|
// cover any non-distinct column functions
|
|
{
|
|
vector<SP_ROWAGG_FUNC_t> functionSub1 = functionNoDistVec;
|
|
vector<SP_ROWAGG_FUNC_t> functionSub2;
|
|
int64_t multiParms = 0;
|
|
|
|
for (uint64_t k = 0; k < returnedColVec.size(); k++)
|
|
{
|
|
// search non-distinct functions in functionVec
|
|
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVec2.begin();
|
|
|
|
while (it != functionVec2.end())
|
|
{
|
|
SP_ROWAGG_FUNC_t funct;
|
|
SP_ROWAGG_FUNC_t f = *it++;
|
|
|
|
if (f->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(f.get());
|
|
funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex,
|
|
udafFuncCol->fOutputColumnIndex,
|
|
udafFuncCol->fAuxColumnIndex - multiParms));
|
|
functionSub2.push_back(funct);
|
|
}
|
|
else if ((f->fOutputColumnIndex == k) &&
|
|
(f->fAggFunction == ROWAGG_COUNT_ASTERISK || f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
|
|
f->fAggFunction == ROWAGG_SUM || f->fAggFunction == ROWAGG_AVG ||
|
|
f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX ||
|
|
f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND ||
|
|
f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR ||
|
|
f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_GROUP_CONCAT ||
|
|
f->fAggFunction == ROWAGG_JSON_ARRAY || f->fAggFunction == ROWAGG_SELECT_SOME))
|
|
{
|
|
funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex,
|
|
f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms));
|
|
functionSub2.push_back(funct);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (functionSub1.size() > 0)
|
|
{
|
|
// make sure the group by columns are available for next aggregate phase.
|
|
vector<SP_ROWAGG_GRPBY_t> groupBySubNoDist;
|
|
|
|
for (uint64_t i = 0; i < groupByNoDist.size(); i++)
|
|
groupBySubNoDist.push_back(
|
|
SP_ROWAGG_GRPBY_t(new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i)));
|
|
|
|
// construct sub-aggregator
|
|
SP_ROWAGG_UM_t subAgg(
|
|
new RowAggregationUM(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false));
|
|
subAgg->timeZone(jobInfo.timeZone);
|
|
subAgg->groupConcat(jobInfo.groupConcatInfo.groupConcat());
|
|
|
|
// add to rowAggDist
|
|
multiDistinctAggregator->addSubAggregator(subAgg, aggRG, functionSub2);
|
|
subRgVec.push_back(aggRG);
|
|
}
|
|
}
|
|
}
|
|
|
|
rowAggDist->addAggregator(rowAgg, aggRG);
|
|
rowgroups.push_back(aggRgDist);
|
|
aggregators.push_back(rowAggDist);
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << "projected RG: " << projRG.toString() << endl << "aggregated RG: " << aggRG.toString() << endl;
|
|
|
|
for (uint64_t i = 0; i < subRgVec.size(); i++)
|
|
cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl;
|
|
|
|
cout << "aggregatedDist RG: " << aggRgDist.toString() << endl;
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
|
|
vector<SP_ROWAGG_t>& aggregators)
|
|
{
|
|
// check if there are any aggregate columns
|
|
// a vector that has the aggregate function to be done by PM
|
|
vector<pair<uint32_t, int>> aggColVec;
|
|
set<uint32_t> avgSet;
|
|
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
|
|
// For UDAF
|
|
uint32_t projColsUDAFIdx = 0;
|
|
uint32_t udafcParamIdx = 0;
|
|
UDAFColumn* udafc = NULL;
|
|
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
|
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
// skip if not an aggregation column
|
|
if (returnedColVec[i].second == 0)
|
|
continue;
|
|
|
|
aggColVec.push_back(returnedColVec[i]);
|
|
|
|
// remember if a column has an average function,
|
|
// with avg function, no need for separate sum or count_column_name
|
|
if (returnedColVec[i].second == AggregateColumn::AVG)
|
|
avgSet.insert(returnedColVec[i].first);
|
|
}
|
|
|
|
// populate the aggregate rowgroup on PM and UM
|
|
// PM: projectedRG -> aggregateRGPM
|
|
// UM: aggregateRGPM -> aggregateRGUM
|
|
//
|
|
// Aggregate preparation by joblist factory:
|
|
// 1. get projected rowgroup (done by doAggProject) -- input to PM AGG
|
|
// 2. construct aggregate rowgroup -- output of PM, input of UM
|
|
// 3. construct aggregate rowgroup -- output of UM
|
|
const RowGroup projRG = rowgroups[0];
|
|
const vector<uint32_t>& oidsProj = projRG.getOIDs();
|
|
const vector<uint32_t>& keysProj = projRG.getKeys();
|
|
const vector<uint32_t>& scaleProj = projRG.getScale();
|
|
const vector<uint32_t>& precisionProj = projRG.getPrecision();
|
|
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
|
|
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
|
|
|
|
vector<uint32_t> posAggPm, posAggUm;
|
|
vector<uint32_t> oidsAggPm, oidsAggUm;
|
|
vector<uint32_t> keysAggPm, keysAggUm;
|
|
vector<uint32_t> scaleAggPm, scaleAggUm;
|
|
vector<uint32_t> precisionAggPm, precisionAggUm;
|
|
vector<CalpontSystemCatalog::ColDataType> typeAggPm, typeAggUm;
|
|
vector<uint32_t> csNumAggPm, csNumAggUm;
|
|
vector<uint32_t> widthAggPm, widthAggUm;
|
|
vector<SP_ROWAGG_GRPBY_t> groupByPm, groupByUm;
|
|
vector<SP_ROWAGG_FUNC_t> functionVecPm, functionVecUm;
|
|
uint32_t bigIntWidth = sizeof(int64_t);
|
|
uint32_t bigUintWidth = sizeof(uint64_t);
|
|
AGG_MAP aggFuncMap;
|
|
|
|
// associate the columns between projected RG and aggregate RG on PM
|
|
// populated the aggregate columns
|
|
// the groupby columns are put in front, even not a returned column
|
|
// sum and count(column name) are omitted, if avg present
|
|
{
|
|
// project only unique oids, but they may be repeated in aggregation
|
|
// collect the projected column info, prepare for aggregation
|
|
vector<uint32_t> width;
|
|
map<uint32_t, int> projColPosMap;
|
|
|
|
for (uint64_t i = 0; i < keysProj.size(); i++)
|
|
{
|
|
projColPosMap.insert(make_pair(keysProj[i], i));
|
|
width.push_back(projRG.getColumnWidth(i));
|
|
}
|
|
|
|
// column index for PM aggregate rowgroup
|
|
uint64_t colAggPm = 0;
|
|
|
|
// for groupby column
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
uint32_t key = jobInfo.groupByColVec[i];
|
|
|
|
if (projColPosMap.find(key) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
|
|
cerr << "prep2PhasesAggregate: groupby " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
uint64_t colProj = projColPosMap[key];
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
|
|
groupByPm.push_back(groupby);
|
|
|
|
// PM: just copy down to aggregation rowgroup
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(key);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
|
|
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAggPm));
|
|
colAggPm++;
|
|
}
|
|
|
|
// for distinct column
|
|
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
|
|
{
|
|
uint32_t key = jobInfo.distinctColVec[i];
|
|
|
|
if (projColPosMap.find(key) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
|
|
cerr << "prep2PhasesAggregate: distinct " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
uint64_t colProj = projColPosMap[key];
|
|
|
|
// check for dup distinct column -- @bug6126
|
|
if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end())
|
|
continue;
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
|
|
groupByPm.push_back(groupby);
|
|
|
|
// PM: just copy down to aggregation rowgroup
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(key);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
|
|
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAggPm));
|
|
colAggPm++;
|
|
}
|
|
|
|
// vectors for aggregate functions
|
|
for (uint64_t i = 0; i < aggColVec.size(); i++)
|
|
{
|
|
pUDAFFunc = NULL;
|
|
uint32_t aggKey = aggColVec[i].first;
|
|
RowAggFunctionType aggOp = functionIdMap(aggColVec[i].second);
|
|
RowAggFunctionType stats = statsFuncIdMap(aggColVec[i].second);
|
|
|
|
// skip on PM if this is a constant
|
|
if (aggOp == ROWAGG_CONSTANT)
|
|
continue;
|
|
|
|
if (projColPosMap.find(aggKey) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
|
|
cerr << "prep2PhasesAggregate: aggregate " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && (avgSet.find(aggKey) != avgSet.end()))
|
|
// skip sum / count(column) if avg is also selected
|
|
continue;
|
|
|
|
uint64_t colProj = projColPosMap[aggKey];
|
|
SP_ROWAGG_FUNC_t funct;
|
|
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
|
for (; it != jobInfo.projectionCols.end(); it++)
|
|
{
|
|
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
|
projColsUDAFIdx++;
|
|
if (udafc)
|
|
{
|
|
pUDAFFunc = udafc->getContext().getFunction();
|
|
// Save the multi-parm keys for dup-detection.
|
|
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
|
|
{
|
|
for (uint64_t k = i + 1;
|
|
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
|
|
{
|
|
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
|
|
}
|
|
}
|
|
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
|
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
|
|
break;
|
|
}
|
|
}
|
|
if (it == jobInfo.projectionCols.end())
|
|
{
|
|
throw logic_error(
|
|
"(1)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
|
|
}
|
|
|
|
// skip if this is a duplicate
|
|
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM &&
|
|
aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL)) !=
|
|
aggFuncMap.end())
|
|
{
|
|
// skip if this is a duplicate
|
|
continue;
|
|
}
|
|
|
|
functionVecPm.push_back(funct);
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAggPm));
|
|
|
|
switch (aggOp)
|
|
{
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX:
|
|
case ROWAGG_SELECT_SOME:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
colAggPm++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_SUM:
|
|
case ROWAGG_AVG:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("sum/average");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep2PhasesAggregate: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAggPm,
|
|
scaleAggPm, precisionAggPm, widthAggPm);
|
|
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
csNumAggPm.push_back(8);
|
|
colAggPm++;
|
|
}
|
|
|
|
// PM: put the count column for avg next to the sum
|
|
// let fall through to add a count column for average function
|
|
if (aggOp != ROWAGG_AVG)
|
|
break;
|
|
// The AVG aggregation has a special treatment everywhere.
|
|
// This is so because AVG(column) is a SUM(column)/COUNT(column)
|
|
// and these aggregations can be utilized by AVG, if present.
|
|
/* fall through */
|
|
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
// work around count() in select subquery
|
|
precisionAggPm.push_back(rowgroup::MagicPrecisionForCountAgg);
|
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(bigIntWidth);
|
|
colAggPm++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("variance/standard deviation");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep2PhaseAggregate:: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
// counts(x)
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(0);
|
|
typeAggPm.push_back(CalpontSystemCatalog::DOUBLE);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(sizeof(double));
|
|
funct->fAuxColumnIndex = ++colAggPm;
|
|
|
|
// mean(x)
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(-1);
|
|
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(sizeof(long double));
|
|
++colAggPm;
|
|
|
|
// sum(x_i - mean)^2
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(-1);
|
|
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(sizeof(long double));
|
|
++colAggPm;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(-16); // for connector to skip null check
|
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(bigIntWidth);
|
|
colAggPm++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
// Return column
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(2)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
|
|
precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
|
|
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
|
|
csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
|
|
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
|
++colAggPm;
|
|
// Column for index of UDAF UserData struct
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(0);
|
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(bigUintWidth);
|
|
funct->fAuxColumnIndex = colAggPm++;
|
|
// If the first param is const
|
|
udafcParamIdx = 0;
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
|
if (cc)
|
|
{
|
|
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
|
}
|
|
++udafcParamIdx;
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_MULTI_PARM:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
colAggPm++;
|
|
// If the param is const
|
|
if (udafc)
|
|
{
|
|
if (udafcParamIdx > udafc->aggParms().size() - 1)
|
|
{
|
|
throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with too many parms",
|
|
aggregateFuncErr);
|
|
}
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
|
if (cc)
|
|
{
|
|
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
|
}
|
|
}
|
|
else
|
|
{
|
|
throw QueryDataExcept("prep2PhasesAggregate: UDAF multi function with no parms",
|
|
aggregateFuncErr);
|
|
}
|
|
++udafcParamIdx;
|
|
}
|
|
break;
|
|
|
|
default:
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
|
|
cerr << "prep2PhasesAggregate: " << emsg.str() << endl;
|
|
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// associate the columns between the aggregate RGs on PM and UM
|
|
// populated the returned columns
|
|
// remove not returned groupby column
|
|
// add back sum or count(column name) if omitted due to avg column
|
|
// put count(column name) column to the end, if it is for avg only
|
|
{
|
|
// check if the count column for AVG is also a returned column,
|
|
// if so, replace the "-1" to actual position in returned vec.
|
|
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
|
|
AGG_MAP aggDupFuncMap;
|
|
|
|
projColsUDAFIdx = 0;
|
|
// copy over the groupby vector
|
|
// update the outputColumnIndex if returned
|
|
for (uint64_t i = 0; i < groupByPm.size(); i++)
|
|
{
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(groupByPm[i]->fOutputColumnIndex, -1));
|
|
groupByUm.push_back(groupby);
|
|
}
|
|
|
|
// locate the return column position in aggregated rowgroup from PM
|
|
// outIdx is i without the multi-columns,
|
|
uint64_t outIdx = 0;
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
uint32_t retKey = returnedColVec[i].first;
|
|
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
|
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
|
|
int colPm = -1;
|
|
|
|
if (aggOp == ROWAGG_MULTI_PARM)
|
|
{
|
|
// Skip on UM: Extra parms for an aggregate have no work on the UM
|
|
continue;
|
|
}
|
|
|
|
// Is this a UDAF? use the function as part of the key.
|
|
pUDAFFunc = NULL;
|
|
udafc = NULL;
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
|
|
|
for (; it != jobInfo.projectionCols.end(); it++)
|
|
{
|
|
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
|
projColsUDAFIdx++;
|
|
if (udafc)
|
|
{
|
|
pUDAFFunc = udafc->getContext().getFunction();
|
|
// Save the multi-parm keys for dup-detection.
|
|
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
|
|
{
|
|
for (uint64_t k = i + 1;
|
|
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
|
|
{
|
|
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
if (it == jobInfo.projectionCols.end())
|
|
{
|
|
throw logic_error(
|
|
"(3)prep2PhasesAggregate: A UDAF function is called but there\'s not enough UDAFColumns");
|
|
}
|
|
}
|
|
|
|
AGG_MAP::iterator it = aggFuncMap.find(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
colPm = it->second;
|
|
oidsAggUm.push_back(oidsAggPm[colPm]);
|
|
keysAggUm.push_back(retKey);
|
|
scaleAggUm.push_back(scaleAggPm[colPm]);
|
|
precisionAggUm.push_back(precisionAggPm[colPm]);
|
|
typeAggUm.push_back(typeAggPm[colPm]);
|
|
csNumAggUm.push_back(csNumAggPm[colPm]);
|
|
widthAggUm.push_back(widthAggPm[colPm]);
|
|
}
|
|
|
|
// not a direct hit -- a returned column is not already in the RG from PMs
|
|
else
|
|
{
|
|
// MCOL-5476.
|
|
uint32_t foundTupleKey{0};
|
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
|
|
{
|
|
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
|
|
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
colPm = it->second;
|
|
oidsAggUm.push_back(oidsAggPm[colPm]);
|
|
keysAggUm.push_back(retKey);
|
|
scaleAggUm.push_back(scaleAggPm[colPm]);
|
|
precisionAggUm.push_back(precisionAggPm[colPm]);
|
|
typeAggUm.push_back(typeAggPm[colPm]);
|
|
csNumAggUm.push_back(csNumAggPm[colPm]);
|
|
widthAggUm.push_back(widthAggPm[colPm]);
|
|
// Update the `retKey` to specify that this column is a duplicate.
|
|
retKey = foundTupleKey;
|
|
}
|
|
else
|
|
{
|
|
bool returnColMissing = true;
|
|
|
|
// check if a SUM or COUNT covered by AVG
|
|
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
|
{
|
|
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
// false alarm
|
|
returnColMissing = false;
|
|
|
|
colPm = it->second;
|
|
|
|
if (aggOp == ROWAGG_SUM)
|
|
{
|
|
wideDecimalOrLongDouble(colPm, typeAggPm[colPm], precisionAggPm, scaleAggPm, widthAggPm,
|
|
typeAggUm, scaleAggUm, precisionAggUm, widthAggUm);
|
|
|
|
oidsAggUm.push_back(oidsAggPm[colPm]);
|
|
keysAggUm.push_back(retKey);
|
|
csNumAggUm.push_back(8);
|
|
}
|
|
else
|
|
{
|
|
// leave the count() to avg
|
|
aggOp = ROWAGG_COUNT_NO_OP;
|
|
|
|
colPm++;
|
|
oidsAggUm.push_back(oidsAggPm[colPm]);
|
|
keysAggUm.push_back(retKey);
|
|
scaleAggUm.push_back(0);
|
|
precisionAggUm.push_back(19);
|
|
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggUm.push_back(8);
|
|
widthAggUm.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
}
|
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
|
jobInfo.expressionVec.end())
|
|
{
|
|
// a function on aggregation
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggUm.push_back(ti.oid);
|
|
keysAggUm.push_back(retKey);
|
|
scaleAggUm.push_back(ti.scale);
|
|
precisionAggUm.push_back(ti.precision);
|
|
typeAggUm.push_back(ti.dtype);
|
|
csNumAggUm.push_back(ti.csNum);
|
|
widthAggUm.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
|
{
|
|
// an window function
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggUm.push_back(ti.oid);
|
|
keysAggUm.push_back(retKey);
|
|
scaleAggUm.push_back(ti.scale);
|
|
precisionAggUm.push_back(ti.precision);
|
|
typeAggUm.push_back(ti.dtype);
|
|
csNumAggUm.push_back(ti.csNum);
|
|
widthAggUm.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
else if (aggOp == ROWAGG_CONSTANT)
|
|
{
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggUm.push_back(ti.oid);
|
|
keysAggUm.push_back(retKey);
|
|
scaleAggUm.push_back(ti.scale);
|
|
precisionAggUm.push_back(ti.precision);
|
|
typeAggUm.push_back(ti.dtype);
|
|
csNumAggUm.push_back(ti.csNum);
|
|
widthAggUm.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
|
|
if (returnColMissing)
|
|
{
|
|
Message::Args args;
|
|
args.add(keyName(outIdx, retKey, jobInfo));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
|
cerr << "prep2PhasesAggregate: " << emsg
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
|
<< endl;
|
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
|
}
|
|
}
|
|
}
|
|
|
|
// update groupby vector if the groupby column is a returned column
|
|
if (returnedColVec[i].second == 0)
|
|
{
|
|
int dupGroupbyIndex = -1;
|
|
|
|
for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
|
|
{
|
|
if (jobInfo.groupByColVec[j] == retKey)
|
|
{
|
|
if (groupByUm[j]->fOutputColumnIndex == (uint32_t)-1)
|
|
groupByUm[j]->fOutputColumnIndex = outIdx;
|
|
else
|
|
dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
|
|
}
|
|
}
|
|
|
|
for (uint64_t j = 0; j < jobInfo.distinctColVec.size(); j++)
|
|
{
|
|
if (jobInfo.distinctColVec[j] == retKey)
|
|
{
|
|
if (groupByUm[j]->fOutputColumnIndex == (uint32_t)-1)
|
|
groupByUm[j]->fOutputColumnIndex = outIdx;
|
|
else
|
|
dupGroupbyIndex = groupByUm[j]->fOutputColumnIndex;
|
|
}
|
|
}
|
|
|
|
// a duplicate group by column
|
|
if (dupGroupbyIndex != -1)
|
|
{
|
|
functionVecUm.push_back(SP_ROWAGG_FUNC_t(
|
|
new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// update the aggregate function vector
|
|
SP_ROWAGG_FUNC_t funct;
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, outIdx));
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, outIdx));
|
|
}
|
|
|
|
if (aggOp == ROWAGG_COUNT_NO_OP)
|
|
funct->fAuxColumnIndex = colPm;
|
|
else if (aggOp == ROWAGG_CONSTANT)
|
|
funct->fAuxColumnIndex = jobInfo.cntStarPos;
|
|
|
|
functionVecUm.push_back(funct);
|
|
|
|
// find if this func is a duplicate
|
|
AGG_MAP::iterator iter = aggDupFuncMap.find(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (iter != aggDupFuncMap.end())
|
|
{
|
|
if (funct->fAggFunction == ROWAGG_AVG)
|
|
funct->fAggFunction = ROWAGG_DUP_AVG;
|
|
else if (funct->fAggFunction == ROWAGG_STATS)
|
|
funct->fAggFunction = ROWAGG_DUP_STATS;
|
|
else if (funct->fAggFunction == ROWAGG_UDAF)
|
|
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
|
else
|
|
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
|
|
|
funct->fAuxColumnIndex = iter->second;
|
|
}
|
|
else
|
|
{
|
|
aggDupFuncMap.insert(make_pair(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
funct->fOutputColumnIndex));
|
|
}
|
|
|
|
if (returnedColVec[i].second == AggregateColumn::AVG)
|
|
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
|
|
}
|
|
++outIdx;
|
|
}
|
|
|
|
// now fix the AVG function, locate the count(column) position
|
|
for (uint64_t i = 0; i < functionVecUm.size(); i++)
|
|
{
|
|
if (functionVecUm[i]->fAggFunction != ROWAGG_COUNT_NO_OP)
|
|
continue;
|
|
|
|
// if the count(k) can be associated with an avg(k)
|
|
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
|
|
avgFuncMap.find(keysAggUm[functionVecUm[i]->fOutputColumnIndex]);
|
|
|
|
if (k != avgFuncMap.end())
|
|
k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
|
|
}
|
|
|
|
// there is avg(k), but no count(k) in the select list
|
|
uint64_t lastCol = outIdx;
|
|
|
|
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
|
|
{
|
|
if (k->second->fAuxColumnIndex == (uint32_t)-1)
|
|
{
|
|
k->second->fAuxColumnIndex = lastCol++;
|
|
oidsAggUm.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
|
|
keysAggUm.push_back(k->first);
|
|
scaleAggUm.push_back(0);
|
|
precisionAggUm.push_back(19);
|
|
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggUm.push_back(8);
|
|
widthAggUm.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
|
|
// add auxiliary fields for UDAF and statistics functions
|
|
for (uint64_t i = 0; i < functionVecUm.size(); i++)
|
|
{
|
|
uint64_t j = functionVecUm[i]->fInputColumnIndex;
|
|
|
|
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
// Column for index of UDAF UserData struct
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(4)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
|
|
functionVecUm[i]->fAuxColumnIndex = lastCol++;
|
|
oidsAggUm.push_back(oidsAggPm[j]); // Dummy?
|
|
keysAggUm.push_back(keysAggPm[j]); // Dummy?
|
|
scaleAggUm.push_back(0);
|
|
precisionAggUm.push_back(0);
|
|
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggUm.push_back(8);
|
|
widthAggUm.push_back(bigUintWidth);
|
|
continue;
|
|
}
|
|
|
|
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
|
|
continue;
|
|
|
|
functionVecUm[i]->fAuxColumnIndex = lastCol;
|
|
|
|
// mean(x)
|
|
oidsAggUm.push_back(oidsAggPm[j]);
|
|
keysAggUm.push_back(keysAggPm[j]);
|
|
scaleAggUm.push_back(0);
|
|
precisionAggUm.push_back(-1);
|
|
typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggUm.push_back(8);
|
|
widthAggUm.push_back(sizeof(long double));
|
|
++lastCol;
|
|
|
|
// sum(x_i - mean)^2
|
|
oidsAggUm.push_back(oidsAggPm[j]);
|
|
keysAggUm.push_back(keysAggPm[j]);
|
|
scaleAggUm.push_back(0);
|
|
precisionAggUm.push_back(-1);
|
|
typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggUm.push_back(8);
|
|
widthAggUm.push_back(sizeof(long double));
|
|
++lastCol;
|
|
}
|
|
}
|
|
|
|
// calculate the offset and create the rowaggregations, rowgroups
|
|
posAggUm.push_back(2); // rid
|
|
|
|
for (uint64_t i = 0; i < oidsAggUm.size(); i++)
|
|
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
|
|
|
|
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
|
|
precisionAggUm, jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_UM_t rowAggUm(
|
|
new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit, false));
|
|
rowAggUm->timeZone(jobInfo.timeZone);
|
|
rowgroups.push_back(aggRgUm);
|
|
aggregators.push_back(rowAggUm);
|
|
|
|
posAggPm.push_back(2); // rid
|
|
|
|
for (uint64_t i = 0; i < oidsAggPm.size(); i++)
|
|
posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
|
|
|
|
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm, csNumAggPm, scaleAggPm,
|
|
precisionAggPm, jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm, nullptr, nullptr, jobInfo.hasRollup));
|
|
rowAggPm->timeZone(jobInfo.timeZone);
|
|
rowgroups.push_back(aggRgPm);
|
|
aggregators.push_back(rowAggPm);
|
|
|
|
if (jobInfo.trace)
|
|
cout << "\n====== Aggregation RowGroups ======" << endl
|
|
<< "projected RG: " << projRG.toString() << endl
|
|
<< "aggregated1 RG: " << aggRgPm.toString() << endl
|
|
<< "aggregated2 RG: " << aggRgUm.toString() << endl;
|
|
}
|
|
|
|
void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector<RowGroup>& rowgroups,
|
|
vector<SP_ROWAGG_t>& aggregators)
|
|
{
|
|
// check if there are any aggregate columns
|
|
// a vector that has the aggregate function to be done by PM
|
|
vector<pair<uint32_t, int>> aggColVec, aggNoDistColVec;
|
|
set<uint32_t> avgSet, avgDistSet;
|
|
vector<std::pair<uint32_t, int>>& returnedColVec = jobInfo.returnedColVec;
|
|
// For UDAF
|
|
uint32_t projColsUDAFIdx = 0;
|
|
uint32_t udafcParamIdx = 0;
|
|
UDAFColumn* udafc = NULL;
|
|
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
|
|
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
// col should be an aggregate or groupBy or window function
|
|
uint32_t rtcKey = returnedColVec[i].first;
|
|
uint32_t rtcOp = returnedColVec[i].second;
|
|
|
|
if (rtcOp == 0 &&
|
|
find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), rtcKey) !=
|
|
jobInfo.distinctColVec.end() &&
|
|
find(jobInfo.groupByColVec.begin(), jobInfo.groupByColVec.end(), rtcKey) ==
|
|
jobInfo.groupByColVec.end() &&
|
|
jobInfo.windowSet.find(rtcKey) != jobInfo.windowSet.end())
|
|
{
|
|
Message::Args args;
|
|
args.add(keyName(i, rtcKey, jobInfo));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
|
cerr << "prep2PhasesDistinctAggregate: " << emsg
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[rtcKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[rtcKey].fTable
|
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[rtcKey].fView << endl;
|
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
|
}
|
|
|
|
// skip if not an aggregation column
|
|
if (returnedColVec[i].second == 0)
|
|
continue;
|
|
|
|
aggColVec.push_back(returnedColVec[i]);
|
|
|
|
// remember if a column has an average function,
|
|
// with avg function, no need for separate sum or count_column_name
|
|
if (returnedColVec[i].second == AggregateColumn::AVG)
|
|
avgSet.insert(returnedColVec[i].first);
|
|
|
|
if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
|
|
avgDistSet.insert(returnedColVec[i].first);
|
|
}
|
|
|
|
// populate the aggregate rowgroup on PM and UM
|
|
// PM: projectedRG -> aggregateRGPM
|
|
// UM: aggregateRGPM -> aggregateRGUM
|
|
//
|
|
// Aggregate preparation by joblist factory:
|
|
// 1. get projected rowgroup (done by doAggProject) -- input to PM AGG
|
|
// 2. construct aggregate rowgroup -- output of PM, input of UM
|
|
// 3. construct aggregate rowgroup -- output of UM
|
|
// 4. construct aggregate rowgroup -- output of distinct aggregates
|
|
|
|
const RowGroup projRG = rowgroups[0];
|
|
const vector<uint32_t>& oidsProj = projRG.getOIDs();
|
|
const vector<uint32_t>& keysProj = projRG.getKeys();
|
|
const vector<uint32_t>& scaleProj = projRG.getScale();
|
|
const vector<uint32_t>& precisionProj = projRG.getPrecision();
|
|
const vector<CalpontSystemCatalog::ColDataType>& typeProj = projRG.getColTypes();
|
|
const vector<uint32_t>& csNumProj = projRG.getCharsetNumbers();
|
|
|
|
vector<uint32_t> posAggPm, posAggUm, posAggDist;
|
|
vector<uint32_t> oidsAggPm, oidsAggUm, oidsAggDist;
|
|
vector<uint32_t> keysAggPm, keysAggUm, keysAggDist;
|
|
vector<uint32_t> scaleAggPm, scaleAggUm, scaleAggDist;
|
|
vector<uint32_t> precisionAggPm, precisionAggUm, precisionAggDist;
|
|
vector<CalpontSystemCatalog::ColDataType> typeAggPm, typeAggUm, typeAggDist;
|
|
vector<uint32_t> csNumAggPm, csNumAggUm, csNumAggDist;
|
|
vector<uint32_t> widthAggPm, widthAggUm, widthAggDist;
|
|
|
|
vector<SP_ROWAGG_GRPBY_t> groupByPm, groupByUm, groupByNoDist;
|
|
vector<SP_ROWAGG_FUNC_t> functionVecPm, functionNoDistVec, functionVecUm;
|
|
list<uint32_t> multiParmIndexes;
|
|
|
|
uint32_t bigIntWidth = sizeof(int64_t);
|
|
map<pair<uint32_t, int>, uint64_t> avgFuncDistMap;
|
|
AGG_MAP aggFuncMap;
|
|
|
|
// associate the columns between projected RG and aggregate RG on PM
|
|
// populated the aggregate columns
|
|
// the groupby columns are put in front, even not a returned column
|
|
// sum and count(column name) are omitted, if avg present
|
|
{
|
|
// project only unique oids, but they may be repeated in aggregation
|
|
// collect the projected column info, prepare for aggregation
|
|
vector<uint32_t> width;
|
|
map<uint32_t, int> projColPosMap;
|
|
|
|
for (uint64_t i = 0; i < keysProj.size(); i++)
|
|
{
|
|
projColPosMap.insert(make_pair(keysProj[i], i));
|
|
width.push_back(projRG.getColumnWidth(i));
|
|
}
|
|
|
|
// column index for PM aggregate rowgroup
|
|
uint64_t colAggPm = 0;
|
|
uint64_t multiParm = 0;
|
|
|
|
// for groupby column
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
uint32_t key = jobInfo.groupByColVec[i];
|
|
|
|
if (projColPosMap.find(key) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
|
|
cerr << "prep2PhasesDistinctAggregate: group " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
uint64_t colProj = projColPosMap[key];
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
|
|
groupByPm.push_back(groupby);
|
|
|
|
// PM: just copy down to aggregation rowgroup
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(key);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
|
|
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAggPm));
|
|
colAggPm++;
|
|
}
|
|
|
|
// for distinct column
|
|
for (uint64_t i = 0; i < jobInfo.distinctColVec.size(); i++)
|
|
{
|
|
uint32_t key = jobInfo.distinctColVec[i];
|
|
|
|
if (projColPosMap.find(key) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' isn't in tuple.";
|
|
cerr << "prep2PhasesDistinctAggregate: distinct " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[key].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
// check for dup distinct column -- @bug6126
|
|
if (find(keysAggPm.begin(), keysAggPm.end(), key) != keysAggPm.end())
|
|
continue;
|
|
|
|
uint64_t colProj = projColPosMap[key];
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(colProj, colAggPm));
|
|
groupByPm.push_back(groupby);
|
|
|
|
// PM: just copy down to aggregation rowgroup
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(key);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
|
|
aggFuncMap.insert(make_pair(boost::make_tuple(keysAggPm[colAggPm], 0, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAggPm));
|
|
colAggPm++;
|
|
}
|
|
|
|
// vectors for aggregate functions
|
|
RowAggFunctionType aggOp = ROWAGG_FUNCT_UNDEFINE;
|
|
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
|
|
for (uint64_t i = 0; i < aggColVec.size(); i++)
|
|
{
|
|
aggOp = functionIdMap(aggColVec[i].second);
|
|
|
|
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
|
|
if (aggOp != ROWAGG_MULTI_PARM)
|
|
prevAggOp = aggOp;
|
|
|
|
// skip on PM if this is a constant
|
|
if (aggOp == ROWAGG_CONSTANT)
|
|
continue;
|
|
|
|
pUDAFFunc = NULL;
|
|
uint32_t aggKey = aggColVec[i].first;
|
|
|
|
if (projColPosMap.find(aggKey) == projColPosMap.end())
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[aggKey] << "' isn't in tuple.";
|
|
cerr << "prep2PhasesDistinctAggregate: aggregate " << emsg.str()
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[aggKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fTable;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[aggKey].fView.length() > 0)
|
|
cerr << ", view=" << jobInfo.keyInfo->tupleKeyVec[aggKey].fView;
|
|
|
|
cerr << endl;
|
|
throw logic_error(emsg.str());
|
|
}
|
|
|
|
RowAggFunctionType stats = statsFuncIdMap(aggOp);
|
|
|
|
// skip sum / count(column) if avg is also selected
|
|
if ((aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME) && (avgSet.find(aggKey) != avgSet.end()))
|
|
continue;
|
|
|
|
// We skip distinct aggs, including extra parms. These are handled by adding them to group by list
|
|
// above.
|
|
if (aggOp == ROWAGG_DISTINCT_SUM || aggOp == ROWAGG_DISTINCT_AVG ||
|
|
aggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
|
|
continue;
|
|
if (aggOp == ROWAGG_MULTI_PARM && prevAggOp == ROWAGG_COUNT_DISTINCT_COL_NAME)
|
|
continue;
|
|
|
|
uint64_t colProj = projColPosMap[aggKey];
|
|
SP_ROWAGG_FUNC_t funct;
|
|
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
|
for (; it != jobInfo.projectionCols.end(); it++)
|
|
{
|
|
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
|
projColsUDAFIdx++;
|
|
if (udafc)
|
|
{
|
|
pUDAFFunc = udafc->getContext().getFunction();
|
|
// Save the multi-parm keys for dup-detection.
|
|
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
|
|
{
|
|
for (uint64_t k = i + 1;
|
|
k < aggColVec.size() && aggColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
|
|
{
|
|
udafc->getContext().getParamKeys()->push_back(aggColVec[k].first);
|
|
}
|
|
}
|
|
// Create a RowAggFunctionCol (UDAF subtype) with the context.
|
|
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
|
|
break;
|
|
}
|
|
}
|
|
if (it == jobInfo.projectionCols.end())
|
|
{
|
|
throw logic_error(
|
|
"(1)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough "
|
|
"UDAFColumns");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm));
|
|
}
|
|
|
|
// skip if this is a duplicate
|
|
if (aggOp != ROWAGG_UDAF && aggOp != ROWAGG_MULTI_PARM &&
|
|
aggFuncMap.find(boost::make_tuple(aggKey, aggOp, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL)) !=
|
|
aggFuncMap.end())
|
|
{
|
|
// skip if this is a duplicate
|
|
continue;
|
|
}
|
|
|
|
functionVecPm.push_back(funct);
|
|
aggFuncMap.insert(make_pair(
|
|
boost::make_tuple(aggKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
colAggPm - multiParm));
|
|
|
|
switch (aggOp)
|
|
{
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX:
|
|
case ROWAGG_SELECT_SOME:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
colAggPm++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_SUM:
|
|
case ROWAGG_AVG:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("sum/average");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
csNumAggPm.push_back(8);
|
|
wideDecimalOrLongDouble(colProj, typeProj[colProj], precisionProj, scaleProj, width, typeAggPm,
|
|
scaleAggPm, precisionAggPm, widthAggPm);
|
|
colAggPm++;
|
|
}
|
|
|
|
// PM: put the count column for avg next to the sum
|
|
// let fall through to add a count column for average function
|
|
if (aggOp == ROWAGG_AVG)
|
|
funct->fAuxColumnIndex = colAggPm;
|
|
else
|
|
break;
|
|
/* fall through */
|
|
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
// work around count() in select subquery
|
|
precisionAggPm.push_back(rowgroup::MagicPrecisionForCountAgg);
|
|
|
|
if (isUnsigned(typeProj[colProj]))
|
|
{
|
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
}
|
|
else
|
|
{
|
|
typeAggPm.push_back(CalpontSystemCatalog::BIGINT);
|
|
}
|
|
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(bigIntWidth);
|
|
colAggPm++;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
if (typeProj[colProj] == CalpontSystemCatalog::CHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::VARCHAR ||
|
|
typeProj[colProj] == CalpontSystemCatalog::BLOB ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TEXT ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATE ||
|
|
typeProj[colProj] == CalpontSystemCatalog::DATETIME ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeProj[colProj] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("variance/standard deviation");
|
|
args.add(colTypeIdString(typeProj[colProj]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep2PhasesDistinctAggregate:: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
// count(x)
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(0);
|
|
typeAggPm.push_back(CalpontSystemCatalog::DOUBLE);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(sizeof(double));
|
|
funct->fAuxColumnIndex = ++colAggPm;
|
|
|
|
// mean(x)
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(-1);
|
|
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(sizeof(long double));
|
|
++colAggPm;
|
|
|
|
// sum(x_i - mean)^2
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(-1);
|
|
typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(sizeof(long double));
|
|
++colAggPm;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(-16); // for connector to skip null check
|
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(bigIntWidth);
|
|
++colAggPm;
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funct.get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(2)prep2PhasesDistinctAggregate: A UDAF function is called but there's no "
|
|
"RowUDAFFunctionCol");
|
|
}
|
|
|
|
// Return column
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(udafFuncCol->fUDAFContext.getScale());
|
|
precisionAggPm.push_back(udafFuncCol->fUDAFContext.getPrecision());
|
|
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
|
|
csNumAggPm.push_back(udafFuncCol->fUDAFContext.getCharsetNumber());
|
|
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
|
++colAggPm;
|
|
// Column for index of UDAF UserData struct
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(0);
|
|
precisionAggPm.push_back(0);
|
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggPm.push_back(8);
|
|
widthAggPm.push_back(sizeof(uint64_t));
|
|
funct->fAuxColumnIndex = colAggPm++;
|
|
// If the first param is const
|
|
udafcParamIdx = 0;
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
|
if (cc)
|
|
{
|
|
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
|
}
|
|
++udafcParamIdx;
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_MULTI_PARM:
|
|
{
|
|
oidsAggPm.push_back(oidsProj[colProj]);
|
|
keysAggPm.push_back(aggKey);
|
|
scaleAggPm.push_back(scaleProj[colProj]);
|
|
precisionAggPm.push_back(precisionProj[colProj]);
|
|
typeAggPm.push_back(typeProj[colProj]);
|
|
csNumAggPm.push_back(csNumProj[colProj]);
|
|
widthAggPm.push_back(width[colProj]);
|
|
multiParmIndexes.push_back(colAggPm);
|
|
++colAggPm;
|
|
++multiParm;
|
|
// If the param is const
|
|
if (udafc)
|
|
{
|
|
if (udafcParamIdx > udafc->aggParms().size() - 1)
|
|
{
|
|
throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with too many parms",
|
|
aggregateFuncErr);
|
|
}
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(udafc->aggParms()[udafcParamIdx].get());
|
|
if (cc)
|
|
{
|
|
funct->fpConstCol = udafc->aggParms()[udafcParamIdx];
|
|
}
|
|
}
|
|
else if (prevAggOp != ROWAGG_COUNT_DISTINCT_COL_NAME)
|
|
{
|
|
throw QueryDataExcept("prep2PhasesDistinctAggregate: UDAF multi function with no parms",
|
|
aggregateFuncErr);
|
|
}
|
|
++udafcParamIdx;
|
|
}
|
|
break;
|
|
|
|
default:
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "aggregate function (" << (uint64_t)aggOp << ") isn't supported";
|
|
cerr << "prep2PhasesDistinctAggregate: " << emsg.str() << endl;
|
|
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// associate the columns between the aggregate RGs on PM and UM without distinct aggregator
|
|
// populated the returned columns
|
|
{
|
|
int64_t multiParms = 0;
|
|
|
|
for (uint32_t idx = 0; idx < groupByPm.size(); idx++)
|
|
{
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(idx, idx));
|
|
groupByUm.push_back(groupby);
|
|
}
|
|
|
|
for (uint32_t idx = 0; idx < functionVecPm.size(); idx++)
|
|
{
|
|
SP_ROWAGG_FUNC_t funct;
|
|
SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx];
|
|
|
|
if (funcPm->fAggFunction == ROWAGG_MULTI_PARM)
|
|
{
|
|
// Skip on UM: Extra parms for an aggregate have no work on the UM
|
|
++multiParms;
|
|
continue;
|
|
}
|
|
|
|
if (funcPm->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get());
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fOutputColumnIndex,
|
|
udafFuncCol->fOutputColumnIndex - multiParms,
|
|
udafFuncCol->fAuxColumnIndex - multiParms));
|
|
functionNoDistVec.push_back(funct);
|
|
pUDAFFunc = udafFuncCol->fUDAFContext.getFunction();
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(funcPm->fAggFunction, funcPm->fStatsFunction,
|
|
funcPm->fOutputColumnIndex, funcPm->fOutputColumnIndex - multiParms,
|
|
funcPm->fAuxColumnIndex - multiParms));
|
|
functionNoDistVec.push_back(funct);
|
|
pUDAFFunc = NULL;
|
|
}
|
|
}
|
|
|
|
// Copy over the PM arrays to the UM. Skip any that are a multi-parm entry.
|
|
for (uint32_t idx = 0; idx < oidsAggPm.size(); ++idx)
|
|
{
|
|
if (find(multiParmIndexes.begin(), multiParmIndexes.end(), idx) != multiParmIndexes.end())
|
|
{
|
|
continue;
|
|
}
|
|
oidsAggUm.push_back(oidsAggPm[idx]);
|
|
keysAggUm.push_back(keysAggPm[idx]);
|
|
scaleAggUm.push_back(scaleAggPm[idx]);
|
|
precisionAggUm.push_back(precisionAggPm[idx]);
|
|
widthAggUm.push_back(widthAggPm[idx]);
|
|
typeAggUm.push_back(typeAggPm[idx]);
|
|
csNumAggUm.push_back(csNumAggPm[idx]);
|
|
}
|
|
}
|
|
|
|
// associate the columns between the aggregate RGs on UM and Distinct
|
|
// populated the returned columns
|
|
// remove not returned groupby column
|
|
// add back sum or count(column name) if omitted due to avg column
|
|
// put count(column name) column to the end, if it is for avg only
|
|
{
|
|
// Keep a count of the parms after the first for any aggregate.
|
|
// These will be skipped and the count needs to be subtracted
|
|
// from where the aux column will be.
|
|
projColsUDAFIdx = 0;
|
|
// check if the count column for AVG is also a returned column,
|
|
// if so, replace the "-1" to actual position in returned vec.
|
|
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
|
|
AGG_MAP aggDupFuncMap;
|
|
|
|
// copy over the groupby vector
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(i, -1));
|
|
groupByNoDist.push_back(groupby);
|
|
}
|
|
|
|
// locate the return column position in aggregated rowgroup from PM
|
|
// outIdx is i without the multi-columns,
|
|
uint64_t outIdx = 0;
|
|
RowAggFunctionType prevAggOp = ROWAGG_FUNCT_UNDEFINE;
|
|
uint32_t prevRetKey = 0;
|
|
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
pUDAFFunc = NULL;
|
|
udafc = NULL;
|
|
uint32_t retKey = returnedColVec[i].first;
|
|
|
|
RowAggFunctionType aggOp = functionIdMap(returnedColVec[i].second);
|
|
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
|
|
int colUm = -1;
|
|
|
|
if (aggOp == ROWAGG_MULTI_PARM)
|
|
{
|
|
// Duplicate detection doesn't work for multi-parm`
|
|
|
|
// If this function was earlier detected as a duplicate, unduplicate it.
|
|
SP_ROWAGG_FUNC_t funct = functionVecUm.back();
|
|
if (funct->fAggFunction == ROWAGG_DUP_FUNCT)
|
|
funct->fAggFunction = prevAggOp;
|
|
|
|
// Remove it from aggDupFuncMap if it's in there.
|
|
funct->hasMultiParm = true;
|
|
AGG_MAP::iterator it = aggDupFuncMap.find(boost::make_tuple(
|
|
prevRetKey, prevAggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
if (it != aggDupFuncMap.end())
|
|
{
|
|
aggDupFuncMap.erase(it);
|
|
}
|
|
// Skip further UM porocessing of the multi-parm: Extra parms for an aggregate have no work on the UM
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
// Save the op for MULTI_PARM exclusion when COUNT(DISTINCT)
|
|
prevAggOp = aggOp;
|
|
prevRetKey = returnedColVec[i].first;
|
|
}
|
|
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx;
|
|
for (; it != jobInfo.projectionCols.end(); it++)
|
|
{
|
|
udafc = dynamic_cast<UDAFColumn*>((*it).get());
|
|
projColsUDAFIdx++;
|
|
if (udafc)
|
|
{
|
|
pUDAFFunc = udafc->getContext().getFunction();
|
|
// Save the multi-parm keys for dup-detection.
|
|
if (pUDAFFunc && udafc->getContext().getParamKeys()->size() == 0)
|
|
{
|
|
for (uint64_t k = i + 1;
|
|
k < returnedColVec.size() && returnedColVec[k].second == AggregateColumn::MULTI_PARM; ++k)
|
|
{
|
|
udafc->getContext().getParamKeys()->push_back(returnedColVec[k].first);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
if (it == jobInfo.projectionCols.end())
|
|
{
|
|
throw logic_error(
|
|
"(4)prep2PhasesDistinctAggregate: A UDAF function is called but there\'s not enough "
|
|
"UDAFColumns");
|
|
}
|
|
}
|
|
|
|
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
|
|
jobInfo.distinctColVec.end())
|
|
{
|
|
AGG_MAP::iterator it = aggFuncMap.find(
|
|
boost::make_tuple(retKey, 0, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
colUm = it->second;
|
|
}
|
|
}
|
|
|
|
if (colUm > -1) // Means we found a DISTINCT and have a column number
|
|
{
|
|
switch (aggOp)
|
|
{
|
|
case ROWAGG_DISTINCT_AVG:
|
|
|
|
// avgFuncMap.insert(make_pair(key, funct));
|
|
case ROWAGG_DISTINCT_SUM:
|
|
{
|
|
if (typeAggUm[colUm] == CalpontSystemCatalog::CHAR ||
|
|
typeAggUm[colUm] == CalpontSystemCatalog::VARCHAR ||
|
|
typeAggUm[colUm] == CalpontSystemCatalog::BLOB ||
|
|
typeAggUm[colUm] == CalpontSystemCatalog::TEXT ||
|
|
typeAggUm[colUm] == CalpontSystemCatalog::DATE ||
|
|
typeAggUm[colUm] == CalpontSystemCatalog::DATETIME ||
|
|
typeAggUm[colUm] == CalpontSystemCatalog::TIMESTAMP ||
|
|
typeAggUm[colUm] == CalpontSystemCatalog::TIME)
|
|
{
|
|
Message::Args args;
|
|
args.add("sum/average");
|
|
args.add(colTypeIdString(typeAggUm[colUm]));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATE_TYPE_NOT_SUPPORT, args);
|
|
cerr << "prep2PhasesDistinctAggregate: " << emsg << endl;
|
|
throw IDBExcept(emsg, ERR_AGGREGATE_TYPE_NOT_SUPPORT);
|
|
}
|
|
|
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
|
keysAggDist.push_back(retKey);
|
|
csNumAggDist.push_back(8);
|
|
wideDecimalOrLongDouble(colUm, typeAggPm[colUm], precisionAggPm, scaleAggPm, widthAggPm,
|
|
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
|
|
}
|
|
// PM: put the count column for avg next to the sum
|
|
// let fall through to add a count column for average function
|
|
// if (aggOp != ROWAGG_DISTINCT_AVG)
|
|
break;
|
|
|
|
case ROWAGG_COUNT_DISTINCT_COL_NAME:
|
|
{
|
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(0);
|
|
// work around count() in select subquery
|
|
precisionAggDist.push_back(rowgroup::MagicPrecisionForCountAgg);
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
break;
|
|
|
|
default:
|
|
// could happen if agg and agg distinct use same column.
|
|
colUm = -1;
|
|
break;
|
|
} // switch
|
|
}
|
|
// For non distinct aggregates
|
|
if (colUm == -1)
|
|
{
|
|
AGG_MAP::iterator it = aggFuncMap.find(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
colUm = it->second;
|
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
|
keysAggDist.push_back(keysAggUm[colUm]);
|
|
scaleAggDist.push_back(scaleAggUm[colUm]);
|
|
precisionAggDist.push_back(precisionAggUm[colUm]);
|
|
typeAggDist.push_back(typeAggUm[colUm]);
|
|
csNumAggDist.push_back(csNumAggUm[colUm]);
|
|
widthAggDist.push_back(widthAggUm[colUm]);
|
|
}
|
|
|
|
// not a direct hit -- a returned column is not already in the RG from PMs
|
|
else
|
|
{
|
|
// MCOL-5476.
|
|
uint32_t foundTupleKey{0};
|
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
|
|
{
|
|
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
|
|
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
colUm = it->second;
|
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
|
keysAggDist.push_back(keysAggUm[colUm]);
|
|
scaleAggDist.push_back(scaleAggUm[colUm]);
|
|
precisionAggDist.push_back(precisionAggUm[colUm]);
|
|
typeAggDist.push_back(typeAggUm[colUm]);
|
|
csNumAggDist.push_back(csNumAggUm[colUm]);
|
|
widthAggDist.push_back(widthAggUm[colUm]);
|
|
// Update the `retKey` to specify that this column is a duplicate.
|
|
retKey = foundTupleKey;
|
|
}
|
|
else
|
|
{
|
|
// here
|
|
bool returnColMissing = true;
|
|
|
|
// check if a SUM or COUNT covered by AVG
|
|
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
|
{
|
|
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
|
udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (it != aggFuncMap.end())
|
|
{
|
|
// false alarm
|
|
returnColMissing = false;
|
|
|
|
colUm = it->second;
|
|
|
|
if (aggOp == ROWAGG_SUM)
|
|
{
|
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
|
keysAggDist.push_back(retKey);
|
|
csNumAggDist.push_back(8);
|
|
wideDecimalOrLongDouble(colUm, typeAggUm[colUm], precisionAggUm, scaleAggUm, widthAggUm,
|
|
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
|
|
}
|
|
else
|
|
{
|
|
// leave the count() to avg
|
|
aggOp = ROWAGG_COUNT_NO_OP;
|
|
|
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(0);
|
|
if (isUnsigned(typeAggUm[colUm]))
|
|
{
|
|
precisionAggDist.push_back(20);
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
}
|
|
else
|
|
{
|
|
precisionAggDist.push_back(19);
|
|
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
|
}
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
}
|
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
|
jobInfo.expressionVec.end())
|
|
{
|
|
// a function on aggregation
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggDist.push_back(ti.oid);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(ti.scale);
|
|
precisionAggDist.push_back(ti.precision);
|
|
typeAggDist.push_back(ti.dtype);
|
|
csNumAggDist.push_back(ti.csNum);
|
|
widthAggDist.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
|
{
|
|
// a window function
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggDist.push_back(ti.oid);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(ti.scale);
|
|
precisionAggDist.push_back(ti.precision);
|
|
typeAggDist.push_back(ti.dtype);
|
|
csNumAggDist.push_back(ti.csNum);
|
|
widthAggDist.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
else if (aggOp == ROWAGG_CONSTANT)
|
|
{
|
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
|
oidsAggDist.push_back(ti.oid);
|
|
keysAggDist.push_back(retKey);
|
|
scaleAggDist.push_back(ti.scale);
|
|
precisionAggDist.push_back(ti.precision);
|
|
typeAggDist.push_back(ti.dtype);
|
|
csNumAggDist.push_back(ti.csNum);
|
|
widthAggDist.push_back(ti.width);
|
|
|
|
returnColMissing = false;
|
|
}
|
|
|
|
if (returnColMissing)
|
|
{
|
|
Message::Args args;
|
|
args.add(keyName(outIdx, retKey, jobInfo));
|
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
|
cerr << "prep2PhasesDistinctAggregate: " << emsg
|
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
|
<< endl;
|
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
|
}
|
|
} // else not a direct hit
|
|
}
|
|
} // else not a DISTINCT
|
|
|
|
// update groupby vector if the groupby column is a returned column
|
|
if (returnedColVec[i].second == 0)
|
|
{
|
|
int dupGroupbyIndex = -1;
|
|
|
|
for (uint64_t j = 0; j < jobInfo.groupByColVec.size(); j++)
|
|
{
|
|
if (jobInfo.groupByColVec[j] == retKey)
|
|
{
|
|
if (groupByNoDist[j]->fOutputColumnIndex == (uint32_t)-1)
|
|
groupByNoDist[j]->fOutputColumnIndex = outIdx;
|
|
else
|
|
dupGroupbyIndex = groupByNoDist[j]->fOutputColumnIndex;
|
|
}
|
|
}
|
|
|
|
// a duplicate group by column
|
|
if (dupGroupbyIndex != -1)
|
|
functionVecUm.push_back(SP_ROWAGG_FUNC_t(
|
|
new RowAggFunctionCol(ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, outIdx, dupGroupbyIndex)));
|
|
}
|
|
else
|
|
{
|
|
// update the aggregate function vector
|
|
SP_ROWAGG_FUNC_t funct;
|
|
if (aggOp == ROWAGG_UDAF)
|
|
{
|
|
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, outIdx));
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, outIdx));
|
|
}
|
|
|
|
if (aggOp == ROWAGG_COUNT_NO_OP)
|
|
funct->fAuxColumnIndex = colUm;
|
|
else if (aggOp == ROWAGG_CONSTANT)
|
|
funct->fAuxColumnIndex = jobInfo.cntStarPos;
|
|
|
|
functionVecUm.push_back(funct);
|
|
|
|
// find if this func is a duplicate
|
|
AGG_MAP::iterator iter = aggDupFuncMap.find(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
|
|
|
if (iter != aggDupFuncMap.end())
|
|
{
|
|
if (funct->fAggFunction == ROWAGG_AVG)
|
|
funct->fAggFunction = ROWAGG_DUP_AVG;
|
|
else if (funct->fAggFunction == ROWAGG_STATS)
|
|
funct->fAggFunction = ROWAGG_DUP_STATS;
|
|
else if (funct->fAggFunction == ROWAGG_UDAF)
|
|
funct->fAggFunction = ROWAGG_DUP_UDAF;
|
|
else
|
|
funct->fAggFunction = ROWAGG_DUP_FUNCT;
|
|
|
|
funct->fAuxColumnIndex = iter->second;
|
|
}
|
|
else
|
|
{
|
|
aggDupFuncMap.insert(make_pair(
|
|
boost::make_tuple(retKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL),
|
|
funct->fOutputColumnIndex));
|
|
}
|
|
|
|
if (returnedColVec[i].second == AggregateColumn::AVG)
|
|
avgFuncMap.insert(make_pair(returnedColVec[i].first, funct));
|
|
else if (returnedColVec[i].second == AggregateColumn::DISTINCT_AVG)
|
|
avgDistFuncMap.insert(make_pair(returnedColVec[i].first, funct));
|
|
}
|
|
++outIdx;
|
|
} // for (i
|
|
|
|
// now fix the AVG function, locate the count(column) position
|
|
for (uint64_t i = 0; i < functionVecUm.size(); i++)
|
|
{
|
|
// if the count(k) can be associated with an avg(k)
|
|
if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_NO_OP)
|
|
{
|
|
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
|
|
avgFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]);
|
|
|
|
if (k != avgFuncMap.end())
|
|
k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
|
|
}
|
|
}
|
|
|
|
// there is avg(k), but no count(k) in the select list
|
|
uint64_t lastCol = outIdx;
|
|
|
|
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
|
|
{
|
|
if (k->second->fAuxColumnIndex == (uint32_t)-1)
|
|
{
|
|
k->second->fAuxColumnIndex = lastCol++;
|
|
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
|
|
keysAggDist.push_back(k->first);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(19);
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
|
|
// distinct avg
|
|
for (uint64_t i = 0; i < functionVecUm.size(); i++)
|
|
{
|
|
if (functionVecUm[i]->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME && !functionVecUm[i]->hasMultiParm)
|
|
{
|
|
map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k =
|
|
avgDistFuncMap.find(keysAggDist[functionVecUm[i]->fOutputColumnIndex]);
|
|
|
|
if (k != avgDistFuncMap.end())
|
|
{
|
|
k->second->fAuxColumnIndex = functionVecUm[i]->fOutputColumnIndex;
|
|
functionVecUm[i]->fAggFunction = ROWAGG_COUNT_NO_OP;
|
|
}
|
|
}
|
|
}
|
|
|
|
// there is avg(distinct k), but no count(distinct k) in the select list
|
|
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgDistFuncMap.begin(); k != avgDistFuncMap.end(); k++)
|
|
{
|
|
// find count(distinct k) or add it
|
|
if (k->second->fAuxColumnIndex == (uint32_t)-1)
|
|
{
|
|
k->second->fAuxColumnIndex = lastCol++;
|
|
oidsAggDist.push_back(jobInfo.keyInfo->tupleKeyVec[k->first].fId);
|
|
keysAggDist.push_back(k->first);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(19);
|
|
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(bigIntWidth);
|
|
}
|
|
}
|
|
|
|
// add auxiliary fields for UDAF and statistics functions
|
|
for (uint64_t i = 0; i < functionVecUm.size(); i++)
|
|
{
|
|
uint64_t j = functionVecUm[i]->fInputColumnIndex;
|
|
|
|
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
// Column for index of UDAF UserData struct
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
|
|
|
|
if (!udafFuncCol)
|
|
{
|
|
throw logic_error(
|
|
"(5)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
|
|
}
|
|
|
|
functionVecUm[i]->fAuxColumnIndex = lastCol++;
|
|
oidsAggDist.push_back(oidsAggPm[j]); // Dummy?
|
|
keysAggDist.push_back(keysAggPm[j]); // Dummy?
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(0);
|
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(sizeof(uint64_t));
|
|
continue;
|
|
}
|
|
|
|
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
|
|
continue;
|
|
|
|
functionVecUm[i]->fAuxColumnIndex = lastCol;
|
|
|
|
// mean(x)
|
|
oidsAggDist.push_back(oidsAggPm[j]);
|
|
keysAggDist.push_back(keysAggPm[j]);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(-1);
|
|
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(sizeof(long double));
|
|
++lastCol;
|
|
|
|
// sum(x_i - mean)^2
|
|
oidsAggDist.push_back(oidsAggPm[j]);
|
|
keysAggDist.push_back(keysAggPm[j]);
|
|
scaleAggDist.push_back(0);
|
|
precisionAggDist.push_back(-1);
|
|
typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE);
|
|
csNumAggDist.push_back(8);
|
|
widthAggDist.push_back(sizeof(long double));
|
|
++lastCol;
|
|
}
|
|
}
|
|
|
|
// calculate the offset and create the rowaggregations, rowgroups
|
|
posAggUm.push_back(2); // rid
|
|
|
|
for (uint64_t i = 0; i < oidsAggUm.size(); i++)
|
|
posAggUm.push_back(posAggUm[i] + widthAggUm[i]);
|
|
|
|
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
|
|
precisionAggUm, jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_UM_t rowAggUm(
|
|
new RowAggregationUMP2(groupByUm, functionNoDistVec, jobInfo.rm, jobInfo.umMemLimit, false));
|
|
rowAggUm->timeZone(jobInfo.timeZone);
|
|
|
|
posAggDist.push_back(2); // rid
|
|
|
|
for (uint64_t i = 0; i < oidsAggDist.size(); i++)
|
|
posAggDist.push_back(posAggDist[i] + widthAggDist[i]);
|
|
|
|
RowGroup aggRgDist(oidsAggDist.size(), posAggDist, oidsAggDist, keysAggDist, typeAggDist, csNumAggDist,
|
|
scaleAggDist, precisionAggDist, jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_DIST rowAggDist(
|
|
new RowAggregationDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit));
|
|
rowAggDist->timeZone(jobInfo.timeZone);
|
|
|
|
// if distinct key word applied to more than one aggregate column, reset rowAggDist
|
|
vector<RowGroup> subRgVec;
|
|
|
|
if (jobInfo.distinctColVec.size() > 1)
|
|
{
|
|
RowAggregationMultiDistinct* multiDistinctAggregator =
|
|
new RowAggregationMultiDistinct(groupByNoDist, functionVecUm, jobInfo.rm, jobInfo.umMemLimit);
|
|
multiDistinctAggregator->timeZone(jobInfo.timeZone);
|
|
rowAggDist.reset(multiDistinctAggregator);
|
|
|
|
// construct and add sub-aggregators to rowAggDist
|
|
vector<uint32_t> posAggGb, posAggSub;
|
|
vector<uint32_t> oidsAggGb, oidsAggSub;
|
|
vector<uint32_t> keysAggGb, keysAggSub;
|
|
vector<uint32_t> scaleAggGb, scaleAggSub;
|
|
vector<uint32_t> precisionAggGb, precisionAggSub;
|
|
vector<CalpontSystemCatalog::ColDataType> typeAggGb, typeAggSub;
|
|
vector<uint32_t> csNumAggGb, csNumAggSub;
|
|
vector<uint32_t> widthAggGb, widthAggSub;
|
|
|
|
// populate groupby column info
|
|
for (uint64_t i = 0; i < jobInfo.groupByColVec.size(); i++)
|
|
{
|
|
oidsAggGb.push_back(oidsAggUm[i]);
|
|
keysAggGb.push_back(keysAggUm[i]);
|
|
scaleAggGb.push_back(scaleAggUm[i]);
|
|
precisionAggGb.push_back(precisionAggUm[i]);
|
|
typeAggGb.push_back(typeAggUm[i]);
|
|
csNumAggGb.push_back(csNumAggUm[i]);
|
|
widthAggGb.push_back(widthAggUm[i]);
|
|
}
|
|
|
|
// for distinct, each column requires a seperate rowgroup
|
|
vector<SP_ROWAGG_DIST> rowAggSubDistVec;
|
|
|
|
uint32_t distinctColKey;
|
|
int64_t j;
|
|
uint64_t k;
|
|
uint64_t outIdx = 0;
|
|
for (uint64_t i = 0; i < returnedColVec.size(); i++)
|
|
{
|
|
if (returnedColVec[i].second == 0)
|
|
{
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
|
|
j = -1;
|
|
|
|
distinctColKey = -1;
|
|
// Find the entry in distinctColVec, if any
|
|
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
|
|
{
|
|
distinctColKey = jobInfo.distinctColVec[k];
|
|
if (returnedColVec[i].first == distinctColKey)
|
|
break;
|
|
}
|
|
if (distinctColKey == (uint32_t)-1)
|
|
{
|
|
++outIdx;
|
|
continue;
|
|
}
|
|
// locate the distinct key in the row group
|
|
for (k = 0; k < keysAggUm.size(); k++)
|
|
{
|
|
if (keysAggUm[k] == distinctColKey)
|
|
{
|
|
j = k;
|
|
break;
|
|
}
|
|
}
|
|
|
|
idbassert(j != -1);
|
|
|
|
oidsAggSub = oidsAggGb;
|
|
keysAggSub = keysAggGb;
|
|
scaleAggSub = scaleAggGb;
|
|
precisionAggSub = precisionAggGb;
|
|
typeAggSub = typeAggGb;
|
|
csNumAggSub = csNumAggGb;
|
|
widthAggSub = widthAggGb;
|
|
|
|
oidsAggSub.push_back(oidsAggUm[j]);
|
|
keysAggSub.push_back(keysAggUm[j]);
|
|
scaleAggSub.push_back(scaleAggUm[j]);
|
|
precisionAggSub.push_back(precisionAggUm[j]);
|
|
typeAggSub.push_back(typeAggUm[j]);
|
|
csNumAggSub.push_back(csNumAggUm[j]);
|
|
widthAggSub.push_back(widthAggUm[j]);
|
|
|
|
// construct groupby vector
|
|
vector<SP_ROWAGG_GRPBY_t> groupBySub;
|
|
k = 0;
|
|
|
|
while (k < jobInfo.groupByColVec.size())
|
|
{
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(k, k));
|
|
groupBySub.push_back(groupby);
|
|
k++;
|
|
}
|
|
// add the distinct column as groupby
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
|
|
groupBySub.push_back(groupby);
|
|
|
|
// Add multi parm distinct
|
|
while ((i + 1) < returnedColVec.size() &&
|
|
functionIdMap(returnedColVec[i + 1].second) == ROWAGG_MULTI_PARM)
|
|
{
|
|
++i;
|
|
uint32_t dColKey = -1;
|
|
j = -1;
|
|
|
|
// Find the entry in distinctColVec, if any
|
|
for (k = 0; k < jobInfo.distinctColVec.size(); k++)
|
|
{
|
|
dColKey = jobInfo.distinctColVec[k];
|
|
if (returnedColVec[i].first == dColKey)
|
|
break;
|
|
}
|
|
idbassert(dColKey != (uint32_t)-1);
|
|
// locate the distinct key in the row group
|
|
for (k = 0; k < keysAggUm.size(); k++)
|
|
{
|
|
if (keysAggUm[k] == dColKey)
|
|
{
|
|
j = k;
|
|
break;
|
|
}
|
|
}
|
|
idbassert(j != -1);
|
|
|
|
oidsAggSub.push_back(oidsAggUm[j]);
|
|
keysAggSub.push_back(keysAggUm[j]);
|
|
scaleAggSub.push_back(scaleAggUm[j]);
|
|
precisionAggSub.push_back(precisionAggUm[j]);
|
|
typeAggSub.push_back(typeAggUm[j]);
|
|
csNumAggSub.push_back(csNumAggUm[j]);
|
|
widthAggSub.push_back(widthAggUm[j]);
|
|
|
|
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
|
|
groupBySub.push_back(groupby);
|
|
}
|
|
|
|
// construct sub-rowgroup
|
|
posAggSub.clear();
|
|
posAggSub.push_back(2); // rid
|
|
|
|
for (k = 0; k < oidsAggSub.size(); k++)
|
|
posAggSub.push_back(posAggSub[k] + widthAggSub[k]);
|
|
|
|
RowGroup subRg(oidsAggSub.size(), posAggSub, oidsAggSub, keysAggSub, typeAggSub, csNumAggSub,
|
|
scaleAggSub, precisionAggSub, jobInfo.stringTableThreshold);
|
|
subRgVec.push_back(subRg);
|
|
|
|
// Keep a count of the parms after the first for any aggregate.
|
|
// These will be skipped and the count needs to be subtracted
|
|
// from where the aux column will be.
|
|
int64_t multiParms = 0;
|
|
|
|
// tricky part : 2 function vectors
|
|
// -- dummy function vector for sub-aggregator, which does distinct only
|
|
// -- aggregate function on this distinct column for rowAggDist
|
|
vector<SP_ROWAGG_FUNC_t> functionSub1, functionSub2;
|
|
// search the function in functionVec
|
|
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
|
|
|
|
while (it != functionVecUm.end())
|
|
{
|
|
SP_ROWAGG_FUNC_t f = *it++;
|
|
|
|
if ((f->fOutputColumnIndex == outIdx) &&
|
|
(f->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME || f->fAggFunction == ROWAGG_DISTINCT_SUM ||
|
|
f->fAggFunction == ROWAGG_DISTINCT_AVG))
|
|
{
|
|
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction,
|
|
groupBySub.size() - 1, f->fOutputColumnIndex,
|
|
f->fAuxColumnIndex - multiParms));
|
|
functionSub2.push_back(funct);
|
|
}
|
|
}
|
|
|
|
// construct sub-aggregator
|
|
SP_ROWAGG_UM_t subAgg(
|
|
new RowAggregationSubDistinct(groupBySub, functionSub1, jobInfo.rm, jobInfo.umMemLimit));
|
|
subAgg->timeZone(jobInfo.timeZone);
|
|
|
|
// add to rowAggDist
|
|
multiDistinctAggregator->addSubAggregator(subAgg, subRg, functionSub2);
|
|
|
|
++outIdx;
|
|
}
|
|
|
|
// cover any non-distinct column functions
|
|
{
|
|
vector<SP_ROWAGG_FUNC_t> functionSub1 = functionNoDistVec;
|
|
vector<SP_ROWAGG_FUNC_t> functionSub2;
|
|
int64_t multiParms = 0;
|
|
|
|
for (uint64_t k = 0; k < returnedColVec.size(); k++)
|
|
{
|
|
// search non-distinct functions in functionVec
|
|
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
|
|
|
|
while (it != functionVecUm.end())
|
|
{
|
|
SP_ROWAGG_FUNC_t funct;
|
|
SP_ROWAGG_FUNC_t f = *it++;
|
|
|
|
if (f->fOutputColumnIndex == k)
|
|
{
|
|
if (f->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(f.get());
|
|
funct.reset(new RowUDAFFunctionCol(udafFuncCol->fUDAFContext, udafFuncCol->fInputColumnIndex,
|
|
udafFuncCol->fOutputColumnIndex,
|
|
udafFuncCol->fAuxColumnIndex - multiParms));
|
|
functionSub2.push_back(funct);
|
|
}
|
|
else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK || f->fAggFunction == ROWAGG_COUNT_COL_NAME ||
|
|
f->fAggFunction == ROWAGG_SUM || f->fAggFunction == ROWAGG_AVG ||
|
|
f->fAggFunction == ROWAGG_MIN || f->fAggFunction == ROWAGG_MAX ||
|
|
f->fAggFunction == ROWAGG_STATS || f->fAggFunction == ROWAGG_BIT_AND ||
|
|
f->fAggFunction == ROWAGG_BIT_OR || f->fAggFunction == ROWAGG_BIT_XOR ||
|
|
f->fAggFunction == ROWAGG_CONSTANT || f->fAggFunction == ROWAGG_SELECT_SOME)
|
|
{
|
|
funct.reset(new RowAggFunctionCol(f->fAggFunction, f->fStatsFunction, f->fInputColumnIndex,
|
|
f->fOutputColumnIndex, f->fAuxColumnIndex - multiParms));
|
|
functionSub2.push_back(funct);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (functionSub1.size() > 0)
|
|
{
|
|
// make sure the group by columns are available for next aggregate phase.
|
|
vector<SP_ROWAGG_GRPBY_t> groupBySubNoDist;
|
|
|
|
for (uint64_t i = 0; i < groupByNoDist.size(); i++)
|
|
groupBySubNoDist.push_back(
|
|
SP_ROWAGG_GRPBY_t(new RowAggGroupByCol(groupByNoDist[i]->fInputColumnIndex, i)));
|
|
|
|
// construct sub-aggregator
|
|
SP_ROWAGG_UM_t subAgg(
|
|
new RowAggregationUMP2(groupBySubNoDist, functionSub1, jobInfo.rm, jobInfo.umMemLimit, false));
|
|
subAgg->timeZone(jobInfo.timeZone);
|
|
|
|
// add to rowAggDist
|
|
multiDistinctAggregator->addSubAggregator(subAgg, aggRgUm, functionSub2);
|
|
subRgVec.push_back(aggRgUm);
|
|
}
|
|
}
|
|
}
|
|
|
|
rowAggDist->addAggregator(rowAggUm, aggRgUm);
|
|
rowgroups.push_back(aggRgDist);
|
|
aggregators.push_back(rowAggDist);
|
|
|
|
posAggPm.push_back(2); // rid
|
|
|
|
for (uint64_t i = 0; i < oidsAggPm.size(); i++)
|
|
posAggPm.push_back(posAggPm[i] + widthAggPm[i]);
|
|
|
|
RowGroup aggRgPm(oidsAggPm.size(), posAggPm, oidsAggPm, keysAggPm, typeAggPm, csNumAggPm, scaleAggPm,
|
|
precisionAggPm, jobInfo.stringTableThreshold);
|
|
SP_ROWAGG_PM_t rowAggPm(new RowAggregation(groupByPm, functionVecPm));
|
|
rowAggPm->timeZone(jobInfo.timeZone);
|
|
rowgroups.push_back(aggRgPm);
|
|
aggregators.push_back(rowAggPm);
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << "projected RG: " << projRG.toString() << endl
|
|
<< "aggregated1 RG: " << aggRgPm.toString() << endl
|
|
<< "aggregated2 RG: " << aggRgUm.toString() << endl;
|
|
|
|
for (uint64_t i = 0; i < subRgVec.size(); i++)
|
|
cout << "aggregatedSub RG: " << i << " " << subRgVec[i].toString() << endl;
|
|
|
|
cout << "aggregatedDist RG: " << aggRgDist.toString() << endl;
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInfo& jobInfo)
|
|
{
|
|
map<uint32_t, uint32_t> keyToIndexMap;
|
|
|
|
for (uint64_t i = 0; i < fRowGroupOut.getKeys().size(); ++i)
|
|
{
|
|
if (keyToIndexMap.find(fRowGroupOut.getKeys()[i]) == keyToIndexMap.end())
|
|
keyToIndexMap.insert(make_pair(fRowGroupOut.getKeys()[i], i));
|
|
}
|
|
|
|
RetColsVector expressionVec;
|
|
ArithmeticColumn* ac = NULL;
|
|
FunctionColumn* fc = NULL;
|
|
RetColsVector& cols = jobInfo.nonConstCols;
|
|
vector<SimpleColumn*> simpleColumns;
|
|
|
|
for (RetColsVector::iterator it = cols.begin(); it != cols.end(); ++it)
|
|
{
|
|
uint64_t eid = -1;
|
|
|
|
if (((ac = dynamic_cast<ArithmeticColumn*>(it->get())) != NULL) && (ac->aggColumnList().size() > 0) &&
|
|
(ac->windowfunctionColumnList().size() == 0))
|
|
{
|
|
const vector<SimpleColumn*>& scols = ac->simpleColumnList();
|
|
simpleColumns.insert(simpleColumns.end(), scols.begin(), scols.end());
|
|
|
|
eid = ac->expressionId();
|
|
expressionVec.push_back(*it);
|
|
}
|
|
else if (((fc = dynamic_cast<FunctionColumn*>(it->get())) != NULL) && (fc->aggColumnList().size() > 0) &&
|
|
(fc->windowfunctionColumnList().size() == 0))
|
|
{
|
|
const vector<SimpleColumn*>& sCols = fc->simpleColumnList();
|
|
simpleColumns.insert(simpleColumns.end(), sCols.begin(), sCols.end());
|
|
|
|
eid = fc->expressionId();
|
|
expressionVec.push_back(*it);
|
|
}
|
|
|
|
// update the output index
|
|
if (eid != (uint64_t)-1)
|
|
{
|
|
map<uint32_t, uint32_t>::iterator mit = keyToIndexMap.find(getExpTupleKey(jobInfo, eid));
|
|
|
|
if (mit != keyToIndexMap.end())
|
|
{
|
|
it->get()->outputIndex(mit->second);
|
|
}
|
|
else
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "expression " << eid << " cannot be found in tuple.";
|
|
cerr << "prepExpressionOnAggregate: " << emsg.str() << endl;
|
|
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
|
|
}
|
|
}
|
|
}
|
|
|
|
// map the input indices
|
|
for (vector<SimpleColumn*>::iterator i = simpleColumns.begin(); i != simpleColumns.end(); i++)
|
|
{
|
|
CalpontSystemCatalog::OID oid = (*i)->oid();
|
|
uint32_t key = getTupleKey(jobInfo, *i);
|
|
CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*i)->colType());
|
|
|
|
if (dictOid > 0)
|
|
{
|
|
oid = dictOid;
|
|
key = jobInfo.keyInfo->dictKeyMap[key];
|
|
}
|
|
|
|
map<uint32_t, uint32_t>::iterator mit = keyToIndexMap.find(key);
|
|
|
|
if (mit != keyToIndexMap.end())
|
|
{
|
|
(*i)->inputIndex(mit->second);
|
|
}
|
|
else
|
|
{
|
|
ostringstream emsg;
|
|
emsg << "'" << jobInfo.keyInfo->tupleKeyToName[key] << "' cannot be found in tuple.";
|
|
cerr << "prepExpressionOnAggregate: " << emsg.str() << " simple column: oid(" << oid << "), alias("
|
|
<< extractTableAlias(*i) << ")." << endl;
|
|
throw QueryDataExcept(emsg.str(), aggregateFuncErr);
|
|
}
|
|
}
|
|
|
|
// add expression to UM aggregator
|
|
aggUM->expression(expressionVec);
|
|
}
|
|
|
|
void TupleAggregateStep::addConstangAggregate(vector<ConstantAggData>& constAggDataVec)
|
|
{
|
|
fAggregator->constantAggregate(constAggDataVec);
|
|
}
|
|
|
|
void TupleAggregateStep::aggregateRowGroups()
|
|
{
|
|
RGData rgData;
|
|
bool more = true;
|
|
RowGroupDL* dlIn = NULL;
|
|
|
|
if (!fDoneAggregate)
|
|
{
|
|
if (fInputJobStepAssociation.outSize() == 0)
|
|
throw logic_error("No input data list for TupleAggregate step.");
|
|
|
|
dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
|
|
|
|
if (dlIn == NULL)
|
|
throw logic_error("Input is not RowGroup data list in TupleAggregate step.");
|
|
|
|
if (fInputIter < 0)
|
|
fInputIter = dlIn->getIterator();
|
|
|
|
more = dlIn->next(fInputIter, &rgData);
|
|
|
|
if (traceOn())
|
|
dlTimes.setFirstReadTime();
|
|
|
|
StepTeleStats sts;
|
|
sts.query_uuid = fQueryUuid;
|
|
sts.step_uuid = fStepUuid;
|
|
sts.msg_type = StepTeleStats::ST_START;
|
|
sts.total_units_of_work = 1;
|
|
postStepStartTele(sts);
|
|
|
|
try
|
|
{
|
|
// this check covers the no row case
|
|
if (!more && cancelled())
|
|
{
|
|
fDoneAggregate = true;
|
|
fEndOfResult = true;
|
|
}
|
|
|
|
while (more && !fEndOfResult)
|
|
{
|
|
fRowGroupIn.setData(&rgData);
|
|
fAggregator->addRowGroup(&fRowGroupIn);
|
|
more = dlIn->next(fInputIter, &rgData);
|
|
|
|
// error checking
|
|
if (cancelled())
|
|
{
|
|
fEndOfResult = true;
|
|
|
|
while (more)
|
|
more = dlIn->next(fInputIter, &rgData);
|
|
}
|
|
}
|
|
} // try
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
|
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::aggregateRowGroups()");
|
|
fEndOfResult = true;
|
|
}
|
|
}
|
|
|
|
fDoneAggregate = true;
|
|
|
|
while (more)
|
|
more = dlIn->next(fInputIter, &rgData);
|
|
|
|
if (traceOn())
|
|
{
|
|
dlTimes.setLastReadTime();
|
|
dlTimes.setEndOfInputTime();
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::threadedAggregateFinalize(uint32_t threadID)
|
|
{
|
|
for (uint32_t i = 0; i < fNumOfBuckets; ++i)
|
|
{
|
|
if (fAgg_mutex[i]->try_lock())
|
|
{
|
|
try
|
|
{
|
|
if (fAggregators[i])
|
|
fAggregators[i]->finalAggregation();
|
|
}
|
|
catch (...)
|
|
{
|
|
fAgg_mutex[i]->unlock();
|
|
throw;
|
|
}
|
|
fAgg_mutex[i]->unlock();
|
|
}
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::threadedAggregateRowGroups(uint32_t threadID)
|
|
{
|
|
RGData rgData;
|
|
scoped_array<RowBucketVec> rowBucketVecs(new RowBucketVec[fNumOfBuckets]);
|
|
scoped_array<Row> distRow;
|
|
scoped_array<std::shared_ptr<uint8_t[]>> distRowData;
|
|
uint32_t bucketID;
|
|
scoped_array<bool> bucketDone(new bool[fNumOfBuckets]);
|
|
vector<uint32_t> hashLens;
|
|
bool locked = false;
|
|
bool more = true;
|
|
RowGroupDL* dlIn = nullptr;
|
|
uint32_t rgVecShift = float(fNumOfBuckets) / fNumOfThreads * threadID;
|
|
|
|
RowAggregationMultiDistinct* multiDist = nullptr;
|
|
if (!fDoneAggregate)
|
|
{
|
|
if (fInputJobStepAssociation.outSize() == 0)
|
|
throw logic_error("No input data list for delivery.");
|
|
|
|
dlIn = fInputJobStepAssociation.outAt(0)->rowGroupDL();
|
|
|
|
if (dlIn == nullptr)
|
|
throw logic_error("Input is not RowGroup data list in delivery step.");
|
|
|
|
vector<RGData> rgDatas;
|
|
|
|
try
|
|
{
|
|
// this check covers the no row case
|
|
if (!more && cancelled())
|
|
{
|
|
fDoneAggregate = true;
|
|
fEndOfResult = true;
|
|
}
|
|
|
|
bool firstRead = true;
|
|
Row rowIn;
|
|
|
|
while (more && !fEndOfResult)
|
|
{
|
|
fMutex.lock();
|
|
locked = true;
|
|
|
|
for (uint32_t c = 0; c < fNumOfRowGroups && !cancelled(); c++)
|
|
{
|
|
more = dlIn->next(fInputIter, &rgData);
|
|
|
|
if (firstRead)
|
|
{
|
|
if (threadID == 0)
|
|
{
|
|
if (traceOn())
|
|
dlTimes.setFirstReadTime();
|
|
|
|
StepTeleStats sts;
|
|
sts.query_uuid = fQueryUuid;
|
|
sts.step_uuid = fStepUuid;
|
|
sts.msg_type = StepTeleStats::ST_START;
|
|
sts.total_units_of_work = 1;
|
|
postStepStartTele(sts);
|
|
}
|
|
|
|
multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregator.get());
|
|
|
|
if (multiDist)
|
|
{
|
|
for (uint32_t i = 0; i < fNumOfBuckets; i++)
|
|
rowBucketVecs[i].resize(multiDist->subAggregators().size());
|
|
|
|
distRow.reset(new Row[multiDist->subAggregators().size()]);
|
|
distRowData.reset(new std::shared_ptr<uint8_t[]>[multiDist->subAggregators().size()]);
|
|
|
|
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
|
|
{
|
|
multiDist->subAggregators()[j]->getOutputRowGroup()->initRow(&distRow[j], true);
|
|
distRowData[j].reset(new uint8_t[distRow[j].getSize()]);
|
|
distRow[j].setData(rowgroup::Row::Pointer(distRowData[j].get()));
|
|
hashLens.push_back(multiDist->subAggregators()[j]->aggMapKeyLength());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (uint32_t i = 0; i < fNumOfBuckets; i++)
|
|
rowBucketVecs[i].resize(1);
|
|
|
|
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()))
|
|
hashLens.push_back(dynamic_cast<RowAggregationDistinct*>(fAggregator.get())
|
|
->aggregator()
|
|
->aggMapKeyLength());
|
|
else
|
|
hashLens.push_back(fAggregator->aggMapKeyLength());
|
|
}
|
|
|
|
fRowGroupIns[threadID] = fRowGroupIn;
|
|
fRowGroupIns[threadID].initRow(&rowIn);
|
|
firstRead = false;
|
|
}
|
|
|
|
if (more)
|
|
{
|
|
fRowGroupIns[threadID].setData(&rgData);
|
|
|
|
bool diskAggAllowed = fRm->getAllowDiskAggregation();
|
|
int64_t memSize = fRowGroupIns[threadID].getSizeWithStrings();
|
|
if (!fRm->getMemory(memSize, fSessionMemLimit, !diskAggAllowed))
|
|
{
|
|
if (!diskAggAllowed)
|
|
{
|
|
rgDatas.clear(); // to short-cut the rest of processing
|
|
more = false;
|
|
fEndOfResult = true;
|
|
|
|
if (status() == 0)
|
|
{
|
|
errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_AGGREGATION_TOO_BIG));
|
|
status(ERR_AGGREGATION_TOO_BIG);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
rgDatas.push_back(rgData);
|
|
}
|
|
break;
|
|
}
|
|
fMemUsage[threadID] += memSize;
|
|
rgDatas.push_back(rgData);
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
// input rowgroup and aggregator is finalized only right before hashjoin starts
|
|
// if there is.
|
|
if (fAggregators.empty())
|
|
{
|
|
fAggregators.resize(fNumOfBuckets);
|
|
|
|
for (uint32_t i = 0; i < fNumOfBuckets; i++)
|
|
{
|
|
fAggregators[i].reset(fAggregator->clone());
|
|
fAggregators[i]->setInputOutput(fRowGroupIn, &fRowGroupOuts[i]);
|
|
}
|
|
}
|
|
|
|
fMutex.unlock();
|
|
locked = false;
|
|
|
|
multiDist = dynamic_cast<RowAggregationMultiDistinct*>(fAggregator.get());
|
|
|
|
// dispatch rows to row buckets
|
|
if (multiDist)
|
|
{
|
|
for (uint32_t c = 0; c < rgDatas.size(); c++)
|
|
{
|
|
fRowGroupIns[threadID].setData(&rgDatas[c]);
|
|
|
|
for (uint32_t j = 0; j < multiDist->subAggregators().size(); j++)
|
|
{
|
|
fRowGroupIns[threadID].getRow(0, &rowIn);
|
|
rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
|
|
|
|
for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
|
|
{
|
|
for (uint64_t k = 0; k < multiDist->subAggregators()[j]->getGroupByCols().size(); ++k)
|
|
{
|
|
rowIn.copyField(
|
|
distRow[j], k,
|
|
multiDist->subAggregators()[j]->getGroupByCols()[k].get()->fInputColumnIndex);
|
|
}
|
|
|
|
// TBD This approach could potentiall
|
|
// put all values in on bucket.
|
|
uint64_t hash = rowgroup::hashRow(distRow[j], hashLens[j] - 1);
|
|
bucketID = hash % fNumOfBuckets;
|
|
rowBucketVecs[bucketID][j].emplace_back(rowIn.getPointer(), hash);
|
|
rowIn.nextRow();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (uint32_t c = 0; c < rgDatas.size(); c++)
|
|
{
|
|
fRowGroupIns[threadID].setData(&rgDatas[c]);
|
|
fRowGroupIns[threadID].getRow(0, &rowIn);
|
|
rowIn.setUserDataStore(rgDatas[c].getUserDataStore());
|
|
|
|
for (uint64_t i = 0; i < fRowGroupIns[threadID].getRowCount(); ++i)
|
|
{
|
|
// The key is the groupby columns, which are the leading columns.
|
|
// TBD This approach could potential
|
|
// put all values in on bucket.
|
|
// The fAggregator->hasRollup() is true when we perform one-phase
|
|
// aggregation and also are doing subtotals' computations.
|
|
// Subtotals produce new keys whose hash values may not be in
|
|
// the processing bucket. Consider case for key tuples (1,2) and (1,3).
|
|
// Their subtotals's keys will be (1, NULL) and (1, NULL)
|
|
// but they will be left in their processing buckets and never
|
|
// gets aggregated properly.
|
|
// Due to this, we put all rows into the same bucket 0 when perfoming
|
|
// single-phase aggregation with subtotals.
|
|
// For all other cases (single-phase without subtotals and two-phase
|
|
// aggregation with and without subtotals) fAggregator->hasRollup() is false.
|
|
// In these cases we have full parallel processing as expected.
|
|
uint64_t hash = fAggregator->hasRollup() ? 0 : rowgroup::hashRow(rowIn, hashLens[0] - 1);
|
|
int bucketID = hash % fNumOfBuckets;
|
|
rowBucketVecs[bucketID][0].emplace_back(rowIn.getPointer(), hash);
|
|
rowIn.nextRow();
|
|
}
|
|
}
|
|
}
|
|
|
|
// insert to the hashmaps owned by each aggregator
|
|
bool done = false;
|
|
fill(&bucketDone[0], &bucketDone[fNumOfBuckets], false);
|
|
|
|
while (!fEndOfResult && !done && !cancelled())
|
|
{
|
|
bool didWork = false;
|
|
done = true;
|
|
|
|
// each thread starts from its own bucket for better distribution
|
|
uint32_t shift = (rgVecShift++) % fNumOfBuckets;
|
|
for (uint32_t ci = 0; ci < fNumOfBuckets && !cancelled(); ci++)
|
|
{
|
|
uint32_t c = (ci + shift) % fNumOfBuckets;
|
|
if (!fEndOfResult && !bucketDone[c] && fAgg_mutex[c]->try_lock())
|
|
{
|
|
try
|
|
{
|
|
didWork = true;
|
|
|
|
if (multiDist)
|
|
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[c].get())
|
|
->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c]);
|
|
else
|
|
{
|
|
fAggregators[c]->addRowGroup(&fRowGroupIns[threadID], rowBucketVecs[c][0]);
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
fAgg_mutex[c]->unlock();
|
|
throw;
|
|
}
|
|
|
|
rowBucketVecs[c][0].clear();
|
|
bucketDone[c] = true;
|
|
fAgg_mutex[c]->unlock();
|
|
}
|
|
else if (!bucketDone[c])
|
|
{
|
|
done = false;
|
|
}
|
|
}
|
|
|
|
if (!didWork)
|
|
usleep(1000); // avoid using all CPU during busy wait
|
|
}
|
|
|
|
rgDatas.clear();
|
|
fRm->returnMemory(fMemUsage[threadID], fSessionMemLimit);
|
|
fMemUsage[threadID] = 0;
|
|
|
|
if (cancelled())
|
|
{
|
|
fEndOfResult = true;
|
|
fMutex.lock();
|
|
|
|
while (more)
|
|
more = dlIn->next(fInputIter, &rgData);
|
|
|
|
fMutex.unlock();
|
|
}
|
|
}
|
|
} // try
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
|
logging::ERR_AGGREGATION_TOO_BIG,
|
|
"TupleAggregateStep::threadedAggregateRowGroups()[" + std::to_string(threadID) + "]");
|
|
fEndOfResult = true;
|
|
fDoneAggregate = true;
|
|
}
|
|
}
|
|
|
|
if (!locked)
|
|
fMutex.lock();
|
|
|
|
while (more)
|
|
more = dlIn->next(fInputIter, &rgData);
|
|
|
|
fMutex.unlock();
|
|
locked = false;
|
|
|
|
if (traceOn())
|
|
{
|
|
dlTimes.setLastReadTime();
|
|
dlTimes.setEndOfInputTime();
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::doAggregate_singleThread()
|
|
{
|
|
AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
|
|
RowGroupDL* dlp = dl->rowGroupDL();
|
|
RGData rgData;
|
|
|
|
try
|
|
{
|
|
if (!fDoneAggregate)
|
|
aggregateRowGroups();
|
|
|
|
if (fEndOfResult == false)
|
|
{
|
|
// do the final aggregtion and deliver the results
|
|
// at least one RowGroup for aggregate results
|
|
if (dynamic_cast<RowAggregationDistinct*>(fAggregator.get()) != NULL)
|
|
{
|
|
dynamic_cast<RowAggregationDistinct*>(fAggregator.get())->doDistinctAggregation();
|
|
}
|
|
|
|
while (fAggregator->nextRowGroup())
|
|
{
|
|
fAggregator->finalize();
|
|
fRowsReturned += fRowGroupOut.getRowCount();
|
|
rgData = fRowGroupOut.duplicate();
|
|
fRowGroupDelivered.setData(&rgData);
|
|
|
|
if (fRowGroupOut.getColumnCount() > fRowGroupDelivered.getColumnCount())
|
|
pruneAuxColumns();
|
|
|
|
dlp->insert(rgData);
|
|
}
|
|
}
|
|
} // try
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
|
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doAggregate_singleThread()");
|
|
}
|
|
|
|
if (traceOn())
|
|
printCalTrace();
|
|
|
|
StepTeleStats sts;
|
|
sts.query_uuid = fQueryUuid;
|
|
sts.step_uuid = fStepUuid;
|
|
sts.msg_type = StepTeleStats::ST_SUMMARY;
|
|
sts.total_units_of_work = sts.units_of_work_completed = 1;
|
|
sts.rows = fRowsReturned;
|
|
postStepSummaryTele(sts);
|
|
|
|
// Bug 3136, let mini stats to be formatted if traceOn.
|
|
fEndOfResult = true;
|
|
dlp->endOfInput();
|
|
}
|
|
|
|
void TupleAggregateStep::doAggregate()
|
|
{
|
|
// @bug4314. DO NOT access fAggregtor before the first read of input,
|
|
// because hashjoin may not have finalized fAggregator.
|
|
if (!fIsMultiThread)
|
|
return doAggregate_singleThread();
|
|
|
|
AnyDataListSPtr dl = fOutputJobStepAssociation.outAt(0);
|
|
RowGroupDL* dlp = dl->rowGroupDL();
|
|
ByteStream bs;
|
|
doThreadedAggregate(bs, dlp);
|
|
return;
|
|
}
|
|
|
|
/** @brief Aggregate input row groups in two-phase multi-threaded aggregation.
|
|
* In second phase handle three different aggregation cases differently:
|
|
* 1. Query contains at least one aggregation on a DISTINCT column, e.g. SUM (DISTINCT col1) AND at least one
|
|
* GROUP BY column
|
|
* 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
|
|
* 3. Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
|
|
* DISTINCT selects (e.g. SELECT DISTINCT col1 FROM ...) are handled in tupleannexstep.cpp.
|
|
*/
|
|
uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp)
|
|
{
|
|
// initialize return value variable
|
|
uint64_t rowCount = 0;
|
|
|
|
try
|
|
{
|
|
/*
|
|
* Phase 1: Distribute input rows to different buckets depending on the hash value of the group by columns
|
|
* per row. Then distribute buckets equally on aggregators in fAggregators. (Number of fAggregators ==
|
|
* fNumOfBuckets). Each previously created hash bucket is represented as one RowGroup in a fAggregator.
|
|
*/
|
|
|
|
if (!fDoneAggregate)
|
|
{
|
|
initializeMultiThread();
|
|
|
|
vector<uint64_t> runners; // thread pool handles
|
|
runners.reserve(fNumOfThreads); // to prevent a resize during use
|
|
|
|
// Start the aggregator threads
|
|
for (uint32_t threadNum = 0; threadNum < fNumOfThreads; threadNum++)
|
|
{
|
|
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregator(this, threadNum)));
|
|
}
|
|
|
|
// Now wait for all those threads
|
|
jobstepThreadPool.join(runners);
|
|
}
|
|
|
|
if (!cancelled())
|
|
{
|
|
vector<uint64_t> runners;
|
|
// use half of the threads because finalizing requires twice as
|
|
// much memory on average
|
|
uint32_t threads = std::max(1U, fNumOfThreads / 2);
|
|
runners.reserve(threads);
|
|
for (uint32_t threadNum = 0; threadNum < threads; ++threadNum)
|
|
{
|
|
runners.push_back(jobstepThreadPool.invoke(ThreadedAggregateFinalizer(this, threadNum)));
|
|
}
|
|
jobstepThreadPool.join(runners);
|
|
}
|
|
|
|
/*
|
|
* Phase 2: Depending on query type (see below) do aggregation per previously created RowGroup of rows
|
|
* that need to aggregated and output results.
|
|
*/
|
|
|
|
auto* distinctAggregator = dynamic_cast<RowAggregationDistinct*>(fAggregator.get());
|
|
const bool hasGroupByColumns = fAggregator->aggMapKeyLength() > 0;
|
|
|
|
// Case 1: Query contains at least one aggregation on a DISTINCT column AND at least one GROUP BY column
|
|
// e.g. SELECT SUM(DISTINCT col1) FROM test GROUP BY col2;
|
|
if (distinctAggregator && hasGroupByColumns)
|
|
{
|
|
if (!fEndOfResult)
|
|
{
|
|
// Do multi-threaded second phase aggregation (per row group created for GROUP BY statement)
|
|
if (!fDoneAggregate)
|
|
{
|
|
vector<uint64_t> runners; // thread pool handles
|
|
fRowGroupsDeliveredData.resize(fNumOfBuckets);
|
|
|
|
uint32_t bucketsPerThread = fNumOfBuckets / fNumOfThreads;
|
|
uint32_t numThreads = ((fNumOfBuckets % fNumOfThreads) == 0 ? fNumOfThreads : fNumOfThreads + 1);
|
|
|
|
runners.reserve(numThreads);
|
|
|
|
for (uint32_t threadNum = 0; threadNum < numThreads; threadNum++)
|
|
{
|
|
runners.push_back(jobstepThreadPool.invoke(
|
|
ThreadedSecondPhaseAggregator(this, threadNum * bucketsPerThread, bucketsPerThread)));
|
|
}
|
|
|
|
jobstepThreadPool.join(runners);
|
|
}
|
|
|
|
// Deliver results
|
|
fDoneAggregate = true;
|
|
bool done = true;
|
|
while (nextDeliveredRowGroup() && !cancelled())
|
|
{
|
|
done = false;
|
|
rowCount = fRowGroupOut.getRowCount();
|
|
|
|
if (rowCount != 0)
|
|
{
|
|
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
|
break;
|
|
}
|
|
|
|
done = true;
|
|
}
|
|
|
|
if (done)
|
|
{
|
|
fEndOfResult = true;
|
|
}
|
|
}
|
|
}
|
|
// Case 2. Query contains at least one aggregation on a DISTINCT column but no GROUP BY column
|
|
// e.g. SELECT SUM(DISTINCT col1) FROM test;
|
|
else if (distinctAggregator)
|
|
{
|
|
if (!fEndOfResult)
|
|
{
|
|
if (!fDoneAggregate)
|
|
{
|
|
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
|
|
// easy way of multi-threading this and it's done in a single thread for now.
|
|
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; bucketNum++)
|
|
{
|
|
if (fEndOfResult == false)
|
|
{
|
|
// The distinctAggregator accumulates the aggregation results of all row groups by being added
|
|
// all row groups of each bucket aggregator and doing an aggregation step after each addition.
|
|
auto* bucketMultiDistinctAggregator =
|
|
dynamic_cast<RowAggregationMultiDistinct*>(fAggregators[bucketNum].get());
|
|
auto* bucketDistinctAggregator =
|
|
dynamic_cast<RowAggregationDistinct*>(fAggregators[bucketNum].get());
|
|
distinctAggregator->aggregator(bucketDistinctAggregator->aggregator());
|
|
|
|
if (bucketMultiDistinctAggregator)
|
|
{
|
|
(dynamic_cast<RowAggregationMultiDistinct*>(distinctAggregator))
|
|
->subAggregators(bucketMultiDistinctAggregator->subAggregators());
|
|
}
|
|
|
|
distinctAggregator->aggregator()->finalAggregation();
|
|
distinctAggregator->doDistinctAggregation();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Deliver results
|
|
fDoneAggregate = true;
|
|
bool done = true;
|
|
while (fAggregator->nextRowGroup() && !cancelled())
|
|
{
|
|
done = false;
|
|
fAggregator->finalize();
|
|
rowCount = fRowGroupOut.getRowCount();
|
|
fRowsReturned += rowCount;
|
|
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
|
|
|
|
if (rowCount != 0)
|
|
{
|
|
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
|
break;
|
|
}
|
|
done = true;
|
|
}
|
|
if (done)
|
|
fEndOfResult = true;
|
|
}
|
|
}
|
|
// CASE 3: Query contains no aggregation on a DISTINCT column, but at least one GROUP BY column
|
|
// e.g. SELECT SUM(col1) FROM test GROUP BY col2;
|
|
// Do aggregation over all row groups. As all row groups need to be aggregated together there is no
|
|
// easy way of multi-threading this and it's done in a single thread for now.
|
|
else if (hasGroupByColumns)
|
|
{
|
|
if (!fEndOfResult && !fDoneAggregate)
|
|
{
|
|
for (uint32_t bucketNum = 0; bucketNum < fNumOfBuckets; ++bucketNum)
|
|
{
|
|
fAggregator->append(fAggregators[bucketNum].get());
|
|
}
|
|
}
|
|
|
|
fDoneAggregate = true;
|
|
bool done = true;
|
|
|
|
while (fAggregator->nextRowGroup() && !cancelled())
|
|
{
|
|
done = false;
|
|
fAggregator->finalize();
|
|
rowCount = fRowGroupOut.getRowCount();
|
|
fRowsReturned += rowCount;
|
|
fRowGroupDelivered.setData(fRowGroupOut.getRGData());
|
|
|
|
if (rowCount != 0)
|
|
{
|
|
if (!cleanUpAndOutputRowGroup(bs, dlp))
|
|
break;
|
|
}
|
|
done = true;
|
|
}
|
|
|
|
if (done)
|
|
{
|
|
fEndOfResult = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
throw logic_error(
|
|
"TupleAggregateStep::doThreadedAggregate: No DISTINCT columns nested into aggregation function "
|
|
"or "
|
|
"GROUP BY columns found. Should not reach here.");
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
handleException(std::current_exception(), logging::tupleAggregateStepErr,
|
|
logging::ERR_AGGREGATION_TOO_BIG, "TupleAggregateStep::doThreadedAggregate()");
|
|
fEndOfResult = true;
|
|
}
|
|
|
|
if (fEndOfResult)
|
|
{
|
|
StepTeleStats sts;
|
|
sts.query_uuid = fQueryUuid;
|
|
sts.step_uuid = fStepUuid;
|
|
sts.msg_type = StepTeleStats::ST_SUMMARY;
|
|
sts.total_units_of_work = sts.units_of_work_completed = 1;
|
|
sts.rows = fRowsReturned;
|
|
postStepSummaryTele(sts);
|
|
|
|
if (dlp)
|
|
{
|
|
dlp->endOfInput();
|
|
}
|
|
else
|
|
{
|
|
// send an empty / error band
|
|
RGData rgData(fRowGroupOut, 0);
|
|
fRowGroupOut.setData(&rgData);
|
|
fRowGroupOut.resetRowGroup(0);
|
|
fRowGroupOut.setStatus(status());
|
|
fRowGroupOut.serializeRGData(bs);
|
|
rowCount = 0;
|
|
}
|
|
|
|
if (traceOn())
|
|
printCalTrace();
|
|
}
|
|
|
|
return rowCount;
|
|
}
|
|
|
|
bool TupleAggregateStep::cleanUpAndOutputRowGroup(ByteStream& bs, RowGroupDL* dlp)
|
|
{
|
|
if (fRowGroupOut.getColumnCount() != fRowGroupDelivered.getColumnCount())
|
|
pruneAuxColumns();
|
|
|
|
if (dlp)
|
|
{
|
|
RGData rgData = fRowGroupDelivered.duplicate();
|
|
dlp->insert(rgData);
|
|
return true;
|
|
}
|
|
|
|
bs.restart();
|
|
fRowGroupDelivered.serializeRGData(bs);
|
|
return false;
|
|
}
|
|
|
|
void TupleAggregateStep::pruneAuxColumns()
|
|
{
|
|
uint64_t rowCount = fRowGroupOut.getRowCount();
|
|
Row row1, row2;
|
|
fRowGroupOut.initRow(&row1);
|
|
fRowGroupOut.getRow(0, &row1);
|
|
fRowGroupDelivered.initRow(&row2);
|
|
fRowGroupDelivered.getRow(0, &row2);
|
|
|
|
for (uint64_t i = 1; i < rowCount; i++)
|
|
{
|
|
for (uint32_t j = 0; j < row2.getColumnCount(); j++)
|
|
{
|
|
row2.setNullMark(j, row1.getNullMark(j));
|
|
}
|
|
// skip the first row
|
|
row1.nextRow();
|
|
row2.nextRow();
|
|
|
|
// bug4463, memmove for src, dest overlap
|
|
memmove(row2.getData(), row1.getData(), row2.getSize());
|
|
}
|
|
for (uint32_t j = 0; j < row2.getColumnCount(); j++)
|
|
{
|
|
row2.setNullMark(j, row1.getNullMark(j));
|
|
}
|
|
}
|
|
|
|
void TupleAggregateStep::printCalTrace()
|
|
{
|
|
time_t t = time(0);
|
|
char timeString[50];
|
|
ctime_r(&t, timeString);
|
|
timeString[strlen(timeString) - 1] = '\0';
|
|
ostringstream logStr;
|
|
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
|
|
<< "; total rows returned-" << fRowsReturned << endl
|
|
<< "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
|
|
<< "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
|
|
<< "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
|
|
<< "\tJob completion status " << status() << endl;
|
|
logEnd(logStr.str().c_str());
|
|
fExtendedInfo += logStr.str();
|
|
formatMiniStats();
|
|
}
|
|
|
|
void TupleAggregateStep::formatMiniStats()
|
|
{
|
|
ostringstream oss;
|
|
oss << "TAS "
|
|
<< "UM "
|
|
<< "- "
|
|
<< "- "
|
|
<< "- "
|
|
<< "- "
|
|
<< "- "
|
|
<< "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
|
|
<< fRowsReturned << " ";
|
|
fMiniInfo += oss.str();
|
|
}
|
|
|
|
uint32_t TupleAggregateStep::getTupleKeyFromTuple(
|
|
const boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*>& tuple)
|
|
{
|
|
return tuple.get<0>();
|
|
}
|
|
|
|
uint32_t TupleAggregateStep::getTupleKeyFromTuple(uint32_t key)
|
|
{
|
|
return key;
|
|
}
|
|
|
|
template <class GroupByMap>
|
|
bool TupleAggregateStep::tryToFindEqualFunctionColumnByTupleKey(JobInfo& jobInfo, GroupByMap& groupByMap,
|
|
const uint32_t tupleKey, uint32_t& foundKey)
|
|
{
|
|
auto funcMapIt = jobInfo.functionColumnMap.find(tupleKey);
|
|
if (funcMapIt != jobInfo.functionColumnMap.end())
|
|
{
|
|
const auto& rFunctionInfo = funcMapIt->second;
|
|
// Try to match given `tupleKey` in `groupByMap`.
|
|
for (const auto& groupByMapPair : groupByMap)
|
|
{
|
|
const auto currentTupleKey = getTupleKeyFromTuple(groupByMapPair.first);
|
|
auto currentFuncMapIt = jobInfo.functionColumnMap.find(currentTupleKey);
|
|
// Skip if the keys are the same.
|
|
if (currentFuncMapIt != jobInfo.functionColumnMap.end() && currentTupleKey != tupleKey)
|
|
{
|
|
const auto& lFunctionInfo = currentFuncMapIt->second;
|
|
// Oid and function name should be the same.
|
|
if (lFunctionInfo.associatedColumnOid == rFunctionInfo.associatedColumnOid &&
|
|
lFunctionInfo.functionName == rFunctionInfo.functionName)
|
|
{
|
|
foundKey = currentTupleKey;
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
} // namespace joblist
|