1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-1201 manual rebase with develop. Obsoletes branch MCOL-1201

This commit is contained in:
David Hall
2018-05-11 09:50:10 -05:00
parent 12b1d99f51
commit 6fa7dded6f
30 changed files with 2255 additions and 1196 deletions

View File

@ -215,6 +215,22 @@ inline string getStringNullValue()
namespace rowgroup
{
const std::string typeStr("");
const static_any::any& RowAggregation::charTypeId((char)1);
const static_any::any& RowAggregation::scharTypeId((signed char)1);
const static_any::any& RowAggregation::shortTypeId((short)1);
const static_any::any& RowAggregation::intTypeId((int)1);
const static_any::any& RowAggregation::longTypeId((long)1);
const static_any::any& RowAggregation::llTypeId((long long)1);
const static_any::any& RowAggregation::ucharTypeId((unsigned char)1);
const static_any::any& RowAggregation::ushortTypeId((unsigned short)1);
const static_any::any& RowAggregation::uintTypeId((unsigned int)1);
const static_any::any& RowAggregation::ulongTypeId((unsigned long)1);
const static_any::any& RowAggregation::ullTypeId((unsigned long long)1);
const static_any::any& RowAggregation::floatTypeId((float)1);
const static_any::any& RowAggregation::doubleTypeId((double)1);
const static_any::any& RowAggregation::strTypeId(typeStr);
KeyStorage::KeyStorage(const RowGroup& keys, Row** tRow) : tmpRow(tRow), rg(keys)
{
RGData data(rg);
@ -691,7 +707,8 @@ RowAggregation::RowAggregation(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCol
RowAggregation::RowAggregation(const RowAggregation& rhs):
fAggMapPtr(NULL), fRowGroupOut(NULL),
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0)
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
fRGContext(rhs.fRGContext)
{
//fGroupByCols.clear();
//fFunctionCols.clear();
@ -756,7 +773,6 @@ void RowAggregation::addRowGroup(const RowGroup* pRows, vector<Row::Pointer>& in
{
// this function is for threaded aggregation, which is for group by and distinct.
// if (countSpecial(pRows))
Row rowIn;
pRows->initRow(&rowIn);
@ -790,7 +806,7 @@ void RowAggregation::setJoinRowGroups(vector<RowGroup>* pSmallSideRG, RowGroup*
}
//------------------------------------------------------------------------------
// For UDAF, we need to sometimes start a new context.
// For UDAF, we need to sometimes start a new fRGContext.
//
// This will be called any number of times by each of the batchprimitiveprocessor
// threads on the PM and by multple threads on the UM. It must remain
@ -801,29 +817,29 @@ void RowAggregation::resetUDAF(uint64_t funcColID)
// Get the UDAF class pointer and store in the row definition object.
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColID].get());
// resetUDAF needs to be re-entrant. Since we're modifying the context object
// by creating a new userData, we need a local copy. The copy constructor
// doesn't copy userData.
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
// RowAggregation and it's functions need to be re-entrant which means
// each instance (thread) needs its own copy of the context object.
// Note: operator=() doesn't copy userData.
fRGContext = rowUDAF->fUDAFContext;
// Call the user reset for the group userData. Since, at this point,
// context's userData will be NULL, reset will generate a new one.
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->reset(&rgContext);
rc = fRGContext.getFunction()->reset(&fRGContext);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
fRow.setUserDataStore(fRowGroupOut->getRGData()->getUserDataStore());
fRow.setUserData(rgContext,
rgContext.getUserDataSP(),
rgContext.getUserDataSize(),
fRow.setUserData(fRGContext,
fRGContext.getUserDataSP(),
fRGContext.getUserDataSize(),
rowUDAF->fAuxColumnIndex);
rgContext.setUserData(NULL); // Prevents calling deleteUserData on the context.
fRGContext.setUserData(NULL); // Prevents calling deleteUserData on the fRGContext.
}
//------------------------------------------------------------------------------
@ -873,7 +889,6 @@ void RowAggregation::initialize()
}
}
// Save the RowGroup data pointer
fResultDataVec.push_back(fRowGroupOut->getRGData());
@ -1658,10 +1673,11 @@ void RowAggregation::updateEntry(const Row& rowIn)
{
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
int64_t colIn = fFunctionCols[i]->fInputColumnIndex;
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
SP_ROWAGG_FUNC_t pFunctionCol = fFunctionCols[i];
int64_t colIn = pFunctionCol->fInputColumnIndex;
int64_t colOut = pFunctionCol->fOutputColumnIndex;
switch (fFunctionCols[i]->fAggFunction)
switch (pFunctionCol->fAggFunction)
{
case ROWAGG_COUNT_COL_NAME:
@ -1675,7 +1691,7 @@ void RowAggregation::updateEntry(const Row& rowIn)
case ROWAGG_MIN:
case ROWAGG_MAX:
case ROWAGG_SUM:
doMinMaxSum(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction);
doMinMaxSum(rowIn, colIn, colOut, pFunctionCol->fAggFunction);
break;
case ROWAGG_AVG:
@ -1692,7 +1708,7 @@ void RowAggregation::updateEntry(const Row& rowIn)
case ROWAGG_BIT_OR:
case ROWAGG_BIT_XOR:
{
doBitOp(rowIn, colIn, colOut, fFunctionCols[i]->fAggFunction);
doBitOp(rowIn, colIn, colOut, pFunctionCol->fAggFunction);
break;
}
@ -1707,11 +1723,11 @@ void RowAggregation::updateEntry(const Row& rowIn)
case ROWAGG_UDAF:
{
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(pFunctionCol.get());
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colOut + 1, rowUDAF);
doUDAF(rowIn, colIn, colOut, colOut + 1, rowUDAF, i);
}
else
{
@ -1725,7 +1741,7 @@ void RowAggregation::updateEntry(const Row& rowIn)
{
std::ostringstream errmsg;
errmsg << "RowAggregation: function (id = " <<
(uint64_t) fFunctionCols[i]->fAggFunction << ") is not supported.";
(uint64_t) pFunctionCol->fAggFunction << ") is not supported.";
cerr << errmsg.str() << endl;
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
break;
@ -1997,131 +2013,142 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
}
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
RowUDAFFunctionCol* rowUDAF)
RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx)
{
std::vector<mcsv1sdk::ColumnDatum> valsIn;
execplan::CalpontSystemCatalog::ColDataType colDataType = fRowGroupIn.getColTypes()[colIn];
std::vector<uint32_t> dataFlags;
int32_t paramCount = fRGContext.getParameterCount();
// The vector of parameters to be sent to the UDAF
mcsv1sdk::ColumnDatum valsIn[paramCount];
uint32_t dataFlags[paramCount];
// Get the context for this rowGroup. Make a copy so we're thread safe.
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
// Turn on NULL flags
std::vector<uint32_t> flags;
uint32_t flag = 0;
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
execplan::CalpontSystemCatalog::ColDataType colDataType;
for (uint32_t i = 0; i < fRGContext.getParameterCount(); ++i)
{
if (rgContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
mcsv1sdk::ColumnDatum& datum = valsIn[i];
// Turn on NULL flags
dataFlags[i] = 0;
if (isNull(&fRowGroupIn, rowIn, colIn) == true)
{
return;
if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
{
return;
}
dataFlags[i] |= mcsv1sdk::PARAM_IS_NULL;
}
colDataType = fRowGroupIn.getColTypes()[colIn];
if (!fRGContext.isParamNull(i))
{
switch (colDataType)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
datum.columnData = rowIn.getIntField(colIn);
datum.scale = fRowGroupIn.getScale()[colIn];
datum.precision = fRowGroupIn.getPrecision()[colIn];
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
datum.columnData = rowIn.getUintField(colIn);
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
datum.dataType = execplan::CalpontSystemCatalog::DOUBLE;
datum.columnData = rowIn.getDoubleField(colIn);
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
datum.dataType = execplan::CalpontSystemCatalog::FLOAT;
datum.columnData = rowIn.getFloatField(colIn);
break;
}
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
datum.columnData = rowIn.getUintField(colIn);
break;
}
case execplan::CalpontSystemCatalog::TIME:
{
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
datum.columnData = rowIn.getIntField(colIn);
break;
}
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::CLOB:
case execplan::CalpontSystemCatalog::BLOB:
{
datum.dataType = colDataType;
datum.columnData = rowIn.getStringField(colIn);
break;
}
default:
{
std::ostringstream errmsg;
errmsg << "RowAggregation " << fRGContext.getName() <<
": No logic for data type: " << colDataType;
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
break;
}
}
}
flag |= mcsv1sdk::PARAM_IS_NULL;
}
flags.push_back(flag);
rgContext.setDataFlags(&flags);
mcsv1sdk::ColumnDatum datum;
if (!rgContext.isParamNull(0))
{
switch (colDataType)
// MCOL-1201: If there are multiple parameters, the next fFunctionCols
// will have the column used. By incrementing the funcColsIdx (passed by
// ref, we also increment the caller's index.
if (fFunctionCols.size() > funcColsIdx + 1
&& fFunctionCols[funcColsIdx+1]->fAggFunction == ROWAGG_MULTI_PARM)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
datum.columnData = rowIn.getIntField(colIn);
datum.scale = fRowGroupIn.getScale()[colIn];
datum.precision = fRowGroupIn.getPrecision()[colIn];
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
datum.columnData = rowIn.getUintField(colIn);
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
datum.dataType = execplan::CalpontSystemCatalog::DOUBLE;
datum.columnData = rowIn.getDoubleField(colIn);
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
datum.dataType = execplan::CalpontSystemCatalog::FLOAT;
datum.columnData = rowIn.getFloatField(colIn);
break;
}
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
datum.columnData = rowIn.getUintField(colIn);
break;
}
case execplan::CalpontSystemCatalog::TIME:
{
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
datum.columnData = rowIn.getIntField(colIn);
break;
}
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::CLOB:
case execplan::CalpontSystemCatalog::BLOB:
{
datum.dataType = colDataType;
datum.columnData = rowIn.getStringField(colIn);
break;
}
default:
{
std::ostringstream errmsg;
errmsg << "RowAggregation " << rgContext.getName() <<
": No logic for data type: " << colDataType;
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
break;
}
++funcColsIdx;
SP_ROWAGG_FUNC_t pFunctionCol = fFunctionCols[funcColsIdx];
colIn = pFunctionCol->fInputColumnIndex;
colOut = pFunctionCol->fOutputColumnIndex;
}
else
{
break;
}
}
valsIn.push_back(datum);
// The intermediate values are stored in userData referenced by colAux.
rgContext.setUserData(fRow.getUserData(colAux));
fRGContext.setDataFlags(dataFlags);
fRGContext.setUserData(fRow.getUserData(colAux));
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->nextValue(&rgContext, valsIn);
rgContext.setUserData(NULL);
rc = fRGContext.getFunction()->nextValue(&fRGContext, valsIn);
fRGContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
}
@ -2218,6 +2245,7 @@ RowAggregationUM::RowAggregationUM(const RowAggregationUM& rhs) :
fHasAvg(rhs.fHasAvg),
fKeyOnHeap(rhs.fKeyOnHeap),
fHasStatsFunc(rhs.fHasStatsFunc),
fHasUDAF(rhs.fHasUDAF),
fExpression(rhs.fExpression),
fTotalMemUsage(rhs.fTotalMemUsage),
fRm(rhs.fRm),
@ -2419,7 +2447,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn)
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF);
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i);
}
else
{
@ -2585,22 +2613,6 @@ void RowAggregationUM::calculateAvgColumns()
// Sets the value from valOut into column colOut, performing any conversions.
void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
{
static const static_any::any& charTypeId((char)1);
static const static_any::any& scharTypeId((signed char)1);
static const static_any::any& shortTypeId((short)1);
static const static_any::any& intTypeId((int)1);
static const static_any::any& longTypeId((long)1);
static const static_any::any& llTypeId((long long)1);
static const static_any::any& ucharTypeId((unsigned char)1);
static const static_any::any& ushortTypeId((unsigned short)1);
static const static_any::any& uintTypeId((unsigned int)1);
static const static_any::any& ulongTypeId((unsigned long)1);
static const static_any::any& ullTypeId((unsigned long long)1);
static const static_any::any& floatTypeId((float)1);
static const static_any::any& doubleTypeId((double)1);
static const std::string typeStr("");
static const static_any::any& strTypeId(typeStr);
execplan::CalpontSystemCatalog::ColDataType colDataType = fRowGroupOut->getColTypes()[colOut];
if (valOut.empty())
@ -2609,6 +2621,179 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
return;
}
int64_t intOut = 0;
uint64_t uintOut = 0;
float floatOut = 0.0;
double doubleOut = 0.0;
ostringstream oss;
std::string strOut;
bool bSetSuccess = false;
switch (colDataType)
{
case execplan::CalpontSystemCatalog::BIT:
case execplan::CalpontSystemCatalog::TINYINT:
if (valOut.compatible(charTypeId))
{
intOut = valOut.cast<char>();
bSetSuccess = true;
}
else if (valOut.compatible(scharTypeId))
{
intOut = valOut.cast<signed char>();
bSetSuccess = true;
}
if (bSetSuccess)
{
fRow.setIntField<1>(intOut, colOut);
}
break;
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
if (valOut.compatible(shortTypeId))
{
intOut = valOut.cast<short>();
fRow.setIntField<2>(intOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::INT:
if (valOut.compatible(uintTypeId))
{
intOut = valOut.cast<int>();
bSetSuccess = true;
}
else if (valOut.compatible(longTypeId))
{
intOut = valOut.cast<long>();
bSetSuccess = true;
}
if (bSetSuccess)
{
fRow.setIntField<4>(intOut, colOut);
}
break;
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
if (valOut.compatible(llTypeId))
{
intOut = valOut.cast<long long>();
fRow.setIntField<8>(intOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::UTINYINT:
if (valOut.compatible(ucharTypeId))
{
uintOut = valOut.cast<unsigned char>();
fRow.setUintField<1>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
if (valOut.compatible(ushortTypeId))
{
uintOut = valOut.cast<unsigned short>();
fRow.setUintField<2>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::UINT:
if (valOut.compatible(uintTypeId))
{
uintOut = valOut.cast<unsigned int>();
fRow.setUintField<4>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::UBIGINT:
if (valOut.compatible(ulongTypeId))
{
uintOut = valOut.cast<unsigned long>();
fRow.setUintField<8>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
if (valOut.compatible(ulongTypeId))
{
uintOut = valOut.cast<unsigned long>();
fRow.setUintField<8>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
if (valOut.compatible(floatTypeId))
{
floatOut = valOut.cast<float>();
fRow.setFloatField(floatOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
if (valOut.compatible(doubleTypeId))
{
doubleOut = valOut.cast<double>();
fRow.setDoubleField(doubleOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
if (valOut.compatible(strTypeId))
{
std::string strOut = valOut.cast<std::string>();
fRow.setStringField(strOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::CLOB:
case execplan::CalpontSystemCatalog::BLOB:
if (valOut.compatible(strTypeId))
{
std::string strOut = valOut.cast<std::string>();
fRow.setVarBinaryField(strOut, colOut);
bSetSuccess = true;
}
break;
default:
{
std::ostringstream errmsg;
errmsg << "RowAggregation: No logic for data type: " << colDataType;
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
break;
}
}
if (!bSetSuccess)
{
SetUDAFAnyValue(valOut, colOut);
}
}
void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut)
{
execplan::CalpontSystemCatalog::ColDataType colDataType = fRowGroupOut->getColTypes()[colOut];
// This may seem a bit convoluted. Users shouldn't return a type
// that they didn't set in mcsv1_UDAF::init(), but this
// handles whatever return type is given and casts
@ -2814,7 +2999,7 @@ void RowAggregationUM::calculateUDAFColumns()
continue;
rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
fRGContext = rowUDAF->fUDAFContext;
int64_t colOut = rowUDAF->fOutputColumnIndex;
int64_t colAux = rowUDAF->fAuxColumnIndex;
@ -2826,26 +3011,26 @@ void RowAggregationUM::calculateUDAFColumns()
fRowGroupOut->getRow(j, &fRow);
// Turn the NULL flag off. We can't know NULL at this point
rgContext.setDataFlags(NULL);
fRGContext.setDataFlags(NULL);
// The intermediate values are stored in colAux.
rgContext.setUserData(fRow.getUserData(colAux));
fRGContext.setUserData(fRow.getUserData(colAux));
// Call the UDAF evaluate function
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->evaluate(&rgContext, valOut);
rgContext.setUserData(NULL);
rc = fRGContext.getFunction()->evaluate(&fRGContext, valOut);
fRGContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::QueryDataExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
// Set the returned value into the output row
SetUDAFValue(valOut, colOut);
}
rgContext.setUserData(NULL);
fRGContext.setUserData(NULL);
}
}
@ -3116,54 +3301,60 @@ void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, u
{
// For a NULL constant, call nextValue with NULL and then evaluate.
bool bInterrupted = false;
mcsv1sdk::mcsv1Context context(((RowUDAFFunctionCol*)fFunctionCols[i].get())->fUDAFContext);
context.setInterrupted(bInterrupted);
context.createUserData();
fRGContext.setInterrupted(bInterrupted);
fRGContext.createUserData();
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
std::vector<mcsv1sdk::ColumnDatum> valsIn;
mcsv1sdk::ColumnDatum valsIn[1];
// Call a reset, then nextValue, then execute. This will evaluate
// the UDAF for the constant.
rc = context.getFunction()->reset(&context);
rc = fRGContext.getFunction()->reset(&fRGContext);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
fRGContext.setInterrupted(true);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
#if 0
uint32_t dataFlags[fRGContext.getParameterCount()];
for (uint32_t i = 0; i < fRGContext.getParameterCount(); ++i)
{
mcsv1sdk::ColumnDatum& datum = valsIn[i];
// Turn on NULL flags
dataFlags[i] = 0;
}
#endif
// Turn the NULL and CONSTANT flags on.
std::vector<uint32_t> flags;
uint32_t flag = mcsv1sdk::PARAM_IS_NULL | mcsv1sdk::PARAM_IS_CONSTANT;
flags.push_back(flag);
context.setDataFlags(&flags);
uint32_t flags[1];
flags[0] = mcsv1sdk::PARAM_IS_NULL | mcsv1sdk::PARAM_IS_CONSTANT;
fRGContext.setDataFlags(flags);
// Create a dummy datum
mcsv1sdk::ColumnDatum datum;
mcsv1sdk::ColumnDatum& datum = valsIn[0];
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
datum.columnData = 0;
valsIn.push_back(datum);
rc = context.getFunction()->nextValue(&context, valsIn);
rc = fRGContext.getFunction()->nextValue(&fRGContext, valsIn);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
fRGContext.setInterrupted(true);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
static_any::any valOut;
rc = context.getFunction()->evaluate(&context, valOut);
rc = fRGContext.getFunction()->evaluate(&fRGContext, valOut);
fRGContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
fRGContext.setInterrupted(true);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
// Set the returned value into the output row
SetUDAFValue(valOut, colOut);
context.setDataFlags(NULL);
fRGContext.setDataFlags(NULL);
}
break;
@ -3460,30 +3651,28 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
case ROWAGG_UDAF:
{
bool bInterrupted = false;
mcsv1sdk::mcsv1Context context(((RowUDAFFunctionCol*)fFunctionCols[i].get())->fUDAFContext);
context.setInterrupted(bInterrupted);
context.createUserData();
fRGContext.setInterrupted(bInterrupted);
fRGContext.createUserData();
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
std::vector<mcsv1sdk::ColumnDatum> valsIn;
mcsv1sdk::ColumnDatum valsIn[1];
// Call a reset, then nextValue, then execute. This will evaluate
// the UDAF for the constant.
rc = context.getFunction()->reset(&context);
rc = fRGContext.getFunction()->reset(&fRGContext);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
fRGContext.setInterrupted(true);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
// Turn the CONSTANT flags on.
std::vector<uint32_t> flags;
uint32_t flag = mcsv1sdk::PARAM_IS_CONSTANT;
flags.push_back(flag);
context.setDataFlags(&flags);
uint32_t flags[1];
flags[0] = mcsv1sdk::PARAM_IS_CONSTANT;
fRGContext.setDataFlags(flags);
// Create a datum item for sending to UDAF
mcsv1sdk::ColumnDatum datum;
mcsv1sdk::ColumnDatum& datum = valsIn[0];
datum.dataType = (CalpontSystemCatalog::ColDataType)colDataType;
switch (colDataType)
@ -3567,27 +3756,27 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
break;
}
valsIn.push_back(datum);
rc = context.getFunction()->nextValue(&context, valsIn);
rc = fRGContext.getFunction()->nextValue(&fRGContext, valsIn);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
fRGContext.setInterrupted(true);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
static_any::any valOut;
rc = context.getFunction()->evaluate(&context, valOut);
rc = fRGContext.getFunction()->evaluate(&fRGContext, valOut);
fRGContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
context.setInterrupted(true);
throw logging::QueryDataExcept(context.getErrorMessage(), logging::aggregateFuncErr);
fRGContext.setInterrupted(true);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
// Set the returned value into the output row
SetUDAFValue(valOut, colOut);
context.setDataFlags(NULL);
fRGContext.setDataFlags(NULL);
}
break;
@ -3806,7 +3995,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn)
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF);
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i);
}
else
{
@ -4011,45 +4200,43 @@ void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut
// rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance
//------------------------------------------------------------------------------
void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux,
RowUDAFFunctionCol* rowUDAF)
RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx)
{
static_any::any valOut;
mcsv1sdk::mcsv1Context rgContext(rowUDAF->fUDAFContext);
// Get the user data
boost::shared_ptr<mcsv1sdk::UserData> userData = rowIn.getUserData(colIn + 1);
// Unlike other aggregates, the data isn't in colIn, so testing it for NULL
// there won't help. In case of NULL, userData will be NULL.
std::vector<uint32_t> flags;
uint32_t flag = 0;
uint32_t flags[1];
flags[0] = 0;
if (!userData)
{
if (rgContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
{
return;
}
// Turn on NULL flags
flag |= mcsv1sdk::PARAM_IS_NULL;
flags[0] |= mcsv1sdk::PARAM_IS_NULL;
}
flags.push_back(flag);
rgContext.setDataFlags(&flags);
fRGContext.setDataFlags(flags);
// The intermediate values are stored in colAux.
rgContext.setUserData(fRow.getUserData(colAux));
fRGContext.setUserData(fRow.getUserData(colAux));
// Call the UDAF subEvaluate method
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
rc = rgContext.getFunction()->subEvaluate(&rgContext, userData.get());
rgContext.setUserData(NULL);
rc = fRGContext.getFunction()->subEvaluate(&fRGContext, userData.get());
fRGContext.setUserData(NULL);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
{
rowUDAF->bInterrupted = true;
throw logging::IDBExcept(rgContext.getErrorMessage(), logging::aggregateFuncErr);
throw logging::IDBExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
}
@ -4246,7 +4433,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn)
if (rowUDAF)
{
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF);
doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i);
}
else
{