diff --git a/dbcon/joblist/windowfunctionstep.cpp b/dbcon/joblist/windowfunctionstep.cpp old mode 100644 new mode 100755 index 6daa6ee02..c09224899 --- a/dbcon/joblist/windowfunctionstep.cpp +++ b/dbcon/joblist/windowfunctionstep.cpp @@ -522,7 +522,7 @@ void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, J igpc < csep->groupByCols().end(); ++igpc) { - if (*igpc->get() == *j->get()) + if ((*igpc)->alias() == (*j)->alias()) { bFound = true; break; diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp old mode 100644 new mode 100755 index 2eda37ba7..8a891ea6f --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -4950,6 +4950,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) mcsv1sdk::mcsv1Context& context = udafc->getContext(); context.setName(isp->func_name()); + // Get the return type as defined by CREATE AGGREGATE FUNCTION + // Most functions don't care, but some may. + context.setMariaDBReturnType((mcsv1sdk::enum_mariadb_return_type)isp->field_type()); + // Set up the return type defaults for the call to init() context.setResultType(udafc->resultType().colDataType); context.setColWidth(udafc->resultType().colWidth); diff --git a/storage-manager/include/messageFormat.h b/storage-manager/include/messageFormat.h old mode 100644 new mode 100755 index ae03cec74..d0a20ca96 --- a/storage-manager/include/messageFormat.h +++ b/storage-manager/include/messageFormat.h @@ -65,6 +65,7 @@ static const uint32_t SM_MSG_END=0x9d5bc31b; static const uint32_t SM_HEADER_LEN = sizeof(sm_msg_header); // the unix socket StorageManager is listening on +__attribute__ ((unused)) static const char *socket_name = "\0storagemanager"; #pragma GCC diagnostic pop diff --git a/utils/regr/moda.cpp b/utils/regr/moda.cpp new file mode 100644 index 000000000..180e21b2a --- /dev/null +++ b/utils/regr/moda.cpp @@ -0,0 +1,480 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include "moda.h" +#include "bytestream.h" +#include "objectreader.h" + +using namespace mcsv1sdk; + +// This is the standard way to get a UDAF function into the system's +// map of UDAF for lookup +class Add_moda_ToUDAFMap +{ +public: + Add_moda_ToUDAFMap() + { + UDAFMap::getMap()["moda"] = new moda(); + } +}; +static Add_moda_ToUDAFMap addToMap; + +// There are a few design options when creating a generic moda function: +// 1) Always use DOUBLE for internal storage +// Pros: can handle data from any native SQL type. +// Cons: If MODA(SUM()) is called, then the LONG DOUBLE returned by SUM will +// be truncated. +// It requires 8 bytes in the hash table and requires streaming 8 bytes +// per entry regardles of how small it could have been. +// 2) Always use LONG DOUBLE for internal storage +// Pros: Solves the problem of MODA(SUM()) +// Cons: It requires 16 bytes in the hash table and requires streaming 16 bytes +// per entry regardles of how small it could have been. +// 3) Use the data type of the column for internal storage +// Pros: Can handle MODA(SUM()) because LONG DOUBLE all types are handeled +// Only the data size needed is stored in the hash table and streamed +// +// This class implements option 3 by creating templated classes. +// There are two moda classes, the main one called moda, which is basically +// an adapter (Pattern) to the templated class called Moda_impl_T. +// +// The way the API works, each function class is instantiated exactly once per +// executable and then accessed via a map. This means that the function classes +// could be used by any active query, or more than once by a single query. These +// classes have no data fields for this reason. All data for a specific query is +// maintained by the context object. +// +// Each possible templated instantation is created ate moda creation during startup. +// They are the Moda_impl_T members at the bottom of the moda class definition. +// At runtime getImpl() gets the right one for the datatype involved based on context. +// +// More template magic is done in the ModaData class to create and maintained +// a hash of the correct type. + +// getImpl returns the current modaImpl or gets the correct one based on context. +mcsv1_UDAF* moda::getImpl(mcsv1Context* context) +{ + ModaData* data = static_cast(context->getUserData()); + if (data->modaImpl) + return data->modaImpl; + + switch (context->getResultType()) + { + case execplan::CalpontSystemCatalog::TINYINT: + data->modaImpl = &moda_impl_int8; + break; + case execplan::CalpontSystemCatalog::SMALLINT: + data->modaImpl = &moda_impl_int16; + break; + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::INT: + data->modaImpl = &moda_impl_int32; + break; + case execplan::CalpontSystemCatalog::BIGINT: + data->modaImpl = &moda_impl_int64; + break; + case execplan::CalpontSystemCatalog::DECIMAL: + case execplan::CalpontSystemCatalog::UDECIMAL: + switch (context->getColWidth()) + { + case 1: + data->modaImpl = &moda_impl_int8; + break; + case 2: + data->modaImpl = &moda_impl_int16; + break; + case 4: + data->modaImpl = &moda_impl_int32; + break; + default: + data->modaImpl = &moda_impl_int64; + break; + } + break; + case execplan::CalpontSystemCatalog::UTINYINT: + data->modaImpl = &moda_impl_uint8; + break; + case execplan::CalpontSystemCatalog::USMALLINT: + data->modaImpl = &moda_impl_uint16; + break; + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::UINT: + data->modaImpl = &moda_impl_uint32; + break; + case execplan::CalpontSystemCatalog::UBIGINT: + data->modaImpl = &moda_impl_uint64; + break; + case execplan::CalpontSystemCatalog::FLOAT: + data->modaImpl = &moda_impl_float; + break; + case execplan::CalpontSystemCatalog::DOUBLE: + data->modaImpl = &moda_impl_double; + break; + case execplan::CalpontSystemCatalog::LONGDOUBLE: + data->modaImpl = &moda_impl_longdouble; + break; + default: + data->modaImpl = NULL; + } + return data->modaImpl; +} + + +mcsv1_UDAF::ReturnCode moda::init(mcsv1Context* context, + ColumnDatum* colTypes) +{ + if (context->getParameterCount() < 1) + { + // The error message will be prepended with + // "The storage engine for the table doesn't support " + context->setErrorMessage("moda() with 0 arguments"); + return mcsv1_UDAF::ERROR; + } + + if (context->getParameterCount() > 1) + { + context->setErrorMessage("moda() with more than 1 argument"); + return mcsv1_UDAF::ERROR; + } + + if (!(execplan::isNumeric(colTypes[0].dataType))) + { + // The error message will be prepended with + // "The storage engine for the table doesn't support " + context->setErrorMessage("moda() with non-numeric argument"); + return mcsv1_UDAF::ERROR; + } + + context->setResultType(colTypes[0].dataType); + + if (colTypes[0].dataType == execplan::CalpontSystemCatalog::DECIMAL + || colTypes[0].dataType == execplan::CalpontSystemCatalog::UDECIMAL) + { + if (colTypes[0].precision < 3) + { + context->setColWidth(1); + } + else if (colTypes[0].precision < 4) + { + context->setColWidth(2); + } + else if (colTypes[0].precision < 9) + { + context->setColWidth(4); + } + else + { + context->setColWidth(8); + } + } + + mcsv1_UDAF* impl = getImpl(context); + + if (!impl) + { + // The error message will be prepended with + // "The storage engine for the table doesn't support " + context->setErrorMessage("moda() with non-numeric argument"); + return mcsv1_UDAF::ERROR; + } + + context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS); + return impl->init(context, colTypes); +} + +template +mcsv1_UDAF::ReturnCode Moda_impl_T::init(mcsv1Context* context, + ColumnDatum* colTypes) +{ + context->setScale(context->getScale()); + context->setPrecision(19); + return mcsv1_UDAF::SUCCESS; + +} + +template +mcsv1_UDAF::ReturnCode Moda_impl_T::reset(mcsv1Context* context) +{ + ModaData* data = static_cast(context->getUserData()); + data->fReturnType = context->getResultType(); + data->fColWidth = context->getColWidth(); + data->clear(); + return mcsv1_UDAF::SUCCESS; +} + +template +mcsv1_UDAF::ReturnCode Moda_impl_T::nextValue(mcsv1Context* context, ColumnDatum* valsIn) +{ + static_any::any& valIn = valsIn[0].columnData; + ModaData* data = static_cast(context->getUserData()); + std::unordered_map* map = data->getMap(); + + if (valIn.empty()) + { + return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. + } + + T val = convertAnyTo(valIn); + + if (context->getResultType() == execplan::CalpontSystemCatalog::DOUBLE) + { + // For decimal types, we need to move the decimal point. + uint32_t scale = valsIn[0].scale; + + if (val != 0 && scale > 0) + { + val /= pow(10.0, (double)scale); + } + } + + data->fSum += val; + ++data->fCount; + (*map)[val]++; + + return mcsv1_UDAF::SUCCESS; +} + +template +mcsv1_UDAF::ReturnCode Moda_impl_T::subEvaluate(mcsv1Context* context, const UserData* userDataIn) +{ + if (!userDataIn) + { + return mcsv1_UDAF::SUCCESS; + } + + ModaData* outData = static_cast(context->getUserData()); + const ModaData* inData = static_cast(userDataIn); + std::unordered_map* outMap = outData->getMap(); + std::unordered_map* inMap = inData->getMap(); + typename std::unordered_map::const_iterator iter; + + for (iter = inMap->begin(); iter != inMap->end(); ++iter) + { + (*outMap)[iter->first] += iter->second; + } + // AVG + outData->fSum += inData->fSum; + outData->fCount += inData->fCount; + + return mcsv1_UDAF::SUCCESS; +} + +template +mcsv1_UDAF::ReturnCode Moda_impl_T::evaluate(mcsv1Context* context, static_any::any& valOut) +{ + uint64_t maxCnt = 0; + T avg = 0; + T val = 0; + ModaData* data = static_cast(context->getUserData()); + std::unordered_map* map = data->getMap(); + + if (map->size() == 0) + { + valOut = (T)0; + return mcsv1_UDAF::SUCCESS; + } + + avg = data->fCount ? data->fSum / data->fCount : 0; + typename std::unordered_map::iterator iter; + + for (iter = map->begin(); iter != map->end(); ++iter) + { + if (iter->second > maxCnt) + { + val = iter->first; + maxCnt = iter->second; + } + else if (iter->second == maxCnt) + { + // Tie breaker: choose the closest to avg. If still tie, choose smallest + if ((abs(val-avg) > abs(iter->first-avg)) + || ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first)))) + { + val = iter->first; + } + } + } + + // If scale is > 0, then the original type was DECIMAL. Set the + // ResultType to DECIMAL so the delivery logic moves the decimal point. + if (context->getScale() > 0) + context->setResultType(execplan::CalpontSystemCatalog::DECIMAL); + + valOut = val; + return mcsv1_UDAF::SUCCESS; +} + +template +mcsv1_UDAF::ReturnCode Moda_impl_T::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) +{ + static_any::any& valDropped = valsDropped[0].columnData; + ModaData* data = static_cast(context->getUserData()); + std::unordered_map* map = data->getMap(); + + if (valDropped.empty()) + { + return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. + } + + T val = convertAnyTo(valDropped); + + data->fSum -= val; + --data->fCount; + (*map)[val]--; + + return mcsv1_UDAF::SUCCESS; +} + +void ModaData::serialize(messageqcpp::ByteStream& bs) const +{ + bs << fReturnType; + bs << fSum; + bs << fCount; + bs << fColWidth; + + switch ((execplan::CalpontSystemCatalog::ColDataType)fReturnType) + { + case execplan::CalpontSystemCatalog::TINYINT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::SMALLINT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::INT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::BIGINT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::DECIMAL: + case execplan::CalpontSystemCatalog::UDECIMAL: + switch (fColWidth) + { + case 1: + serializeMap(bs); + break; + case 2: + serializeMap(bs); + break; + case 4: + serializeMap(bs); + break; + default: + serializeMap(bs); + break; + } + break; + case execplan::CalpontSystemCatalog::UTINYINT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::USMALLINT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::UINT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::UBIGINT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::FLOAT: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::DOUBLE: + serializeMap(bs); + break; + case execplan::CalpontSystemCatalog::LONGDOUBLE: + serializeMap(bs); + break; + default: + throw std::runtime_error("ModaData::serialize with bad data type"); + break; + } +} + +void ModaData::unserialize(messageqcpp::ByteStream& bs) +{ + bs >> fReturnType; + bs >> fSum; + bs >> fCount; + bs >> fColWidth; + + switch ((execplan::CalpontSystemCatalog::ColDataType)fReturnType) + { + case execplan::CalpontSystemCatalog::TINYINT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::SMALLINT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::INT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::BIGINT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::DECIMAL: + case execplan::CalpontSystemCatalog::UDECIMAL: + switch (fColWidth) + { + case 1: + unserializeMap(bs); + break; + case 2: + unserializeMap(bs); + break; + case 4: + unserializeMap(bs); + break; + default: + unserializeMap(bs); + break; + } + break; + case execplan::CalpontSystemCatalog::UTINYINT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::USMALLINT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::UINT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::UBIGINT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::FLOAT: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::DOUBLE: + unserializeMap(bs); + break; + case execplan::CalpontSystemCatalog::LONGDOUBLE: + unserializeMap(bs); + break; + default: + throw std::runtime_error("ModaData::unserialize with bad data type"); + break; + } +} + diff --git a/utils/regr/moda.h b/utils/regr/moda.h new file mode 100644 index 000000000..2a9e6cbe7 --- /dev/null +++ b/utils/regr/moda.h @@ -0,0 +1,236 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/*********************************************************************** +* $Id$ +* +* moda.h +***********************************************************************/ + +/** + * Columnstore interface for the moda User Defined Aggregate + * Functions (UDAF) and User Defined Analytic Functions (UDAnF). + * + * To notify mysqld about the new function: + * + * CREATE AGGREGATE FUNCTION moda returns STRING soname 'libregr_mysql.so'; + * + * moda returns the value with the greatest number of occurances in + * the dataset with ties being broken by: + * 1) closest to AVG + * 2) smallest value + */ +#ifndef HEADER_moda +#define HEADER_moda + +#include +#include +#include +#include + +#include "mcsv1_udaf.h" +#include "calpontsystemcatalog.h" +#include "windowfunctioncolumn.h" + +#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT) +#define EXPORT __declspec(dllexport) +#else +#define EXPORT +#endif + +namespace mcsv1sdk +{ +// Override UserData for data storage +struct ModaData : public UserData +{ + ModaData() : fMap(NULL), + fReturnType((uint32_t)execplan::CalpontSystemCatalog::UNDEFINED), + fColWidth(0), + modaImpl(NULL) + {}; + + virtual ~ModaData() {} + + virtual void serialize(messageqcpp::ByteStream& bs) const; + virtual void unserialize(messageqcpp::ByteStream& bs); + + template + std::unordered_map* getMap() + { + if (!fMap) + { + // Just in time creation + fMap = new std::unordered_map; + } + return (std::unordered_map*) fMap; + } + + // The const version is only called by serialize() + // It shouldn't (and can't) create a new map. + template + std::unordered_map* getMap() const + { + if (!fMap) + { + throw std::runtime_error("ModaData::serialize with no map"); + } + return (std::unordered_map*) fMap; + } + + template + void clear() + { + fSum = 0.0; + fCount = 0; + getMap()->clear(); + } + + long double fSum; + uint64_t fCount; + void* fMap; // Will be of type unordered_map<> + uint32_t fReturnType; + uint32_t fColWidth; + mcsv1_UDAF* modaImpl; // A pointer to one of the Moda_impl_T concrete classes + +private: + // For now, copy construction is unwanted + ModaData(UserData&); + + // Templated map streamers + template + void serializeMap(messageqcpp::ByteStream& bs) const + { + std::unordered_map* map = getMap(); + typename std::unordered_map::const_iterator iter; + bs << (uint64_t)map->size(); + for (iter = map->begin(); iter != map->end(); ++iter) + { + bs << iter->first; + bs << iter->second; + } + } + + template + void unserializeMap(messageqcpp::ByteStream& bs) + { + uint32_t cnt; + T num; + uint64_t sz; + bs >> sz; + std::unordered_map* map = getMap(); + map->clear(); + for (uint64_t i = 0; i < sz; ++i) + { + bs >> num; + bs >> cnt; + (*map)[num] = cnt; + } + } +}; + +template +class Moda_impl_T : public mcsv1_UDAF +{ +public: + // Defaults OK + Moda_impl_T() {}; + virtual ~Moda_impl_T() {}; + + virtual mcsv1_UDAF::ReturnCode init(mcsv1Context* context, + ColumnDatum* colTypes); + + virtual mcsv1_UDAF::ReturnCode reset(mcsv1Context* context); + virtual mcsv1_UDAF::ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn); + virtual mcsv1_UDAF::ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn); + virtual mcsv1_UDAF::ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut); + virtual mcsv1_UDAF::ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped); + + // Dummy: not used + virtual mcsv1_UDAF::ReturnCode createUserData(UserData*& userData, int32_t& length) + { + return mcsv1_UDAF::SUCCESS; + } +}; + +// moda returns the modal value of the dataset. If more than one value +// have the same maximum number of occurances, then the one closest to +// AVG wins. If two are the same distance from AVG, then the smaller wins. +class moda : public mcsv1_UDAF +{ +public: + // Defaults OK + moda() : mcsv1_UDAF() {}; + virtual ~moda() {}; + + virtual mcsv1_UDAF::ReturnCode init(mcsv1Context* context, + ColumnDatum* colTypes); + + virtual ReturnCode reset(mcsv1Context* context) + { + return getImpl(context)->reset(context); + } + + virtual mcsv1_UDAF::ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn) + { + return getImpl(context)->nextValue(context, valsIn); + } + + virtual mcsv1_UDAF::ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn) + { + return getImpl(context)->subEvaluate(context, valIn); + } + + virtual mcsv1_UDAF::ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut) + { + return getImpl(context)->evaluate(context, valOut); + } + + virtual mcsv1_UDAF::ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped) + { + return getImpl(context)->dropValue(context, valsDropped); + } + + mcsv1_UDAF::ReturnCode createUserData(UserData*& userData, int32_t& length) + { + userData = new ModaData; + length = sizeof(ModaData); + return mcsv1_UDAF::SUCCESS; + } + + mcsv1_UDAF* getImpl(mcsv1Context* context); + +protected: + Moda_impl_T moda_impl_int8; + Moda_impl_T moda_impl_int16; + Moda_impl_T moda_impl_int32; + Moda_impl_T moda_impl_int64; + Moda_impl_T moda_impl_uint8; + Moda_impl_T moda_impl_uint16; + Moda_impl_T moda_impl_uint32; + Moda_impl_T moda_impl_uint64; + Moda_impl_T moda_impl_float; + Moda_impl_T moda_impl_double; + Moda_impl_T moda_impl_longdouble; +}; + + +}; // namespace + +#undef EXPORT + +#endif // HEADER_mode.h + diff --git a/utils/regr/modamysql.cpp b/utils/regr/modamysql.cpp new file mode 100644 index 000000000..0d22fa343 --- /dev/null +++ b/utils/regr/modamysql.cpp @@ -0,0 +1,288 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "idb_mysql.h" + +namespace +{ +inline bool isNumeric(int type, const char* attr) +{ + if (type == INT_RESULT || type == REAL_RESULT || type == DECIMAL_RESULT) + { + return true; + } +#if _MSC_VER + if (_strnicmp("NULL", attr, 4) == 0)) +#else + if (strncasecmp("NULL", attr, 4) == 0) +#endif + { + return true; + } + return false; +} + +struct moda_data +{ + long double fSum; + uint64_t fCount; + enum Item_result fReturnType; + std::tr1::unordered_map mapINT; + std::tr1::unordered_map mapREAL; + std::tr1::unordered_map mapDECIMAL; + void clear() + { + fSum = 0.0; + fCount = 0; + mapINT.clear(); + mapREAL.clear(); + mapDECIMAL.clear(); + } +}; + +} + +extern "C" +{ + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +my_bool moda_init(UDF_INIT* initid, UDF_ARGS* args, char* message) +{ + struct moda_data* data; + if (args->arg_count != 1) + { + strcpy(message,"moda() requires one argument"); + return 1; + } + if (!isNumeric(args->arg_type[0], args->attributes[0])) + { + strcpy(message,"moda() with a non-numeric argument"); + return 1; + } + + data = new moda_data; + data->fReturnType = args->arg_type[0]; + data->fCount = 0; + data->fSum = 0.0; + initid->ptr = (char*)data; + return 0; +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +void moda_deinit(UDF_INIT* initid) +{ + struct moda_data* data = (struct moda_data*)initid->ptr; + data->clear(); + delete data; +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +void moda_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), + char* message __attribute__((unused))) +{ + struct moda_data* data = (struct moda_data*)initid->ptr; + data->clear(); +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +void moda_add(UDF_INIT* initid, + UDF_ARGS* args, + char* is_null, + char* message __attribute__((unused))) +{ + // Test for NULL + if (args->args[0] == 0) + { + return; + } + + struct moda_data* data = (struct moda_data*)initid->ptr; + data->fCount++; + + switch (args->arg_type[0]) + { + case INT_RESULT: + { + int64_t val = *((int64_t*)args->args[0]); + data->fSum += (long double)val; + data->mapINT[val]++; + break; + } + case REAL_RESULT: + { + double val = *((double*)args->args[0]); + data->fSum += val; + data->mapREAL[val]++; + break; + } + case DECIMAL_RESULT: + case STRING_RESULT: + { + long double val = strtold(args->args[0], 0); + data->fSum += val; + data->mapDECIMAL[val]++; + break; + } + default: + break; + } +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +void moda_remove(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, + char* message __attribute__((unused))) +{ + // Test for NULL + if (args->args[0] == 0) + { + return; + } + struct moda_data* data = (struct moda_data*)initid->ptr; + data->fCount--; + + switch (args->arg_type[0]) + { + case INT_RESULT: + { + int64_t val = *((int64_t*)args->args[0]); + data->fSum -= (long double)val; + data->mapINT[val]--; + break; + } + case REAL_RESULT: + { + double val = *((double*)args->args[0]); + data->fSum -= val; + data->mapREAL[val]--; + break; + } + case DECIMAL_RESULT: + case STRING_RESULT: + { + long double val = strtold(args->args[0], 0); + data->fSum -= val; + data->mapDECIMAL[val]--; + break; + } + default: + break; + } +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +char* moda(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + char* is_null, char* error __attribute__((unused))) +{ + struct moda_data* data = (struct moda_data*)initid->ptr; + uint32_t maxCnt = 0.0; + + switch (args->arg_type[0]) + { + case INT_RESULT: + { + typename std::tr1::unordered_map::iterator iter; + int64_t avg = (int64_t)data->fCount ? data->fSum / data->fCount : 0; + int64_t val = 0.0; + for (iter = data->mapINT.begin(); iter != data->mapINT.end(); ++iter) + { + if (iter->second > maxCnt) + { + val = iter->first; + maxCnt = iter->second; + } + else if (iter->second == maxCnt) + { + // Tie breaker: choose the closest to avg. If still tie, choose smallest + if ((abs(val-avg) > abs(iter->first-avg)) + || ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first)))) + { + val = iter->first; + } + } + } + std::ostringstream oss; + oss << val; + return const_cast(oss.str().c_str()); + break; + } + case REAL_RESULT: + { + typename std::tr1::unordered_map::iterator iter; + double avg = data->fCount ? data->fSum / data->fCount : 0; + double val = 0.0; + for (iter = data->mapREAL.begin(); iter != data->mapREAL.end(); ++iter) + { + if (iter->second > maxCnt) + { + val = iter->first; + maxCnt = iter->second; + } + else if (iter->second == maxCnt) + { + // Tie breaker: choose the closest to avg. If still tie, choose smallest + if ((abs(val-avg) > abs(iter->first-avg)) + || ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first)))) + { + val = iter->first; + } + } + } + std::ostringstream oss; + oss << val; + return const_cast(oss.str().c_str()); + break; + } + case DECIMAL_RESULT: + case STRING_RESULT: + { + typename std::tr1::unordered_map::iterator iter; + long double avg = data->fCount ? data->fSum / data->fCount : 0; + long double val = 0.0; + for (iter = data->mapDECIMAL.begin(); iter != data->mapDECIMAL.end(); ++iter) + { + if (iter->second > maxCnt) + { + val = iter->first; + maxCnt = iter->second; + } + else if (iter->second == maxCnt) + { + long double thisVal = iter->first; + // Tie breaker: choose the closest to avg. If still tie, choose smallest + if ((abs(val-avg) > abs(thisVal-avg)) + || ((abs(val-avg) == abs(thisVal-avg)) && (abs(val) > abs(thisVal)))) + { + val = thisVal; + } + } + } + std::ostringstream oss; + oss << val; + return const_cast(oss.str().c_str()); + break; + } + default: + break; + } + return NULL; +} + +} // Extern "C" diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp old mode 100644 new mode 100755 index a6e529274..71f43652c --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -2671,7 +2671,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut) case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: - if (valOut.compatible(uintTypeId)) + if (valOut.compatible(intTypeId)) { intOut = valOut.cast(); bSetSuccess = true; diff --git a/utils/udfsdk/mcsv1_udaf.cpp b/utils/udfsdk/mcsv1_udaf.cpp old mode 100644 new mode 100755 index b16036e05..326c7fee0 --- a/utils/udfsdk/mcsv1_udaf.cpp +++ b/utils/udfsdk/mcsv1_udaf.cpp @@ -218,6 +218,7 @@ void mcsv1Context::serialize(messageqcpp::ByteStream& b) const // Dont send context flags, These are set for each call b << fUserDataSize; b << (uint32_t)fResultType; + b << fColWidth; b << fResultscale; b << fResultPrecision; b << errorMsg; @@ -228,6 +229,7 @@ void mcsv1Context::serialize(messageqcpp::ByteStream& b) const b << fStartConstant; b << fEndConstant; b << fParamCount; + b << (uint32_t)mariadbReturnType; } void mcsv1Context::unserialize(messageqcpp::ByteStream& b) @@ -239,6 +241,7 @@ void mcsv1Context::unserialize(messageqcpp::ByteStream& b) uint32_t iResultType; b >> iResultType; fResultType = (execplan::CalpontSystemCatalog::ColDataType)iResultType; + b >> fColWidth; b >> fResultscale; b >> fResultPrecision; b >> errorMsg; @@ -250,6 +253,9 @@ void mcsv1Context::unserialize(messageqcpp::ByteStream& b) b >> fStartConstant; b >> fEndConstant; b >> fParamCount; + uint32_t mrt; + b >> mrt; + mariadbReturnType = (enum_mariadb_return_type)mrt; } void UserData::serialize(messageqcpp::ByteStream& bs) const diff --git a/utils/udfsdk/mcsv1_udaf.h b/utils/udfsdk/mcsv1_udaf.h old mode 100644 new mode 100755 index 79d40ccf5..3057cc1aa --- a/utils/udfsdk/mcsv1_udaf.h +++ b/utils/udfsdk/mcsv1_udaf.h @@ -86,6 +86,14 @@ namespace mcsv1sdk { +// The return type of the CREATE AGGREGATE statement +enum enum_mariadb_return_type { + MYSQL_TYPE_DOUBLE = 5, + MYSQL_TYPE_LONGLONG = 8, + MYSQL_TYPE_VARCHAR=15, + MYSQL_TYPE_NEWDECIMAL = 246 +}; + /** * A map from name to function object. * @@ -303,7 +311,7 @@ public: EXPORT int32_t getColWidth(); // For non-numric return types, set the return column width. This defaults - // to the the length of the input. + // to a length determined by the input datatype. // valid in init() EXPORT bool setColWidth(int32_t colWidth); @@ -355,6 +363,9 @@ public: // Get the name of the function EXPORT const std::string& getName() const; + + // Get the return type as set by CREATE AGGREGATE FUNCTION + EXPORT enum_mariadb_return_type getMariaDBReturnType() const; EXPORT mcsv1Context& operator=(const mcsv1Context& rhs); EXPORT mcsv1Context& copy(const mcsv1Context& rhs); @@ -380,6 +391,7 @@ private: mcsv1sdk::mcsv1_UDAF* func; int32_t fParamCount; std::vector paramKeys; + enum_mariadb_return_type mariadbReturnType; public: // For use by the framework @@ -403,6 +415,7 @@ public: EXPORT boost::shared_ptr getUserDataSP(); EXPORT void setParamCount(int32_t paramCount); std::vector* getParamKeys(); + EXPORT void setMariaDBReturnType(enum_mariadb_return_type rt); }; // Since aggregate functions can operate on any data type, we use the following structure @@ -420,7 +433,7 @@ public: // For char, varchar, text, varbinary and blob types, columnData will be std::string. struct ColumnDatum { - execplan::CalpontSystemCatalog::ColDataType dataType; // defined in calpontsystemcatalog.h + execplan::CalpontSystemCatalog::ColDataType dataType; // defined in calpontsystemcatalog.h static_any::any columnData; // Not valid in init() uint32_t scale; // If dataType is a DECIMAL type uint32_t precision; // If dataType is a DECIMAL type @@ -956,6 +969,16 @@ inline std::vector* mcsv1Context::getParamKeys() return ¶mKeys; } +inline enum_mariadb_return_type mcsv1Context::getMariaDBReturnType() const +{ + return mariadbReturnType; +} + +inline void mcsv1Context::setMariaDBReturnType(enum_mariadb_return_type rt) +{ + mariadbReturnType = rt; +} + inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) { return NOT_IMPLEMENTED; @@ -968,8 +991,6 @@ inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::createUserData(UserData*& userData, in return SUCCESS; } - - // Handy helper functions template inline T mcsv1_UDAF::convertAnyTo(static_any::any& valIn)