1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-523 Add UDAF and UDAnF SDK

This commit is contained in:
David Hall
2017-07-26 11:53:08 -05:00
parent 630b113565
commit bc2a4e7795
75 changed files with 10250 additions and 4523 deletions

744
utils/rowgroup/rowaggregation.cpp Normal file → Executable file
View File

@ -28,7 +28,7 @@
#include <sstream>
#include <stdexcept>
#include <limits>
#include <typeinfo>
#include "joblisttypes.h"
#include "resourcemanager.h"
#include "groupconcat.h"
@ -459,7 +459,6 @@ inline void RowAggregation::updateFloatSum(float val1, float val2, int64_t col)
fRow.setFloatField(val1 + val2, col);
}
//------------------------------------------------------------------------------
// Verify if the column value is NULL
// row(in) - Row to be included in aggregation.
@ -721,6 +720,41 @@ void RowAggregation::setJoinRowGroups(vector<RowGroup> *pSmallSideRG, RowGroup *
(*fSmallSideRGs)[i].initRow(&rowSmalls[i]);
}
//------------------------------------------------------------------------------
// For UDAF, we need to sometimes start a new context.
//
// This will be called any number of times by each of the batchprimitiveprocessor
// threads on the PM and by multple threads on the UM. It must remain
// thread safe.
//------------------------------------------------------------------------------
void RowAggregation::resetUDAF(uint64_t funcColID)
{
// Get the UDAF class pointer and store in the row definition object.
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColID].get());
// resetUDAF needs to be re-entrant. Since we're modifying the context object
// by creating a new userData, we need a local copy. The copy constructor
// doesn't copy userData.
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
// Call the user reset for the group userData. Since, at this point,
// context's userData will be NULL, reset will generate a new one.
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->reset(&rgContext);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
}
fRow.setUserDataStore(fRowGroupOut->getRGData()->getUserDataStore());
fRow.setUserData(rgContext,
rgContext.getUserDataSP(),
rgContext.getUserDataSize(),
rowUDAF->fAuxColumnIndex);
rgContext.setUserData(NULL); // Prevents calling deleteUserData on the context.
}
//------------------------------------------------------------------------------
// Initilalize the data members to meaningful values, setup the hashmap.
@ -780,7 +814,7 @@ void RowAggregation::initialize()
//------------------------------------------------------------------------------
// Reset the working data to aggregate next logical block
//------------------------------------------------------------------------------
void RowAggregation::reset()
void RowAggregation::aggReset()
{
fTotalRowCount = 0;
fMaxTotalRowCount = AGG_ROWGROUP_SIZE;
@ -798,15 +832,23 @@ void RowAggregation::reset()
delete fAggMapPtr;
fAggMapPtr = new RowAggMap_t(10, *fHasher, *fEq, *fAlloc);
}
fResultDataVec.clear();
fResultDataVec.push_back(fRowGroupOut->getRGData());
// For UDAF, reset the data
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(i);
}
}
}
void RowAggregationUM::reset()
void RowAggregationUM::aggReset()
{
RowAggregation::reset();
RowAggregation::aggReset();
if (fKeyOnHeap)
{
@ -843,6 +885,15 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
attachGroupConcatAg();
inserted.first->second = RowPosition(fResultDataVec.size()-1, fRowGroupOut->getRowCount()-1);
// If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(i);
}
}
// replace the key value with an equivalent copy, yes this is OK
const_cast<RowPosition &>((inserted.first->first)) = pos;
}
@ -893,6 +944,16 @@ void RowAggregation::aggregateRow(Row& row)
// replace the key value with an equivalent copy, yes this is OK
const_cast<RowPosition &>(*(inserted.first)) =
RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
// If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(i);
}
}
}
else {
//fRow.setData(*(inserted.first));
@ -1065,6 +1126,8 @@ void RowAggregation::makeAggFieldsNull(Row& row)
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::BLOB:
{
int colWidth = fRowGroupOut->getColumnWidth(colOut);
if (colWidth <= 8)
@ -1386,7 +1449,7 @@ void RowAggregation::serialize(messageqcpp::ByteStream& bs) const
bs << functionCount;
for (uint64_t i = 0; i < functionCount; i++)
bs << *(fFunctionCols[i].get());
fFunctionCols[i]->serialize(bs);
}
@ -1415,9 +1478,18 @@ void RowAggregation::deserialize(messageqcpp::ByteStream& bs)
for (uint64_t i = 0; i < functionCount; i++)
{
SP_ROWAGG_FUNC_t funct(
new RowAggFunctionCol(ROWAGG_FUNCT_UNDEFINE, ROWAGG_FUNCT_UNDEFINE, 0, 0));
bs >> *(funct.get());
uint8_t funcType;
bs.peek(funcType);
SP_ROWAGG_FUNC_t funct;
if (funcType == ROWAGG_UDAF)
{
funct.reset(new RowUDAFFunctionCol(0, 0));
}
else
{
funct.reset(new RowAggFunctionCol(ROWAGG_FUNCT_UNDEFINE, ROWAGG_FUNCT_UNDEFINE, 0, 0));
}
funct->deserialize(bs);
fFunctionCols.push_back(funct);
}
}
@ -1477,6 +1549,20 @@ void RowAggregation::updateEntry(const Row& rowIn)
case ROWAGG_GROUP_CONCAT:
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colOut + 1, rowUDAF);
}
else
{
throw logic_error("(3)A UDAF function is called but there's no RowUDAFFunctionCol");
}
break;
}
default:
{
std::ostringstream errmsg;
@ -1729,6 +1815,113 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
fRow.setLongDoubleField(fRow.getLongDoubleField(colAux+1) + valIn*valIn, colAux+1);
}
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
RowUDAFFunctionCol* rowUDAF)
{
std::vector<mcsv1sdk::ColumnDatum> valsIn;
execplan::CalpontSystemCatalog::ColDataType colDataType = fRowGroupIn.getColTypes()[colIn];
std::vector<uint32_t> dataFlags;
// Get the context for this rowGroup. Make a copy so we're thread safe.
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
// Turn on NULL flags
std::vector<uint32_t> flags;
uint32_t flag = 0;
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
{
if (rgContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
{
return;
}
flag |= mcsv1sdk::PARAM_IS_NULL;
}
flags.push_back(flag);
rgContext.setDataFlags(&flags);
mcsv1sdk::ColumnDatum datum;
switch (colDataType)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
datum.columnData = rowIn.getIntField(colIn);
datum.scale = fRowGroupIn.getScale()[colIn];
datum.precision = fRowGroupIn.getPrecision()[colIn];
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
datum.columnData = rowIn.getUintField(colIn);
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
datum.dataType = execplan::CalpontSystemCatalog::DOUBLE;
datum.columnData = rowIn.getDoubleField(colIn);
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
datum.dataType = execplan::CalpontSystemCatalog::FLOAT;
datum.columnData = rowIn.getFloatField(colIn);
break;
}
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
datum.columnData = rowIn.getUintField(colIn);
break;
}
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::CLOB:
case execplan::CalpontSystemCatalog::BLOB:
{
datum.dataType = colDataType;
datum.columnData = rowIn.getStringField(colIn);
break;
}
default:
{
std::ostringstream errmsg;
errmsg << "RowAggregation " << rgContext.getName() <<
": No logic for data type: " << colDataType;
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
break;
}
}
valsIn.push_back(datum);
// The intermediate values are stored in userData referenced by colAux.
rgContext.setUserData(fRow.getUserData(colAux));
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->nextValue(&rgContext, valsIn);
rgContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
}
}
//------------------------------------------------------------------------------
// Allocate a new data array for the output RowGroup
@ -1781,7 +1974,6 @@ void RowAggregation::loadEmptySet(messageqcpp::ByteStream& bs)
fEmptyRowGroup.serializeRGData(bs);
}
//------------------------------------------------------------------------------
// Row Aggregation constructor used on UM
// For one-phase case, from projected RG to final aggregated RG
@ -1790,10 +1982,11 @@ RowAggregationUM::RowAggregationUM(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupB
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols,
joblist::ResourceManager *r, boost::shared_ptr<int64_t> sessionLimit) :
RowAggregation(rowAggGroupByCols, rowAggFunctionCols), fHasAvg(false), fKeyOnHeap(false),
fHasStatsFunc(false), fTotalMemUsage(0), fRm(r), fSessionMemLimit(sessionLimit),
fLastMemUsage(0), fNextRGIndex(0)
fHasStatsFunc(false), fHasUDAF(false),fTotalMemUsage(0), fRm(r),
fSessionMemLimit(sessionLimit), fLastMemUsage(0), fNextRGIndex(0)
{
// Check if there are any avg functions.
// Check if there are any avg, stats or UDAF functions.
// These flags are used in finalize.
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_AVG ||
@ -1801,6 +1994,8 @@ RowAggregationUM::RowAggregationUM(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupB
fHasAvg = true;
else if (fFunctionCols[i]->fAggFunction == ROWAGG_STATS)
fHasStatsFunc = true;
else if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
fHasUDAF = true;
}
// Check if all groupby column selected
@ -1904,6 +2099,11 @@ void RowAggregationUM::finalize()
calculateStatisticsFunctions();
}
if (fHasUDAF)
{
calculateUDAFColumns();
}
if (fGroupConcat.size() > 0)
setGroupConcatString();
@ -1950,6 +2150,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn)
{
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
switch (fFunctionCols[i]->fAggFunction)
{
@ -1971,14 +2172,12 @@ void RowAggregationUM::updateEntry(const Row& rowIn)
// The sum and count on UM may not be put next to each other:
// use colOut to store the sum;
// use colAux to store the count.
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
doAvg(rowIn, colIn, colOut, colAux);
break;
}
case ROWAGG_STATS:
{
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
doStatistics(rowIn, colIn, colOut, colAux);
break;
}
@ -2004,6 +2203,20 @@ void RowAggregationUM::updateEntry(const Row& rowIn)
case ROWAGG_CONSTANT:
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF);
}
else
{
throw logic_error("(5)A UDAF function is called but there's no RowUDAFFunctionCol");
}
break;
}
default:
{
// need a exception to show the value
@ -2143,6 +2356,251 @@ void RowAggregationUM::calculateAvgColumns()
}
}
// Sets the value from valOut into column colOut, performing any conversions.
void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
{
static const static_any::any& charTypeId = (char)1;
static const static_any::any& scharTypeId = (signed char)1;
static const static_any::any& shortTypeId = (short)1;
static const static_any::any& intTypeId = (int)1;
static const static_any::any& longTypeId = (long)1;
static const static_any::any& llTypeId = (long long)1;
static const static_any::any& ucharTypeId = (unsigned char)1;
static const static_any::any& ushortTypeId = (unsigned short)1;
static const static_any::any& uintTypeId = (unsigned int)1;
static const static_any::any& ulongTypeId = (unsigned long)1;
static const static_any::any& ullTypeId = (unsigned long long)1;
static const static_any::any& floatTypeId = (float)1;
static const static_any::any& doubleTypeId = (double)1;
static const std::string typeStr("");
static const static_any::any& strTypeId = typeStr;
execplan::CalpontSystemCatalog::ColDataType colDataType = fRowGroupOut->getColTypes()[colOut];
if (valOut.empty())
{
// Fields are initialized to NULL, which is what we want for empty;
return;
}
// This may seem a bit convoluted. Users shouldn't return a type
// that they didn't set in mcsv1_UDAF::init(), but this
// handles whatever return type is given and casts
// it to whatever they said to return.
int64_t intOut = 0;
uint64_t uintOut = 0;
float floatOut = 0.0;
double doubleOut = 0.0;
ostringstream oss;
std::string strOut;
if (valOut.compatible(charTypeId))
{
uintOut = intOut = valOut.cast<char>();
floatOut = intOut;
oss << intOut;
}
else if (valOut.compatible(scharTypeId))
{
uintOut = intOut = valOut.cast<signed char>();
floatOut = intOut;
oss << intOut;
}
else if (valOut.compatible(shortTypeId))
{
uintOut = intOut = valOut.cast<short>();
floatOut = intOut;
oss << intOut;
}
else if (valOut.compatible(intTypeId))
{
uintOut = intOut = valOut.cast<int>();
floatOut = intOut;
oss << intOut;
}
else if (valOut.compatible(longTypeId))
{
uintOut = intOut = valOut.cast<long>();
floatOut = intOut;
oss << intOut;
}
else if (valOut.compatible(llTypeId))
{
uintOut = intOut = valOut.cast<long long>();
floatOut = intOut;
oss << intOut;
}
else if (valOut.compatible(ucharTypeId))
{
intOut = uintOut = valOut.cast<unsigned char>();
floatOut = uintOut;
oss << uintOut;
}
else if (valOut.compatible(ushortTypeId))
{
intOut = uintOut = valOut.cast<unsigned short>();
floatOut = uintOut;
oss << uintOut;
}
else if (valOut.compatible(uintTypeId))
{
intOut = uintOut = valOut.cast<unsigned int>();
floatOut = uintOut;
oss << uintOut;
}
else if (valOut.compatible(ulongTypeId))
{
intOut = uintOut = valOut.cast<unsigned long>();
floatOut = uintOut;
oss << uintOut;
}
else if (valOut.compatible(ullTypeId))
{
intOut = uintOut = valOut.cast<unsigned long long>();
floatOut = uintOut;
oss << uintOut;
}
else if (valOut.compatible(floatTypeId))
{
floatOut = valOut.cast<float>();
doubleOut = floatOut;
intOut = uintOut = floatOut;
oss << floatOut;
}
else if (valOut.compatible(doubleTypeId))
{
doubleOut = valOut.cast<double>();
floatOut = (float)doubleOut;
uintOut = (uint64_t)doubleOut;
intOut = (int64_t)doubleOut;
oss << doubleOut;
}
if (valOut.compatible(strTypeId))
{
std::string strOut = valOut.cast<std::string>();
// Convert the string to numeric type, just in case.
intOut = atol(strOut.c_str());
uintOut = strtoul(strOut.c_str(), NULL, 10);
doubleOut = strtod(strOut.c_str(), NULL);
floatOut = (float)doubleOut;
}
else
{
strOut = oss.str();
}
switch (colDataType)
{
case execplan::CalpontSystemCatalog::BIT:
case execplan::CalpontSystemCatalog::TINYINT:
fRow.setIntField<1>(intOut, colOut);
break;
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
fRow.setIntField<2>(intOut, colOut);
break;
case execplan::CalpontSystemCatalog::INT:
fRow.setIntField<4>(intOut, colOut);
break;
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
fRow.setIntField<8>(intOut, colOut);
break;
case execplan::CalpontSystemCatalog::UTINYINT:
fRow.setUintField<1>(uintOut, colOut);
break;
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
fRow.setUintField<2>(uintOut, colOut);
break;
case execplan::CalpontSystemCatalog::UINT:
fRow.setUintField<4>(uintOut, colOut);
break;
case execplan::CalpontSystemCatalog::UBIGINT:
fRow.setUintField<8>(uintOut, colOut);
break;
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
fRow.setUintField<8>(uintOut, colOut);
break;
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
fRow.setFloatField(floatOut, colOut);
break;
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
fRow.setDoubleField(doubleOut, colOut);
break;
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
fRow.setStringField(strOut, colOut);
break;
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::CLOB:
case execplan::CalpontSystemCatalog::BLOB:
fRow.setVarBinaryField(strOut, colOut);
break;
default:
{
std::ostringstream errmsg;
errmsg << "RowAggregation: No logic for data type: " << colDataType;
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
break;
}
}
}
//------------------------------------------------------------------------------
//
// For each rowgroup, calculate the final value.
//------------------------------------------------------------------------------
void RowAggregationUM::calculateUDAFColumns()
{
RowUDAFFunctionCol* rowUDAF = NULL;
static_any::any valOut;
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction != ROWAGG_UDAF)
continue;
rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
int64_t colOut = rowUDAF->fOutputColumnIndex;
int64_t colAux = rowUDAF->fAuxColumnIndex;
// At this point, each row is an aggregated GROUP BY.
for (uint64_t j = 0; j < fRowGroupOut->getRowCount(); j++)
{
// Get the user data from the row and evaluate.
fRowGroupOut->getRow(j, &fRow);
// Turn the NULL flag off. We can't know NULL at this point
rgContext.setDataFlags(NULL);
// The intermediate values are stored in colAux.
rgContext.setUserData(fRow.getUserData(colAux));
// Call the UDAF evaluate function
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->evaluate(&rgContext, valOut);
rgContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
}
// Set the returned value into the output row
SetUDAFValue(valOut, colOut);
}
rgContext.setUserData(NULL);
}
}
//------------------------------------------------------------------------------
// After all PM rowgroups received, calculate the statistics.
@ -2222,7 +2680,6 @@ void RowAggregationUM::calculateStatisticsFunctions()
}
}
//------------------------------------------------------------------------------
// Fix the duplicate function columns -- same function same column id repeated
//------------------------------------------------------------------------------
@ -2248,7 +2705,6 @@ void RowAggregationUM::fixDuplicates(RowAggFunctionType funct)
}
}
//------------------------------------------------------------------------------
// Evaluate the functions and expressions
//------------------------------------------------------------------------------
@ -2262,7 +2718,6 @@ void RowAggregationUM::evaluateExpression()
}
}
//------------------------------------------------------------------------------
// Calculate the aggregate(constant) columns
//------------------------------------------------------------------------------
@ -2395,6 +2850,58 @@ void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, u
}
break;
case ROWAGG_UDAF:
{
int64_t rowCnt = 0;
// For a NULL constant, call nextValue with NULL and then evaluate.
bool bInterrupted = false;
mcsv1sdk::mcsv1Context context;
context.setRowCnt(rowCnt);
context.setInterrupted(bInterrupted);
context.createUserData();
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
std::vector<mcsv1sdk::ColumnDatum> valsIn;
// Call a reset, then nextValue, then execute. This will evaluate
// the UDAF for the constant.
rc = context.getFunction()->reset(&context);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
}
// Turn the NULL and CONSTANT flags on.
std::vector<uint32_t> flags;
uint32_t flag = mcsv1sdk::PARAM_IS_NULL | mcsv1sdk::PARAM_IS_CONSTANT;
flags.push_back(flag);
context.setDataFlags(&flags);
// Create a dummy datum
mcsv1sdk::ColumnDatum datum;
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
datum.columnData = 0;
valsIn.push_back(datum);
rc = context.getFunction()->nextValue(&context, valsIn);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
}
static_any::any valOut;
rc = context.getFunction()->evaluate(&context, valOut);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
}
// Set the returned value into the output row
SetUDAFValue(valOut, colOut);
context.setDataFlags(NULL);
}
break;
default:
{
fRow.setStringField("", colOut);
@ -2674,6 +3181,133 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
}
break;
case ROWAGG_UDAF:
{
int64_t rowCnt = 0;
bool bInterrupted = false;
mcsv1sdk::mcsv1Context context;
context.setRowCnt(rowCnt);
context.setInterrupted(bInterrupted);
// Try the complex data initiation. If not implemented, use the simple,
context.createUserData();
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
std::vector<mcsv1sdk::ColumnDatum> valsIn;
// Call a reset, then nextValue, then execute. This will evaluate
// the UDAF for the constant.
rc = context.getFunction()->reset(&context);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
}
// Turn the CONSTANT flags on.
std::vector<uint32_t> flags;
uint32_t flag = mcsv1sdk::PARAM_IS_CONSTANT;
flags.push_back(flag);
context.setDataFlags(&flags);
// Create a datum item for sending to UDAF
mcsv1sdk::ColumnDatum datum;
datum.dataType = (CalpontSystemCatalog::ColDataType)colDataType;
switch (colDataType)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
{
datum.columnData = strtol(aggData.fConstValue.c_str(), 0, 10);
}
break;
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
{
datum.columnData = strtoul(aggData.fConstValue.c_str(), 0, 10);
}
break;
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
double dbl = strtod(aggData.fConstValue.c_str(), 0);
double scale = pow(10.0, (double) fRowGroupOut->getScale()[i]);
datum.columnData = (int64_t)(scale*dbl);
datum.scale = scale;
datum.precision = fRowGroupOut->getPrecision()[i];
}
break;
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
datum.columnData = strtod(aggData.fConstValue.c_str(), 0);
}
break;
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
#ifdef _MSC_VER
datum.columnData = strtod(aggData.fConstValue.c_str(), 0);
#else
datum.columnData = strtof(aggData.fConstValue.c_str(), 0);
#endif
}
break;
case execplan::CalpontSystemCatalog::DATE:
{
datum.columnData = DataConvert::stringToDate(aggData.fConstValue);
}
break;
case execplan::CalpontSystemCatalog::DATETIME:
{
datum.columnData = DataConvert::stringToDatetime(aggData.fConstValue);
}
break;
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::BLOB:
default:
{
datum.columnData = aggData.fConstValue;
}
break;
}
valsIn.push_back(datum);
rc = context.getFunction()->nextValue(&context, valsIn);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
}
static_any::any valOut;
rc = context.getFunction()->evaluate(&context, valOut);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
}
// Set the returned value into the output row
SetUDAFValue(valOut, colOut);
context.setDataFlags(NULL);
}
break;
default:
{
fRow.setStringField(aggData.fConstValue, colOut);
@ -2823,6 +3457,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn)
{
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
switch (fFunctionCols[i]->fAggFunction)
{
@ -2845,14 +3480,12 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn)
// The sum and count on UM may not be put next to each other:
// use colOut to store the sum;
// use colAux to store the count.
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
doAvg(rowIn, colIn, colOut, colAux);
break;
}
case ROWAGG_STATS:
{
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
doStatistics(rowIn, colIn, colOut, colAux);
break;
}
@ -2878,6 +3511,20 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn)
case ROWAGG_CONSTANT:
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF);
}
else
{
throw logic_error("(6)A UDAF function is called but there's no RowUDAFFunctionCol");
}
break;
}
default:
{
std::ostringstream errmsg;
@ -3050,6 +3697,43 @@ void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut
fRow.setUintField(valIn ^ valOut, colOut);
}
//------------------------------------------------------------------------------
// Subaggregate the UDAF. This calls subaggregate for each partially
// aggregated row returned by the PM
// rowIn(in) - Row to be included in aggregation.
// colIn(in) - column in the input row group
// colOut(in) - column in the output row group
// colAux(in) - Where the UDAF userdata resides
// rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance
//------------------------------------------------------------------------------
void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
RowUDAFFunctionCol* rowUDAF)
{
static_any::any valOut;
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
// Turn on NULL flags
std::vector<uint32_t> flags;
uint32_t flag = 0;
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
flag |= mcsv1sdk::PARAM_IS_NULL;
flags.push_back(flag);
rgContext.setDataFlags(&flags);
// The intermediate values are stored in colAux.
rgContext.setUserData(fRow.getUserData(colAux));
// Call the UDAF subEvaluate method
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->subEvaluate(&rgContext, rowIn.getUserData(colIn+1).get());
rgContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
}
}
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
@ -3163,6 +3847,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn)
{
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
switch (fFunctionCols[i]->fAggFunction)
{
@ -3192,7 +3877,6 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn)
// The sum and count on UM may not be put next to each other:
// use colOut to store the sum;
// use colAux to store the count.
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
doAvg(rowIn, colIn, colOut, colAux);
break;
}
@ -3202,14 +3886,12 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn)
// The sum and count on UM may not be put next to each other:
// use colOut to store the sum;
// use colAux to store the count.
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
RowAggregation::doAvg(rowIn, colIn, colOut, colAux);
break;
}
case ROWAGG_STATS:
{
int64_t colAux = fFunctionCols[i]->fAuxColumnIndex;
doStatistics(rowIn, colIn, colOut, colAux);
break;
}
@ -3235,6 +3917,20 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn)
case ROWAGG_CONSTANT:
break;
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF);
}
else
{
throw logic_error("(7)A UDAF function is called but there's no RowUDAFFunctionCol");
}
break;
}
default:
{
std::ostringstream errmsg;