You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-3503 add MODA aggregate function
This commit is contained in:
2
dbcon/joblist/windowfunctionstep.cpp
Normal file → Executable file
2
dbcon/joblist/windowfunctionstep.cpp
Normal file → Executable file
@ -522,7 +522,7 @@ void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, J
|
||||
igpc < csep->groupByCols().end();
|
||||
++igpc)
|
||||
{
|
||||
if (*igpc->get() == *j->get())
|
||||
if ((*igpc)->alias() == (*j)->alias())
|
||||
{
|
||||
bFound = true;
|
||||
break;
|
||||
|
4
dbcon/mysql/ha_calpont_execplan.cpp
Normal file → Executable file
4
dbcon/mysql/ha_calpont_execplan.cpp
Normal file → Executable file
@ -4950,6 +4950,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
|
||||
mcsv1sdk::mcsv1Context& context = udafc->getContext();
|
||||
context.setName(isp->func_name());
|
||||
|
||||
// Get the return type as defined by CREATE AGGREGATE FUNCTION
|
||||
// Most functions don't care, but some may.
|
||||
context.setMariaDBReturnType((mcsv1sdk::enum_mariadb_return_type)isp->field_type());
|
||||
|
||||
// Set up the return type defaults for the call to init()
|
||||
context.setResultType(udafc->resultType().colDataType);
|
||||
context.setColWidth(udafc->resultType().colWidth);
|
||||
|
1
storage-manager/include/messageFormat.h
Normal file → Executable file
1
storage-manager/include/messageFormat.h
Normal file → Executable file
@ -65,6 +65,7 @@ static const uint32_t SM_MSG_END=0x9d5bc31b;
|
||||
static const uint32_t SM_HEADER_LEN = sizeof(sm_msg_header);
|
||||
|
||||
// the unix socket StorageManager is listening on
|
||||
__attribute__ ((unused))
|
||||
static const char *socket_name = "\0storagemanager";
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
480
utils/regr/moda.cpp
Normal file
480
utils/regr/moda.cpp
Normal file
@ -0,0 +1,480 @@
|
||||
/* Copyright (C) 2019 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <sstream>
|
||||
#include <cstring>
|
||||
#include <typeinfo>
|
||||
#include "moda.h"
|
||||
#include "bytestream.h"
|
||||
#include "objectreader.h"
|
||||
|
||||
using namespace mcsv1sdk;
|
||||
|
||||
// This is the standard way to get a UDAF function into the system's
|
||||
// map of UDAF for lookup
|
||||
class Add_moda_ToUDAFMap
|
||||
{
|
||||
public:
|
||||
Add_moda_ToUDAFMap()
|
||||
{
|
||||
UDAFMap::getMap()["moda"] = new moda();
|
||||
}
|
||||
};
|
||||
static Add_moda_ToUDAFMap addToMap;
|
||||
|
||||
// There are a few design options when creating a generic moda function:
|
||||
// 1) Always use DOUBLE for internal storage
|
||||
// Pros: can handle data from any native SQL type.
|
||||
// Cons: If MODA(SUM()) is called, then the LONG DOUBLE returned by SUM will
|
||||
// be truncated.
|
||||
// It requires 8 bytes in the hash table and requires streaming 8 bytes
|
||||
// per entry regardles of how small it could have been.
|
||||
// 2) Always use LONG DOUBLE for internal storage
|
||||
// Pros: Solves the problem of MODA(SUM())
|
||||
// Cons: It requires 16 bytes in the hash table and requires streaming 16 bytes
|
||||
// per entry regardles of how small it could have been.
|
||||
// 3) Use the data type of the column for internal storage
|
||||
// Pros: Can handle MODA(SUM()) because LONG DOUBLE all types are handeled
|
||||
// Only the data size needed is stored in the hash table and streamed
|
||||
//
|
||||
// This class implements option 3 by creating templated classes.
|
||||
// There are two moda classes, the main one called moda, which is basically
|
||||
// an adapter (Pattern) to the templated class called Moda_impl_T.
|
||||
//
|
||||
// The way the API works, each function class is instantiated exactly once per
|
||||
// executable and then accessed via a map. This means that the function classes
|
||||
// could be used by any active query, or more than once by a single query. These
|
||||
// classes have no data fields for this reason. All data for a specific query is
|
||||
// maintained by the context object.
|
||||
//
|
||||
// Each possible templated instantation is created ate moda creation during startup.
|
||||
// They are the Moda_impl_T members at the bottom of the moda class definition.
|
||||
// At runtime getImpl() gets the right one for the datatype involved based on context.
|
||||
//
|
||||
// More template magic is done in the ModaData class to create and maintained
|
||||
// a hash of the correct type.
|
||||
|
||||
// getImpl returns the current modaImpl or gets the correct one based on context.
|
||||
mcsv1_UDAF* moda::getImpl(mcsv1Context* context)
|
||||
{
|
||||
ModaData* data = static_cast<ModaData*>(context->getUserData());
|
||||
if (data->modaImpl)
|
||||
return data->modaImpl;
|
||||
|
||||
switch (context->getResultType())
|
||||
{
|
||||
case execplan::CalpontSystemCatalog::TINYINT:
|
||||
data->modaImpl = &moda_impl_int8;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::SMALLINT:
|
||||
data->modaImpl = &moda_impl_int16;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::MEDINT:
|
||||
case execplan::CalpontSystemCatalog::INT:
|
||||
data->modaImpl = &moda_impl_int32;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::BIGINT:
|
||||
data->modaImpl = &moda_impl_int64;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
switch (context->getColWidth())
|
||||
{
|
||||
case 1:
|
||||
data->modaImpl = &moda_impl_int8;
|
||||
break;
|
||||
case 2:
|
||||
data->modaImpl = &moda_impl_int16;
|
||||
break;
|
||||
case 4:
|
||||
data->modaImpl = &moda_impl_int32;
|
||||
break;
|
||||
default:
|
||||
data->modaImpl = &moda_impl_int64;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||
data->modaImpl = &moda_impl_uint8;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::USMALLINT:
|
||||
data->modaImpl = &moda_impl_uint16;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UMEDINT:
|
||||
case execplan::CalpontSystemCatalog::UINT:
|
||||
data->modaImpl = &moda_impl_uint32;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UBIGINT:
|
||||
data->modaImpl = &moda_impl_uint64;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::FLOAT:
|
||||
data->modaImpl = &moda_impl_float;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::DOUBLE:
|
||||
data->modaImpl = &moda_impl_double;
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
||||
data->modaImpl = &moda_impl_longdouble;
|
||||
break;
|
||||
default:
|
||||
data->modaImpl = NULL;
|
||||
}
|
||||
return data->modaImpl;
|
||||
}
|
||||
|
||||
|
||||
mcsv1_UDAF::ReturnCode moda::init(mcsv1Context* context,
|
||||
ColumnDatum* colTypes)
|
||||
{
|
||||
if (context->getParameterCount() < 1)
|
||||
{
|
||||
// The error message will be prepended with
|
||||
// "The storage engine for the table doesn't support "
|
||||
context->setErrorMessage("moda() with 0 arguments");
|
||||
return mcsv1_UDAF::ERROR;
|
||||
}
|
||||
|
||||
if (context->getParameterCount() > 1)
|
||||
{
|
||||
context->setErrorMessage("moda() with more than 1 argument");
|
||||
return mcsv1_UDAF::ERROR;
|
||||
}
|
||||
|
||||
if (!(execplan::isNumeric(colTypes[0].dataType)))
|
||||
{
|
||||
// The error message will be prepended with
|
||||
// "The storage engine for the table doesn't support "
|
||||
context->setErrorMessage("moda() with non-numeric argument");
|
||||
return mcsv1_UDAF::ERROR;
|
||||
}
|
||||
|
||||
context->setResultType(colTypes[0].dataType);
|
||||
|
||||
if (colTypes[0].dataType == execplan::CalpontSystemCatalog::DECIMAL
|
||||
|| colTypes[0].dataType == execplan::CalpontSystemCatalog::UDECIMAL)
|
||||
{
|
||||
if (colTypes[0].precision < 3)
|
||||
{
|
||||
context->setColWidth(1);
|
||||
}
|
||||
else if (colTypes[0].precision < 4)
|
||||
{
|
||||
context->setColWidth(2);
|
||||
}
|
||||
else if (colTypes[0].precision < 9)
|
||||
{
|
||||
context->setColWidth(4);
|
||||
}
|
||||
else
|
||||
{
|
||||
context->setColWidth(8);
|
||||
}
|
||||
}
|
||||
|
||||
mcsv1_UDAF* impl = getImpl(context);
|
||||
|
||||
if (!impl)
|
||||
{
|
||||
// The error message will be prepended with
|
||||
// "The storage engine for the table doesn't support "
|
||||
context->setErrorMessage("moda() with non-numeric argument");
|
||||
return mcsv1_UDAF::ERROR;
|
||||
}
|
||||
|
||||
context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS);
|
||||
return impl->init(context, colTypes);
|
||||
}
|
||||
|
||||
template<class T>
|
||||
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::init(mcsv1Context* context,
|
||||
ColumnDatum* colTypes)
|
||||
{
|
||||
context->setScale(context->getScale());
|
||||
context->setPrecision(19);
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
template<class T>
|
||||
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::reset(mcsv1Context* context)
|
||||
{
|
||||
ModaData* data = static_cast<ModaData*>(context->getUserData());
|
||||
data->fReturnType = context->getResultType();
|
||||
data->fColWidth = context->getColWidth();
|
||||
data->clear<T>();
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::nextValue(mcsv1Context* context, ColumnDatum* valsIn)
|
||||
{
|
||||
static_any::any& valIn = valsIn[0].columnData;
|
||||
ModaData* data = static_cast<ModaData*>(context->getUserData());
|
||||
std::unordered_map<T, uint32_t>* map = data->getMap<T>();
|
||||
|
||||
if (valIn.empty())
|
||||
{
|
||||
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
|
||||
}
|
||||
|
||||
T val = convertAnyTo<T>(valIn);
|
||||
|
||||
if (context->getResultType() == execplan::CalpontSystemCatalog::DOUBLE)
|
||||
{
|
||||
// For decimal types, we need to move the decimal point.
|
||||
uint32_t scale = valsIn[0].scale;
|
||||
|
||||
if (val != 0 && scale > 0)
|
||||
{
|
||||
val /= pow(10.0, (double)scale);
|
||||
}
|
||||
}
|
||||
|
||||
data->fSum += val;
|
||||
++data->fCount;
|
||||
(*map)[val]++;
|
||||
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::subEvaluate(mcsv1Context* context, const UserData* userDataIn)
|
||||
{
|
||||
if (!userDataIn)
|
||||
{
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
ModaData* outData = static_cast<ModaData*>(context->getUserData());
|
||||
const ModaData* inData = static_cast<const ModaData*>(userDataIn);
|
||||
std::unordered_map<T, uint32_t>* outMap = outData->getMap<T>();
|
||||
std::unordered_map<T, uint32_t>* inMap = inData->getMap<T>();
|
||||
typename std::unordered_map<T, uint32_t>::const_iterator iter;
|
||||
|
||||
for (iter = inMap->begin(); iter != inMap->end(); ++iter)
|
||||
{
|
||||
(*outMap)[iter->first] += iter->second;
|
||||
}
|
||||
// AVG
|
||||
outData->fSum += inData->fSum;
|
||||
outData->fCount += inData->fCount;
|
||||
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::evaluate(mcsv1Context* context, static_any::any& valOut)
|
||||
{
|
||||
uint64_t maxCnt = 0;
|
||||
T avg = 0;
|
||||
T val = 0;
|
||||
ModaData* data = static_cast<ModaData*>(context->getUserData());
|
||||
std::unordered_map<T, uint32_t>* map = data->getMap<T>();
|
||||
|
||||
if (map->size() == 0)
|
||||
{
|
||||
valOut = (T)0;
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
avg = data->fCount ? data->fSum / data->fCount : 0;
|
||||
typename std::unordered_map<T, uint32_t>::iterator iter;
|
||||
|
||||
for (iter = map->begin(); iter != map->end(); ++iter)
|
||||
{
|
||||
if (iter->second > maxCnt)
|
||||
{
|
||||
val = iter->first;
|
||||
maxCnt = iter->second;
|
||||
}
|
||||
else if (iter->second == maxCnt)
|
||||
{
|
||||
// Tie breaker: choose the closest to avg. If still tie, choose smallest
|
||||
if ((abs(val-avg) > abs(iter->first-avg))
|
||||
|| ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first))))
|
||||
{
|
||||
val = iter->first;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If scale is > 0, then the original type was DECIMAL. Set the
|
||||
// ResultType to DECIMAL so the delivery logic moves the decimal point.
|
||||
if (context->getScale() > 0)
|
||||
context->setResultType(execplan::CalpontSystemCatalog::DECIMAL);
|
||||
|
||||
valOut = val;
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
|
||||
{
|
||||
static_any::any& valDropped = valsDropped[0].columnData;
|
||||
ModaData* data = static_cast<ModaData*>(context->getUserData());
|
||||
std::unordered_map<T, uint32_t>* map = data->getMap<T>();
|
||||
|
||||
if (valDropped.empty())
|
||||
{
|
||||
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
|
||||
}
|
||||
|
||||
T val = convertAnyTo<T>(valDropped);
|
||||
|
||||
data->fSum -= val;
|
||||
--data->fCount;
|
||||
(*map)[val]--;
|
||||
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
void ModaData::serialize(messageqcpp::ByteStream& bs) const
|
||||
{
|
||||
bs << fReturnType;
|
||||
bs << fSum;
|
||||
bs << fCount;
|
||||
bs << fColWidth;
|
||||
|
||||
switch ((execplan::CalpontSystemCatalog::ColDataType)fReturnType)
|
||||
{
|
||||
case execplan::CalpontSystemCatalog::TINYINT:
|
||||
serializeMap<int8_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::SMALLINT:
|
||||
serializeMap<int16_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::MEDINT:
|
||||
case execplan::CalpontSystemCatalog::INT:
|
||||
serializeMap<int32_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::BIGINT:
|
||||
serializeMap<int64_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
switch (fColWidth)
|
||||
{
|
||||
case 1:
|
||||
serializeMap<int8_t>(bs);
|
||||
break;
|
||||
case 2:
|
||||
serializeMap<int16_t>(bs);
|
||||
break;
|
||||
case 4:
|
||||
serializeMap<int32_t>(bs);
|
||||
break;
|
||||
default:
|
||||
serializeMap<int64_t>(bs);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||
serializeMap<uint8_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::USMALLINT:
|
||||
serializeMap<uint16_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UMEDINT:
|
||||
case execplan::CalpontSystemCatalog::UINT:
|
||||
serializeMap<uint32_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UBIGINT:
|
||||
serializeMap<uint64_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::FLOAT:
|
||||
serializeMap<float>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::DOUBLE:
|
||||
serializeMap<double>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
||||
serializeMap<long double>(bs);
|
||||
break;
|
||||
default:
|
||||
throw std::runtime_error("ModaData::serialize with bad data type");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void ModaData::unserialize(messageqcpp::ByteStream& bs)
|
||||
{
|
||||
bs >> fReturnType;
|
||||
bs >> fSum;
|
||||
bs >> fCount;
|
||||
bs >> fColWidth;
|
||||
|
||||
switch ((execplan::CalpontSystemCatalog::ColDataType)fReturnType)
|
||||
{
|
||||
case execplan::CalpontSystemCatalog::TINYINT:
|
||||
unserializeMap<int8_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::SMALLINT:
|
||||
unserializeMap<int16_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::MEDINT:
|
||||
case execplan::CalpontSystemCatalog::INT:
|
||||
unserializeMap<int32_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::BIGINT:
|
||||
unserializeMap<int64_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::DECIMAL:
|
||||
case execplan::CalpontSystemCatalog::UDECIMAL:
|
||||
switch (fColWidth)
|
||||
{
|
||||
case 1:
|
||||
unserializeMap<int8_t>(bs);
|
||||
break;
|
||||
case 2:
|
||||
unserializeMap<int16_t>(bs);
|
||||
break;
|
||||
case 4:
|
||||
unserializeMap<int32_t>(bs);
|
||||
break;
|
||||
default:
|
||||
unserializeMap<int64_t>(bs);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UTINYINT:
|
||||
unserializeMap<uint8_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::USMALLINT:
|
||||
unserializeMap<uint16_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UMEDINT:
|
||||
case execplan::CalpontSystemCatalog::UINT:
|
||||
unserializeMap<uint32_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::UBIGINT:
|
||||
unserializeMap<uint64_t>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::FLOAT:
|
||||
unserializeMap<float>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::DOUBLE:
|
||||
unserializeMap<double>(bs);
|
||||
break;
|
||||
case execplan::CalpontSystemCatalog::LONGDOUBLE:
|
||||
unserializeMap<long double>(bs);
|
||||
break;
|
||||
default:
|
||||
throw std::runtime_error("ModaData::unserialize with bad data type");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
236
utils/regr/moda.h
Normal file
236
utils/regr/moda.h
Normal file
@ -0,0 +1,236 @@
|
||||
/* Copyright (C) 2019 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
/***********************************************************************
|
||||
* $Id$
|
||||
*
|
||||
* moda.h
|
||||
***********************************************************************/
|
||||
|
||||
/**
|
||||
* Columnstore interface for the moda User Defined Aggregate
|
||||
* Functions (UDAF) and User Defined Analytic Functions (UDAnF).
|
||||
*
|
||||
* To notify mysqld about the new function:
|
||||
*
|
||||
* CREATE AGGREGATE FUNCTION moda returns STRING soname 'libregr_mysql.so';
|
||||
*
|
||||
* moda returns the value with the greatest number of occurances in
|
||||
* the dataset with ties being broken by:
|
||||
* 1) closest to AVG
|
||||
* 2) smallest value
|
||||
*/
|
||||
#ifndef HEADER_moda
|
||||
#define HEADER_moda
|
||||
|
||||
#include <cstdlib>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "mcsv1_udaf.h"
|
||||
#include "calpontsystemcatalog.h"
|
||||
#include "windowfunctioncolumn.h"
|
||||
|
||||
#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT)
|
||||
#define EXPORT __declspec(dllexport)
|
||||
#else
|
||||
#define EXPORT
|
||||
#endif
|
||||
|
||||
namespace mcsv1sdk
|
||||
{
|
||||
// Override UserData for data storage
|
||||
struct ModaData : public UserData
|
||||
{
|
||||
ModaData() : fMap(NULL),
|
||||
fReturnType((uint32_t)execplan::CalpontSystemCatalog::UNDEFINED),
|
||||
fColWidth(0),
|
||||
modaImpl(NULL)
|
||||
{};
|
||||
|
||||
virtual ~ModaData() {}
|
||||
|
||||
virtual void serialize(messageqcpp::ByteStream& bs) const;
|
||||
virtual void unserialize(messageqcpp::ByteStream& bs);
|
||||
|
||||
template<class T>
|
||||
std::unordered_map<T, uint32_t>* getMap()
|
||||
{
|
||||
if (!fMap)
|
||||
{
|
||||
// Just in time creation
|
||||
fMap = new std::unordered_map<T, uint32_t>;
|
||||
}
|
||||
return (std::unordered_map<T, uint32_t>*) fMap;
|
||||
}
|
||||
|
||||
// The const version is only called by serialize()
|
||||
// It shouldn't (and can't) create a new map.
|
||||
template<class T>
|
||||
std::unordered_map<T, uint32_t>* getMap() const
|
||||
{
|
||||
if (!fMap)
|
||||
{
|
||||
throw std::runtime_error("ModaData::serialize with no map");
|
||||
}
|
||||
return (std::unordered_map<T, uint32_t>*) fMap;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void clear()
|
||||
{
|
||||
fSum = 0.0;
|
||||
fCount = 0;
|
||||
getMap<T>()->clear();
|
||||
}
|
||||
|
||||
long double fSum;
|
||||
uint64_t fCount;
|
||||
void* fMap; // Will be of type unordered_map<>
|
||||
uint32_t fReturnType;
|
||||
uint32_t fColWidth;
|
||||
mcsv1_UDAF* modaImpl; // A pointer to one of the Moda_impl_T concrete classes
|
||||
|
||||
private:
|
||||
// For now, copy construction is unwanted
|
||||
ModaData(UserData&);
|
||||
|
||||
// Templated map streamers
|
||||
template<class T>
|
||||
void serializeMap(messageqcpp::ByteStream& bs) const
|
||||
{
|
||||
std::unordered_map<T, uint32_t>* map = getMap<T>();
|
||||
typename std::unordered_map<T, uint32_t>::const_iterator iter;
|
||||
bs << (uint64_t)map->size();
|
||||
for (iter = map->begin(); iter != map->end(); ++iter)
|
||||
{
|
||||
bs << iter->first;
|
||||
bs << iter->second;
|
||||
}
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void unserializeMap(messageqcpp::ByteStream& bs)
|
||||
{
|
||||
uint32_t cnt;
|
||||
T num;
|
||||
uint64_t sz;
|
||||
bs >> sz;
|
||||
std::unordered_map<T, uint32_t>* map = getMap<T>();
|
||||
map->clear();
|
||||
for (uint64_t i = 0; i < sz; ++i)
|
||||
{
|
||||
bs >> num;
|
||||
bs >> cnt;
|
||||
(*map)[num] = cnt;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template<class T>
|
||||
class Moda_impl_T : public mcsv1_UDAF
|
||||
{
|
||||
public:
|
||||
// Defaults OK
|
||||
Moda_impl_T() {};
|
||||
virtual ~Moda_impl_T() {};
|
||||
|
||||
virtual mcsv1_UDAF::ReturnCode init(mcsv1Context* context,
|
||||
ColumnDatum* colTypes);
|
||||
|
||||
virtual mcsv1_UDAF::ReturnCode reset(mcsv1Context* context);
|
||||
virtual mcsv1_UDAF::ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn);
|
||||
virtual mcsv1_UDAF::ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn);
|
||||
virtual mcsv1_UDAF::ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut);
|
||||
virtual mcsv1_UDAF::ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped);
|
||||
|
||||
// Dummy: not used
|
||||
virtual mcsv1_UDAF::ReturnCode createUserData(UserData*& userData, int32_t& length)
|
||||
{
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
};
|
||||
|
||||
// moda returns the modal value of the dataset. If more than one value
|
||||
// have the same maximum number of occurances, then the one closest to
|
||||
// AVG wins. If two are the same distance from AVG, then the smaller wins.
|
||||
class moda : public mcsv1_UDAF
|
||||
{
|
||||
public:
|
||||
// Defaults OK
|
||||
moda() : mcsv1_UDAF() {};
|
||||
virtual ~moda() {};
|
||||
|
||||
virtual mcsv1_UDAF::ReturnCode init(mcsv1Context* context,
|
||||
ColumnDatum* colTypes);
|
||||
|
||||
virtual ReturnCode reset(mcsv1Context* context)
|
||||
{
|
||||
return getImpl(context)->reset(context);
|
||||
}
|
||||
|
||||
virtual mcsv1_UDAF::ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn)
|
||||
{
|
||||
return getImpl(context)->nextValue(context, valsIn);
|
||||
}
|
||||
|
||||
virtual mcsv1_UDAF::ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn)
|
||||
{
|
||||
return getImpl(context)->subEvaluate(context, valIn);
|
||||
}
|
||||
|
||||
virtual mcsv1_UDAF::ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut)
|
||||
{
|
||||
return getImpl(context)->evaluate(context, valOut);
|
||||
}
|
||||
|
||||
virtual mcsv1_UDAF::ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
|
||||
{
|
||||
return getImpl(context)->dropValue(context, valsDropped);
|
||||
}
|
||||
|
||||
mcsv1_UDAF::ReturnCode createUserData(UserData*& userData, int32_t& length)
|
||||
{
|
||||
userData = new ModaData;
|
||||
length = sizeof(ModaData);
|
||||
return mcsv1_UDAF::SUCCESS;
|
||||
}
|
||||
|
||||
mcsv1_UDAF* getImpl(mcsv1Context* context);
|
||||
|
||||
protected:
|
||||
Moda_impl_T<int8_t> moda_impl_int8;
|
||||
Moda_impl_T<int16_t> moda_impl_int16;
|
||||
Moda_impl_T<int32_t> moda_impl_int32;
|
||||
Moda_impl_T<int64_t> moda_impl_int64;
|
||||
Moda_impl_T<uint8_t> moda_impl_uint8;
|
||||
Moda_impl_T<uint16_t> moda_impl_uint16;
|
||||
Moda_impl_T<uint32_t> moda_impl_uint32;
|
||||
Moda_impl_T<uint64_t> moda_impl_uint64;
|
||||
Moda_impl_T<float> moda_impl_float;
|
||||
Moda_impl_T<double> moda_impl_double;
|
||||
Moda_impl_T<long double> moda_impl_longdouble;
|
||||
};
|
||||
|
||||
|
||||
}; // namespace
|
||||
|
||||
#undef EXPORT
|
||||
|
||||
#endif // HEADER_mode.h
|
||||
|
288
utils/regr/modamysql.cpp
Normal file
288
utils/regr/modamysql.cpp
Normal file
@ -0,0 +1,288 @@
|
||||
#include <my_config.h>
|
||||
#include <cmath>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <string.h>
|
||||
#include <tr1/unordered_map>
|
||||
#include <algorithm>
|
||||
|
||||
#include "idb_mysql.h"
|
||||
|
||||
namespace
|
||||
{
|
||||
inline bool isNumeric(int type, const char* attr)
|
||||
{
|
||||
if (type == INT_RESULT || type == REAL_RESULT || type == DECIMAL_RESULT)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
#if _MSC_VER
|
||||
if (_strnicmp("NULL", attr, 4) == 0))
|
||||
#else
|
||||
if (strncasecmp("NULL", attr, 4) == 0)
|
||||
#endif
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
struct moda_data
|
||||
{
|
||||
long double fSum;
|
||||
uint64_t fCount;
|
||||
enum Item_result fReturnType;
|
||||
std::tr1::unordered_map<int64_t, uint32_t> mapINT;
|
||||
std::tr1::unordered_map<double, uint32_t> mapREAL;
|
||||
std::tr1::unordered_map<long double, uint32_t> mapDECIMAL;
|
||||
void clear()
|
||||
{
|
||||
fSum = 0.0;
|
||||
fCount = 0;
|
||||
mapINT.clear();
|
||||
mapREAL.clear();
|
||||
mapDECIMAL.clear();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
extern "C"
|
||||
{
|
||||
|
||||
#ifdef _MSC_VER
|
||||
__declspec(dllexport)
|
||||
#endif
|
||||
my_bool moda_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
|
||||
{
|
||||
struct moda_data* data;
|
||||
if (args->arg_count != 1)
|
||||
{
|
||||
strcpy(message,"moda() requires one argument");
|
||||
return 1;
|
||||
}
|
||||
if (!isNumeric(args->arg_type[0], args->attributes[0]))
|
||||
{
|
||||
strcpy(message,"moda() with a non-numeric argument");
|
||||
return 1;
|
||||
}
|
||||
|
||||
data = new moda_data;
|
||||
data->fReturnType = args->arg_type[0];
|
||||
data->fCount = 0;
|
||||
data->fSum = 0.0;
|
||||
initid->ptr = (char*)data;
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef _MSC_VER
|
||||
__declspec(dllexport)
|
||||
#endif
|
||||
void moda_deinit(UDF_INIT* initid)
|
||||
{
|
||||
struct moda_data* data = (struct moda_data*)initid->ptr;
|
||||
data->clear();
|
||||
delete data;
|
||||
}
|
||||
|
||||
#ifdef _MSC_VER
|
||||
__declspec(dllexport)
|
||||
#endif
|
||||
void moda_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
|
||||
char* message __attribute__((unused)))
|
||||
{
|
||||
struct moda_data* data = (struct moda_data*)initid->ptr;
|
||||
data->clear();
|
||||
}
|
||||
|
||||
#ifdef _MSC_VER
|
||||
__declspec(dllexport)
|
||||
#endif
|
||||
void moda_add(UDF_INIT* initid,
|
||||
UDF_ARGS* args,
|
||||
char* is_null,
|
||||
char* message __attribute__((unused)))
|
||||
{
|
||||
// Test for NULL
|
||||
if (args->args[0] == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
struct moda_data* data = (struct moda_data*)initid->ptr;
|
||||
data->fCount++;
|
||||
|
||||
switch (args->arg_type[0])
|
||||
{
|
||||
case INT_RESULT:
|
||||
{
|
||||
int64_t val = *((int64_t*)args->args[0]);
|
||||
data->fSum += (long double)val;
|
||||
data->mapINT[val]++;
|
||||
break;
|
||||
}
|
||||
case REAL_RESULT:
|
||||
{
|
||||
double val = *((double*)args->args[0]);
|
||||
data->fSum += val;
|
||||
data->mapREAL[val]++;
|
||||
break;
|
||||
}
|
||||
case DECIMAL_RESULT:
|
||||
case STRING_RESULT:
|
||||
{
|
||||
long double val = strtold(args->args[0], 0);
|
||||
data->fSum += val;
|
||||
data->mapDECIMAL[val]++;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef _MSC_VER
|
||||
__declspec(dllexport)
|
||||
#endif
|
||||
void moda_remove(UDF_INIT* initid, UDF_ARGS* args,
|
||||
char* is_null,
|
||||
char* message __attribute__((unused)))
|
||||
{
|
||||
// Test for NULL
|
||||
if (args->args[0] == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
struct moda_data* data = (struct moda_data*)initid->ptr;
|
||||
data->fCount--;
|
||||
|
||||
switch (args->arg_type[0])
|
||||
{
|
||||
case INT_RESULT:
|
||||
{
|
||||
int64_t val = *((int64_t*)args->args[0]);
|
||||
data->fSum -= (long double)val;
|
||||
data->mapINT[val]--;
|
||||
break;
|
||||
}
|
||||
case REAL_RESULT:
|
||||
{
|
||||
double val = *((double*)args->args[0]);
|
||||
data->fSum -= val;
|
||||
data->mapREAL[val]--;
|
||||
break;
|
||||
}
|
||||
case DECIMAL_RESULT:
|
||||
case STRING_RESULT:
|
||||
{
|
||||
long double val = strtold(args->args[0], 0);
|
||||
data->fSum -= val;
|
||||
data->mapDECIMAL[val]--;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef _MSC_VER
|
||||
__declspec(dllexport)
|
||||
#endif
|
||||
char* moda(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
|
||||
char* is_null, char* error __attribute__((unused)))
|
||||
{
|
||||
struct moda_data* data = (struct moda_data*)initid->ptr;
|
||||
uint32_t maxCnt = 0.0;
|
||||
|
||||
switch (args->arg_type[0])
|
||||
{
|
||||
case INT_RESULT:
|
||||
{
|
||||
typename std::tr1::unordered_map<int64_t, uint32_t>::iterator iter;
|
||||
int64_t avg = (int64_t)data->fCount ? data->fSum / data->fCount : 0;
|
||||
int64_t val = 0.0;
|
||||
for (iter = data->mapINT.begin(); iter != data->mapINT.end(); ++iter)
|
||||
{
|
||||
if (iter->second > maxCnt)
|
||||
{
|
||||
val = iter->first;
|
||||
maxCnt = iter->second;
|
||||
}
|
||||
else if (iter->second == maxCnt)
|
||||
{
|
||||
// Tie breaker: choose the closest to avg. If still tie, choose smallest
|
||||
if ((abs(val-avg) > abs(iter->first-avg))
|
||||
|| ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first))))
|
||||
{
|
||||
val = iter->first;
|
||||
}
|
||||
}
|
||||
}
|
||||
std::ostringstream oss;
|
||||
oss << val;
|
||||
return const_cast<char*>(oss.str().c_str());
|
||||
break;
|
||||
}
|
||||
case REAL_RESULT:
|
||||
{
|
||||
typename std::tr1::unordered_map<double, uint32_t>::iterator iter;
|
||||
double avg = data->fCount ? data->fSum / data->fCount : 0;
|
||||
double val = 0.0;
|
||||
for (iter = data->mapREAL.begin(); iter != data->mapREAL.end(); ++iter)
|
||||
{
|
||||
if (iter->second > maxCnt)
|
||||
{
|
||||
val = iter->first;
|
||||
maxCnt = iter->second;
|
||||
}
|
||||
else if (iter->second == maxCnt)
|
||||
{
|
||||
// Tie breaker: choose the closest to avg. If still tie, choose smallest
|
||||
if ((abs(val-avg) > abs(iter->first-avg))
|
||||
|| ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first))))
|
||||
{
|
||||
val = iter->first;
|
||||
}
|
||||
}
|
||||
}
|
||||
std::ostringstream oss;
|
||||
oss << val;
|
||||
return const_cast<char*>(oss.str().c_str());
|
||||
break;
|
||||
}
|
||||
case DECIMAL_RESULT:
|
||||
case STRING_RESULT:
|
||||
{
|
||||
typename std::tr1::unordered_map<long double, uint32_t>::iterator iter;
|
||||
long double avg = data->fCount ? data->fSum / data->fCount : 0;
|
||||
long double val = 0.0;
|
||||
for (iter = data->mapDECIMAL.begin(); iter != data->mapDECIMAL.end(); ++iter)
|
||||
{
|
||||
if (iter->second > maxCnt)
|
||||
{
|
||||
val = iter->first;
|
||||
maxCnt = iter->second;
|
||||
}
|
||||
else if (iter->second == maxCnt)
|
||||
{
|
||||
long double thisVal = iter->first;
|
||||
// Tie breaker: choose the closest to avg. If still tie, choose smallest
|
||||
if ((abs(val-avg) > abs(thisVal-avg))
|
||||
|| ((abs(val-avg) == abs(thisVal-avg)) && (abs(val) > abs(thisVal))))
|
||||
{
|
||||
val = thisVal;
|
||||
}
|
||||
}
|
||||
}
|
||||
std::ostringstream oss;
|
||||
oss << val;
|
||||
return const_cast<char*>(oss.str().c_str());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
} // Extern "C"
|
2
utils/rowgroup/rowaggregation.cpp
Normal file → Executable file
2
utils/rowgroup/rowaggregation.cpp
Normal file → Executable file
@ -2671,7 +2671,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
|
||||
|
||||
case execplan::CalpontSystemCatalog::MEDINT:
|
||||
case execplan::CalpontSystemCatalog::INT:
|
||||
if (valOut.compatible(uintTypeId))
|
||||
if (valOut.compatible(intTypeId))
|
||||
{
|
||||
intOut = valOut.cast<int>();
|
||||
bSetSuccess = true;
|
||||
|
6
utils/udfsdk/mcsv1_udaf.cpp
Normal file → Executable file
6
utils/udfsdk/mcsv1_udaf.cpp
Normal file → Executable file
@ -218,6 +218,7 @@ void mcsv1Context::serialize(messageqcpp::ByteStream& b) const
|
||||
// Dont send context flags, These are set for each call
|
||||
b << fUserDataSize;
|
||||
b << (uint32_t)fResultType;
|
||||
b << fColWidth;
|
||||
b << fResultscale;
|
||||
b << fResultPrecision;
|
||||
b << errorMsg;
|
||||
@ -228,6 +229,7 @@ void mcsv1Context::serialize(messageqcpp::ByteStream& b) const
|
||||
b << fStartConstant;
|
||||
b << fEndConstant;
|
||||
b << fParamCount;
|
||||
b << (uint32_t)mariadbReturnType;
|
||||
}
|
||||
|
||||
void mcsv1Context::unserialize(messageqcpp::ByteStream& b)
|
||||
@ -239,6 +241,7 @@ void mcsv1Context::unserialize(messageqcpp::ByteStream& b)
|
||||
uint32_t iResultType;
|
||||
b >> iResultType;
|
||||
fResultType = (execplan::CalpontSystemCatalog::ColDataType)iResultType;
|
||||
b >> fColWidth;
|
||||
b >> fResultscale;
|
||||
b >> fResultPrecision;
|
||||
b >> errorMsg;
|
||||
@ -250,6 +253,9 @@ void mcsv1Context::unserialize(messageqcpp::ByteStream& b)
|
||||
b >> fStartConstant;
|
||||
b >> fEndConstant;
|
||||
b >> fParamCount;
|
||||
uint32_t mrt;
|
||||
b >> mrt;
|
||||
mariadbReturnType = (enum_mariadb_return_type)mrt;
|
||||
}
|
||||
|
||||
void UserData::serialize(messageqcpp::ByteStream& bs) const
|
||||
|
27
utils/udfsdk/mcsv1_udaf.h
Normal file → Executable file
27
utils/udfsdk/mcsv1_udaf.h
Normal file → Executable file
@ -86,6 +86,14 @@
|
||||
|
||||
namespace mcsv1sdk
|
||||
{
|
||||
// The return type of the CREATE AGGREGATE statement
|
||||
enum enum_mariadb_return_type {
|
||||
MYSQL_TYPE_DOUBLE = 5,
|
||||
MYSQL_TYPE_LONGLONG = 8,
|
||||
MYSQL_TYPE_VARCHAR=15,
|
||||
MYSQL_TYPE_NEWDECIMAL = 246
|
||||
};
|
||||
|
||||
/**
|
||||
* A map from name to function object.
|
||||
*
|
||||
@ -303,7 +311,7 @@ public:
|
||||
EXPORT int32_t getColWidth();
|
||||
|
||||
// For non-numric return types, set the return column width. This defaults
|
||||
// to the the length of the input.
|
||||
// to a length determined by the input datatype.
|
||||
// valid in init()
|
||||
EXPORT bool setColWidth(int32_t colWidth);
|
||||
|
||||
@ -356,6 +364,9 @@ public:
|
||||
// Get the name of the function
|
||||
EXPORT const std::string& getName() const;
|
||||
|
||||
// Get the return type as set by CREATE AGGREGATE FUNCTION
|
||||
EXPORT enum_mariadb_return_type getMariaDBReturnType() const;
|
||||
|
||||
EXPORT mcsv1Context& operator=(const mcsv1Context& rhs);
|
||||
EXPORT mcsv1Context& copy(const mcsv1Context& rhs);
|
||||
|
||||
@ -380,6 +391,7 @@ private:
|
||||
mcsv1sdk::mcsv1_UDAF* func;
|
||||
int32_t fParamCount;
|
||||
std::vector<uint32_t> paramKeys;
|
||||
enum_mariadb_return_type mariadbReturnType;
|
||||
|
||||
public:
|
||||
// For use by the framework
|
||||
@ -403,6 +415,7 @@ public:
|
||||
EXPORT boost::shared_ptr<UserData> getUserDataSP();
|
||||
EXPORT void setParamCount(int32_t paramCount);
|
||||
std::vector<uint32_t>* getParamKeys();
|
||||
EXPORT void setMariaDBReturnType(enum_mariadb_return_type rt);
|
||||
};
|
||||
|
||||
// Since aggregate functions can operate on any data type, we use the following structure
|
||||
@ -956,6 +969,16 @@ inline std::vector<uint32_t>* mcsv1Context::getParamKeys()
|
||||
return ¶mKeys;
|
||||
}
|
||||
|
||||
inline enum_mariadb_return_type mcsv1Context::getMariaDBReturnType() const
|
||||
{
|
||||
return mariadbReturnType;
|
||||
}
|
||||
|
||||
inline void mcsv1Context::setMariaDBReturnType(enum_mariadb_return_type rt)
|
||||
{
|
||||
mariadbReturnType = rt;
|
||||
}
|
||||
|
||||
inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
|
||||
{
|
||||
return NOT_IMPLEMENTED;
|
||||
@ -968,8 +991,6 @@ inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::createUserData(UserData*& userData, in
|
||||
return SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Handy helper functions
|
||||
template<typename T>
|
||||
inline T mcsv1_UDAF::convertAnyTo(static_any::any& valIn)
|
||||
|
Reference in New Issue
Block a user