mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-23 07:05:36 +03:00
std::string fTmpDir = config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates); std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression");
4986 lines
154 KiB
C++
4986 lines
154 KiB
C++
/*
|
|
Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (c) 2019-2021 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA.
|
|
*/
|
|
|
|
/** @file rowaggregation.cpp
|
|
*
|
|
* File contains classes used to perform RowGroup aggregation. RowAggregation
|
|
* is the primary class.
|
|
*/
|
|
|
|
#include <unistd.h>
|
|
#include <sstream>
|
|
#include <stdexcept>
|
|
#include <limits>
|
|
#include <typeinfo>
|
|
#include <cassert>
|
|
|
|
#include "joblisttypes.h"
|
|
#include "mcs_basic_types.h"
|
|
#include "resourcemanager.h"
|
|
#include "groupconcat.h"
|
|
#include "jsonarrayagg.h"
|
|
|
|
#include "blocksize.h"
|
|
#include "errorcodes.h"
|
|
#include "exceptclasses.h"
|
|
#include "errorids.h"
|
|
#include "idberrorinfo.h"
|
|
#include "dataconvert.h"
|
|
#include "returnedcolumn.h"
|
|
#include "arithmeticcolumn.h"
|
|
#include "functioncolumn.h"
|
|
#include "simplecolumn.h"
|
|
#include "rowgroup.h"
|
|
#include "funcexp.h"
|
|
#include "rowaggregation.h"
|
|
#include "calpontsystemcatalog.h"
|
|
#include "vlarray.h"
|
|
|
|
#include "threadnaming.h"
|
|
#include "rowstorage.h"
|
|
|
|
//..comment out NDEBUG to enable assertions, uncomment NDEBUG to disable
|
|
// #define NDEBUG
|
|
#include "mcs_decimal.h"
|
|
|
|
using namespace std;
|
|
using namespace boost;
|
|
using namespace dataconvert;
|
|
|
|
// inlines of RowAggregation that used only in this file
|
|
namespace
|
|
{
|
|
template <typename T>
|
|
inline bool minMax(T d1, T d2, int type)
|
|
{
|
|
if (type == rowgroup::ROWAGG_MIN)
|
|
return d1 < d2;
|
|
else
|
|
return d1 > d2;
|
|
}
|
|
|
|
inline bool minMax(int128_t d1, int128_t d2, int type)
|
|
{
|
|
return (type == rowgroup::ROWAGG_MIN) ? d1 < d2 : d1 > d2;
|
|
}
|
|
|
|
inline int64_t getIntNullValue(int colType)
|
|
{
|
|
switch (colType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT: return joblist::TINYINTNULL;
|
|
|
|
case execplan::CalpontSystemCatalog::SMALLINT: return joblist::SMALLINTNULL;
|
|
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT: return joblist::INTNULL;
|
|
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
default: return joblist::BIGINTNULL;
|
|
}
|
|
}
|
|
|
|
inline uint64_t getUintNullValue(int colType, int colWidth = 0)
|
|
{
|
|
switch (colType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
{
|
|
if (colWidth == 1)
|
|
return joblist::CHAR1NULL;
|
|
else if (colWidth == 2)
|
|
return joblist::CHAR2NULL;
|
|
else if (colWidth < 5)
|
|
return joblist::CHAR4NULL;
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
{
|
|
if (colWidth < 3)
|
|
return joblist::CHAR2NULL;
|
|
else if (colWidth < 5)
|
|
return joblist::CHAR4NULL;
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
{
|
|
return joblist::DATENULL;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
{
|
|
return joblist::DATETIMENULL;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
return joblist::TIMESTAMPNULL;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
return joblist::TIMENULL;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
switch (colWidth)
|
|
{
|
|
case 1:
|
|
{
|
|
return joblist::TINYINTNULL;
|
|
}
|
|
|
|
case 2:
|
|
{
|
|
return joblist::SMALLINTNULL;
|
|
}
|
|
|
|
case 4:
|
|
{
|
|
return joblist::INTNULL;
|
|
}
|
|
|
|
default:
|
|
{
|
|
return joblist::BIGINTNULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
{
|
|
return joblist::UTINYINTNULL;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
{
|
|
return joblist::USMALLINTNULL;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
{
|
|
return joblist::UINTNULL;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
return joblist::UBIGINTNULL;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
return joblist::CHAR8NULL;
|
|
}
|
|
|
|
inline double getDoubleNullValue()
|
|
{
|
|
uint64_t x = joblist::DOUBLENULL;
|
|
auto* y = (double*)&x;
|
|
return *y;
|
|
}
|
|
|
|
inline float getFloatNullValue()
|
|
{
|
|
uint32_t x = joblist::FLOATNULL;
|
|
auto* y = (float*)&x;
|
|
return *y;
|
|
}
|
|
|
|
inline long double getLongDoubleNullValue()
|
|
{
|
|
return joblist::LONGDOUBLENULL;
|
|
}
|
|
|
|
inline utils::NullString getStringNullValue()
|
|
{
|
|
return utils::NullString();
|
|
}
|
|
|
|
} // namespace
|
|
|
|
namespace rowgroup
|
|
{
|
|
const utils::NullString typeStr;
|
|
const static_any::any& RowAggregation::charTypeId((char)1);
|
|
const static_any::any& RowAggregation::scharTypeId((signed char)1);
|
|
const static_any::any& RowAggregation::shortTypeId((short)1);
|
|
const static_any::any& RowAggregation::intTypeId((int)1);
|
|
const static_any::any& RowAggregation::longTypeId((long)1);
|
|
const static_any::any& RowAggregation::llTypeId((long long)1);
|
|
const static_any::any& RowAggregation::int128TypeId((int128_t)1);
|
|
const static_any::any& RowAggregation::ucharTypeId((unsigned char)1);
|
|
const static_any::any& RowAggregation::ushortTypeId((unsigned short)1);
|
|
const static_any::any& RowAggregation::uintTypeId((unsigned int)1);
|
|
const static_any::any& RowAggregation::ulongTypeId((unsigned long)1);
|
|
const static_any::any& RowAggregation::ullTypeId((unsigned long long)1);
|
|
const static_any::any& RowAggregation::floatTypeId((float)1);
|
|
const static_any::any& RowAggregation::doubleTypeId((double)1);
|
|
const static_any::any& RowAggregation::longdoubleTypeId((long double)1);
|
|
const static_any::any& RowAggregation::strTypeId(typeStr);
|
|
|
|
static const string overflowMsg("Aggregation overflow.");
|
|
|
|
inline void RowAggregation::updateIntMinMax(int128_t val1, int128_t val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
fRow.setInt128Field(val1, col);
|
|
else if (minMax(val1, val2, func))
|
|
fRow.setInt128Field(val1, col);
|
|
}
|
|
|
|
inline void RowAggregation::updateIntMinMax(int64_t val1, int64_t val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
fRow.setIntField(val1, col);
|
|
else if (minMax(val1, val2, func))
|
|
fRow.setIntField(val1, col);
|
|
}
|
|
|
|
inline void RowAggregation::updateUintMinMax(uint64_t val1, uint64_t val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
fRow.setUintField(val1, col);
|
|
else if (minMax(val1, val2, func))
|
|
fRow.setUintField(val1, col);
|
|
}
|
|
|
|
inline void RowAggregation::updateCharMinMax(uint64_t val1, uint64_t val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
fRow.setUintField(val1, col);
|
|
else if (minMax(uint64ToStr(val1), uint64ToStr(val2), func))
|
|
fRow.setUintField(val1, col);
|
|
}
|
|
|
|
inline void RowAggregation::updateDoubleMinMax(double val1, double val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
fRow.setDoubleField(val1, col);
|
|
else if (minMax(val1, val2, func))
|
|
fRow.setDoubleField(val1, col);
|
|
}
|
|
|
|
inline void RowAggregation::updateLongDoubleMinMax(long double val1, long double val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
fRow.setLongDoubleField(val1, col);
|
|
else if (minMax(val1, val2, func))
|
|
fRow.setLongDoubleField(val1, col);
|
|
}
|
|
|
|
inline void RowAggregation::updateFloatMinMax(float val1, float val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
fRow.setFloatField(val1, col);
|
|
else if (minMax(val1, val2, func))
|
|
fRow.setFloatField(val1, col);
|
|
}
|
|
|
|
void RowAggregation::updateStringMinMax(utils::NullString val1, utils::NullString val2, int64_t col, int func)
|
|
{
|
|
if (isNull(fRowGroupOut, fRow, col))
|
|
{
|
|
fRow.setStringField(val1, col);
|
|
return;
|
|
}
|
|
if (val1.isNull())
|
|
{
|
|
// as any comparison with NULL is false, it should not affect min/max ranges.
|
|
return; // do nothing.
|
|
}
|
|
CHARSET_INFO* cs = fRow.getCharset(col);
|
|
int tmp = cs->strnncoll(val1.str(), val1.length(), val2.str(), val2.length());
|
|
|
|
if ((tmp < 0 && func == rowgroup::ROWAGG_MIN) || (tmp > 0 && func == rowgroup::ROWAGG_MAX))
|
|
{
|
|
fRow.setStringField(val1, col);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Verify if the column value is NULL
|
|
// row(in) - Row to be included in aggregation.
|
|
// col(in) - column in the input row group
|
|
// return - equal to null or not
|
|
//------------------------------------------------------------------------------
|
|
inline bool RowAggregation::isNull(const RowGroup* pRowGroup, const Row& row, int64_t col)
|
|
{
|
|
/* TODO: Can we replace all of this with a call to row.isNullValue(col)? */
|
|
bool ret = false;
|
|
|
|
int colDataType = (pRowGroup->getColTypes())[col];
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
{
|
|
ret = ((uint8_t)row.getIntField(col) == joblist::TINYINTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
{
|
|
ret = ((uint8_t)row.getIntField(col) == joblist::UTINYINTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
{
|
|
int colWidth = pRowGroup->getColumnWidth(col);
|
|
|
|
// XXX: this is wrong. NullStrings now contain separate NULL values.
|
|
// bug 1853, use token to check null
|
|
// scale here is used to indicate token, not real string.
|
|
if ((pRowGroup->getScale())[col] > 0)
|
|
{
|
|
uint64_t uintField = row.getUintField(col);
|
|
if (uintField == joblist::UBIGINTNULL)
|
|
ret = true;
|
|
|
|
// break the case block
|
|
break;
|
|
}
|
|
|
|
// real string to check null
|
|
if (colWidth <= 7 || (colWidth == 8 && colDataType == execplan::CalpontSystemCatalog::CHAR))
|
|
{
|
|
if (colWidth == 1)
|
|
ret = ((uint8_t)row.getUintField(col) == joblist::CHAR1NULL);
|
|
else if (colWidth == 2)
|
|
ret = ((uint16_t)row.getUintField(col) == joblist::CHAR2NULL);
|
|
else if (colWidth < 5)
|
|
ret = ((uint32_t)row.getUintField(col) == joblist::CHAR4NULL);
|
|
else
|
|
ret = ((uint64_t)row.getUintField(col) == joblist::CHAR8NULL);
|
|
}
|
|
else
|
|
{
|
|
//@bug 1821
|
|
auto const str = row.getConstString(col);
|
|
ret = str.isNull();
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
{
|
|
ret = ((uint16_t)row.getIntField(col) == joblist::SMALLINTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
{
|
|
ret = ((uint16_t)row.getIntField(col) == joblist::USMALLINTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
ret = ((uint64_t)row.getUintField(col) == joblist::DOUBLENULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
ret = (row.getLongDoubleField(col) == joblist::LONGDOUBLENULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
{
|
|
ret = ((uint32_t)row.getIntField(col) == joblist::INTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
{
|
|
ret = ((uint32_t)row.getIntField(col) == joblist::UINTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
ret = ((uint32_t)row.getUintField(col) == joblist::FLOATNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
{
|
|
ret = ((uint32_t)row.getUintField(col) == joblist::DATENULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
ret = ((uint64_t)row.getIntField(col) == joblist::BIGINTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
ret = ((uint64_t)row.getIntField(col) == joblist::UBIGINTNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
ret = row.isNullValue(col);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
{
|
|
ret = ((uint64_t)row.getUintField(col) == joblist::DATETIMENULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
ret = ((uint64_t)row.getUintField(col) == joblist::TIMESTAMPNULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
ret = ((uint64_t)row.getUintField(col) == joblist::TIMENULL);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::VARBINARY:
|
|
case execplan::CalpontSystemCatalog::BLOB:
|
|
{
|
|
auto const str = row.getConstString(col);
|
|
ret = str.isNull();
|
|
break;
|
|
}
|
|
|
|
default: break;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Row Aggregation default constructor
|
|
//------------------------------------------------------------------------------
|
|
RowAggregation::RowAggregation()
|
|
: fRowGroupOut(nullptr)
|
|
, fSmallSideRGs(nullptr)
|
|
, fLargeSideRG(nullptr)
|
|
, fSmallSideCount(0)
|
|
, fOrigFunctionCols(nullptr)
|
|
, fRollupFlag(false)
|
|
{
|
|
}
|
|
|
|
RowAggregation::RowAggregation(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager* rm, boost::shared_ptr<int64_t> sl, bool withRollup)
|
|
: fRowGroupOut(nullptr)
|
|
, fSmallSideRGs(nullptr)
|
|
, fLargeSideRG(nullptr)
|
|
, fSmallSideCount(0)
|
|
, fOrigFunctionCols(nullptr)
|
|
, fRm(rm)
|
|
, fSessionMemLimit(std::move(sl))
|
|
, fRollupFlag(withRollup)
|
|
{
|
|
fGroupByCols.assign(rowAggGroupByCols.begin(), rowAggGroupByCols.end());
|
|
fFunctionCols.assign(rowAggFunctionCols.begin(), rowAggFunctionCols.end());
|
|
}
|
|
|
|
RowAggregation::RowAggregation(const RowAggregation& rhs)
|
|
: fRowGroupOut(nullptr)
|
|
, fSmallSideRGs(nullptr)
|
|
, fLargeSideRG(nullptr)
|
|
, fSmallSideCount(0)
|
|
, fKeyOnHeap(rhs.fKeyOnHeap)
|
|
, fRGContext(rhs.fRGContext)
|
|
, fOrigFunctionCols(nullptr)
|
|
, fRm(rhs.fRm)
|
|
, fSessionMemLimit(rhs.fSessionMemLimit)
|
|
, fRollupFlag(rhs.fRollupFlag)
|
|
{
|
|
fGroupByCols.assign(rhs.fGroupByCols.begin(), rhs.fGroupByCols.end());
|
|
fFunctionCols.assign(rhs.fFunctionCols.begin(), rhs.fFunctionCols.end());
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Row Aggregation destructor.
|
|
//------------------------------------------------------------------------------
|
|
RowAggregation::~RowAggregation()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregate the rows in pRows. User should make Multiple calls to
|
|
// addRowGroup() to aggregate multiple RowGroups. When all RowGroups have
|
|
// been input, a call should be made to endOfInput() to signal the end of data.
|
|
// nextRowGroup() can then be called iteratively to access the aggregated
|
|
// results.
|
|
//
|
|
// pRows(in) - RowGroup to be aggregated.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::addRowGroup(const RowGroup* pRows)
|
|
{
|
|
// no group by == no map, everything done in fRow
|
|
if (fGroupByCols.empty())
|
|
{
|
|
fRowGroupOut->setRowCount(1);
|
|
|
|
// special, but very common case -- count(*) without groupby columns
|
|
if (fFunctionCols.size() == 1 && fFunctionCols[0]->fAggFunction == ROWAGG_COUNT_ASTERISK)
|
|
{
|
|
if (countSpecial(pRows))
|
|
return;
|
|
}
|
|
}
|
|
|
|
fRowGroupOut->setDBRoot(pRows->getDBRoot());
|
|
|
|
Row rowIn;
|
|
pRows->initRow(&rowIn);
|
|
pRows->getRow(0, &rowIn);
|
|
|
|
for (uint64_t i = 0; i < pRows->getRowCount(); ++i)
|
|
{
|
|
aggregateRow(rowIn);
|
|
rowIn.nextRow();
|
|
}
|
|
fRowAggStorage->dump();
|
|
}
|
|
|
|
void RowAggregation::addRowGroup(const RowGroup* pRows, vector<std::pair<Row::Pointer, uint64_t>>& inRows)
|
|
{
|
|
// this function is for threaded aggregation, which is for group by and distinct.
|
|
// if (countSpecial(pRows))
|
|
Row rowIn;
|
|
pRows->initRow(&rowIn);
|
|
|
|
for (const auto& inRow : inRows)
|
|
{
|
|
rowIn.setData(inRow.first);
|
|
aggregateRow(rowIn, &inRow.second);
|
|
}
|
|
fRowAggStorage->dump();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Set join rowgroups and mappings
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::setJoinRowGroups(vector<RowGroup>* pSmallSideRG, RowGroup* pLargeSideRG)
|
|
{
|
|
fSmallSideRGs = pSmallSideRG;
|
|
fLargeSideRG = pLargeSideRG;
|
|
fSmallSideCount = fSmallSideRGs->size();
|
|
fSmallMappings.reset(new std::shared_ptr<int[]>[fSmallSideCount]);
|
|
|
|
for (uint32_t i = 0; i < fSmallSideCount; i++)
|
|
fSmallMappings[i] = makeMapping((*fSmallSideRGs)[i], fRowGroupIn);
|
|
|
|
fLargeMapping = makeMapping(*fLargeSideRG, fRowGroupIn);
|
|
|
|
rowSmalls.reset(new Row[fSmallSideCount]);
|
|
|
|
for (uint32_t i = 0; i < fSmallSideCount; i++)
|
|
(*fSmallSideRGs)[i].initRow(&rowSmalls[i]);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// For UDAF, we need to sometimes start a new fRGContext.
|
|
//
|
|
// This will be called any number of times by each of the batchprimitiveprocessor
|
|
// threads on the PM and by multple threads on the UM. It must remain
|
|
// thread safe.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF, uint64_t funcColsIdx)
|
|
{
|
|
// RowAggregation and it's functions need to be re-entrant which means
|
|
// each instance (thread) needs its own copy of the context object.
|
|
// Note: operator=() doesn't copy userData.
|
|
fRGContextColl[funcColsIdx] = rowUDAF->fUDAFContext;
|
|
|
|
// Call the user reset for the group userData. Since, at this point,
|
|
// context's userData will be NULL, reset will generate a new one.
|
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
|
rc = fRGContextColl[funcColsIdx].getFunction()->reset(&fRGContextColl[funcColsIdx]);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
rowUDAF->bInterrupted = true;
|
|
throw logging::QueryDataExcept(fRGContextColl[funcColsIdx].getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
fRow.setUserDataStore(fRowGroupOut->getRGData()->getUserDataStore());
|
|
fRow.setUserData(fRGContextColl[funcColsIdx], fRGContextColl[funcColsIdx].getUserDataSP(),
|
|
fRGContextColl[funcColsIdx].getUserDataSize(), rowUDAF->fAuxColumnIndex);
|
|
// Prevents calling deleteUserData on the mcsv1Context.
|
|
fRGContextColl[funcColsIdx].setUserData(NULL);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Initilalize the data members to meaningful values, setup the hashmap.
|
|
// The fRowGroupOut must have a valid data pointer before this.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::initialize(bool hasGroupConcat)
|
|
{
|
|
// Calculate the length of the hashmap key.
|
|
fAggMapKeyCount = fGroupByCols.size();
|
|
bool disk_agg = fRm ? fRm->getAllowDiskAggregation() : false;
|
|
bool allow_gen = true;
|
|
for (auto& fun : fFunctionCols)
|
|
{
|
|
if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT ||
|
|
fun->fAggFunction == ROWAGG_JSON_ARRAY)
|
|
{
|
|
allow_gen = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
auto* compressor = compress::getCompressInterfaceByName(fCompStr);
|
|
|
|
if (fKeyOnHeap)
|
|
{
|
|
fRowAggStorage.reset(new RowAggStorage(fTmpDir, fRowGroupOut, &fKeyRG, fAggMapKeyCount, fRm,
|
|
fSessionMemLimit, disk_agg, allow_gen, compressor));
|
|
}
|
|
else
|
|
{
|
|
fRowAggStorage.reset(new RowAggStorage(fTmpDir, fRowGroupOut, fAggMapKeyCount, fRm, fSessionMemLimit,
|
|
disk_agg, allow_gen, compressor));
|
|
}
|
|
|
|
// Initialize the work row.
|
|
fRowGroupOut->initRow(&fRow);
|
|
fRowGroupOut->getRow(0, &fRow);
|
|
makeAggFieldsNull(fRow);
|
|
|
|
// Keep a copy of the null row to initialize new map entries.
|
|
// MCOL-5429 Use stringstore if the datatype of the groupconcat
|
|
// field is a long string.
|
|
if (hasGroupConcat && fRowGroupOut->hasLongString())
|
|
{
|
|
fNullRowGroup = *fRowGroupOut;
|
|
fNullRowGroup.setUseStringTable(true);
|
|
fNullRowRGData.reinit(fNullRowGroup, 1);
|
|
fNullRowGroup.setData(&fNullRowRGData);
|
|
fNullRowGroup.resetRowGroup(0);
|
|
fNullRowGroup.initRow(&fNullRow);
|
|
fNullRowGroup.getRow(0, &fNullRow);
|
|
}
|
|
else
|
|
{
|
|
fRowGroupOut->initRow(&fNullRow, true);
|
|
fNullRowData.reset(new uint8_t[fNullRow.getSize()]);
|
|
fNullRow.setData(rowgroup::Row::Pointer(fNullRowData.get()));
|
|
}
|
|
|
|
copyRow(fRow, &fNullRow);
|
|
|
|
// Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx
|
|
fRGContextColl.resize(fFunctionCols.size());
|
|
|
|
// Need map only if groupby list is not empty.
|
|
if (fGroupByCols.empty())
|
|
{
|
|
fRowGroupOut->setRowCount(1);
|
|
attachGroupConcatAg();
|
|
// For UDAF, reset the data
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
|
resetUDAF(rowUDAFColumnPtr, i);
|
|
}
|
|
}
|
|
}
|
|
|
|
// for 8k poc: an empty output row group to match message count
|
|
fEmptyRowGroup = *fRowGroupOut;
|
|
fEmptyRowData.reinit(*fRowGroupOut, 1);
|
|
fEmptyRowGroup.setData(&fEmptyRowData);
|
|
fEmptyRowGroup.resetRowGroup(0);
|
|
fEmptyRowGroup.initRow(&fEmptyRow);
|
|
fEmptyRowGroup.getRow(0, &fEmptyRow);
|
|
|
|
copyRow(fNullRow, &fEmptyRow);
|
|
|
|
if (fGroupByCols.empty()) // no groupby
|
|
fEmptyRowGroup.setRowCount(1);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Reset the working data to aggregate next logical block
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::aggReset()
|
|
{
|
|
bool disk_agg = fRm ? fRm->getAllowDiskAggregation() : false;
|
|
bool allow_gen = true;
|
|
for (auto& fun : fFunctionCols)
|
|
{
|
|
if (fun->fAggFunction == ROWAGG_UDAF || fun->fAggFunction == ROWAGG_GROUP_CONCAT ||
|
|
fun->fAggFunction == ROWAGG_JSON_ARRAY)
|
|
{
|
|
allow_gen = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
auto* compressor = compress::getCompressInterfaceByName(fCompStr);
|
|
|
|
if (fKeyOnHeap)
|
|
{
|
|
fRowAggStorage.reset(new RowAggStorage(fTmpDir, fRowGroupOut, &fKeyRG, fAggMapKeyCount, fRm,
|
|
fSessionMemLimit, disk_agg, allow_gen, compressor));
|
|
}
|
|
else
|
|
{
|
|
fRowAggStorage.reset(new RowAggStorage(fTmpDir, fRowGroupOut, fAggMapKeyCount, fRm, fSessionMemLimit,
|
|
disk_agg, allow_gen, compressor));
|
|
}
|
|
fRowGroupOut->getRow(0, &fRow);
|
|
copyNullRow(fRow);
|
|
attachGroupConcatAg();
|
|
|
|
// For UDAF, reset the data
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
|
resetUDAF(rowUDAFColumnPtr, i);
|
|
}
|
|
}
|
|
}
|
|
|
|
void RowAggregation::append(RowAggregation* other)
|
|
{
|
|
fRowAggStorage->append(*other->fRowAggStorage);
|
|
}
|
|
|
|
void RowAggregationUM::aggReset()
|
|
{
|
|
if (fKeyOnHeap)
|
|
{
|
|
fKeyRG = fRowGroupIn.truncate(fGroupByCols.size());
|
|
}
|
|
RowAggregation::aggReset();
|
|
}
|
|
|
|
void RowAggregation::aggregateRow(Row& row, const uint64_t* hash,
|
|
std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
|
{
|
|
uint32_t cnt = fRollupFlag ? fGroupByCols.size() : 1;
|
|
for (uint32_t z = 0; z < cnt; z++)
|
|
{
|
|
// groupby column list is not empty, find the entry.
|
|
if (!fGroupByCols.empty())
|
|
{
|
|
bool is_new_row;
|
|
if (hash != nullptr)
|
|
is_new_row = fRowAggStorage->getTargetRow(row, *hash, fRow);
|
|
else
|
|
is_new_row = fRowAggStorage->getTargetRow(row, fRow);
|
|
|
|
if (is_new_row)
|
|
{
|
|
initMapData(row);
|
|
attachGroupConcatAg();
|
|
|
|
// If there's UDAF involved, reset the user data.
|
|
if (fOrigFunctionCols)
|
|
{
|
|
// This is a multi-distinct query and fFunctionCols may not
|
|
// contain all the UDAF we need to reset
|
|
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
|
|
{
|
|
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get());
|
|
resetUDAF(rowUDAFColumnPtr, i);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
|
{
|
|
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
|
resetUDAF(rowUDAFColumnPtr, i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
updateEntry(row, rgContextColl);
|
|
// these quantities are unsigned and comparing z and cnt - 1 can be incorrect
|
|
// because cnt can be zero.
|
|
if ((z + 1 < cnt))
|
|
{
|
|
// if we are rolling up, we mark appropriate field as NULL and also increment
|
|
// value in the "mark" column, so that we can differentiate between data and
|
|
// various rollups.
|
|
row.setIntField(row.getIntField(cnt - 1) + 1, cnt - 1);
|
|
// cnt is number of columns in GROUP BY with mark included:
|
|
// year, store, MARK - cnt is 3, we have two columns to mark as NULL.
|
|
// z=0 will NULL store, z=1 will NULL year. We will not NULL MARK as
|
|
// z=2 does not satisfy z+1 < cnt.
|
|
// In other ords, the "z" counter goes from 0 to cnt-2, see the "if"
|
|
// condition above.Thus, least value of cnt - 2 - z is zero,
|
|
row.setToNull(cnt - 2 - z);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Initialize the working row, all aggregation fields to all null values or 0.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::initMapData(const Row& rowIn)
|
|
{
|
|
// First, copy the null row.
|
|
copyNullRow(fRow);
|
|
|
|
// Then, populate the groupby cols.
|
|
for (auto& fGroupByCol : fGroupByCols)
|
|
{
|
|
int64_t colOut = fGroupByCol->fOutputColumnIndex;
|
|
|
|
if (colOut == numeric_limits<unsigned int>::max())
|
|
continue;
|
|
|
|
int64_t colIn = fGroupByCol->fInputColumnIndex;
|
|
int colDataType = ((fRowGroupIn.getColTypes())[colIn]);
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
fRow.setIntField(rowIn.getIntField(colIn), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
if (LIKELY(rowIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
|
{
|
|
int128_t value = rowIn.getTSInt128Field(colIn).getValue();
|
|
fRow.setInt128Field(value, colOut);
|
|
}
|
|
else if (rowIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
fRow.setIntField(rowIn.getIntField(colIn), colOut);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregation::initMapData(): DECIMAL bad length.");
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
fRow.setUintField(rowIn.getUintField(colIn), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
{
|
|
int colWidth = fRowGroupIn.getColumnWidth(colIn);
|
|
|
|
if (colWidth <= 8)
|
|
{
|
|
fRow.setUintField(rowIn.getUintField(colIn), colOut);
|
|
}
|
|
else
|
|
{
|
|
fRow.setStringField(rowIn.getConstString(colIn), colOut);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
fRow.setDoubleField(rowIn.getDoubleField(colIn), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
fRow.setLongDoubleField(rowIn.getLongDoubleField(colIn), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
fRow.setFloatField(rowIn.getFloatField(colIn), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
fRow.setUintField(rowIn.getUintField(colIn), colOut);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Add group_concat to the initialized working row
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::attachGroupConcatAg()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Make all aggregation fields to null.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::makeAggFieldsNull(Row& row)
|
|
{
|
|
// initialize all bytes to 0
|
|
memset(row.getData(), 0, row.getSize());
|
|
// row.initToNull();
|
|
|
|
for (auto& fFunctionCol : fFunctionCols)
|
|
{
|
|
// Initial count fields to 0.
|
|
int64_t colOut = fFunctionCol->fOutputColumnIndex;
|
|
|
|
if (fFunctionCol->fAggFunction == ROWAGG_COUNT_ASTERISK ||
|
|
fFunctionCol->fAggFunction == ROWAGG_COUNT_COL_NAME ||
|
|
fFunctionCol->fAggFunction == ROWAGG_COUNT_DISTINCT_COL_NAME ||
|
|
fFunctionCol->fAggFunction == ROWAGG_COUNT_NO_OP || fFunctionCol->fAggFunction == ROWAGG_JSON_ARRAY ||
|
|
fFunctionCol->fAggFunction == ROWAGG_GROUP_CONCAT || fFunctionCol->fAggFunction == ROWAGG_STATS)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// ROWAGG_BIT_AND : 0xFFFFFFFFFFFFFFFFULL;
|
|
// ROWAGG_BIT_OR/ROWAGG_BIT_XOR : 0 (already set).
|
|
if (fFunctionCol->fAggFunction == ROWAGG_BIT_OR || fFunctionCol->fAggFunction == ROWAGG_BIT_XOR)
|
|
{
|
|
continue;
|
|
}
|
|
else if (fFunctionCol->fAggFunction == ROWAGG_BIT_AND)
|
|
{
|
|
row.setUintField(0xFFFFFFFFFFFFFFFFULL, colOut);
|
|
continue;
|
|
}
|
|
|
|
// Initial other aggregation fields to null.
|
|
int colDataType = (fRowGroupOut->getColTypes())[colOut];
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
row.setIntField(getIntNullValue(colDataType), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
row.setUintField(getUintNullValue(colDataType), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
uint32_t colWidth = fRowGroupOut->getColumnWidth(colOut);
|
|
if (LIKELY(colWidth == datatypes::MAXDECIMALWIDTH))
|
|
{
|
|
uint32_t offset = row.getOffset(colOut);
|
|
row.setBinaryField_offset(const_cast<int128_t*>(&datatypes::Decimal128Null), colWidth, offset);
|
|
}
|
|
else if (colWidth <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
row.setIntField(getUintNullValue(colDataType, colWidth), colOut);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregation::makeAggFieldsNull(): DECIMAL bad length.");
|
|
}
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
case execplan::CalpontSystemCatalog::VARBINARY:
|
|
case execplan::CalpontSystemCatalog::BLOB:
|
|
{
|
|
uint32_t colWidth = fRowGroupOut->getColumnWidth(colOut);
|
|
|
|
if (colWidth <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
row.setUintField(getUintNullValue(colDataType, colWidth), colOut);
|
|
}
|
|
else
|
|
{
|
|
row.setStringField(getStringNullValue(), colOut);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
row.setDoubleField(getDoubleNullValue(), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
row.setLongDoubleField(getLongDoubleNullValue(), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
row.setFloatField(getFloatNullValue(), colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
row.setUintField(getUintNullValue(colDataType), colOut);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the min/max fields if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group
|
|
// funcType(in) - aggregation function type
|
|
// Note: NULL value check must be done on UM & PM
|
|
// UM may receive NULL values, too.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::doMinMax(const Row& rowIn, int64_t colIn, int64_t colOut, int funcType)
|
|
{
|
|
int colDataType = (fRowGroupIn.getColTypes())[colIn];
|
|
|
|
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
|
|
return;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
int64_t valIn = rowIn.getIntField(colIn);
|
|
int64_t valOut = fRow.getIntField(colOut);
|
|
updateIntMinMax(valIn, valOut, colOut, funcType);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
if (LIKELY(rowIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
|
{
|
|
updateIntMinMax(rowIn.getTSInt128Field(colIn).getValue(), fRow.getTSInt128Field(colOut).getValue(),
|
|
colOut, funcType);
|
|
}
|
|
else if (rowIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
int64_t valIn = rowIn.getIntField(colIn);
|
|
int64_t valOut = fRow.getIntField(colOut);
|
|
updateIntMinMax(valIn, valOut, colOut, funcType);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregation::doMinMax(): DECIMAL bad length.");
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
uint64_t valIn = rowIn.getUintField(colIn);
|
|
uint64_t valOut = fRow.getUintField(colOut);
|
|
updateUintMinMax(valIn, valOut, colOut, funcType);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
{
|
|
auto valIn = rowIn.getStringField(colIn);
|
|
auto valOut = fRow.getStringField(colOut);
|
|
updateStringMinMax(valIn, valOut, colOut, funcType);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
double valIn = rowIn.getDoubleField(colIn);
|
|
double valOut = fRow.getDoubleField(colOut);
|
|
updateDoubleMinMax(valIn, valOut, colOut, funcType);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
float valIn = rowIn.getFloatField(colIn);
|
|
float valOut = fRow.getFloatField(colOut);
|
|
updateFloatMinMax(valIn, valOut, colOut, funcType);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
uint64_t valIn = rowIn.getUintField(colIn);
|
|
uint64_t valOut = fRow.getUintField(colOut);
|
|
updateUintMinMax(valIn, valOut, colOut, funcType);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
long double valIn = rowIn.getLongDoubleField(colIn);
|
|
long double valOut = fRow.getLongDoubleField(colOut);
|
|
updateLongDoubleMinMax(valIn, valOut, colOut, funcType);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the sum fields if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group
|
|
// funcType(in) - aggregation function type
|
|
// Note: NULL value check must be done on UM & PM
|
|
// UM may receive NULL values, too.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::doSum(const Row& rowIn, int64_t colIn, int64_t colOut, int funcType)
|
|
{
|
|
datatypes::SystemCatalog::ColDataType colDataType = rowIn.getColType(colIn);
|
|
long double valIn = 0;
|
|
bool isWideDataType = false;
|
|
int128_t wideValue = 0;
|
|
|
|
if (rowIn.isNullValue(colIn))
|
|
return;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
valIn = rowIn.getIntField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
valIn = rowIn.getUintField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
uint32_t width = rowIn.getColumnWidth(colIn);
|
|
isWideDataType = width == datatypes::MAXDECIMALWIDTH;
|
|
if (LIKELY(isWideDataType))
|
|
{
|
|
wideValue = rowIn.getTSInt128Field(colIn).getValue();
|
|
}
|
|
else if (width <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
uint32_t scale = rowIn.getScale(colIn);
|
|
valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregation::doSum(): DECIMAL bad length.");
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: sum(CHAR[]) is not supported.";
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
valIn = rowIn.getDoubleField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
valIn = rowIn.getFloatField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: sum(date|date time) is not supported.";
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
valIn = rowIn.getLongDoubleField(colIn);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(colDataType) && !isWideDataType)
|
|
{
|
|
if (LIKELY(!isNull(fRowGroupOut, fRow, colOut)))
|
|
{
|
|
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
|
|
int128_t sum = valOut + valIn;
|
|
fRow.setBinaryField(&sum, colOut);
|
|
}
|
|
else
|
|
{
|
|
int128_t sum = valIn;
|
|
fRow.setBinaryField(&sum, colOut);
|
|
}
|
|
}
|
|
else if (isWideDataType)
|
|
{
|
|
if (LIKELY(!isNull(fRowGroupOut, fRow, colOut)))
|
|
{
|
|
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
|
|
int128_t sum = valOut + wideValue;
|
|
fRow.setInt128Field(sum, colOut);
|
|
}
|
|
else
|
|
{
|
|
fRow.setInt128Field(wideValue, colOut);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (LIKELY(!isNull(fRowGroupOut, fRow, colOut)))
|
|
{
|
|
long double valOut = fRow.getLongDoubleField(colOut);
|
|
fRow.setLongDoubleField(valIn + valOut, colOut);
|
|
}
|
|
else
|
|
{
|
|
fRow.setLongDoubleField(valIn, colOut);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the and/or/xor fields if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group
|
|
// funcType(in) - aggregation function type
|
|
// Note: NULL value check must be done on UM & PM
|
|
// UM may receive NULL values, too.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut, int funcType)
|
|
{
|
|
int colDataType = (fRowGroupIn.getColTypes())[colIn];
|
|
|
|
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
|
|
return;
|
|
|
|
int64_t valIn = 0;
|
|
uint64_t uvalIn = 0;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
valIn = rowIn.getIntField(colIn);
|
|
|
|
if ((fRowGroupIn.getScale())[colIn] != 0)
|
|
{
|
|
valIn = rowIn.getIntField(colIn);
|
|
valIn /= IDB_pow[fRowGroupIn.getScale()[colIn] - 1];
|
|
|
|
if (valIn > 0)
|
|
valIn += 5;
|
|
else if (valIn < 0)
|
|
valIn -= 5;
|
|
|
|
valIn /= 10;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
uvalIn = rowIn.getUintField(colIn);
|
|
uint64_t uvalOut = fRow.getUintField(colOut);
|
|
|
|
if (funcType == ROWAGG_BIT_AND)
|
|
fRow.setUintField(uvalIn & uvalOut, colOut);
|
|
else if (funcType == ROWAGG_BIT_OR)
|
|
fRow.setUintField(uvalIn | uvalOut, colOut);
|
|
else
|
|
fRow.setUintField(uvalIn ^ uvalOut, colOut);
|
|
|
|
return;
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
{
|
|
auto str = rowIn.getStringField(colIn);
|
|
valIn = strtoll(str.safeString("").c_str(), nullptr, 10);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
double dbl = 0.0;
|
|
|
|
if (colDataType == execplan::CalpontSystemCatalog::DOUBLE ||
|
|
colDataType == execplan::CalpontSystemCatalog::UDOUBLE)
|
|
dbl = rowIn.getDoubleField(colIn);
|
|
else if (colDataType == execplan::CalpontSystemCatalog::LONGDOUBLE)
|
|
dbl = (double)rowIn.getLongDoubleField(colIn);
|
|
else
|
|
dbl = rowIn.getFloatField(colIn);
|
|
|
|
int64_t maxint = 0x7FFFFFFFFFFFFFFFLL;
|
|
int64_t minint = 0x8000000000000000LL;
|
|
|
|
if (dbl > maxint)
|
|
{
|
|
valIn = maxint;
|
|
}
|
|
else if (dbl < minint)
|
|
{
|
|
valIn = minint;
|
|
}
|
|
else
|
|
{
|
|
dbl += (dbl >= 0) ? 0.5 : -0.5;
|
|
valIn = (int64_t)dbl;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
{
|
|
uint64_t dt = rowIn.getUintField(colIn);
|
|
dt = dt & 0xFFFFFFC0; // no need to set spare bits to 3E, will shift out
|
|
valIn = ((dt >> 16) * 10000) + (((dt >> 12) & 0xF) * 100) + ((dt >> 6) & 077);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
{
|
|
uint64_t dtm = rowIn.getUintField(colIn);
|
|
valIn = ((dtm >> 48) * 10000000000LL) + (((dtm >> 44) & 0xF) * 100000000) +
|
|
(((dtm >> 38) & 077) * 1000000) + (((dtm >> 32) & 077) * 10000) + (((dtm >> 26) & 077) * 100) +
|
|
((dtm >> 20) & 077);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
uint64_t timestamp = rowIn.getUintField(colIn);
|
|
string str = DataConvert::timestampToString1(timestamp, fTimeZone);
|
|
// strip off micro seconds
|
|
str = str.substr(0, 14);
|
|
valIn = strtoll(str.c_str(), nullptr, 10);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
int64_t dtm = rowIn.getUintField(colIn);
|
|
// Handle negative correctly
|
|
int hour = 0;
|
|
|
|
if ((dtm >> 40) & 0x800)
|
|
{
|
|
hour = 0xfffff000;
|
|
}
|
|
|
|
hour |= ((dtm >> 40) & 0xfff);
|
|
valIn = (hour * 10000) + (((dtm >> 32) & 0xff) * 100) + ((dtm >> 24) & 0xff);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
int64_t valOut = fRow.getIntField(colOut);
|
|
|
|
if (funcType == ROWAGG_BIT_AND)
|
|
fRow.setIntField(valIn & valOut, colOut);
|
|
else if (funcType == ROWAGG_BIT_OR)
|
|
fRow.setIntField(valIn | valOut, colOut);
|
|
else
|
|
fRow.setIntField(valIn ^ valOut, colOut);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Marks the end of input into aggregation when aggregating multiple RowGroups.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::endOfInput()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Serialize this RowAggregation object into the specified ByteStream.
|
|
// Primary information to be serialized is the RowAggGroupByCol and
|
|
// RowAggFunctionCol vectors.
|
|
// bs(out) - ByteStream to be used in serialization.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::serialize(messageqcpp::ByteStream& bs) const
|
|
{
|
|
// groupby
|
|
uint64_t groupbyCount = fGroupByCols.size();
|
|
bs << groupbyCount;
|
|
|
|
for (uint64_t i = 0; i < groupbyCount; i++)
|
|
bs << *(fGroupByCols[i].get());
|
|
|
|
// aggregate function
|
|
uint64_t functionCount = fFunctionCols.size();
|
|
bs << functionCount;
|
|
|
|
for (uint64_t i = 0; i < functionCount; i++)
|
|
fFunctionCols[i]->serialize(bs);
|
|
|
|
messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
|
|
bs << timeZone;
|
|
bs << (int8_t)fRollupFlag;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Unserialze the specified ByteStream into this RowAggregation object.
|
|
// Primary information to be deserialized is the RowAggGroupByCol and
|
|
// RowAggFunctionCol vectors.
|
|
// bs(in) - ByteStream to be deserialized
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::deserialize(messageqcpp::ByteStream& bs)
|
|
{
|
|
// groupby
|
|
uint64_t groupbyCount = 0;
|
|
bs >> groupbyCount;
|
|
|
|
for (uint64_t i = 0; i < groupbyCount; i++)
|
|
{
|
|
SP_ROWAGG_GRPBY_t grpby(new RowAggGroupByCol(0, 0));
|
|
bs >> *(grpby.get());
|
|
fGroupByCols.push_back(grpby);
|
|
}
|
|
|
|
// aggregate function
|
|
uint64_t functionCount = 0;
|
|
bs >> functionCount;
|
|
|
|
for (uint64_t i = 0; i < functionCount; i++)
|
|
{
|
|
uint8_t funcType;
|
|
bs.peek(funcType);
|
|
SP_ROWAGG_FUNC_t funct;
|
|
|
|
if (funcType == ROWAGG_UDAF)
|
|
{
|
|
funct.reset(new RowUDAFFunctionCol(0, 0));
|
|
}
|
|
else
|
|
{
|
|
funct.reset(new RowAggFunctionCol(ROWAGG_FUNCT_UNDEFINE, ROWAGG_FUNCT_UNDEFINE, 0, 0));
|
|
}
|
|
|
|
funct->deserialize(bs);
|
|
fFunctionCols.push_back(funct);
|
|
}
|
|
|
|
messageqcpp::ByteStream::octbyte timeZone;
|
|
bs >> timeZone;
|
|
fTimeZone = timeZone;
|
|
uint8_t tmp8;
|
|
bs >> tmp8;
|
|
fRollupFlag = tmp8;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the aggregation totals in the internal hashmap for the specified row.
|
|
// NULL values are recognized and ignored for all agg functions except for
|
|
// COUNT(*), which counts all rows regardless of value.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
|
|
switch (fFunctionCols[i]->fAggFunction)
|
|
{
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
|
|
// if NOT null, let execution fall through.
|
|
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
|
|
break;
|
|
/* fall through */
|
|
|
|
case ROWAGG_COUNT_ASTERISK: fRow.setUintField<8>(fRow.getUintField<8>(colOut) + 1, colOut); break;
|
|
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX: doMinMax(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_SUM: doSum(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_AVG:
|
|
// count(column) for average is inserted after the sum,
|
|
// colOut+1 is the position of the aux count column.
|
|
doAvg(rowIn, colIn, colOut, colOut + 1);
|
|
break;
|
|
|
|
case ROWAGG_STATS: doStatistics(rowIn, colIn, colOut, colOut + 1); break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
doBitOp(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_COUNT_NO_OP:
|
|
case ROWAGG_DUP_FUNCT:
|
|
case ROWAGG_DUP_AVG:
|
|
case ROWAGG_DUP_STATS:
|
|
case ROWAGG_DUP_UDAF:
|
|
case ROWAGG_CONSTANT:
|
|
case ROWAGG_JSON_ARRAY:
|
|
case ROWAGG_GROUP_CONCAT: break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
doUDAF(rowIn, colIn, colOut, colOut + 1, i, rgContextColl);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: function (id = " << (uint64_t)fFunctionCols[i]->fAggFunction
|
|
<< ") is not supported.";
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Merge the aggregation subtotals in the internal hashmap for the specified row.
|
|
// NULL values are recognized and ignored for all agg functions except for
|
|
// COUNT(*), which counts all rows regardless of value.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::mergeEntries(const Row& rowIn)
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
|
|
switch (fFunctionCols[i]->fAggFunction)
|
|
{
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
fRow.setUintField<8>(fRow.getUintField<8>(colOut) + rowIn.getUintField<8>(colOut), colOut);
|
|
break;
|
|
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX: doMinMax(rowIn, colOut, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_SUM: doSum(rowIn, colOut, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_AVG:
|
|
// count(column) for average is inserted after the sum,
|
|
doAvg(rowIn, colOut, colOut, fFunctionCols[i]->fAuxColumnIndex, true);
|
|
break;
|
|
|
|
case ROWAGG_STATS: mergeStatistics(rowIn, colOut, fFunctionCols[i]->fAuxColumnIndex); break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR: doBitOp(rowIn, colOut, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_COUNT_NO_OP:
|
|
case ROWAGG_DUP_FUNCT:
|
|
case ROWAGG_DUP_AVG:
|
|
case ROWAGG_DUP_STATS:
|
|
case ROWAGG_DUP_UDAF:
|
|
case ROWAGG_CONSTANT:
|
|
case ROWAGG_JSON_ARRAY:
|
|
case ROWAGG_GROUP_CONCAT: break;
|
|
|
|
case ROWAGG_UDAF: doUDAF(rowIn, colOut, colOut, colOut + 1, i); break;
|
|
|
|
default:
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: function (id = " << (uint64_t)fFunctionCols[i]->fAggFunction
|
|
<< ") is not supported.";
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the sum and count fields for average if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group stores the sum
|
|
// colAux(in) - column in the output row group stores the count
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool merge)
|
|
{
|
|
if (rowIn.isNullValue(colIn))
|
|
return;
|
|
|
|
datatypes::SystemCatalog::ColDataType colDataType = rowIn.getColType(colIn);
|
|
long double valIn = 0;
|
|
bool isWideDataType = false;
|
|
int128_t wideValue = 0;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
valIn = rowIn.getIntField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
valIn = rowIn.getUintField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
uint32_t width = fRowGroupIn.getColumnWidth(colIn);
|
|
isWideDataType = width == datatypes::MAXDECIMALWIDTH;
|
|
if (LIKELY(isWideDataType))
|
|
{
|
|
wideValue = rowIn.getTSInt128Field(colIn).getValue();
|
|
}
|
|
else if (width <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
uint32_t scale = fRowGroupIn.getScale()[colIn];
|
|
valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregation::doAvg(): DECIMAL bad length.");
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
valIn = rowIn.getDoubleField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
valIn = rowIn.getFloatField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
valIn = rowIn.getLongDoubleField(colIn);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: no average for data type: " << colDataType;
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// min(count) = 0
|
|
uint64_t count = fRow.getUintField(colAux);
|
|
bool notFirstValue = count > 0;
|
|
|
|
// Set count column
|
|
if (merge)
|
|
{
|
|
fRow.setUintField<8>(count + rowIn.getUintField<8>(colAux), colAux);
|
|
}
|
|
else
|
|
{
|
|
fRow.setUintField<8>(count + 1, colAux);
|
|
}
|
|
|
|
// Set sum column
|
|
if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(colDataType) && !isWideDataType)
|
|
{
|
|
if (LIKELY(notFirstValue))
|
|
{
|
|
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
|
|
int128_t sum = valOut + valIn;
|
|
fRow.setBinaryField(&sum, colOut);
|
|
}
|
|
else
|
|
{
|
|
int128_t sum = valIn;
|
|
fRow.setBinaryField(&sum, colOut);
|
|
}
|
|
}
|
|
else if (isWideDataType)
|
|
{
|
|
if (LIKELY(notFirstValue))
|
|
{
|
|
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
|
|
int128_t sum = valOut + wideValue;
|
|
fRow.setInt128Field(sum, colOut);
|
|
}
|
|
else
|
|
{
|
|
fRow.setInt128Field(wideValue, colOut);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (LIKELY(notFirstValue))
|
|
{
|
|
long double valOut = fRow.getLongDoubleField(colOut);
|
|
fRow.setLongDoubleField(valIn + valOut, colOut);
|
|
}
|
|
else // This is the first value
|
|
fRow.setLongDoubleField(valIn, colOut);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the sum and count fields for average if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group stores the count
|
|
// colAux(in) - column in the output row group stores the mean(x)
|
|
// colAux + 1 - column in the output row group stores the sum(x_i - mean)^2
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux)
|
|
{
|
|
int colDataType = (fRowGroupIn.getColTypes())[colIn];
|
|
|
|
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
|
|
return;
|
|
|
|
long double valIn = 0.0;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT: valIn = (long double)rowIn.getIntField(colIn); break;
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL: // handle scale later
|
|
case execplan::CalpontSystemCatalog::UDECIMAL: // handle scale later
|
|
if (LIKELY(fRowGroupIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
|
{
|
|
// To save from unaligned memory
|
|
datatypes::TSInt128 val128In = rowIn.getTSInt128Field(colIn);
|
|
valIn = static_cast<long double>(val128In.toTFloat128());
|
|
}
|
|
else if (fRowGroupIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
valIn = (long double)rowIn.getIntField(colIn);
|
|
}
|
|
else
|
|
{
|
|
idbassert(false);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT: valIn = (long double)rowIn.getUintField(colIn); break;
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE: valIn = (long double)rowIn.getDoubleField(colIn); break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT: valIn = (long double)rowIn.getFloatField(colIn); break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE: valIn = rowIn.getLongDoubleField(colIn); break;
|
|
|
|
default:
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: no average for data type: " << colDataType;
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
|
|
double count = fRow.getDoubleField(colOut) + 1.0;
|
|
long double mean = fRow.getLongDoubleField(colAux);
|
|
long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1);
|
|
volatile long double delta = valIn - mean;
|
|
mean += delta / count;
|
|
scaledMomentum2 += delta * (valIn - mean);
|
|
|
|
fRow.setDoubleField(count, colOut);
|
|
fRow.setLongDoubleField(mean, colAux);
|
|
fRow.setLongDoubleField(scaledMomentum2, colAux + 1);
|
|
}
|
|
|
|
void RowAggregation::mergeStatistics(const Row& rowIn, uint64_t colOut, uint64_t colAux)
|
|
{
|
|
fRow.setDoubleField(fRow.getDoubleField(colOut) + rowIn.getDoubleField(colOut), colOut);
|
|
fRow.setLongDoubleField(fRow.getLongDoubleField(colAux) + rowIn.getLongDoubleField(colAux), colAux);
|
|
fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + rowIn.getLongDoubleField(colAux + 1),
|
|
colAux + 1);
|
|
}
|
|
|
|
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
|
|
uint64_t& funcColsIdx, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
|
{
|
|
std::vector<mcsv1sdk::mcsv1Context>* udafContextsCollPtr = &fRGContextColl;
|
|
if (UNLIKELY(rgContextColl != nullptr))
|
|
{
|
|
udafContextsCollPtr = rgContextColl;
|
|
}
|
|
|
|
std::vector<mcsv1sdk::mcsv1Context>& udafContextsColl = *udafContextsCollPtr;
|
|
uint32_t paramCount = udafContextsColl[funcColsIdx].getParameterCount();
|
|
// doUDAF changes funcColsIdx to skip UDAF arguments so the real UDAF
|
|
// column idx is the initial value of the funcColsIdx
|
|
uint64_t origFuncColsIdx = funcColsIdx;
|
|
// The vector of parameters to be sent to the UDAF
|
|
utils::VLArray<mcsv1sdk::ColumnDatum> valsIn(paramCount);
|
|
utils::VLArray<uint32_t> dataFlags(paramCount);
|
|
execplan::ConstantColumn* cc;
|
|
bool bIsNull = false;
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType;
|
|
|
|
for (uint32_t i = 0; i < paramCount; ++i)
|
|
{
|
|
mcsv1sdk::ColumnDatum& datum = valsIn[i];
|
|
// Turn on NULL flags based on the data
|
|
dataFlags[i] = 0;
|
|
|
|
// If this particular parameter is a constant, then we need
|
|
// to access the constant value rather than a row value.
|
|
cc = nullptr;
|
|
|
|
if (fFunctionCols[funcColsIdx]->fpConstCol)
|
|
{
|
|
cc = dynamic_cast<execplan::ConstantColumn*>(fFunctionCols[funcColsIdx]->fpConstCol.get());
|
|
}
|
|
|
|
if ((cc && cc->isNull()) || (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
|
|
{
|
|
if (udafContextsColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
|
{
|
|
// When Ignore nulls, if there are multiple parameters and any
|
|
// one of them is NULL, we ignore the entry. We need to increment
|
|
// funcColsIdx the number of extra parameters.
|
|
funcColsIdx += paramCount - i - 1;
|
|
return;
|
|
}
|
|
|
|
dataFlags[i] |= mcsv1sdk::PARAM_IS_NULL;
|
|
}
|
|
|
|
if (cc)
|
|
{
|
|
colDataType = cc->resultType().colDataType;
|
|
}
|
|
else
|
|
{
|
|
colDataType = fRowGroupIn.getColTypes()[colIn];
|
|
}
|
|
|
|
if (!(dataFlags[i] & mcsv1sdk::PARAM_IS_NULL))
|
|
{
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getIntVal(const_cast<Row&>(rowIn), bIsNull);
|
|
datum.scale = cc->resultType().scale;
|
|
datum.precision = cc->resultType().precision;
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getIntField(colIn);
|
|
datum.scale = fRowGroupIn.getScale()[colIn];
|
|
datum.precision = fRowGroupIn.getPrecision()[colIn];
|
|
}
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
datum.dataType = colDataType;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getDecimalVal(const_cast<Row&>(rowIn), bIsNull).value;
|
|
datum.scale = cc->resultType().scale;
|
|
datum.precision = cc->resultType().precision;
|
|
}
|
|
else
|
|
{
|
|
if (LIKELY(fRowGroupIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
|
{
|
|
// We can't control boost::any asignment
|
|
// so let's get an aligned memory
|
|
datatypes::TSInt128 val = rowIn.getTSInt128Field(colIn);
|
|
datum.columnData = val.s128Value;
|
|
}
|
|
else if (fRowGroupIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
datum.columnData = rowIn.getIntField(colIn);
|
|
}
|
|
else
|
|
{
|
|
idbassert(false);
|
|
}
|
|
datum.scale = fRowGroupIn.getScale()[colIn];
|
|
datum.precision = fRowGroupIn.getPrecision()[colIn];
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getUintVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getUintField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::DOUBLE;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getDoubleVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getDoubleField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::LONGDOUBLE;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getLongDoubleVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getLongDoubleField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::FLOAT;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getFloatVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getFloatField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getDateIntVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getUintField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getDatetimeIntVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getUintField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getTimestampIntVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getUintField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getTimeIntVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getIntField(colIn);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
case execplan::CalpontSystemCatalog::VARBINARY:
|
|
case execplan::CalpontSystemCatalog::CLOB:
|
|
case execplan::CalpontSystemCatalog::BLOB:
|
|
{
|
|
datum.dataType = colDataType;
|
|
|
|
if (cc)
|
|
{
|
|
datum.columnData = cc->getStrVal(const_cast<Row&>(rowIn), bIsNull);
|
|
}
|
|
else
|
|
{
|
|
datum.columnData = rowIn.getStringField(colIn);
|
|
}
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation " << udafContextsColl[origFuncColsIdx].getName()
|
|
<< ": No logic for data type: " << colDataType;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// MCOL-1201: If there are multiple parameters, the next fFunctionCols
|
|
// will have the column used. By incrementing the funcColsIdx (passed by
|
|
// ref, we also increment the caller's index.
|
|
if (fFunctionCols.size() > funcColsIdx + 1 &&
|
|
fFunctionCols[funcColsIdx + 1]->fAggFunction == ROWAGG_MULTI_PARM)
|
|
{
|
|
++funcColsIdx;
|
|
colIn = fFunctionCols[funcColsIdx]->fInputColumnIndex;
|
|
colOut = fFunctionCols[funcColsIdx]->fOutputColumnIndex;
|
|
(void)colOut;
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
// The intermediate values are stored in userData referenced by colAux.
|
|
udafContextsColl[origFuncColsIdx].setDataFlags(dataFlags);
|
|
udafContextsColl[origFuncColsIdx].setUserData(fRow.getUserData(colAux));
|
|
|
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
|
rc = udafContextsColl[origFuncColsIdx].getFunction()->nextValue(&udafContextsColl[origFuncColsIdx], valsIn);
|
|
udafContextsColl[origFuncColsIdx].setUserData(NULL);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[origFuncColsIdx].get());
|
|
rowUDAF->bInterrupted = true;
|
|
throw logging::QueryDataExcept(udafContextsColl[origFuncColsIdx].getErrorMessage(),
|
|
logging::aggregateFuncErr);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Concatenate multiple RowGroup data into one byte stream. This is for matching
|
|
// the message counts of request and response.
|
|
//
|
|
// This function should be used by PM when result set is large than one RowGroup.
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregation::loadResult(messageqcpp::ByteStream& bs)
|
|
{
|
|
uint32_t sz = 0;
|
|
messageqcpp::ByteStream rgdbs;
|
|
while (auto rgd = fRowAggStorage->getNextRGData())
|
|
{
|
|
++sz;
|
|
fRowGroupOut->setData(rgd.get());
|
|
fRowGroupOut->serializeRGData(rgdbs);
|
|
}
|
|
|
|
if (sz == 0)
|
|
{
|
|
sz = 1;
|
|
RGData rgd(*fRowGroupOut, 1);
|
|
fRowGroupOut->setData(&rgd);
|
|
fRowGroupOut->resetRowGroup(0);
|
|
fRowGroupOut->serializeRGData(rgdbs);
|
|
}
|
|
bs << sz;
|
|
bs.append(rgdbs.buf(), rgdbs.length());
|
|
}
|
|
|
|
void RowAggregation::loadEmptySet(messageqcpp::ByteStream& bs)
|
|
{
|
|
bs << (uint32_t)1;
|
|
fEmptyRowGroup.serializeRGData(bs);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Row Aggregation constructor used on UM
|
|
// For one-phase case, from projected RG to final aggregated RG
|
|
//------------------------------------------------------------------------------
|
|
RowAggregationUM::RowAggregationUM(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit,
|
|
bool withRollup)
|
|
: RowAggregation(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
|
, fHasAvg(false)
|
|
, fHasStatsFunc(false)
|
|
, fHasUDAF(false)
|
|
, fTotalMemUsage(0)
|
|
, fLastMemUsage(0)
|
|
{
|
|
// Check if there are any avg, stats or UDAF functions.
|
|
// These flags are used in finalize.
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction == ROWAGG_AVG || fFunctionCols[i]->fAggFunction == ROWAGG_DISTINCT_AVG)
|
|
fHasAvg = true;
|
|
else if (fFunctionCols[i]->fAggFunction == ROWAGG_STATS)
|
|
fHasStatsFunc = true;
|
|
else if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
|
fHasUDAF = true;
|
|
}
|
|
|
|
// Check if all groupby column selected
|
|
for (uint64_t i = 0; i < fGroupByCols.size(); i++)
|
|
{
|
|
if (fGroupByCols[i]->fInputColumnIndex != fGroupByCols[i]->fOutputColumnIndex)
|
|
{
|
|
fKeyOnHeap = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
RowAggregationUM::RowAggregationUM(const RowAggregationUM& rhs)
|
|
: RowAggregation(rhs)
|
|
, fHasAvg(rhs.fHasAvg)
|
|
, fHasStatsFunc(rhs.fHasStatsFunc)
|
|
, fHasUDAF(rhs.fHasUDAF)
|
|
, fExpression(rhs.fExpression)
|
|
, fTotalMemUsage(rhs.fTotalMemUsage)
|
|
, fConstantAggregate(rhs.fConstantAggregate)
|
|
, fGroupConcat(rhs.fGroupConcat)
|
|
, fLastMemUsage(rhs.fLastMemUsage)
|
|
{
|
|
}
|
|
|
|
RowAggregationUM::~RowAggregationUM()
|
|
{
|
|
fRm->returnMemory(fTotalMemUsage, fSessionMemLimit);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Marks the end of RowGroup input into aggregation.
|
|
//
|
|
// This function should be used by UM when aggregating multiple RowGroups.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::endOfInput()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Initilalize the Group Concat data
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::initialize(bool hasGroupConcat)
|
|
{
|
|
if (fGroupConcat.size() > 0)
|
|
fFunctionColGc = fFunctionCols;
|
|
|
|
if (fKeyOnHeap)
|
|
{
|
|
fKeyRG = fRowGroupIn.truncate(fGroupByCols.size());
|
|
}
|
|
|
|
RowAggregation::initialize(fGroupConcat.size() > 0);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregation finalization can be performed here. For example, this is
|
|
// where fixing the duplicates and dividing the SUM by COUNT to get the AVG.
|
|
//
|
|
// This function should be used by UM when aggregating multiple RowGroups.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::finalize()
|
|
{
|
|
// copy the duplicates functions, except AVG
|
|
fixDuplicates(ROWAGG_DUP_FUNCT);
|
|
|
|
// UM: it is time to divide SUM by COUNT for any AVG cols.
|
|
if (fHasAvg)
|
|
{
|
|
calculateAvgColumns();
|
|
|
|
// copy the duplicate AVGs, if any
|
|
fixDuplicates(ROWAGG_DUP_AVG);
|
|
}
|
|
|
|
// UM: it is time to calculate statistics functions
|
|
if (fHasStatsFunc)
|
|
{
|
|
// covers duplicats, too.
|
|
calculateStatisticsFunctions();
|
|
}
|
|
|
|
if (fHasUDAF)
|
|
{
|
|
calculateUDAFColumns();
|
|
// copy the duplicate UDAF, if any
|
|
fixDuplicates(ROWAGG_DUP_UDAF);
|
|
}
|
|
|
|
if (fGroupConcat.size() > 0)
|
|
setGroupConcatString();
|
|
|
|
if (fConstantAggregate.size() > 0)
|
|
fixConstantAggregate();
|
|
|
|
if (fExpression.size() > 0)
|
|
evaluateExpression();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Add group_concat to the initialized working row
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::attachGroupConcatAg()
|
|
{
|
|
if (fGroupConcat.size() > 0)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
uint64_t i = 0, j = 0;
|
|
|
|
for (; i < fFunctionColGc.size(); i++)
|
|
{
|
|
int64_t colOut = fFunctionColGc[i]->fOutputColumnIndex;
|
|
|
|
if (fFunctionColGc[i]->fAggFunction == ROWAGG_GROUP_CONCAT)
|
|
{
|
|
// save the object's address in the result row
|
|
SP_GroupConcatAg gcc(new joblist::GroupConcatAgUM(fGroupConcat[j++]));
|
|
fGroupConcatAg.push_back(gcc);
|
|
*((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get();
|
|
}
|
|
|
|
if (fFunctionColGc[i]->fAggFunction == ROWAGG_JSON_ARRAY)
|
|
{
|
|
// save the object's address in the result row
|
|
SP_GroupConcatAg gcc(new joblist::JsonArrayAggregatAgUM(fGroupConcat[j++]));
|
|
fGroupConcatAg.push_back(gcc);
|
|
*((GroupConcatAg**)(data + fRow.getOffset(colOut))) = gcc.get();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the aggregation totals in the internal hashmap for the specified row.
|
|
// NULL values are recognized and ignored for all agg functions except for count
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
|
|
|
|
switch (fFunctionCols[i]->fAggFunction)
|
|
{
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
|
|
// if NOT null, let execution fall through.
|
|
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
|
|
break;
|
|
/* fall through */
|
|
|
|
case ROWAGG_COUNT_ASTERISK: fRow.setUintField<8>(fRow.getUintField<8>(colOut) + 1, colOut); break;
|
|
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX: doMinMax(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_SUM: doSum(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_AVG:
|
|
{
|
|
// The sum and count on UM may not be put next to each other:
|
|
// use colOut to store the sum;
|
|
// use colAux to store the count.
|
|
doAvg(rowIn, colIn, colOut, colAux);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
doStatistics(rowIn, colIn, colOut, colAux);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
doBitOp(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_GROUP_CONCAT:
|
|
{
|
|
doGroupConcat(rowIn, colIn, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_JSON_ARRAY:
|
|
{
|
|
doJsonAgg(rowIn, colIn, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_COUNT_NO_OP:
|
|
case ROWAGG_DUP_FUNCT:
|
|
case ROWAGG_DUP_AVG:
|
|
case ROWAGG_DUP_STATS:
|
|
case ROWAGG_DUP_UDAF:
|
|
case ROWAGG_CONSTANT: break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
// need a exception to show the value
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregationUM: function (id = " << (uint64_t)fFunctionCols[i]->fAggFunction
|
|
<< ") is not supported.";
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Concat columns.
|
|
// rowIn(in) - Row that contains the columns to be concatenated.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::doGroupConcat(const Row& rowIn, int64_t, int64_t o)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + fRow.getOffset(o)));
|
|
gccAg->processRow(rowIn);
|
|
}
|
|
|
|
void RowAggregationUM::doJsonAgg(const Row& rowIn, int64_t, int64_t o)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
|
|
gccAg->processRow(rowIn);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// After all PM rowgroups received, calculate the average value.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::calculateAvgColumns()
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction == ROWAGG_AVG || fFunctionCols[i]->fAggFunction == ROWAGG_DISTINCT_AVG)
|
|
{
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
|
|
|
|
for (uint64_t j = 0; j < fRowGroupOut->getRowCount(); j++)
|
|
{
|
|
fRowGroupOut->getRow(j, &fRow);
|
|
uint64_t cnt = fRow.getIntField(colAux);
|
|
|
|
if (cnt == 0) // empty set, value is initialized to null.
|
|
continue;
|
|
|
|
uint32_t precision = fRow.getPrecision(colOut);
|
|
bool isWideDecimal = datatypes::Decimal::isWideDecimalTypeByPrecision(precision);
|
|
|
|
if (!isWideDecimal)
|
|
{
|
|
long double sum = 0.0;
|
|
long double avg = 0.0;
|
|
sum = fRow.getLongDoubleField(colOut);
|
|
avg = sum / cnt;
|
|
fRow.setLongDoubleField(avg, colOut);
|
|
}
|
|
else
|
|
{
|
|
uint32_t scale = fRow.getScale(colOut);
|
|
// Get multiplied to deliver AVG with the scale closest
|
|
// to the expected original scale + 4.
|
|
// There is a counterpart in buildAggregateColumn.
|
|
datatypes::Decimal::setScalePrecision4Avg(precision, scale);
|
|
int128_t sumPnt = fRow.getTSInt128Field(colOut).getValue();
|
|
uint32_t scaleDiff = scale - fRow.getScale(colOut);
|
|
// multiplication overflow check
|
|
datatypes::MultiplicationOverflowCheck multOp;
|
|
int128_t sum = 0;
|
|
if (scaleDiff > 0)
|
|
multOp(sumPnt, datatypes::mcs_pow_10[scaleDiff], sum);
|
|
else
|
|
sum = sumPnt;
|
|
datatypes::lldiv_t_128 avgAndRem = datatypes::lldiv128(sum, cnt);
|
|
// Round the last digit
|
|
if (datatypes::abs(avgAndRem.rem) * 2 >= (int128_t)cnt)
|
|
{
|
|
if (utils::is_negative(avgAndRem.rem))
|
|
{
|
|
avgAndRem.quot--;
|
|
}
|
|
else
|
|
{
|
|
avgAndRem.quot++;
|
|
}
|
|
}
|
|
fRow.setInt128Field(avgAndRem.quot, colOut);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sets the value from valOut into column colOut, performing any conversions.
|
|
void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
|
|
{
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType = fRowGroupOut->getColTypes()[colOut];
|
|
|
|
if (valOut.empty())
|
|
{
|
|
// Fields are initialized to NULL, which is what we want for empty;
|
|
return;
|
|
}
|
|
int64_t intOut;
|
|
uint64_t uintOut;
|
|
float floatOut;
|
|
double doubleOut;
|
|
long double longdoubleOut;
|
|
ostringstream oss;
|
|
utils::NullString strOut;
|
|
|
|
bool bSetSuccess = false;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::BIT:
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
if (valOut.compatible(charTypeId))
|
|
{
|
|
intOut = valOut.cast<char>();
|
|
bSetSuccess = true;
|
|
}
|
|
else if (valOut.compatible(scharTypeId))
|
|
{
|
|
intOut = valOut.cast<signed char>();
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
if (bSetSuccess)
|
|
{
|
|
fRow.setIntField<1>(intOut, colOut);
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
if (valOut.compatible(shortTypeId))
|
|
{
|
|
intOut = valOut.cast<short>();
|
|
fRow.setIntField<2>(intOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
if (valOut.compatible(intTypeId))
|
|
{
|
|
intOut = valOut.cast<int>();
|
|
bSetSuccess = true;
|
|
}
|
|
else if (valOut.compatible(longTypeId))
|
|
{
|
|
intOut = valOut.cast<long>();
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
if (bSetSuccess)
|
|
{
|
|
fRow.setIntField<4>(intOut, colOut);
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
if (valOut.compatible(llTypeId))
|
|
{
|
|
intOut = valOut.cast<long long>();
|
|
fRow.setIntField<8>(intOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
else if (valOut.compatible(int128TypeId))
|
|
{
|
|
int128_t int128Out = valOut.cast<int128_t>();
|
|
fRow.setInt128Field(int128Out, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
if (valOut.compatible(ucharTypeId))
|
|
{
|
|
uintOut = valOut.cast<unsigned char>();
|
|
fRow.setUintField<1>(uintOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
if (valOut.compatible(ushortTypeId))
|
|
{
|
|
uintOut = valOut.cast<unsigned short>();
|
|
fRow.setUintField<2>(uintOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
if (valOut.compatible(uintTypeId))
|
|
{
|
|
uintOut = valOut.cast<unsigned int>();
|
|
fRow.setUintField<4>(uintOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
if (valOut.compatible(ulongTypeId))
|
|
{
|
|
uintOut = valOut.cast<unsigned long>();
|
|
fRow.setUintField<8>(uintOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
if (valOut.compatible(ulongTypeId))
|
|
{
|
|
uintOut = valOut.cast<unsigned long>();
|
|
fRow.setUintField<8>(uintOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
if (valOut.compatible(floatTypeId))
|
|
{
|
|
floatOut = valOut.cast<float>();
|
|
fRow.setFloatField(floatOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
if (valOut.compatible(doubleTypeId))
|
|
{
|
|
doubleOut = valOut.cast<double>();
|
|
fRow.setDoubleField(doubleOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
// XXX: check for empty valOut value? E.g., we can use nullptr inside any.
|
|
if (valOut.compatible(strTypeId))
|
|
{
|
|
utils::NullString s = valOut.cast<utils::NullString>();
|
|
fRow.setStringField(s, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::VARBINARY:
|
|
case execplan::CalpontSystemCatalog::CLOB:
|
|
case execplan::CalpontSystemCatalog::BLOB:
|
|
if (valOut.compatible(strTypeId))
|
|
{
|
|
strOut = valOut.cast<NullString>();
|
|
fRow.setVarBinaryField(strOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
if (valOut.compatible(doubleTypeId))
|
|
{
|
|
longdoubleOut = valOut.cast<long double>();
|
|
fRow.setLongDoubleField(longdoubleOut, colOut);
|
|
bSetSuccess = true;
|
|
}
|
|
|
|
break;
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: No logic for data type: " << colDataType;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!bSetSuccess)
|
|
{
|
|
// This means the return from the UDAF doesn't match the field
|
|
// This handles the mismatch
|
|
SetUDAFAnyValue(valOut, colOut);
|
|
}
|
|
// reset valOut to be ready for the next value
|
|
valOut.reset();
|
|
}
|
|
|
|
void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut)
|
|
{
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType = fRowGroupOut->getColTypes()[colOut];
|
|
|
|
// This may seem a bit convoluted. Users shouldn't return a type
|
|
// that they didn't set in mcsv1_UDAF::init(), but this
|
|
// handles whatever return type is given and casts
|
|
// it to whatever they said to return.
|
|
// TODO: Save cpu cycles here. For one, we don't need to initialize these
|
|
|
|
int64_t intOut = 0;
|
|
uint64_t uintOut = 0;
|
|
double doubleOut = 0.0;
|
|
long double longdoubleOut = 0.0;
|
|
int128_t int128Out = 0;
|
|
ostringstream oss;
|
|
utils::NullString strOut;
|
|
|
|
if (valOut.compatible(charTypeId))
|
|
{
|
|
int128Out = uintOut = intOut = valOut.cast<char>();
|
|
doubleOut = intOut;
|
|
oss << intOut;
|
|
}
|
|
else if (valOut.compatible(scharTypeId))
|
|
{
|
|
int128Out = uintOut = intOut = valOut.cast<signed char>();
|
|
doubleOut = intOut;
|
|
oss << intOut;
|
|
}
|
|
else if (valOut.compatible(shortTypeId))
|
|
{
|
|
int128Out = uintOut = intOut = valOut.cast<short>();
|
|
doubleOut = intOut;
|
|
oss << intOut;
|
|
}
|
|
else if (valOut.compatible(intTypeId))
|
|
{
|
|
int128Out = uintOut = intOut = valOut.cast<int>();
|
|
doubleOut = intOut;
|
|
oss << intOut;
|
|
}
|
|
else if (valOut.compatible(longTypeId))
|
|
{
|
|
int128Out = uintOut = intOut = valOut.cast<long>();
|
|
doubleOut = intOut;
|
|
oss << intOut;
|
|
}
|
|
else if (valOut.compatible(llTypeId))
|
|
{
|
|
int128Out = uintOut = intOut = valOut.cast<long long>();
|
|
doubleOut = intOut;
|
|
oss << intOut;
|
|
}
|
|
else if (valOut.compatible(ucharTypeId))
|
|
{
|
|
int128Out = intOut = uintOut = valOut.cast<unsigned char>();
|
|
doubleOut = uintOut;
|
|
oss << uintOut;
|
|
}
|
|
else if (valOut.compatible(ushortTypeId))
|
|
{
|
|
int128Out = intOut = uintOut = valOut.cast<unsigned short>();
|
|
doubleOut = uintOut;
|
|
oss << uintOut;
|
|
}
|
|
else if (valOut.compatible(uintTypeId))
|
|
{
|
|
int128Out = intOut = uintOut = valOut.cast<unsigned int>();
|
|
doubleOut = uintOut;
|
|
oss << uintOut;
|
|
}
|
|
else if (valOut.compatible(ulongTypeId))
|
|
{
|
|
int128Out = intOut = uintOut = valOut.cast<unsigned long>();
|
|
doubleOut = uintOut;
|
|
oss << uintOut;
|
|
}
|
|
else if (valOut.compatible(ullTypeId))
|
|
{
|
|
int128Out = intOut = uintOut = valOut.cast<unsigned long long>();
|
|
doubleOut = uintOut;
|
|
oss << uintOut;
|
|
}
|
|
else if (valOut.compatible(int128TypeId))
|
|
{
|
|
intOut = uintOut = int128Out = valOut.cast<int128_t>();
|
|
doubleOut = uintOut;
|
|
oss << uintOut;
|
|
}
|
|
else if (valOut.compatible(floatTypeId) || valOut.compatible(doubleTypeId))
|
|
{
|
|
// Should look at scale for decimal and adjust
|
|
doubleOut = valOut.cast<float>();
|
|
int128Out = doubleOut;
|
|
intOut = uintOut = doubleOut;
|
|
oss << doubleOut;
|
|
}
|
|
else if (valOut.compatible(longdoubleTypeId))
|
|
{
|
|
// Should look at scale for decimal and adjust
|
|
longdoubleOut = valOut.cast<long double>();
|
|
int128Out = longdoubleOut;
|
|
doubleOut = (double)longdoubleOut;
|
|
uintOut = (uint64_t)doubleOut;
|
|
intOut = (int64_t)doubleOut;
|
|
oss << doubleOut;
|
|
}
|
|
|
|
if (valOut.compatible(strTypeId))
|
|
{
|
|
strOut = valOut.cast<utils::NullString>();
|
|
// Convert the string to numeric type, just in case.
|
|
intOut = atol(strOut.str());
|
|
uintOut = strtoul(strOut.str(), nullptr, 10);
|
|
doubleOut = strtod(strOut.str(), nullptr);
|
|
longdoubleOut = strtold(strOut.str(), nullptr);
|
|
int128Out = longdoubleOut;
|
|
}
|
|
else
|
|
{
|
|
strOut.assign(oss.str());
|
|
}
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::BIT:
|
|
case execplan::CalpontSystemCatalog::TINYINT: fRow.setIntField<1>(intOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::SMALLINT: fRow.setIntField<2>(intOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT: fRow.setIntField<4>(intOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::BIGINT: fRow.setIntField<8>(intOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
uint32_t width = fRowGroupOut->getColumnWidth(colOut);
|
|
if (width == datatypes::MAXDECIMALWIDTH)
|
|
fRow.setInt128Field(int128Out, colOut);
|
|
else
|
|
fRow.setIntField<8>(intOut, colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT: fRow.setUintField<1>(uintOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::USMALLINT: fRow.setUintField<2>(uintOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT: fRow.setUintField<4>(uintOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::UBIGINT: fRow.setUintField<8>(uintOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP: fRow.setUintField<8>(uintOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::TIME: fRow.setIntField<8>(intOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
float floatOut = (float)doubleOut;
|
|
fRow.setFloatField(floatOut, colOut);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE: fRow.setDoubleField(doubleOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT: fRow.setStringField(strOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::VARBINARY:
|
|
case execplan::CalpontSystemCatalog::CLOB:
|
|
case execplan::CalpontSystemCatalog::BLOB: fRow.setVarBinaryField(strOut, colOut); break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE: fRow.setLongDoubleField(longdoubleOut, colOut); break;
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregation: No logic for data type: " << colDataType;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
//
|
|
// For each rowgroup, calculate the final value.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::calculateUDAFColumns()
|
|
{
|
|
RowUDAFFunctionCol* rowUDAF = nullptr;
|
|
static_any::any valOut;
|
|
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction != ROWAGG_UDAF)
|
|
continue;
|
|
|
|
rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
|
fRGContext = rowUDAF->fUDAFContext;
|
|
|
|
int64_t colOut = rowUDAF->fOutputColumnIndex;
|
|
int64_t colAux = rowUDAF->fAuxColumnIndex;
|
|
|
|
// At this point, each row is an aggregated GROUP BY.
|
|
for (uint64_t j = 0; j < fRowGroupOut->getRowCount(); j++)
|
|
{
|
|
// Get the user data from the row and evaluate.
|
|
fRowGroupOut->getRow(j, &fRow);
|
|
|
|
// Turn the NULL flag off. We can't know NULL at this point
|
|
fRGContext.setDataFlags(nullptr);
|
|
|
|
// The intermediate values are stored in colAux.
|
|
fRGContext.setUserData(fRow.getUserData(colAux));
|
|
// Call the UDAF evaluate function
|
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
|
rc = fRGContext.getFunction()->evaluate(&fRGContext, valOut);
|
|
fRGContext.setUserData(nullptr);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
rowUDAF->bInterrupted = true;
|
|
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
// Set the returned value into the output row
|
|
SetUDAFValue(valOut, colOut);
|
|
}
|
|
|
|
fRGContext.setUserData(nullptr);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// After all PM rowgroups received, calculate the statistics.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::calculateStatisticsFunctions()
|
|
{
|
|
// ROWAGG_DUP_STATS may be not strictly duplicates, covers for statistics functions.
|
|
// They are calculated based on the same set of data: sum(x), sum(x**2) and count.
|
|
// array of <aux index, count> for duplicates
|
|
boost::scoped_array<pair<double, uint64_t>> auxCount(new pair<double, uint64_t>[fRow.getColumnCount()]);
|
|
|
|
fRowGroupOut->getRow(0, &fRow);
|
|
|
|
for (uint64_t j = 0; j < fRowGroupOut->getRowCount(); j++, fRow.nextRow())
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction == ROWAGG_STATS ||
|
|
fFunctionCols[i]->fAggFunction == ROWAGG_DUP_STATS)
|
|
{
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
|
|
|
|
double cnt = fRow.getDoubleField(colOut);
|
|
|
|
if (fFunctionCols[i]->fAggFunction == ROWAGG_STATS)
|
|
{
|
|
auxCount[colOut].first = cnt;
|
|
auxCount[colOut].second = colAux;
|
|
}
|
|
else // ROWAGG_DUP_STATS
|
|
{
|
|
cnt = auxCount[colAux].first;
|
|
colAux = auxCount[colAux].second;
|
|
}
|
|
|
|
if (cnt == 0.0) // empty set, set null.
|
|
{
|
|
fRow.setUintField(joblist::DOUBLENULL, colOut);
|
|
}
|
|
else if (cnt == 1.0)
|
|
{
|
|
if (fFunctionCols[i]->fStatsFunction == ROWAGG_STDDEV_SAMP ||
|
|
fFunctionCols[i]->fStatsFunction == ROWAGG_VAR_SAMP)
|
|
fRow.setUintField(joblist::DOUBLENULL, colOut);
|
|
else
|
|
fRow.setDoubleField(0.0, colOut);
|
|
}
|
|
else // count > 1
|
|
{
|
|
long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1);
|
|
|
|
uint32_t scale = fRow.getScale(colOut);
|
|
auto factor = datatypes::scaleDivisor<long double>(scale);
|
|
|
|
if (scale != 0) // adjust the scale if necessary
|
|
{
|
|
scaledMomentum2 /= factor * factor;
|
|
}
|
|
|
|
if (fFunctionCols[i]->fStatsFunction == ROWAGG_STDDEV_POP)
|
|
scaledMomentum2 = sqrt(scaledMomentum2 / cnt);
|
|
else if (fFunctionCols[i]->fStatsFunction == ROWAGG_STDDEV_SAMP)
|
|
scaledMomentum2 = sqrt(scaledMomentum2 / (cnt - 1));
|
|
else if (fFunctionCols[i]->fStatsFunction == ROWAGG_VAR_POP)
|
|
scaledMomentum2 = scaledMomentum2 / cnt;
|
|
else if (fFunctionCols[i]->fStatsFunction == ROWAGG_VAR_SAMP)
|
|
scaledMomentum2 = scaledMomentum2 / (cnt - 1);
|
|
|
|
fRow.setDoubleField(scaledMomentum2, colOut);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Fix the duplicate function columns -- same function same column id repeated
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::fixDuplicates(RowAggFunctionType funct)
|
|
{
|
|
// find out if any column matches funct
|
|
vector<SP_ROWAGG_FUNC_t> dup;
|
|
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
if (fFunctionCols[i]->fAggFunction == funct)
|
|
dup.push_back(fFunctionCols[i]);
|
|
}
|
|
|
|
if (0 == dup.size())
|
|
return;
|
|
|
|
// fix each row in the row group
|
|
fRowGroupOut->getRow(0, &fRow);
|
|
|
|
for (uint64_t i = 0; i < fRowGroupOut->getRowCount(); i++, fRow.nextRow())
|
|
{
|
|
for (uint64_t j = 0; j < dup.size(); j++)
|
|
fRow.copyField(dup[j]->fOutputColumnIndex, dup[j]->fAuxColumnIndex);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Evaluate the functions and expressions
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::evaluateExpression()
|
|
{
|
|
funcexp::FuncExp* fe = funcexp::FuncExp::instance();
|
|
fRowGroupOut->getRow(0, &fRow);
|
|
|
|
for (uint64_t i = 0; i < fRowGroupOut->getRowCount(); i++, fRow.nextRow())
|
|
{
|
|
fe->evaluate(fRow, fExpression);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Calculate the aggregate(constant) columns
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::fixConstantAggregate()
|
|
{
|
|
// find the field has the count(*).
|
|
int64_t cntIdx = 0;
|
|
|
|
for (uint64_t k = 0; k < fFunctionCols.size(); k++)
|
|
{
|
|
if (fFunctionCols[k]->fAggFunction == ROWAGG_CONSTANT)
|
|
{
|
|
cntIdx = fFunctionCols[k]->fAuxColumnIndex;
|
|
break;
|
|
}
|
|
}
|
|
|
|
fRowGroupOut->getRow(0, &fRow);
|
|
|
|
for (uint64_t i = 0; i < fRowGroupOut->getRowCount(); i++, fRow.nextRow())
|
|
{
|
|
int64_t rowCnt = fRow.getIntField(cntIdx);
|
|
vector<ConstantAggData>::iterator j = fConstantAggregate.begin();
|
|
|
|
for (uint64_t k = 0; k < fFunctionCols.size(); k++)
|
|
{
|
|
if (fFunctionCols[k]->fAggFunction == ROWAGG_CONSTANT)
|
|
{
|
|
if (j->isNull() || rowCnt == 0)
|
|
doNullConstantAggregate(*j, k);
|
|
else
|
|
doNotNullConstantAggregate(*j, k);
|
|
|
|
j++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Calculate the aggregate(null) columns
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, uint64_t i)
|
|
{
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
int colDataType = (fRowGroupOut->getColTypes())[colOut];
|
|
|
|
switch (aggData.fOp)
|
|
{
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX:
|
|
case ROWAGG_AVG:
|
|
case ROWAGG_SUM:
|
|
case ROWAGG_DISTINCT_AVG:
|
|
case ROWAGG_DISTINCT_SUM:
|
|
case ROWAGG_STATS:
|
|
{
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
fRow.setIntField(getIntNullValue(colDataType), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
auto width = fRow.getColumnWidth(colOut);
|
|
if (fRow.getColumnWidth(colOut) == datatypes::MAXDECIMALWIDTH)
|
|
{
|
|
fRow.setInt128Field(datatypes::Decimal128Null, colOut);
|
|
}
|
|
else if (width <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
fRow.setIntField(getIntNullValue(colDataType), colOut);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregationUM::doNullConstantAggregate(): DECIMAL bad length.");
|
|
}
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
fRow.setUintField(getUintNullValue(colDataType), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
fRow.setDoubleField(getDoubleNullValue(), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
fRow.setFloatField(getFloatNullValue(), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
fRow.setUintField(getUintNullValue(colDataType), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
fRow.setIntField(getIntNullValue(colDataType), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
default:
|
|
{
|
|
utils::NullString nullstr;
|
|
fRow.setStringField(nullstr, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
fRow.setLongDoubleField(getLongDoubleNullValue(), colOut);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
case ROWAGG_COUNT_DISTINCT_COL_NAME:
|
|
{
|
|
fRow.setUintField(0, colOut);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
{
|
|
fRow.setUintField(0xFFFFFFFFFFFFFFFFULL, colOut);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
fRow.setUintField(0, colOut);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
// For a NULL constant, call nextValue with NULL and then evaluate.
|
|
bool bInterrupted = false;
|
|
fRGContext.setInterrupted(bInterrupted);
|
|
fRGContext.createUserData();
|
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
|
mcsv1sdk::ColumnDatum valsIn[1];
|
|
|
|
// Call a reset, then nextValue, then execute. This will evaluate
|
|
// the UDAF for the constant.
|
|
rc = fRGContext.getFunction()->reset(&fRGContext);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
fRGContext.setInterrupted(true);
|
|
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
#if 0
|
|
uint32_t dataFlags[fRGContext.getParameterCount()];
|
|
|
|
for (uint32_t i = 0; i < fRGContext.getParameterCount(); ++i)
|
|
{
|
|
mcsv1sdk::ColumnDatum& datum = valsIn[i];
|
|
// Turn on NULL flags
|
|
dataFlags[i] = 0;
|
|
}
|
|
|
|
#endif
|
|
// Turn the NULL and CONSTANT flags on.
|
|
uint32_t flags[1];
|
|
flags[0] = mcsv1sdk::PARAM_IS_NULL | mcsv1sdk::PARAM_IS_CONSTANT;
|
|
fRGContext.setDataFlags(flags);
|
|
|
|
// Create a dummy datum
|
|
mcsv1sdk::ColumnDatum& datum = valsIn[0];
|
|
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
|
|
datum.columnData = 0;
|
|
|
|
rc = fRGContext.getFunction()->nextValue(&fRGContext, valsIn);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
fRGContext.setInterrupted(true);
|
|
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
static_any::any valOut;
|
|
rc = fRGContext.getFunction()->evaluate(&fRGContext, valOut);
|
|
fRGContext.setUserData(nullptr);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
fRGContext.setInterrupted(true);
|
|
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
// Set the returned value into the output row
|
|
SetUDAFValue(valOut, colOut);
|
|
fRGContext.setDataFlags(nullptr);
|
|
}
|
|
break;
|
|
|
|
default:
|
|
{
|
|
utils::NullString nullstr;
|
|
fRow.setStringField(nullstr, colOut);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Calculate the aggregate(const) columns
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData, uint64_t i)
|
|
{
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
auto colDataType = (fRowGroupOut->getColTypes())[colOut];
|
|
int64_t rowCnt = fRow.getIntField(fFunctionCols[i]->fAuxColumnIndex);
|
|
|
|
switch (aggData.fOp)
|
|
{
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX:
|
|
case ROWAGG_AVG:
|
|
case ROWAGG_DISTINCT_AVG:
|
|
case ROWAGG_DISTINCT_SUM:
|
|
{
|
|
switch (colDataType)
|
|
{
|
|
// AVG should not be int result type.
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
fRow.setIntField(strtol(aggData.fConstValue.safeString("").c_str(), nullptr, 10), colOut);
|
|
}
|
|
break;
|
|
|
|
// AVG should not be uint32_t result type.
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
fRow.setUintField(strtoul(aggData.fConstValue.safeString("").c_str(), nullptr, 10), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
auto width = fRow.getColumnWidth(colOut);
|
|
if (width == datatypes::MAXDECIMALWIDTH)
|
|
{
|
|
datatypes::TypeHolderStd colType;
|
|
colType.colWidth = width;
|
|
colType.precision = fRow.getPrecision(i);
|
|
colType.scale = fRow.getScale(i);
|
|
colType.colDataType = colDataType;
|
|
fRow.setInt128Field(colType.decimal128FromString(aggData.fConstValue), colOut);
|
|
}
|
|
else if (width <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
double dbl = strtod(aggData.fConstValue.safeString("").c_str(), 0);
|
|
auto scale = datatypes::scaleDivisor<double>(fRowGroupOut->getScale()[i]);
|
|
// TODO: isn't overflow possible below:
|
|
fRow.setIntField((int64_t)(scale * dbl), colOut);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregationUM::doNotNullConstantAggregate(): DECIMAL bad length.");
|
|
}
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
fRow.setDoubleField(strtod(aggData.fConstValue.safeString("").c_str(), nullptr), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
fRow.setLongDoubleField(strtold(aggData.fConstValue.safeString("").c_str(), nullptr), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
fRow.setFloatField(strtof(aggData.fConstValue.safeString("").c_str(), nullptr), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
{
|
|
fRow.setUintField(DataConvert::stringToDate(aggData.fConstValue), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
{
|
|
fRow.setUintField(DataConvert::stringToDatetime(aggData.fConstValue), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
fRow.setUintField(DataConvert::stringToTimestamp(aggData.fConstValue, fTimeZone), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
fRow.setIntField(DataConvert::stringToTime(aggData.fConstValue), colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
default:
|
|
{
|
|
fRow.setStringField(aggData.fConstValue, colOut);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_SUM:
|
|
{
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
int64_t constVal = strtol(aggData.fConstValue.safeString("").c_str(), nullptr, 10);
|
|
|
|
if (constVal != 0)
|
|
{
|
|
int64_t tmp = numeric_limits<int64_t>::max() / constVal;
|
|
|
|
if (constVal < 0)
|
|
tmp = numeric_limits<int64_t>::min() / constVal;
|
|
|
|
if (rowCnt > tmp)
|
|
throw logging::QueryDataExcept(overflowMsg, logging::aggregateDataErr);
|
|
}
|
|
|
|
fRow.setIntField(constVal * rowCnt, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
uint64_t constVal = strtoul(aggData.fConstValue.safeString("").c_str(), nullptr, 10);
|
|
fRow.setUintField(constVal * rowCnt, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
auto width = fRow.getColumnWidth(colOut);
|
|
if (width == datatypes::MAXDECIMALWIDTH)
|
|
{
|
|
datatypes::TypeHolderStd colType;
|
|
colType.colWidth = width;
|
|
colType.precision = fRow.getPrecision(i);
|
|
colType.scale = fRow.getScale(i);
|
|
colType.colDataType = colDataType;
|
|
int128_t constValue = colType.decimal128FromString(aggData.fConstValue);
|
|
int128_t sum;
|
|
|
|
datatypes::MultiplicationOverflowCheck multOp;
|
|
multOp(constValue, rowCnt, sum);
|
|
fRow.setInt128Field(sum, colOut);
|
|
}
|
|
else if (width == datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
double dbl = strtod(aggData.fConstValue.safeString("").c_str(), 0);
|
|
// TODO: isn't precision loss possible below?
|
|
dbl *= datatypes::scaleDivisor<double>(fRowGroupOut->getScale()[i]);
|
|
dbl *= rowCnt;
|
|
|
|
if ((dbl > 0 && dbl > (double)numeric_limits<int64_t>::max()) ||
|
|
(dbl < 0 && dbl < (double)numeric_limits<int64_t>::min()))
|
|
throw logging::QueryDataExcept(overflowMsg, logging::aggregateDataErr);
|
|
fRow.setIntField((int64_t)dbl, colOut);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error(
|
|
"RowAggregationUM::doNotNullConstantAggregate(): sum() DECIMAL bad length.");
|
|
}
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
double dbl = strtod(aggData.fConstValue.safeString("").c_str(), nullptr) * rowCnt;
|
|
fRow.setDoubleField(dbl, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
long double dbl = strtold(aggData.fConstValue.safeString("").c_str(), nullptr) * rowCnt;
|
|
fRow.setLongDoubleField(dbl, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
double flt;
|
|
flt = strtof(aggData.fConstValue.safeString("").c_str(), nullptr) * rowCnt;
|
|
fRow.setFloatField(flt, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
default:
|
|
{
|
|
// will not be here, checked in tupleaggregatestep.cpp.
|
|
utils::NullString nullstr;
|
|
fRow.setStringField(nullstr, colOut);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
fRow.setIntField(0, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
fRow.setUintField(0, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
fRow.setDoubleField(0.0, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
fRow.setLongDoubleField(0.0, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
fRow.setFloatField(0.0, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
{
|
|
fRow.setUintField(0, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
fRow.setUintField(0, colOut);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
default:
|
|
{
|
|
utils::NullString nullstr;
|
|
fRow.setStringField(nullstr, colOut);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
{
|
|
fRow.setUintField(rowCnt, colOut);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_COUNT_DISTINCT_COL_NAME:
|
|
{
|
|
fRow.setUintField(1, colOut);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
{
|
|
double dbl = strtod(aggData.fConstValue.safeString("").c_str(), nullptr);
|
|
dbl += (dbl > 0) ? 0.5 : -0.5;
|
|
int64_t intVal = (int64_t)dbl;
|
|
fRow.setUintField(intVal, colOut);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
fRow.setUintField(0, colOut);
|
|
}
|
|
break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
bool bInterrupted = false;
|
|
fRGContext.setInterrupted(bInterrupted);
|
|
fRGContext.createUserData();
|
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
|
mcsv1sdk::ColumnDatum valsIn[1];
|
|
|
|
// Call a reset, then nextValue, then execute. This will evaluate
|
|
// the UDAF for the constant.
|
|
rc = fRGContext.getFunction()->reset(&fRGContext);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
fRGContext.setInterrupted(true);
|
|
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
// Turn the CONSTANT flags on.
|
|
uint32_t flags[1];
|
|
flags[0] = mcsv1sdk::PARAM_IS_CONSTANT;
|
|
fRGContext.setDataFlags(flags);
|
|
|
|
// Create a datum item for sending to UDAF
|
|
mcsv1sdk::ColumnDatum& datum = valsIn[0];
|
|
datum.dataType = (execplan::CalpontSystemCatalog::ColDataType)colDataType;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
datum.columnData = strtol(aggData.fConstValue.safeString("").c_str(), nullptr, 10);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
datum.columnData = strtoul(aggData.fConstValue.safeString("").c_str(), nullptr, 10);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
double dbl = strtod(aggData.fConstValue.safeString("").c_str(), 0);
|
|
// TODO: isn't overflow possible below?
|
|
datum.columnData = (int64_t)(dbl * datatypes::scaleDivisor<double>(fRowGroupOut->getScale()[i]));
|
|
datum.scale = fRowGroupOut->getScale()[i];
|
|
datum.precision = fRowGroupOut->getPrecision()[i];
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
datum.columnData = strtod(aggData.fConstValue.safeString("").c_str(), nullptr);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
datum.columnData = strtold(aggData.fConstValue.safeString("").c_str(), nullptr);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
datum.columnData = strtof(aggData.fConstValue.safeString("").c_str(), nullptr);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATE:
|
|
{
|
|
datum.columnData = DataConvert::stringToDate(aggData.fConstValue);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::DATETIME:
|
|
{
|
|
datum.columnData = DataConvert::stringToDatetime(aggData.fConstValue);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::TIMESTAMP:
|
|
{
|
|
datum.columnData = DataConvert::stringToTimestamp(aggData.fConstValue, fTimeZone);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::TIME:
|
|
{
|
|
datum.columnData = DataConvert::stringToTime(aggData.fConstValue);
|
|
}
|
|
break;
|
|
|
|
case execplan::CalpontSystemCatalog::CHAR:
|
|
case execplan::CalpontSystemCatalog::VARCHAR:
|
|
case execplan::CalpontSystemCatalog::TEXT:
|
|
case execplan::CalpontSystemCatalog::VARBINARY:
|
|
case execplan::CalpontSystemCatalog::BLOB:
|
|
default:
|
|
{
|
|
datum.columnData = aggData.fConstValue;
|
|
}
|
|
break;
|
|
}
|
|
|
|
rc = fRGContext.getFunction()->nextValue(&fRGContext, valsIn);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
fRGContext.setInterrupted(true);
|
|
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
static_any::any valOut;
|
|
rc = fRGContext.getFunction()->evaluate(&fRGContext, valOut);
|
|
fRGContext.setUserData(nullptr);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
fRGContext.setInterrupted(true);
|
|
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
|
|
// Set the returned value into the output row
|
|
SetUDAFValue(valOut, colOut);
|
|
fRGContext.setDataFlags(nullptr);
|
|
}
|
|
break;
|
|
|
|
default:
|
|
{
|
|
fRow.setStringField(aggData.fConstValue, colOut);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Allocate a new data array for the output RowGroup
|
|
// return - true if successfully allocated
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUM::setGroupConcatString()
|
|
{
|
|
fRowGroupOut->getRow(0, &fRow);
|
|
|
|
for (uint64_t i = 0; i < fRowGroupOut->getRowCount(); i++, fRow.nextRow())
|
|
{
|
|
for (uint64_t j = 0; j < fFunctionCols.size(); j++)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
|
|
if (fFunctionCols[j]->fAggFunction == ROWAGG_GROUP_CONCAT)
|
|
{
|
|
uint8_t* buff = data + fRow.getOffset(fFunctionCols[j]->fOutputColumnIndex);
|
|
uint8_t* gcString;
|
|
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)buff);
|
|
gcString = gccAg->getResult();
|
|
utils::ConstString str((char*)gcString, gcString ? strlen((const char*)gcString) : 0);
|
|
fRow.setStringField(str, fFunctionCols[j]->fOutputColumnIndex);
|
|
// gccAg->getResult(buff);
|
|
}
|
|
|
|
if (fFunctionCols[j]->fAggFunction == ROWAGG_JSON_ARRAY)
|
|
{
|
|
uint8_t* buff = data + fRow.getOffset(fFunctionCols[j]->fOutputColumnIndex);
|
|
uint8_t* gcString;
|
|
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)buff);
|
|
gcString = gccAg->getResult();
|
|
utils::ConstString str((char*)gcString, gcString ? strlen((char*)gcString) : 0);
|
|
fRow.setStringField(str, fFunctionCols[j]->fOutputColumnIndex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void RowAggregationUM::setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut)
|
|
{
|
|
RowAggregation::setInputOutput(pRowGroupIn, pRowGroupOut);
|
|
|
|
if (fKeyOnHeap)
|
|
{
|
|
fKeyRG = fRowGroupIn.truncate(fGroupByCols.size());
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Returns the next group of aggregated rows.
|
|
// We do not yet cache large aggregations (more than 1 RowGroup result set)
|
|
// to disk, which means, the hashmap is limited to the size of RowGroups in mem
|
|
// (since we use the memory from the output RowGroups for our internal hashmap).
|
|
//
|
|
// This function should be used by UM when aggregating multiple RowGroups.
|
|
//
|
|
// return - false indicates all aggregated RowGroups have been returned,
|
|
// else more aggregated RowGroups remain.
|
|
//------------------------------------------------------------------------------
|
|
bool RowAggregationUM::nextRowGroup()
|
|
{
|
|
fCurRGData = fRowAggStorage->getNextRGData();
|
|
bool more = static_cast<bool>(fCurRGData);
|
|
|
|
if (more)
|
|
{
|
|
// load the top result set
|
|
fRowGroupOut->setData(fCurRGData.get());
|
|
}
|
|
|
|
return more;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Row Aggregation constructor used on UM
|
|
// For 2nd phase of two-phase case, from partial RG to final aggregated RG
|
|
//------------------------------------------------------------------------------
|
|
RowAggregationUMP2::RowAggregationUMP2(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager* r, boost::shared_ptr<int64_t> sessionLimit,
|
|
bool withRollup)
|
|
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, withRollup)
|
|
{
|
|
}
|
|
|
|
RowAggregationUMP2::RowAggregationUMP2(const RowAggregationUMP2& rhs) : RowAggregationUM(rhs)
|
|
{
|
|
}
|
|
|
|
RowAggregationUMP2::~RowAggregationUMP2()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the aggregation totals in the internal hashmap for the specified row.
|
|
// NULL values are recognized and ignored for all agg functions except for count
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUMP2::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
|
|
|
|
switch (fFunctionCols[i]->fAggFunction)
|
|
{
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
{
|
|
uint64_t count = fRow.getUintField<8>(colOut) + rowIn.getUintField<8>(colIn);
|
|
fRow.setUintField<8>(count, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX: doMinMax(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_SUM: doSum(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_AVG:
|
|
{
|
|
// The sum and count on UM may not be put next to each other:
|
|
// use colOut to store the sum;
|
|
// use colAux to store the count.
|
|
doAvg(rowIn, colIn, colOut, colAux);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
doStatistics(rowIn, colIn, colOut, colAux);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
doBitOp(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_GROUP_CONCAT:
|
|
{
|
|
doGroupConcat(rowIn, colIn, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_JSON_ARRAY:
|
|
{
|
|
doJsonAgg(rowIn, colIn, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_COUNT_NO_OP:
|
|
case ROWAGG_DUP_FUNCT:
|
|
case ROWAGG_DUP_AVG:
|
|
case ROWAGG_DUP_STATS:
|
|
case ROWAGG_DUP_UDAF:
|
|
case ROWAGG_CONSTANT: break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregationUMP2: function (id = " << (uint64_t)fFunctionCols[i]->fAggFunction
|
|
<< ") is not supported.";
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the sum and count fields for average if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group stores the sum
|
|
// colAux(in) - column in the output row group stores the count
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUMP2::doAvg(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, bool merge)
|
|
{
|
|
if (rowIn.isNullValue(colIn))
|
|
return;
|
|
|
|
datatypes::SystemCatalog::ColDataType colDataType = rowIn.getColType(colIn);
|
|
long double valIn = 0;
|
|
bool isWideDataType = false;
|
|
int128_t wideValue = 0;
|
|
|
|
switch (colDataType)
|
|
{
|
|
case execplan::CalpontSystemCatalog::TINYINT:
|
|
case execplan::CalpontSystemCatalog::SMALLINT:
|
|
case execplan::CalpontSystemCatalog::MEDINT:
|
|
case execplan::CalpontSystemCatalog::INT:
|
|
case execplan::CalpontSystemCatalog::BIGINT:
|
|
{
|
|
valIn = rowIn.getIntField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::UTINYINT:
|
|
case execplan::CalpontSystemCatalog::USMALLINT:
|
|
case execplan::CalpontSystemCatalog::UMEDINT:
|
|
case execplan::CalpontSystemCatalog::UINT:
|
|
case execplan::CalpontSystemCatalog::UBIGINT:
|
|
{
|
|
valIn = rowIn.getUintField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DECIMAL:
|
|
case execplan::CalpontSystemCatalog::UDECIMAL:
|
|
{
|
|
uint32_t width = rowIn.getColumnWidth(colIn);
|
|
isWideDataType = width == datatypes::MAXDECIMALWIDTH;
|
|
if (LIKELY(isWideDataType))
|
|
{
|
|
wideValue = rowIn.getTSInt128Field(colIn).getValue();
|
|
}
|
|
else if (width <= datatypes::MAXLEGACYWIDTH)
|
|
{
|
|
uint32_t scale = rowIn.getScale(colIn);
|
|
valIn = rowIn.getScaledSInt64FieldAsXFloat<long double>(colIn, scale);
|
|
}
|
|
else
|
|
{
|
|
idbassert(0);
|
|
throw std::logic_error("RowAggregationUMP2::doAvg(): DECIMAL bad length.");
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::DOUBLE:
|
|
case execplan::CalpontSystemCatalog::UDOUBLE:
|
|
{
|
|
valIn = rowIn.getDoubleField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::FLOAT:
|
|
case execplan::CalpontSystemCatalog::UFLOAT:
|
|
{
|
|
valIn = rowIn.getFloatField(colIn);
|
|
break;
|
|
}
|
|
|
|
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
|
{
|
|
valIn = rowIn.getLongDoubleField(colIn);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregationUMP2: no average for data type: " << colDataType;
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
|
|
uint64_t cnt = fRow.getUintField(colAux);
|
|
auto colAuxIn = merge ? colAux : (colIn + 1);
|
|
|
|
if (datatypes::hasUnderlyingWideDecimalForSumAndAvg(colDataType) && !isWideDataType)
|
|
{
|
|
if (LIKELY(cnt > 0))
|
|
{
|
|
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
|
|
int128_t sum = valIn + valOut;
|
|
fRow.setBinaryField(&sum, colOut);
|
|
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
|
|
}
|
|
else
|
|
{
|
|
int128_t sum = valIn;
|
|
fRow.setBinaryField(&sum, colOut);
|
|
fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
|
|
}
|
|
}
|
|
else if (isWideDataType)
|
|
{
|
|
if (LIKELY(cnt > 0))
|
|
{
|
|
int128_t valOut = fRow.getTSInt128Field(colOut).getValue();
|
|
;
|
|
int128_t sum = valOut + wideValue;
|
|
fRow.setInt128Field(sum, colOut);
|
|
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
|
|
}
|
|
else
|
|
{
|
|
fRow.setInt128Field(wideValue, colOut);
|
|
fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (LIKELY(cnt > 0))
|
|
{
|
|
long double valOut = fRow.getLongDoubleField(colOut);
|
|
fRow.setLongDoubleField(valIn + valOut, colOut);
|
|
fRow.setUintField(rowIn.getUintField(colAuxIn) + cnt, colAux);
|
|
}
|
|
else
|
|
{
|
|
fRow.setLongDoubleField(valIn, colOut);
|
|
fRow.setUintField(rowIn.getUintField(colAuxIn), colAux);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the sum and count fields for stattistics if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group stores the count/logical block
|
|
// colIn + 1 - column in the input row group stores the mean(x)/logical block
|
|
// colIn + 2 - column in the input row group stores the sum(x_i - mean)^2/logical block
|
|
// colOut(in) - column in the output row group stores the count
|
|
// colAux(in) - column in the output row group stores the mean(x)
|
|
// colAux + 1 - column in the output row group stores the sum(x_i - mean)^2
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUMP2::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux)
|
|
{
|
|
double count = fRow.getDoubleField(colOut);
|
|
long double mean = fRow.getLongDoubleField(colAux);
|
|
long double scaledMomentum2 = fRow.getLongDoubleField(colAux + 1);
|
|
|
|
double blockCount = rowIn.getDoubleField(colIn);
|
|
long double blockMean = rowIn.getLongDoubleField(colIn + 1);
|
|
long double blockScaledMomentum2 = rowIn.getLongDoubleField(colIn + 2);
|
|
|
|
double nextCount = count + blockCount;
|
|
long double nextMean;
|
|
long double nextScaledMomentum2;
|
|
if (nextCount == 0)
|
|
{
|
|
nextMean = 0;
|
|
nextScaledMomentum2 = 0;
|
|
}
|
|
else
|
|
{
|
|
volatile long double delta = mean - blockMean;
|
|
nextMean = (mean * count + blockMean * blockCount) / nextCount;
|
|
nextScaledMomentum2 =
|
|
scaledMomentum2 + blockScaledMomentum2 + delta * delta * (count * blockCount / nextCount);
|
|
}
|
|
fRow.setDoubleField(nextCount, colOut);
|
|
fRow.setLongDoubleField(nextMean, colAux);
|
|
fRow.setLongDoubleField(nextScaledMomentum2, colAux + 1);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Concat columns.
|
|
// rowIn(in) - Row that contains the columns to be concatenated.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUMP2::doGroupConcat(const Row& rowIn, int64_t i, int64_t o)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + fRow.getOffset(o)));
|
|
gccAg->merge(rowIn, i);
|
|
}
|
|
|
|
void RowAggregationUMP2::doJsonAgg(const Row& rowIn, int64_t i, int64_t o)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
|
|
gccAg->merge(rowIn, i);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the and/or/xor fields if input is not null.
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group
|
|
// funcType(in) - aggregation function type
|
|
// Note: NULL value check must be done on UM & PM
|
|
// UM may receive NULL values, too.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut, int funcType)
|
|
{
|
|
uint64_t valIn = rowIn.getUintField(colIn);
|
|
uint64_t valOut = fRow.getUintField(colOut);
|
|
|
|
if (funcType == ROWAGG_BIT_AND)
|
|
fRow.setUintField(valIn & valOut, colOut);
|
|
else if (funcType == ROWAGG_BIT_OR)
|
|
fRow.setUintField(valIn | valOut, colOut);
|
|
else
|
|
fRow.setUintField(valIn ^ valOut, colOut);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Subaggregate the UDAF. This calls subaggregate for each partially
|
|
// aggregated row returned by the PM
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// colIn(in) - column in the input row group
|
|
// colOut(in) - column in the output row group
|
|
// colAux(in) - Where the UDAF userdata resides
|
|
// rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance
|
|
// rgContextColl(in) - ptr to a vector that brings UDAF contextx in
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
|
|
uint64_t& funcColsIdx, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
|
{
|
|
static_any::any valOut;
|
|
std::vector<mcsv1sdk::mcsv1Context>* udafContextsCollPtr = &fRGContextColl;
|
|
if (UNLIKELY(rgContextColl != nullptr))
|
|
{
|
|
udafContextsCollPtr = rgContextColl;
|
|
}
|
|
|
|
std::vector<mcsv1sdk::mcsv1Context>& udafContextsColl = *udafContextsCollPtr;
|
|
|
|
// Get the user data
|
|
boost::shared_ptr<mcsv1sdk::UserData> userDataIn = rowIn.getUserData(colIn + 1);
|
|
|
|
// Unlike other aggregates, the data isn't in colIn, so testing it for NULL
|
|
// there won't help. In case of NULL, userData will be NULL.
|
|
uint32_t flags[1];
|
|
|
|
flags[0] = 0;
|
|
|
|
if (!userDataIn)
|
|
{
|
|
if (udafContextsColl[funcColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
|
{
|
|
return;
|
|
}
|
|
|
|
// Turn on NULL flags
|
|
flags[0] |= mcsv1sdk::PARAM_IS_NULL;
|
|
}
|
|
|
|
udafContextsColl[funcColsIdx].setDataFlags(flags);
|
|
|
|
// The intermediate values are stored in colAux.
|
|
udafContextsColl[funcColsIdx].setUserData(fRow.getUserData(colAux));
|
|
|
|
// Call the UDAF subEvaluate method
|
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
|
rc = udafContextsColl[funcColsIdx].getFunction()->subEvaluate(&udafContextsColl[funcColsIdx],
|
|
userDataIn.get());
|
|
udafContextsColl[funcColsIdx].setUserData(NULL);
|
|
|
|
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
|
{
|
|
auto* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColsIdx].get());
|
|
rowUDAF->bInterrupted = true;
|
|
throw logging::IDBExcept(udafContextsColl[funcColsIdx].getErrorMessage(), logging::aggregateFuncErr);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
//------------------------------------------------------------------------------
|
|
RowAggregationDistinct::RowAggregationDistinct(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager* r,
|
|
boost::shared_ptr<int64_t> sessionLimit)
|
|
: RowAggregationUMP2(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, false)
|
|
{
|
|
}
|
|
|
|
RowAggregationDistinct::RowAggregationDistinct(const RowAggregationDistinct& rhs)
|
|
: RowAggregationUMP2(rhs), fRowGroupDist(rhs.fRowGroupDist)
|
|
{
|
|
fAggregator.reset(rhs.fAggregator->clone());
|
|
}
|
|
|
|
RowAggregationDistinct::~RowAggregationDistinct()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregation
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationDistinct::setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut)
|
|
{
|
|
fRowGroupIn = fRowGroupDist;
|
|
fRowGroupOut = pRowGroupOut;
|
|
initialize();
|
|
fDataForDist.reinit(fRowGroupDist, RowAggStorage::getMaxRows(fRm ? fRm->getAllowDiskAggregation() : false));
|
|
fRowGroupDist.setData(&fDataForDist);
|
|
fAggregator->setInputOutput(pRowGroupIn, &fRowGroupDist);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregation DISTINCT columns
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationDistinct::addAggregator(const boost::shared_ptr<RowAggregation>& agg, const RowGroup& rg)
|
|
{
|
|
fRowGroupDist = rg;
|
|
fAggregator = agg;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregation DISTINCT columns
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationDistinct::addRowGroup(const RowGroup* pRows)
|
|
{
|
|
fAggregator->addRowGroup(pRows);
|
|
}
|
|
|
|
void RowAggregationDistinct::addRowGroup(const RowGroup* pRows,
|
|
vector<std::pair<Row::Pointer, uint64_t>>& inRows)
|
|
{
|
|
fAggregator->addRowGroup(pRows, inRows);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregation DISTINCT columns
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationDistinct::doDistinctAggregation()
|
|
{
|
|
while (dynamic_cast<RowAggregationUM*>(fAggregator.get())->nextRowGroup())
|
|
{
|
|
fRowGroupIn.setData(fAggregator->getOutputRowGroup()->getRGData());
|
|
|
|
Row rowIn;
|
|
fRowGroupIn.initRow(&rowIn);
|
|
fRowGroupIn.getRow(0, &rowIn);
|
|
|
|
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i, rowIn.nextRow())
|
|
{
|
|
aggregateRow(rowIn);
|
|
}
|
|
}
|
|
}
|
|
|
|
void RowAggregationDistinct::doDistinctAggregation_rowVec(vector<std::pair<Row::Pointer, uint64_t>>& inRows)
|
|
{
|
|
Row rowIn;
|
|
fRowGroupIn.initRow(&rowIn);
|
|
|
|
for (uint64_t i = 0; i < inRows.size(); ++i)
|
|
{
|
|
rowIn.setData(inRows[i].first);
|
|
aggregateRow(rowIn, &inRows[i].second);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update the aggregation totals in the internal hashmap for the specified row.
|
|
// for non-DISTINCT columns works partially aggregated results
|
|
// rowIn(in) - Row to be included in aggregation.
|
|
// rgContextColl(in) - ptr to a vector of UDAF contexts
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationDistinct::updateEntry(const Row& rowIn, std::vector<mcsv1sdk::mcsv1Context>* rgContextColl)
|
|
{
|
|
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
|
{
|
|
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
|
|
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
|
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
|
|
|
|
switch (fFunctionCols[i]->fAggFunction)
|
|
{
|
|
case ROWAGG_COUNT_ASTERISK:
|
|
case ROWAGG_COUNT_COL_NAME:
|
|
{
|
|
uint64_t count = fRow.getUintField<8>(colOut) + rowIn.getUintField<8>(colIn);
|
|
fRow.setUintField<8>(count, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_COUNT_DISTINCT_COL_NAME:
|
|
if (isNull(&fRowGroupIn, rowIn, colIn) != true)
|
|
fRow.setUintField<8>(fRow.getUintField<8>(colOut) + 1, colOut);
|
|
|
|
break;
|
|
|
|
case ROWAGG_MIN:
|
|
case ROWAGG_MAX: doMinMax(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_SUM:
|
|
case ROWAGG_DISTINCT_SUM: doSum(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction); break;
|
|
|
|
case ROWAGG_AVG:
|
|
{
|
|
// The sum and count on UM may not be put next to each other:
|
|
// use colOut to store the sum;
|
|
// use colAux to store the count.
|
|
doAvg(rowIn, colIn, colOut, colAux);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_DISTINCT_AVG:
|
|
{
|
|
// The sum and count on UM may not be put next to each other:
|
|
// use colOut to store the sum;
|
|
// use colAux to store the count.
|
|
RowAggregation::doAvg(rowIn, colIn, colOut, colAux);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_STATS:
|
|
{
|
|
doStatistics(rowIn, colIn, colOut, colAux);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_BIT_AND:
|
|
case ROWAGG_BIT_OR:
|
|
case ROWAGG_BIT_XOR:
|
|
{
|
|
doBitOp(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_GROUP_CONCAT:
|
|
{
|
|
doGroupConcat(rowIn, colIn, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_JSON_ARRAY:
|
|
{
|
|
doJsonAgg(rowIn, colIn, colOut);
|
|
break;
|
|
}
|
|
|
|
case ROWAGG_COUNT_NO_OP:
|
|
case ROWAGG_DUP_FUNCT:
|
|
case ROWAGG_DUP_AVG:
|
|
case ROWAGG_DUP_STATS:
|
|
case ROWAGG_DUP_UDAF:
|
|
case ROWAGG_CONSTANT: break;
|
|
|
|
case ROWAGG_UDAF:
|
|
{
|
|
doUDAF(rowIn, colIn, colOut, colAux, i, rgContextColl);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "RowAggregationDistinct: function (id = " << (uint64_t)fFunctionCols[i]->fAggFunction
|
|
<< ") is not supported.";
|
|
cerr << errmsg.str() << endl;
|
|
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Constructor / destructor
|
|
//------------------------------------------------------------------------------
|
|
RowAggregationSubDistinct::RowAggregationSubDistinct(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager* r,
|
|
boost::shared_ptr<int64_t> sessionLimit)
|
|
: RowAggregationUM(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit, false)
|
|
{
|
|
fKeyOnHeap = false;
|
|
}
|
|
|
|
RowAggregationSubDistinct::RowAggregationSubDistinct(const RowAggregationSubDistinct& rhs)
|
|
: RowAggregationUM(rhs)
|
|
{
|
|
}
|
|
|
|
RowAggregationSubDistinct::~RowAggregationSubDistinct()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Setup the rowgroups and data associations
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationSubDistinct::setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut)
|
|
{
|
|
// set up input/output association
|
|
RowAggregation::setInputOutput(pRowGroupIn, pRowGroupOut);
|
|
|
|
// initialize the aggregate row
|
|
fRowGroupOut->initRow(&fDistRow, true);
|
|
fDistRowData.reset(new uint8_t[fDistRow.getSize()]);
|
|
fDistRow.setData(rowgroup::Row::Pointer(fDistRowData.get()));
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
//------------------------------------------------------------------------------
|
|
// Add rowgroup
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationSubDistinct::addRowGroup(const RowGroup* pRows)
|
|
{
|
|
Row rowIn;
|
|
uint32_t i, j;
|
|
|
|
pRows->initRow(&rowIn);
|
|
pRows->getRow(0, &rowIn);
|
|
|
|
for (i = 0; i < pRows->getRowCount(); ++i, rowIn.nextRow())
|
|
{
|
|
/* TODO: We can make the functors a little smarter and avoid doing this copy before the
|
|
* tentative insert */
|
|
for (j = 0; j < fGroupByCols.size(); j++)
|
|
{
|
|
rowIn.copyField(fDistRow, j, fGroupByCols[j]->fInputColumnIndex);
|
|
}
|
|
|
|
tmpRow = &fDistRow;
|
|
if (fRowAggStorage->getTargetRow(fDistRow, fRow))
|
|
{
|
|
copyRow(fDistRow, &fRow);
|
|
}
|
|
}
|
|
}
|
|
|
|
void RowAggregationSubDistinct::addRowGroup(const RowGroup* pRows,
|
|
std::vector<std::pair<Row::Pointer, uint64_t>>& inRows)
|
|
{
|
|
Row rowIn;
|
|
uint32_t i, j;
|
|
|
|
pRows->initRow(&rowIn);
|
|
|
|
for (i = 0; i < inRows.size(); ++i, rowIn.nextRow())
|
|
{
|
|
rowIn.setData(inRows[i].first);
|
|
|
|
/* TODO: We can make the functors a little smarter and avoid doing this copy before the
|
|
* tentative insert */
|
|
for (j = 0; j < fGroupByCols.size(); j++)
|
|
rowIn.copyField(fDistRow, j, fGroupByCols[j]->fInputColumnIndex);
|
|
|
|
tmpRow = &fDistRow;
|
|
if (fRowAggStorage->getTargetRow(fDistRow, fRow))
|
|
{
|
|
copyRow(fDistRow, &fRow);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Concat columns.
|
|
// rowIn(in) - Row that contains the columns to be concatenated.
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationSubDistinct::doGroupConcat(const Row& rowIn, int64_t i, int64_t o)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + fRow.getOffset(o)));
|
|
gccAg->merge(rowIn, i);
|
|
}
|
|
|
|
void RowAggregationSubDistinct::doJsonAgg(const Row& rowIn, int64_t i, int64_t o)
|
|
{
|
|
uint8_t* data = fRow.getData();
|
|
joblist::JsonArrayAggregatAgUM* gccAg = *((joblist::JsonArrayAggregatAgUM**)(data + fRow.getOffset(o)));
|
|
gccAg->merge(rowIn, i);
|
|
}
|
|
//------------------------------------------------------------------------------
|
|
// Constructor / destructor
|
|
//------------------------------------------------------------------------------
|
|
RowAggregationMultiDistinct::RowAggregationMultiDistinct(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCols,
|
|
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
|
|
joblist::ResourceManager* r,
|
|
boost::shared_ptr<int64_t> sessionLimit)
|
|
: RowAggregationDistinct(rowAggGroupByCols, rowAggFunctionCols, r, sessionLimit)
|
|
{
|
|
}
|
|
|
|
RowAggregationMultiDistinct::RowAggregationMultiDistinct(const RowAggregationMultiDistinct& rhs)
|
|
: RowAggregationDistinct(rhs), fSubRowGroups(rhs.fSubRowGroups), fSubFunctions(rhs.fSubFunctions)
|
|
{
|
|
fAggregator.reset(rhs.fAggregator->clone());
|
|
|
|
boost::shared_ptr<RGData> data;
|
|
fSubAggregators.clear();
|
|
fSubRowData.clear();
|
|
|
|
boost::shared_ptr<RowAggregationUM> agg;
|
|
|
|
for (uint32_t i = 0; i < rhs.fSubAggregators.size(); i++)
|
|
{
|
|
#if 0
|
|
if (!fRm->getMemory(fSubRowGroups[i].getDataSize(AGG_ROWGROUP_SIZE, fSessionMemLimit)))
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->
|
|
errorMsg(logging::ERR_AGGREGATION_TOO_BIG), logging::ERR_AGGREGATION_TOO_BIG);
|
|
|
|
fTotalMemUsage += fSubRowGroups[i].getDataSize(AGG_ROWGROUP_SIZE);
|
|
|
|
#endif
|
|
data.reset(new RGData(fSubRowGroups[i],
|
|
RowAggStorage::getMaxRows(fRm ? fRm->getAllowDiskAggregation() : false)));
|
|
fSubRowData.push_back(data);
|
|
fSubRowGroups[i].setData(data.get());
|
|
agg.reset(rhs.fSubAggregators[i]->clone());
|
|
fSubAggregators.push_back(agg);
|
|
}
|
|
}
|
|
|
|
RowAggregationMultiDistinct::~RowAggregationMultiDistinct()
|
|
{
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Setup the rowgroups and data associations
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationMultiDistinct::setInputOutput(const RowGroup& pRowGroupIn, RowGroup* pRowGroupOut)
|
|
{
|
|
// set up base class aggregators
|
|
RowAggregationDistinct::setInputOutput(pRowGroupIn, pRowGroupOut);
|
|
|
|
// set up sub aggregators
|
|
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
|
|
fSubAggregators[i]->setInputOutput(pRowGroupIn, &fSubRowGroups[i]);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Add sub aggregator for each distinct column with aggregate functions
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationMultiDistinct::addSubAggregator(const boost::shared_ptr<RowAggregationUM>& agg,
|
|
const RowGroup& rg, const vector<SP_ROWAGG_FUNC_t>& funct)
|
|
{
|
|
boost::shared_ptr<RGData> data;
|
|
#if 0
|
|
if (!fRm->getMemory(rg.getDataSize(AGG_ROWGROUP_SIZE), fSessionMemLimit))
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->
|
|
errorMsg(logging::ERR_AGGREGATION_TOO_BIG), logging::ERR_AGGREGATION_TOO_BIG);
|
|
|
|
fTotalMemUsage += rg.getDataSize(AGG_ROWGROUP_SIZE);
|
|
#endif
|
|
data.reset(new RGData(rg, RowAggStorage::getMaxRows(fRm ? fRm->getAllowDiskAggregation() : false)));
|
|
fSubRowData.push_back(data);
|
|
|
|
// assert (agg->aggMapKeyLength() > 0);
|
|
|
|
fSubAggregators.push_back(agg);
|
|
fSubRowGroups.push_back(rg);
|
|
fSubRowGroups.back().setData(data.get());
|
|
fSubFunctions.push_back(funct);
|
|
}
|
|
|
|
void RowAggregationMultiDistinct::addRowGroup(const RowGroup* pRows)
|
|
{
|
|
// aggregate to sub-subAggregators
|
|
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
|
|
fSubAggregators[i]->addRowGroup(pRows);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregation DISTINCT columns
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationMultiDistinct::addRowGroup(const RowGroup* pRowGroupIn,
|
|
vector<vector<std::pair<Row::Pointer, uint64_t>>>& inRows)
|
|
{
|
|
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
|
|
{
|
|
fSubAggregators[i]->addRowGroup(pRowGroupIn, inRows[i]);
|
|
inRows[i].clear();
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Aggregation DISTINCT columns
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
void RowAggregationMultiDistinct::doDistinctAggregation()
|
|
{
|
|
// backup the function column vector for finalize().
|
|
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
|
|
fOrigFunctionCols = &origFunctionCols;
|
|
// aggregate data from each sub-aggregator to distinct aggregator
|
|
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
|
|
{
|
|
fFunctionCols = fSubFunctions[i];
|
|
fRowGroupIn = fSubRowGroups[i];
|
|
auto* rgContextColl = fSubAggregators[i]->rgContextColl();
|
|
Row rowIn;
|
|
fRowGroupIn.initRow(&rowIn);
|
|
|
|
while (dynamic_cast<RowAggregationUM*>(fSubAggregators[i].get())->nextRowGroup())
|
|
{
|
|
fRowGroupIn.setData(fSubAggregators[i]->getOutputRowGroup()->getRGData());
|
|
|
|
// no group by == no map, everything done in fRow
|
|
if (fGroupByCols.empty())
|
|
fRowGroupOut->setRowCount(1);
|
|
|
|
fRowGroupIn.getRow(0, &rowIn);
|
|
|
|
for (uint64_t j = 0; j < fRowGroupIn.getRowCount(); ++j, rowIn.nextRow())
|
|
{
|
|
aggregateRow(rowIn, nullptr, rgContextColl);
|
|
}
|
|
}
|
|
}
|
|
|
|
// restore the function column vector
|
|
fFunctionCols = origFunctionCols;
|
|
fOrigFunctionCols = nullptr;
|
|
}
|
|
|
|
void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(
|
|
vector<vector<std::pair<Row::Pointer, uint64_t>>>& inRows)
|
|
{
|
|
// backup the function column vector for finalize().
|
|
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
|
|
fOrigFunctionCols = &origFunctionCols;
|
|
|
|
// aggregate data from each sub-aggregator to distinct aggregator
|
|
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
|
|
{
|
|
fFunctionCols = fSubFunctions[i];
|
|
fRowGroupIn = fSubRowGroups[i];
|
|
auto* rgContextColl = fSubAggregators[i]->rgContextColl();
|
|
Row rowIn;
|
|
fRowGroupIn.initRow(&rowIn);
|
|
|
|
for (uint64_t j = 0; j < inRows[i].size(); ++j)
|
|
{
|
|
rowIn.setData(inRows[i][j].first);
|
|
aggregateRow(rowIn, &inRows[i][j].second, rgContextColl);
|
|
}
|
|
|
|
inRows[i].clear();
|
|
}
|
|
// restore the function column vector
|
|
fFunctionCols = origFunctionCols;
|
|
fOrigFunctionCols = nullptr;
|
|
}
|
|
|
|
GroupConcatAg::GroupConcatAg(SP_GroupConcat& gcc) : fGroupConcat(gcc)
|
|
{
|
|
}
|
|
|
|
GroupConcatAg::~GroupConcatAg()
|
|
{
|
|
}
|
|
|
|
} // namespace rowgroup
|