1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-4171

This commit is contained in:
David Hall
2020-07-30 17:28:11 -05:00
committed by Roman Nozdrin
parent 5287e6860b
commit 638202417f
40 changed files with 807 additions and 250 deletions

View File

@ -1,4 +1,4 @@
/* Copyright (C) 2014 InfiniDB, Inc.
/* CopyrighT (C) 2014 InfiniDB, Inc.
Copyright (c) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
@ -50,112 +50,116 @@ using namespace joblist;
#include "wf_sum_avg.h"
#if 0
namespace
namespace windowfunction
{
template<typename T>
void checkSumLimit(T sum, T val)
template<typename T_IN, typename T_OUT>
void WF_sum_avg<T_IN, T_OUT>::checkSumLimit(long double val, long double sum)
{
}
template<>
void checkSumLimit<int64_t>(int64_t sum, int64_t val)
template<typename T_IN, typename T_OUT>
inline void WF_sum_avg<T_IN, T_OUT>::checkSumLimit(int128_t val, int128_t sum)
{
if (((sum >= 0) && ((numeric_limits<int64_t>::max() - sum) < val)) ||
((sum < 0) && ((numeric_limits<int64_t>::min() - sum) > val)))
if (((sum >= 0) && ((fMax128 - sum) < val)) ||
((sum < 0) && ((fMin128 - sum) > val)))
{
string errStr = "SUM(int):";
string errStr = "SUM(int128_t)";
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_OVERFLOW, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_OVERFLOW);
}
}
ostringstream oss;
oss << sum << "+" << val;
template<typename T_IN, typename T_OUT>
inline void WF_sum_avg<T_IN, T_OUT>::checkSumLimit(uint128_t val, uint128_t sum)
{
if ((fMaxu128 - sum) < val)
{
string errStr = "SUM(uint128_t)";
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_OVERFLOW, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_OVERFLOW);
}
}
if (sum > 0)
oss << " > " << numeric_limits<uint64_t>::max();
template<typename T_IN, typename T_OUT>
int128_t WF_sum_avg<T_IN, T_OUT>::calculateAvg(int128_t sum, uint64_t count, int scale)
{
int128_t factor = pow(10.0, scale);
int128_t avg;
if (scale > 0)
{
if ((sum * factor) / factor == sum)
{
avg = sum * factor;
avg /= count;
}
else
oss << " < " << numeric_limits<uint64_t>::min();
errStr += oss.str();
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_OVERFLOW, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_OVERFLOW);
{
// scale won't fit before divide, we're gonna lose precision.
avg = sum / count;
if ((avg * factor) / factor != avg) // Still won't fit
{
string errStr = string("AVG(int)");
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_OVERFLOW, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_OVERFLOW);
}
avg *= factor;
}
}
}
template<>
void checkSumLimit<uint64_t>(uint64_t sum, uint64_t val)
{
if ((sum >= 0) && ((numeric_limits<uint64_t>::max() - sum) < val))
else
{
string errStr = "SUM(unsigned):";
ostringstream oss;
oss << sum << "+" << val << " > " << numeric_limits<uint64_t>::max();
errStr += oss.str();
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_OVERFLOW, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_OVERFLOW);
avg = sum / count;
}
}
template<>
long double calculateAvg(long double sum, uint64_t count, int s)
{
return sum / count;
}
long double avgWithLimit(long double sum, uint64_t count, int scale, long double u, long double l)
{
long double factor = pow(10.0, scale);
long double avg = sum / count;
avg *= factor;
avg += (avg < 0) ? (-0.5) : (0.5);
if (avg > u || avg < l)
{
string errStr = string("AVG") + (l < 0 ? "(int):" : "(unsign)");
ostringstream oss;
oss << avg;
errStr += oss.str();
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_OVERFLOW, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_OVERFLOW);
}
return avg;
}
template<>
int64_t calculateAvg<int64_t>(int64_t sum, uint64_t count, int scale)
template<typename T_IN, typename T_OUT>
uint128_t WF_sum_avg<T_IN, T_OUT>::calculateAvg(uint128_t sum, uint64_t count, int scale)
{
int64_t t = (int64_t) avgWithLimit(sum, count, scale,
numeric_limits<int64_t>::max(), numeric_limits<int64_t>::min());
return t;
uint128_t factor = pow(10.0, scale);
uint128_t avg = sum;
if (scale > 0)
{
if ((sum * factor) / factor == sum)
{
avg = sum * factor;
avg /= count;
}
else
{
// scale won't fit before divide, we're gonna lose precision.
avg = sum / count;
if ((avg * factor) / factor != avg) // Still won't fit
{
string errStr = string("AVG(int)");
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_OVERFLOW, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_OVERFLOW);
}
avg *= factor;
}
}
else
{
avg = sum / count;
}
avg += 0.5;
return avg;
}
template<>
uint64_t calculateAvg<uint64_t>(uint64_t sum, uint64_t count, int scale)
template<typename T_IN, typename T_OUT>
inline long double WF_sum_avg<T_IN, T_OUT>::calculateAvg(long double sum, uint64_t count, int scale)
{
uint64_t t = (uint64_t) avgWithLimit(sum, count, scale, numeric_limits<uint64_t>::max(), 0);
return t;
return sum / count;
}
}
#endif
namespace windowfunction
{
template<typename T>
boost::shared_ptr<WindowFunctionType> WF_sum_avg<T>::makeFunction(int id, const string& name, int ct)
// For the static function makeFunction, the template parameters are ignored
template<typename T_IN, typename T_OUT>
boost::shared_ptr<WindowFunctionType> WF_sum_avg<T_IN, T_OUT>::makeFunction(int id, const string& name, int ct, WindowFunctionColumn* wc)
{
boost::shared_ptr<WindowFunctionType> func;
switch (ct)
@ -165,9 +169,9 @@ boost::shared_ptr<WindowFunctionType> WF_sum_avg<T>::makeFunction(int id, const
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL:
{
func.reset(new WF_sum_avg<int64_t>(id, name));
// Look into using int128_t instead of long double
func.reset(new WF_sum_avg<int64_t, long double>(id, name));
break;
}
@ -176,29 +180,54 @@ boost::shared_ptr<WindowFunctionType> WF_sum_avg<T>::makeFunction(int id, const
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
{
func.reset(new WF_sum_avg<uint64_t, long double>(id, name));
break;
}
case CalpontSystemCatalog::DECIMAL:
{
if (wc->functionParms()[0]->resultType().colWidth < 16)
{
func.reset(new WF_sum_avg<int64_t, int128_t>(id, name));
}
else
{
func.reset(new WF_sum_avg<int128_t, int128_t>(id, name));
}
break;
}
case CalpontSystemCatalog::UDECIMAL:
{
func.reset(new WF_sum_avg<uint64_t>(id, name));
if (wc->functionParms()[0]->resultType().colWidth < 16)
{
func.reset(new WF_sum_avg<uint64_t, uint128_t>(id, name));
}
else
{
func.reset(new WF_sum_avg<uint128_t, uint128_t>(id, name));
}
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
func.reset(new WF_sum_avg<double>(id, name));
func.reset(new WF_sum_avg<double, long double>(id, name));
break;
}
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
func.reset(new WF_sum_avg<float>(id, name));
func.reset(new WF_sum_avg<float, long double>(id, name));
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
func.reset(new WF_sum_avg<long double>(id, name));
func.reset(new WF_sum_avg<long double, long double>(id, name));
break;
}
default:
@ -216,15 +245,15 @@ boost::shared_ptr<WindowFunctionType> WF_sum_avg<T>::makeFunction(int id, const
}
template<typename T>
WindowFunctionType* WF_sum_avg<T>::clone() const
template<typename T_IN, typename T_OUT>
WindowFunctionType* WF_sum_avg<T_IN, T_OUT>::clone() const
{
return new WF_sum_avg<T>(*this);
return new WF_sum_avg<T_IN, T_OUT>(*this);
}
template<typename T>
void WF_sum_avg<T>::resetData()
template<typename T_IN, typename T_OUT>
void WF_sum_avg<T_IN, T_OUT>::resetData()
{
fAvg = 0;
fSum = 0;
@ -235,8 +264,8 @@ void WF_sum_avg<T>::resetData()
}
template<typename T>
void WF_sum_avg<T>::operator()(int64_t b, int64_t e, int64_t c)
template<typename T_IN, typename T_OUT>
void WF_sum_avg<T_IN, T_OUT>::operator()(int64_t b, int64_t e, int64_t c)
{
uint64_t colOut = fFieldIndex[0];
@ -251,8 +280,7 @@ void WF_sum_avg<T>::operator()(int64_t b, int64_t e, int64_t c)
e = c;
uint64_t colIn = fFieldIndex[1];
double scale = fRow.getScale(colIn);
int scale = fRow.getScale(colOut) - fRow.getScale(colIn);
for (int64_t i = b; i <= e; i++)
{
if (i % 1000 == 0 && fStep->cancelled())
@ -263,34 +291,27 @@ void WF_sum_avg<T>::operator()(int64_t b, int64_t e, int64_t c)
if (fRow.isNullValue(colIn) == true)
continue;
T valIn;
CDT cdt;
getValue(colIn, valIn, &cdt);
// checkSumLimit(fSum, valIn);
getValue(colIn, fVal, &cdt);
if ((!fDistinct) || (fSet.find(valIn) == fSet.end()))
if ((!fDistinct) || (fSet.find(fVal) == fSet.end()))
{
long double val = valIn;
if (scale &&
cdt != CalpontSystemCatalog::LONGDOUBLE)
{
val /= pow(10.0, scale);
}
fSum += val;
checkSumLimit(fVal, fSum);
fSum += (T_OUT)fVal;
fCount++;
if (fDistinct)
fSet.insert(valIn);
fSet.insert(fVal);
}
}
if ((fCount > 0) && (fFunctionId == WF__AVG || fFunctionId == WF__AVG_DISTINCT))
{
fAvg = fSum / fCount;
fAvg = calculateAvg(fSum, fCount, scale);
}
}
long double* v = NULL;
T_OUT* v = NULL;
if (fCount > 0)
{
@ -307,7 +328,7 @@ void WF_sum_avg<T>::operator()(int64_t b, int64_t e, int64_t c)
template
boost::shared_ptr<WindowFunctionType> WF_sum_avg<int64_t>::makeFunction(int, const string&, int);
boost::shared_ptr<WindowFunctionType> WF_sum_avg<int64_t, long double>::makeFunction(int, const string&, int, WindowFunctionColumn*);
} //namespace