You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-4172 Add support for wide-DECIMAL into statistical aggregate and regr_* UDAF functions
The patch fixes wrong results returned when multiple UDAF exist in projection aggregate over wide decimal literals now works
This commit is contained in:
@ -239,16 +239,20 @@ const static_any::any& RowAggregation::shortTypeId((short)1);
|
||||
const static_any::any& RowAggregation::intTypeId((int)1);
|
||||
const static_any::any& RowAggregation::longTypeId((long)1);
|
||||
const static_any::any& RowAggregation::llTypeId((long long)1);
|
||||
const static_any::any& RowAggregation::int128TypeId((__int128)1);
|
||||
const static_any::any& RowAggregation::ucharTypeId((unsigned char)1);
|
||||
const static_any::any& RowAggregation::ushortTypeId((unsigned short)1);
|
||||
const static_any::any& RowAggregation::uintTypeId((unsigned int)1);
|
||||
const static_any::any& RowAggregation::ulongTypeId((unsigned long)1);
|
||||
const static_any::any& RowAggregation::ullTypeId((unsigned long long)1);
|
||||
const static_any::any& RowAggregation::uint128TypeId((unsigned __int128)1);
|
||||
const static_any::any& RowAggregation::floatTypeId((float)1);
|
||||
const static_any::any& RowAggregation::doubleTypeId((double)1);
|
||||
const static_any::any& RowAggregation::longdoubleTypeId((long double)1);
|
||||
const static_any::any& RowAggregation::strTypeId(typeStr);
|
||||
|
||||
using Dec = datatypes::Decimal;
|
||||
|
||||
KeyStorage::KeyStorage(const RowGroup& keys, Row** tRow) : tmpRow(tRow), rg(keys)
|
||||
{
|
||||
RGData data(rg);
|
||||
@ -619,9 +623,6 @@ RowAggregation::RowAggregation(const RowAggregation& rhs):
|
||||
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
|
||||
fRGContext(rhs.fRGContext), fOrigFunctionCols(NULL)
|
||||
{
|
||||
//fGroupByCols.clear();
|
||||
//fFunctionCols.clear();
|
||||
|
||||
fGroupByCols.assign(rhs.fGroupByCols.begin(), rhs.fGroupByCols.end());
|
||||
fFunctionCols.assign(rhs.fFunctionCols.begin(), rhs.fFunctionCols.end());
|
||||
}
|
||||
@ -721,31 +722,32 @@ void RowAggregation::setJoinRowGroups(vector<RowGroup>* pSmallSideRG, RowGroup*
|
||||
// threads on the PM and by multple threads on the UM. It must remain
|
||||
// thread safe.
|
||||
//------------------------------------------------------------------------------
|
||||
void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF)
|
||||
void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF, uint64_t funcColsIdx)
|
||||
{
|
||||
// RowAggregation and it's functions need to be re-entrant which means
|
||||
// each instance (thread) needs its own copy of the context object.
|
||||
// Note: operator=() doesn't copy userData.
|
||||
fRGContext = rowUDAF->fUDAFContext;
|
||||
fRGContextColl[funcColsIdx] = rowUDAF->fUDAFContext;
|
||||
|
||||
// Call the user reset for the group userData. Since, at this point,
|
||||
// context's userData will be NULL, reset will generate a new one.
|
||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||
rc = fRGContext.getFunction()->reset(&fRGContext);
|
||||
rc = fRGContextColl[funcColsIdx].getFunction()->reset(&fRGContextColl[funcColsIdx]);
|
||||
|
||||
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
||||
{
|
||||
rowUDAF->bInterrupted = true;
|
||||
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
||||
throw logging::QueryDataExcept(fRGContextColl[funcColsIdx].getErrorMessage(),
|
||||
logging::aggregateFuncErr);
|
||||
}
|
||||
|
||||
fRow.setUserDataStore(fRowGroupOut->getRGData()->getUserDataStore());
|
||||
fRow.setUserData(fRGContext,
|
||||
fRGContext.getUserDataSP(),
|
||||
fRGContext.getUserDataSize(),
|
||||
fRow.setUserData(fRGContextColl[funcColsIdx],
|
||||
fRGContextColl[funcColsIdx].getUserDataSP(),
|
||||
fRGContextColl[funcColsIdx].getUserDataSize(),
|
||||
rowUDAF->fAuxColumnIndex);
|
||||
|
||||
fRGContext.setUserData(NULL); // Prevents calling deleteUserData on the fRGContext.
|
||||
// Prevents calling deleteUserData on the mcsv1Context.
|
||||
fRGContextColl[funcColsIdx].setUserData(NULL);
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -784,13 +786,15 @@ void RowAggregation::initialize()
|
||||
{
|
||||
fRowGroupOut->setRowCount(1);
|
||||
attachGroupConcatAg();
|
||||
|
||||
// Lazy approach w/o a mapping b/w fFunctionCols idx and fRGContextColl idx
|
||||
fRGContextColl.resize(fFunctionCols.size());
|
||||
// For UDAF, reset the data
|
||||
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -842,7 +846,8 @@ void RowAggregation::aggReset()
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -897,7 +902,8 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
|
||||
{
|
||||
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -907,7 +913,8 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -972,7 +979,8 @@ void RowAggregation::aggregateRow(Row& row)
|
||||
{
|
||||
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -982,7 +990,8 @@ void RowAggregation::aggregateRow(Row& row)
|
||||
{
|
||||
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
|
||||
{
|
||||
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
|
||||
auto rowUDAFColumnPtr = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get());
|
||||
resetUDAF(rowUDAFColumnPtr, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1033,7 +1042,7 @@ void RowAggregation::initMapData(const Row& rowIn)
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
if (LIKELY(fRow.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
||||
if (LIKELY(rowIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
||||
{
|
||||
uint32_t colOutOffset = fRow.getOffset(colOut);
|
||||
fRow.setBinaryField_offset(
|
||||
@ -1041,7 +1050,7 @@ void RowAggregation::initMapData(const Row& rowIn)
|
||||
sizeof(int128_t),
|
||||
colOutOffset);
|
||||
}
|
||||
else if (fRow.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
||||
else if (rowIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
||||
{
|
||||
fRow.setIntField(rowIn.getIntField(colIn), colOut);
|
||||
}
|
||||
@ -2025,9 +2034,24 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
|
||||
case execplan::CalpontSystemCatalog::MEDINT:
|
||||
case execplan::CalpontSystemCatalog::INT:
|
||||
case execplan::CalpontSystemCatalog::BIGINT:
|
||||
valIn = (long double) rowIn.getIntField(colIn);
|
||||
break;
|
||||
|
||||
case execplan::CalpontSystemCatalog::DECIMAL: // handle scale later
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL: // handle scale later
|
||||
valIn = (long double) rowIn.getIntField(colIn);
|
||||
if (LIKELY(fRowGroupIn.getColumnWidth(colIn) == datatypes::MAXDECIMALWIDTH))
|
||||
{
|
||||
int128_t* val128InPtr = rowIn.getBinaryField<int128_t>(colIn);
|
||||
valIn = Dec::getLongDoubleFromWideDecimal(*val128InPtr);
|
||||
}
|
||||
else if (fRowGroupIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
||||
{
|
||||
valIn = (long double) rowIn.getIntField(colIn);
|
||||
}
|
||||
else
|
||||
{
|
||||
idbassert(false);
|
||||
}
|
||||
break;
|
||||
|
||||
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||
@ -2068,7 +2092,10 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
|
||||
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
int64_t colAux, uint64_t& funcColsIdx)
|
||||
{
|
||||
uint32_t paramCount = fRGContext.getParameterCount();
|
||||
uint32_t paramCount = fRGContextColl[funcColsIdx].getParameterCount();
|
||||
// doUDAF changes funcColsIdx to skip UDAF arguments so the real UDAF
|
||||
// column idx is the initial value of the funcColsIdx
|
||||
uint64_t origFuncColsIdx = funcColsIdx;
|
||||
// The vector of parameters to be sent to the UDAF
|
||||
utils::VLArray<mcsv1sdk::ColumnDatum> valsIn(paramCount);
|
||||
utils::VLArray<uint32_t> dataFlags(paramCount);
|
||||
@ -2094,7 +2121,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
if ((cc && cc->type() == execplan::ConstantColumn::NULLDATA)
|
||||
|| (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
|
||||
{
|
||||
if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
if (fRGContextColl[origFuncColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
{
|
||||
// When Ignore nulls, if there are multiple parameters and any
|
||||
// one of them is NULL, we ignore the entry. We need to increment
|
||||
@ -2139,7 +2166,6 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
datum.scale = fRowGroupIn.getScale()[colIn];
|
||||
datum.precision = fRowGroupIn.getPrecision()[colIn];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@ -2156,7 +2182,19 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
}
|
||||
else
|
||||
{
|
||||
datum.columnData = rowIn.getIntField(colIn);
|
||||
if (LIKELY(fRowGroupIn.getColumnWidth(colIn)
|
||||
== datatypes::MAXDECIMALWIDTH))
|
||||
{
|
||||
datum.columnData = rowIn.getInt128Field(colIn);
|
||||
}
|
||||
else if (fRowGroupIn.getColumnWidth(colIn) <= datatypes::MAXLEGACYWIDTH)
|
||||
{
|
||||
datum.columnData = rowIn.getIntField(colIn);
|
||||
}
|
||||
else
|
||||
{
|
||||
idbassert(false);
|
||||
}
|
||||
datum.scale = fRowGroupIn.getScale()[colIn];
|
||||
datum.precision = fRowGroupIn.getPrecision()[colIn];
|
||||
}
|
||||
@ -2322,7 +2360,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
default:
|
||||
{
|
||||
std::ostringstream errmsg;
|
||||
errmsg << "RowAggregation " << fRGContext.getName() <<
|
||||
errmsg << "RowAggregation " << fRGContextColl[origFuncColsIdx].getName() <<
|
||||
": No logic for data type: " << colDataType;
|
||||
throw logging::QueryDataExcept(errmsg.str(), logging::aggregateFuncErr);
|
||||
break;
|
||||
@ -2348,18 +2386,20 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
}
|
||||
|
||||
// The intermediate values are stored in userData referenced by colAux.
|
||||
fRGContext.setDataFlags(dataFlags);
|
||||
fRGContext.setUserData(fRow.getUserData(colAux));
|
||||
fRGContextColl[origFuncColsIdx].setDataFlags(dataFlags);
|
||||
fRGContextColl[origFuncColsIdx].setUserData(fRow.getUserData(colAux));
|
||||
|
||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||
rc = fRGContext.getFunction()->nextValue(&fRGContext, valsIn);
|
||||
fRGContext.setUserData(NULL);
|
||||
rc = fRGContextColl[origFuncColsIdx].getFunction()->nextValue(&fRGContextColl[origFuncColsIdx],
|
||||
valsIn);
|
||||
fRGContextColl[origFuncColsIdx].setUserData(NULL);
|
||||
|
||||
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
||||
{
|
||||
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColsIdx].get());
|
||||
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[origFuncColsIdx].get());
|
||||
rowUDAF->bInterrupted = true;
|
||||
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
|
||||
throw logging::QueryDataExcept(fRGContextColl[origFuncColsIdx].getErrorMessage(),
|
||||
logging::aggregateFuncErr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2758,11 +2798,11 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t intOut = 0;
|
||||
uint64_t uintOut = 0;
|
||||
float floatOut = 0.0;
|
||||
double doubleOut = 0.0;
|
||||
long double longdoubleOut = 0.0;
|
||||
int64_t intOut;
|
||||
uint64_t uintOut;
|
||||
float floatOut;
|
||||
double doubleOut;
|
||||
long double longdoubleOut;
|
||||
ostringstream oss;
|
||||
std::string strOut;
|
||||
|
||||
@ -2829,6 +2869,12 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
|
||||
fRow.setIntField<8>(intOut, colOut);
|
||||
bSetSuccess = true;
|
||||
}
|
||||
else if (valOut.compatible(int128TypeId))
|
||||
{
|
||||
int128_t int128Out = valOut.cast<int128_t>();
|
||||
fRow.setInt128Field(int128Out, colOut);
|
||||
bSetSuccess = true;
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
@ -2952,6 +2998,8 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
|
||||
|
||||
if (!bSetSuccess)
|
||||
{
|
||||
// This means the return from the UDAF doesn't match the field
|
||||
// This handles the mismatch
|
||||
SetUDAFAnyValue(valOut, colOut);
|
||||
}
|
||||
}
|
||||
@ -2964,104 +3012,102 @@ void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut)
|
||||
// that they didn't set in mcsv1_UDAF::init(), but this
|
||||
// handles whatever return type is given and casts
|
||||
// it to whatever they said to return.
|
||||
// TODO: Save cpu cycles here.
|
||||
// TODO: Save cpu cycles here. For one, we don't need to initialize these
|
||||
|
||||
int64_t intOut = 0;
|
||||
uint64_t uintOut = 0;
|
||||
float floatOut = 0.0;
|
||||
double doubleOut = 0.0;
|
||||
long double longdoubleOut = 0.0;
|
||||
int128_t int128Out = 0;
|
||||
ostringstream oss;
|
||||
std::string strOut;
|
||||
|
||||
if (valOut.compatible(charTypeId))
|
||||
{
|
||||
uintOut = intOut = valOut.cast<char>();
|
||||
floatOut = intOut;
|
||||
int128Out = uintOut = intOut = valOut.cast<char>();
|
||||
doubleOut = intOut;
|
||||
oss << intOut;
|
||||
}
|
||||
else if (valOut.compatible(scharTypeId))
|
||||
{
|
||||
uintOut = intOut = valOut.cast<signed char>();
|
||||
floatOut = intOut;
|
||||
int128Out = uintOut = intOut = valOut.cast<signed char>();
|
||||
doubleOut = intOut;
|
||||
oss << intOut;
|
||||
}
|
||||
else if (valOut.compatible(shortTypeId))
|
||||
{
|
||||
uintOut = intOut = valOut.cast<short>();
|
||||
floatOut = intOut;
|
||||
int128Out = uintOut = intOut = valOut.cast<short>();
|
||||
doubleOut = intOut;
|
||||
oss << intOut;
|
||||
}
|
||||
else if (valOut.compatible(intTypeId))
|
||||
{
|
||||
uintOut = intOut = valOut.cast<int>();
|
||||
floatOut = intOut;
|
||||
int128Out = uintOut = intOut = valOut.cast<int>();
|
||||
doubleOut = intOut;
|
||||
oss << intOut;
|
||||
}
|
||||
else if (valOut.compatible(longTypeId))
|
||||
{
|
||||
uintOut = intOut = valOut.cast<long>();
|
||||
floatOut = intOut;
|
||||
int128Out = uintOut = intOut = valOut.cast<long>();
|
||||
doubleOut = intOut;
|
||||
oss << intOut;
|
||||
}
|
||||
else if (valOut.compatible(llTypeId))
|
||||
{
|
||||
uintOut = intOut = valOut.cast<long long>();
|
||||
floatOut = intOut;
|
||||
int128Out = uintOut = intOut = valOut.cast<long long>();
|
||||
doubleOut = intOut;
|
||||
oss << intOut;
|
||||
}
|
||||
else if (valOut.compatible(ucharTypeId))
|
||||
{
|
||||
intOut = uintOut = valOut.cast<unsigned char>();
|
||||
floatOut = uintOut;
|
||||
int128Out = intOut = uintOut = valOut.cast<unsigned char>();
|
||||
doubleOut = uintOut;
|
||||
oss << uintOut;
|
||||
}
|
||||
else if (valOut.compatible(ushortTypeId))
|
||||
{
|
||||
intOut = uintOut = valOut.cast<unsigned short>();
|
||||
floatOut = uintOut;
|
||||
int128Out = intOut = uintOut = valOut.cast<unsigned short>();
|
||||
doubleOut = uintOut;
|
||||
oss << uintOut;
|
||||
}
|
||||
else if (valOut.compatible(uintTypeId))
|
||||
{
|
||||
intOut = uintOut = valOut.cast<unsigned int>();
|
||||
floatOut = uintOut;
|
||||
int128Out = intOut = uintOut = valOut.cast<unsigned int>();
|
||||
doubleOut = uintOut;
|
||||
oss << uintOut;
|
||||
}
|
||||
else if (valOut.compatible(ulongTypeId))
|
||||
{
|
||||
intOut = uintOut = valOut.cast<unsigned long>();
|
||||
floatOut = uintOut;
|
||||
int128Out = intOut = uintOut = valOut.cast<unsigned long>();
|
||||
doubleOut = uintOut;
|
||||
oss << uintOut;
|
||||
}
|
||||
else if (valOut.compatible(ullTypeId))
|
||||
{
|
||||
intOut = uintOut = valOut.cast<unsigned long long>();
|
||||
floatOut = uintOut;
|
||||
int128Out = intOut = uintOut = valOut.cast<unsigned long long>();
|
||||
doubleOut = uintOut;
|
||||
oss << uintOut;
|
||||
}
|
||||
else if (valOut.compatible(floatTypeId))
|
||||
else if (valOut.compatible(int128TypeId))
|
||||
{
|
||||
floatOut = valOut.cast<float>();
|
||||
doubleOut = floatOut;
|
||||
longdoubleOut = doubleOut;
|
||||
intOut = uintOut = floatOut;
|
||||
oss << floatOut;
|
||||
intOut = uintOut = int128Out = valOut.cast<int128_t>();
|
||||
doubleOut = uintOut;
|
||||
oss << uintOut;
|
||||
}
|
||||
else if (valOut.compatible(doubleTypeId))
|
||||
else if (valOut.compatible(floatTypeId) || valOut.compatible(doubleTypeId))
|
||||
{
|
||||
doubleOut = valOut.cast<double>();
|
||||
longdoubleOut = doubleOut;
|
||||
floatOut = (float)doubleOut;
|
||||
uintOut = (uint64_t)doubleOut;
|
||||
intOut = (int64_t)doubleOut;
|
||||
// Should look at scale for decimal and adjust
|
||||
doubleOut = valOut.cast<float>();
|
||||
int128Out = doubleOut;
|
||||
intOut = uintOut = doubleOut;
|
||||
oss << doubleOut;
|
||||
}
|
||||
|
||||
else if (valOut.compatible(longdoubleTypeId))
|
||||
{
|
||||
// Should look at scale for decimal and adjust
|
||||
longdoubleOut = valOut.cast<long double>();
|
||||
int128Out = longdoubleOut;
|
||||
doubleOut = (double)longdoubleOut;
|
||||
floatOut = (float)doubleOut;
|
||||
uintOut = (uint64_t)doubleOut;
|
||||
intOut = (int64_t)doubleOut;
|
||||
oss << doubleOut;
|
||||
@ -3075,7 +3121,7 @@ void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut)
|
||||
uintOut = strtoul(strOut.c_str(), NULL, 10);
|
||||
doubleOut = strtod(strOut.c_str(), NULL);
|
||||
longdoubleOut = strtold(strOut.c_str(), NULL);
|
||||
floatOut = (float)doubleOut;
|
||||
int128Out = longdoubleOut;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3099,11 +3145,20 @@ void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut)
|
||||
break;
|
||||
|
||||
case execplan::CalpontSystemCatalog::BIGINT:
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
fRow.setIntField<8>(intOut, colOut);
|
||||
break;
|
||||
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
uint32_t width = fRowGroupOut->getColumnWidth(colOut);
|
||||
if (width == datatypes::MAXDECIMALWIDTH)
|
||||
fRow.setInt128Field(int128Out, colOut);
|
||||
else
|
||||
fRow.setIntField<8>(intOut, colOut);
|
||||
break;
|
||||
}
|
||||
|
||||
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||
fRow.setUintField<1>(uintOut, colOut);
|
||||
break;
|
||||
@ -3135,8 +3190,11 @@ void RowAggregationUM::SetUDAFAnyValue(static_any::any& valOut, int64_t colOut)
|
||||
|
||||
case execplan::CalpontSystemCatalog::FLOAT:
|
||||
case execplan::CalpontSystemCatalog::UFLOAT:
|
||||
{
|
||||
float floatOut = (float)doubleOut;
|
||||
fRow.setFloatField(floatOut, colOut);
|
||||
break;
|
||||
}
|
||||
|
||||
case execplan::CalpontSystemCatalog::DOUBLE:
|
||||
case execplan::CalpontSystemCatalog::UDOUBLE:
|
||||
@ -3407,10 +3465,28 @@ void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, u
|
||||
case execplan::CalpontSystemCatalog::MEDINT:
|
||||
case execplan::CalpontSystemCatalog::INT:
|
||||
case execplan::CalpontSystemCatalog::BIGINT:
|
||||
{
|
||||
fRow.setIntField(getIntNullValue(colDataType), colOut);
|
||||
}
|
||||
break;
|
||||
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
fRow.setIntField(getIntNullValue(colDataType), colOut);
|
||||
auto width = fRow.getColumnWidth(colOut);
|
||||
if (fRow.getColumnWidth(colOut) == datatypes::MAXDECIMALWIDTH)
|
||||
{
|
||||
fRow.setInt128Field(datatypes::Decimal128Null, colOut);
|
||||
}
|
||||
else if (width <= datatypes::MAXLEGACYWIDTH)
|
||||
{
|
||||
fRow.setIntField(getIntNullValue(colDataType), colOut);
|
||||
}
|
||||
else
|
||||
{
|
||||
idbassert(0);
|
||||
throw std::logic_error("RowAggregationUM::doNullConstantAggregate(): DECIMAL bad length.");
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
@ -3570,7 +3646,7 @@ void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, u
|
||||
void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData, uint64_t i)
|
||||
{
|
||||
int64_t colOut = fFunctionCols[i]->fOutputColumnIndex;
|
||||
int colDataType = (fRowGroupOut->getColTypes())[colOut];
|
||||
auto colDataType = (fRowGroupOut->getColTypes())[colOut];
|
||||
int64_t rowCnt = fRow.getIntField(fFunctionCols[i]->fAuxColumnIndex);
|
||||
|
||||
switch (aggData.fOp)
|
||||
@ -3592,7 +3668,7 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
|
||||
{
|
||||
fRow.setIntField(strtol(aggData.fConstValue.c_str(), 0, 10), colOut);
|
||||
}
|
||||
break;
|
||||
break;
|
||||
|
||||
// AVG should not be uint32_t result type.
|
||||
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||
@ -3608,9 +3684,27 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
double dbl = strtod(aggData.fConstValue.c_str(), 0);
|
||||
double scale = pow(10.0, (double) fRowGroupOut->getScale()[i]);
|
||||
fRow.setIntField((int64_t)(scale * dbl), colOut);
|
||||
auto width = fRow.getColumnWidth(colOut);
|
||||
if (width == datatypes::MAXDECIMALWIDTH)
|
||||
{
|
||||
ColTypeAlias colType;
|
||||
colType.colWidth = width;
|
||||
colType.precision = fRow.getPrecision(i);
|
||||
colType.scale = fRow.getScale(i);
|
||||
colType.colDataType = colDataType;
|
||||
fRow.setInt128Field(Dec::int128FromString(aggData.fConstValue, colType), colOut);
|
||||
}
|
||||
else if (width <= datatypes::MAXLEGACYWIDTH)
|
||||
{
|
||||
double dbl = strtod(aggData.fConstValue.c_str(), 0);
|
||||
double scale = pow(10.0, (double) fRowGroupOut->getScale()[i]);
|
||||
fRow.setIntField((int64_t)(scale * dbl), colOut);
|
||||
}
|
||||
else
|
||||
{
|
||||
idbassert(0);
|
||||
throw std::logic_error("RowAggregationUM::doNotNullConstantAggregate(): DECIMAL bad length.");
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
@ -3715,15 +3809,39 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
double dbl = strtod(aggData.fConstValue.c_str(), 0);
|
||||
dbl *= pow(10.0, (double) fRowGroupOut->getScale()[i]);
|
||||
dbl *= rowCnt;
|
||||
auto width = fRow.getColumnWidth(colOut);
|
||||
if (width == datatypes::MAXDECIMALWIDTH)
|
||||
{
|
||||
ColTypeAlias colType;
|
||||
colType.colWidth = width;
|
||||
colType.precision = fRow.getPrecision(i);
|
||||
colType.scale = fRow.getScale(i);
|
||||
colType.colDataType = colDataType;
|
||||
int128_t constValue = Dec::int128FromString(aggData.fConstValue,
|
||||
colType);
|
||||
int128_t sum;
|
||||
|
||||
if ((dbl > 0 && dbl > (double) numeric_limits<int64_t>::max()) ||
|
||||
(dbl < 0 && dbl < (double) numeric_limits<int64_t>::min()))
|
||||
throw logging::QueryDataExcept(overflowMsg, logging::aggregateDataErr);
|
||||
datatypes::MultiplicationOverflowCheck multOp;
|
||||
multOp(constValue, rowCnt, sum);
|
||||
fRow.setInt128Field(sum, colOut);
|
||||
}
|
||||
else if (width == datatypes::MAXLEGACYWIDTH)
|
||||
{
|
||||
double dbl = strtod(aggData.fConstValue.c_str(), 0);
|
||||
dbl *= pow(10.0, (double) fRowGroupOut->getScale()[i]);
|
||||
dbl *= rowCnt;
|
||||
|
||||
if ((dbl > 0 && dbl > (double) numeric_limits<int64_t>::max()) ||
|
||||
(dbl < 0 && dbl < (double) numeric_limits<int64_t>::min()))
|
||||
throw logging::QueryDataExcept(overflowMsg, logging::aggregateDataErr);
|
||||
fRow.setIntField((int64_t) dbl, colOut);
|
||||
}
|
||||
else
|
||||
{
|
||||
idbassert(0);
|
||||
throw std::logic_error("RowAggregationUM::doNotNullConstantAggregate(): sum() DECIMAL bad length.");
|
||||
}
|
||||
|
||||
fRow.setIntField((int64_t) dbl, colOut);
|
||||
}
|
||||
break;
|
||||
|
||||
@ -4459,7 +4577,7 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
|
||||
if (!userDataIn)
|
||||
{
|
||||
if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
if (fRGContextColl[funcColsIdx].getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
|
||||
{
|
||||
return;
|
||||
}
|
||||
@ -4468,14 +4586,14 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
|
||||
flags[0] |= mcsv1sdk::PARAM_IS_NULL;
|
||||
}
|
||||
|
||||
fRGContext.setDataFlags(flags);
|
||||
fRGContextColl[funcColsIdx].setDataFlags(flags);
|
||||
|
||||
// The intermediate values are stored in colAux.
|
||||
fRGContext.setUserData(fRow.getUserData(colAux));
|
||||
fRGContextColl[funcColsIdx].setUserData(fRow.getUserData(colAux));
|
||||
|
||||
// Call the UDAF subEvaluate method
|
||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||
rc = fRGContext.getFunction()->subEvaluate(&fRGContext, userDataIn.get());
|
||||
rc = fRGContextColl[funcColsIdx].getFunction()->subEvaluate(&fRGContextColl[funcColsIdx], userDataIn.get());
|
||||
fRGContext.setUserData(NULL);
|
||||
|
||||
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)
|
||||
|
Reference in New Issue
Block a user