1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1698 Fix code for DISTINCT in UDAnF

This commit is contained in:
David Hall
2018-09-10 13:53:56 -05:00
parent f4af014435
commit 52cbb62317
5 changed files with 444 additions and 132 deletions

View File

@ -53,69 +53,11 @@ using namespace joblist;
namespace windowfunction
{
template<typename T>
boost::shared_ptr<WindowFunctionType> WF_udaf<T>::makeFunction(int id, const string& name, int ct, mcsv1sdk::mcsv1Context& context)
boost::shared_ptr<WindowFunctionType> WF_udaf::makeFunction(int id, const string& name, int ct, mcsv1sdk::mcsv1Context& context)
{
boost::shared_ptr<WindowFunctionType> func;
switch (ct)
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL:
{
func.reset(new WF_udaf<int64_t>(id, name, context));
break;
}
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
case CalpontSystemCatalog::UDECIMAL:
{
func.reset(new WF_udaf<uint64_t>(id, name, context));
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
func.reset(new WF_udaf<double>(id, name, context));
break;
}
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
func.reset(new WF_udaf<float>(id, name, context));
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::TEXT:
case CalpontSystemCatalog::BLOB:
{
func.reset(new WF_udaf<string>(id, name, context));
break;
}
default:
{
string errStr = name + "(" + colType2String[ct] + ")";
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_PARM_TYPE, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_INVALID_PARM_TYPE);
break;
}
}
func.reset(new WF_udaf(id, name, context));
// Get the UDAnF function object
WF_udaf* wfUDAF = (WF_udaf*)func.get();
@ -125,30 +67,26 @@ boost::shared_ptr<WindowFunctionType> WF_udaf<T>::makeFunction(int id, const str
return func;
}
template<typename T>
WF_udaf<T>::WF_udaf(WF_udaf& rhs) : fUDAFContext(rhs.getContext()),
WF_udaf::WF_udaf(WF_udaf& rhs) : fUDAFContext(rhs.getContext()),
bInterrupted(rhs.getInterrupted()),
fDistinct(rhs.getDistinct())
{
getContext().setInterrupted(getInterruptedPtr());
}
template<typename T>
WindowFunctionType* WF_udaf<T>::clone() const
WindowFunctionType* WF_udaf::clone() const
{
return new WF_udaf(*const_cast<WF_udaf*>(this));
}
template<typename T>
void WF_udaf<T>::resetData()
void WF_udaf::resetData()
{
getContext().getFunction()->reset(&getContext());
fDistinctSet.clear();
WindowFunctionType::resetData();
}
template<typename T>
void WF_udaf<T>::parseParms(const std::vector<execplan::SRCP>& parms)
void WF_udaf::parseParms(const std::vector<execplan::SRCP>& parms)
{
bRespectNulls = true;
// The last parms: respect null | ignore null
@ -156,10 +94,13 @@ void WF_udaf<T>::parseParms(const std::vector<execplan::SRCP>& parms)
idbassert(cc != NULL);
bool isNull = false; // dummy, harded coded
bRespectNulls = (cc->getIntVal(fRow, isNull) > 0);
if (getContext().getRunFlag(mcsv1sdk::UDAF_DISTINCT))
{
setDistinct();
}
}
template<typename T>
bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
bool WF_udaf::dropValues(int64_t b, int64_t e)
{
if (!bHasDropValue)
{
@ -168,6 +109,7 @@ bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
}
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
bool isNull = false;
// Turn on the Analytic flag so the function is aware it is being called
// as a Window Function.
@ -175,14 +117,26 @@ bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
// Put the parameter metadata (type, scale, precision) into valsIn
mcsv1sdk::ColumnDatum valsIn[getContext().getParameterCount()];
ConstantColumn* cc = NULL;
for (uint32_t i = 0; i < getContext().getParameterCount(); ++i)
{
uint64_t colIn = fFieldIndex[i + 1];
mcsv1sdk::ColumnDatum& datum = valsIn[i];
datum.dataType = fRow.getColType(colIn);
datum.scale = fRow.getScale(colIn);
datum.precision = fRow.getPrecision(colIn);
cc = static_cast<ConstantColumn*>(fConstantParms[i].get());
if (cc)
{
datum.dataType = cc->resultType().colDataType;
datum.scale = cc->resultType().scale;
datum.precision = cc->resultType().precision;
}
else
{
uint64_t colIn = fFieldIndex[i + 1];
datum.dataType = fRow.getColType(colIn);
datum.scale = fRow.getScale(colIn);
datum.precision = fRow.getPrecision(colIn);
}
}
for (int64_t i = b; i < e; i++)
@ -190,52 +144,325 @@ bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
if (i % 1000 == 0 && fStep->cancelled())
break;
bool bHasNull = false;
fRow.setData(getPointer(fRowData->at(i)));
// Turn on NULL flags
// NULL flags
uint32_t flags[getContext().getParameterCount()];
bool bSkipIt = false;
for (uint32_t k = 0; k < getContext().getParameterCount(); ++k)
{
cc = static_cast<ConstantColumn*>(fConstantParms[k].get());
uint64_t colIn = fFieldIndex[k + 1];
mcsv1sdk::ColumnDatum& datum = valsIn[k];
flags[k] = 0;
if (fRow.isNullValue(colIn) == true)
// Turn on Null flags or skip based on respect nulls
flags[k] = 0;
if ((!cc && fRow.isNullValue(colIn) == true)
|| (cc && cc->type() == ConstantColumn::NULLDATA))
{
if (!bRespectNulls)
{
bHasNull = true;
bSkipIt = true;
break;
}
flags[k] |= mcsv1sdk::PARAM_IS_NULL;
}
T valIn;
getValue(colIn, valIn, &datum.dataType);
// Check for distinct, if turned on.
// Currently, distinct only works for param 1
if (k == 0)
if (!bSkipIt && !(flags[k] & mcsv1sdk::PARAM_IS_NULL))
{
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
switch (datum.dataType)
{
continue;
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
{
int64_t valIn;
if (cc)
{
valIn = cc->getIntVal(fRow, isNull);
}
else
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
{
if (fDistinct)
{
DistinctMap::iterator distinct;
distinct = fDistinctMap.find(valIn);
if (distinct != fDistinctMap.end())
{
// This is a duplicate: decrement the count
--(*distinct).second;
if ((*distinct).second > 0) // still more of these
{
bSkipIt = true;
continue;
}
else
{
fDistinctMap.erase(distinct);
}
}
}
}
datum.columnData = valIn;
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
int64_t valIn;
if (cc)
{
valIn = cc->getDecimalVal(fRow, isNull).value;
}
else
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
{
if (fDistinct)
{
DistinctMap::iterator distinct;
distinct = fDistinctMap.find(valIn);
if (distinct != fDistinctMap.end())
{
// This is a duplicate: decrement the count
--(*distinct).second;
if ((*distinct).second > 0) // still more of these
{
bSkipIt = true;
continue;
}
else
{
fDistinctMap.erase(distinct);
}
}
}
}
datum.columnData = valIn;
break;
}
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
{
uint64_t valIn;
if (cc)
{
valIn = cc->getUintVal(fRow, isNull);
}
else
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
{
if (fDistinct)
{
DistinctMap::iterator distinct;
distinct = fDistinctMap.find(valIn);
if (distinct != fDistinctMap.end())
{
// This is a duplicate: decrement the count
--(*distinct).second;
if ((*distinct).second > 0) // still more of these
{
bSkipIt = true;
continue;
}
else
{
fDistinctMap.erase(distinct);
}
}
}
}
datum.columnData = valIn;
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
double valIn;
if (cc)
{
valIn = cc->getDoubleVal(fRow, isNull);
}
else
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
{
if (fDistinct)
{
DistinctMap::iterator distinct;
distinct = fDistinctMap.find(valIn);
if (distinct != fDistinctMap.end())
{
// This is a duplicate: decrement the count
--(*distinct).second;
if ((*distinct).second > 0) // still more of these
{
bSkipIt = true;
continue;
}
else
{
fDistinctMap.erase(distinct);
}
}
}
}
datum.columnData = valIn;
break;
}
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
float valIn;
if (cc)
{
valIn = cc->getFloatVal(fRow, isNull);
}
else
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
{
if (fDistinct)
{
DistinctMap::iterator distinct;
distinct = fDistinctMap.find(valIn);
if (distinct != fDistinctMap.end())
{
// This is a duplicate: decrement the count
--(*distinct).second;
if ((*distinct).second > 0) // still more of these
{
bSkipIt = true;
continue;
}
else
{
fDistinctMap.erase(distinct);
}
}
}
}
datum.columnData = valIn;
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::TEXT:
case CalpontSystemCatalog::BLOB:
{
string valIn;
if (cc)
{
valIn = cc->getStrVal(fRow, isNull);
}
else
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
{
if (fDistinct)
{
DistinctMap::iterator distinct;
distinct = fDistinctMap.find(valIn);
if (distinct != fDistinctMap.end())
{
// This is a duplicate: decrement the count
--(*distinct).second;
if ((*distinct).second > 0) // still more of these
{
bSkipIt = true;
continue;
}
else
{
fDistinctMap.erase(distinct);
}
}
}
}
datum.columnData = valIn;
break;
}
default:
{
string errStr = "(" + colType2String[(int)datum.dataType] + ")";
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_PARM_TYPE, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_INVALID_PARM_TYPE);
break;
}
}
if (fDistinct)
fDistinctSet.insert(valIn);
}
datum.columnData = valIn;
}
if (bHasNull)
if (bSkipIt)
{
continue;
}
getContext().setDataFlags(flags);
rc = getContext().getFunction()->dropValue(&getContext(), valsIn);
if (rc == mcsv1sdk::mcsv1_UDAF::NOT_IMPLEMENTED)
@ -257,8 +484,7 @@ bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
}
// Sets the value from valOut into column colOut, performing any conversions.
template<typename T>
void WF_udaf<T>::SetUDAFValue(static_any::any& valOut, int64_t colOut,
void WF_udaf::SetUDAFValue(static_any::any& valOut, int64_t colOut,
int64_t b, int64_t e, int64_t c)
{
static const static_any::any& charTypeId = (char)1;
@ -279,15 +505,6 @@ void WF_udaf<T>::SetUDAFValue(static_any::any& valOut, int64_t colOut,
CDT colDataType = fRow.getColType(colOut);
if (valOut.empty())
{
// If valOut is empty, we return NULL
T* pv = NULL;
setValue(colDataType, b, e, c, pv);
fPrev = c;
return;
}
// This may seem a bit convoluted. Users shouldn't return a type
// that they didn't set in mcsv1_UDAF::init(), but this
// handles whatever return type is given and casts
@ -405,7 +622,16 @@ void WF_udaf<T>::SetUDAFValue(static_any::any& valOut, int64_t colOut,
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
setValue(colDataType, b, e, c, &intOut);
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
if (valOut.empty())
{
setValue(colDataType, b, e, c, (int64_t*)NULL);
}
else
{
setValue(colDataType, b, e, c, &intOut);
}
break;
case execplan::CalpontSystemCatalog::UTINYINT:
@ -416,17 +642,38 @@ void WF_udaf<T>::SetUDAFValue(static_any::any& valOut, int64_t colOut,
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
case execplan::CalpontSystemCatalog::TIME:
setValue(colDataType, b, e, c, &uintOut);
if (valOut.empty())
{
setValue(colDataType, b, e, c, (uint64_t*)NULL);
}
else
{
setValue(colDataType, b, e, c, &uintOut);
}
break;
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
setValue(colDataType, b, e, c, &floatOut);
if (valOut.empty())
{
setValue(colDataType, b, e, c, (float*)NULL);
}
else
{
setValue(colDataType, b, e, c, &floatOut);
}
break;
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
setValue(colDataType, b, e, c, &doubleOut);
if (valOut.empty())
{
setValue(colDataType, b, e, c, (double*)NULL);
}
else
{
setValue(colDataType, b, e, c, &doubleOut);
}
break;
case execplan::CalpontSystemCatalog::CHAR:
@ -435,7 +682,14 @@ void WF_udaf<T>::SetUDAFValue(static_any::any& valOut, int64_t colOut,
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::CLOB:
case execplan::CalpontSystemCatalog::BLOB:
setValue(colDataType, b, e, c, &strOut);
if (valOut.empty())
{
setValue(colDataType, b, e, c, (string*)NULL);
}
else
{
setValue(colDataType, b, e, c, &strOut);
}
break;
default:
@ -449,8 +703,7 @@ void WF_udaf<T>::SetUDAFValue(static_any::any& valOut, int64_t colOut,
}
}
template<typename T>
void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
void WF_udaf::operator()(int64_t b, int64_t e, int64_t c)
{
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
uint64_t colOut = fFieldIndex[0];
@ -499,7 +752,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
else
getContext().clearContextFlag(mcsv1sdk::CONTEXT_HAS_CURRENT_ROW);
bool bHasNull = false;
bool bSkipIt = false;
for (int64_t i = b; i <= e; i++)
{
@ -510,7 +763,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
// NULL flags
uint32_t flags[getContext().getParameterCount()];
bHasNull = false;
bSkipIt = false;
for (uint32_t k = 0; k < getContext().getParameterCount(); ++k)
{
@ -526,14 +779,14 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
if (!bRespectNulls)
{
bHasNull = true;
bSkipIt = true;
break;
}
flags[k] |= mcsv1sdk::PARAM_IS_NULL;
}
if (!bHasNull && !(flags[k] & mcsv1sdk::PARAM_IS_NULL))
if (!bSkipIt && !(flags[k] & mcsv1sdk::PARAM_IS_NULL))
{
switch (datum.dataType)
{
@ -542,6 +795,8 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
{
int64_t valIn;
@ -560,7 +815,21 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
{
continue;
// MCOL-1698
std::pair<static_any::any, uint64_t> val = make_pair(valIn, 1);
// Unordered_map will not insert a duplicate key (valIn).
// If it doesn't insert, the original pair will be returned
// in distinct.first and distinct.second will be a bool --
// true if newly inserted, false if a duplicate.
std::pair<typename DistinctMap::iterator, bool> distinct;
distinct = fDistinctMap.insert(val);
if (distinct.second == false)
{
// This is a duplicate: increment the count
++(*distinct.first).second;
bSkipIt = true;
continue;
}
}
if (fDistinct)
@ -591,7 +860,15 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
{
continue;
std::pair<static_any::any, uint64_t> val = make_pair(valIn, 1);
std::pair<typename DistinctMap::iterator, bool> distinct;
distinct = fDistinctMap.insert(val);
if (distinct.second == false)
{
++(*distinct.first).second;
bSkipIt = true;
continue;
}
}
if (fDistinct)
@ -625,7 +902,15 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
{
continue;
std::pair<static_any::any, uint64_t> val = make_pair(valIn, 1);
std::pair<typename DistinctMap::iterator, bool> distinct;
distinct = fDistinctMap.insert(val);
if (distinct.second == false)
{
++(*distinct.first).second;
bSkipIt = true;
continue;
}
}
if (fDistinct)
@ -656,7 +941,15 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
{
continue;
std::pair<static_any::any, uint64_t> val = make_pair(valIn, 1);
std::pair<typename DistinctMap::iterator, bool> distinct;
distinct = fDistinctMap.insert(val);
if (distinct.second == false)
{
++(*distinct.first).second;
bSkipIt = true;
continue;
}
}
if (fDistinct)
@ -687,7 +980,15 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
{
continue;
std::pair<static_any::any, uint64_t> val = make_pair(valIn, 1);
std::pair<typename DistinctMap::iterator, bool> distinct;
distinct = fDistinctMap.insert(val);
if (distinct.second == false)
{
++(*distinct.first).second;
bSkipIt = true;
continue;
}
}
if (fDistinct)
@ -721,7 +1022,15 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
{
continue;
std::pair<static_any::any, uint64_t> val = make_pair(valIn, 1);
std::pair<typename DistinctMap::iterator, bool> distinct;
distinct = fDistinctMap.insert(val);
if (distinct.second == false)
{
++(*distinct.first).second;
bSkipIt = true;
continue;
}
}
if (fDistinct)
@ -734,7 +1043,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
default:
{
string errStr = "(" + colType2String[i] + ")";
string errStr = "(" + colType2String[(int)datum.dataType] + ")";
errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_PARM_TYPE, errStr);
cerr << errStr << endl;
throw IDBExcept(errStr, ERR_WF_INVALID_PARM_TYPE);
@ -746,7 +1055,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
}
// Skip if any value is NULL and respect nulls is off.
if (bHasNull)
if (bSkipIt)
{
continue;
}
@ -780,8 +1089,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
fPrev = c;
}
template
boost::shared_ptr<WindowFunctionType> WF_udaf<int64_t>::makeFunction(int id, const string& name, int ct, mcsv1sdk::mcsv1Context& context);
boost::shared_ptr<WindowFunctionType> WF_udaf::makeFunction(int id, const string& name, int ct, mcsv1sdk::mcsv1Context& context);
} //namespace
// vim:ts=4 sw=4: