diff --git a/dbcon/mysql/install_calpont_mysql.sh b/dbcon/mysql/install_calpont_mysql.sh index 311e03784..a5d7150a2 100755 --- a/dbcon/mysql/install_calpont_mysql.sh +++ b/dbcon/mysql/install_calpont_mysql.sh @@ -89,6 +89,7 @@ CREATE AGGREGATE FUNCTION regr_avgy RETURNS REAL soname 'libregr_mysql.so'; CREATE AGGREGATE FUNCTION regr_count RETURNS INTEGER soname 'libregr_mysql.so'; CREATE AGGREGATE FUNCTION regr_slope RETURNS REAL soname 'libregr_mysql.so'; CREATE AGGREGATE FUNCTION regr_intercept RETURNS REAL soname 'libregr_mysql.so'; +CREATE AGGREGATE FUNCTION regr_r2 RETURNS REAL soname 'libregr_mysql.so'; CREATE AGGREGATE FUNCTION distinct_count RETURNS INTEGER soname 'libudf_mysql.so'; diff --git a/utils/regr/CMakeLists.txt b/utils/regr/CMakeLists.txt index 47db83c63..16f44d9af 100755 --- a/utils/regr/CMakeLists.txt +++ b/utils/regr/CMakeLists.txt @@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ########### next target ############### -set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept) +set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept regr_r2) add_definitions(-DMYSQL_DYNAMIC_PLUGIN) diff --git a/utils/regr/regr.vpj b/utils/regr/regr.vpj index a22c54ced..0de8c7282 100644 --- a/utils/regr/regr.vpj +++ b/utils/regr/regr.vpj @@ -198,6 +198,7 @@ + @@ -208,6 +209,7 @@ + +#include +#include +#include "regr_intercept.h" +#include "bytestream.h" +#include "objectreader.h" + +using namespace mcsv1sdk; + +class Add_regr_intercept_ToUDAFMap +{ +public: + Add_regr_intercept_ToUDAFMap() + { + UDAFMap::getMap()["regr_intercept"] = new regr_intercept(); + } +}; + +static Add_regr_intercept_ToUDAFMap addToMap; + +// Use the simple data model +struct regr_intercept_data +{ + uint64_t cnt; + double sumx; + double sumx2; // sum of (x squared) + double sumy; + double sumxy; // sum of (x*y) +}; + + +mcsv1_UDAF::ReturnCode regr_intercept::init(mcsv1Context* context, + ColumnDatum* colTypes) +{ + if (context->getParameterCount() != 2) + { + // The error message will be prepended with + // "The storage engine for the table doesn't support " + context->setErrorMessage("regr_intercept() with other than 2 arguments"); + return mcsv1_UDAF::ERROR; + } + + context->setUserDataSize(sizeof(regr_intercept_data)); + context->setResultType(CalpontSystemCatalog::DOUBLE); + context->setColWidth(8); + context->setScale(colTypes[0].scale + 8); + context->setPrecision(19); + context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS); + return mcsv1_UDAF::SUCCESS; + +} + +mcsv1_UDAF::ReturnCode regr_intercept::reset(mcsv1Context* context) +{ + struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data; + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + data->sumxy = 0.0; + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_intercept::nextValue(mcsv1Context* context, ColumnDatum* valsIn) +{ + static_any::any& valIn_y = valsIn[0].columnData; + static_any::any& valIn_x = valsIn[1].columnData; + struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data; + double valx = 0.0; + double valy = 0.0; + + valx = convertAnyTo(valIn_x); + valy = convertAnyTo(valIn_y); + + // For decimal types, we need to move the decimal point. + uint32_t scaley = valsIn[0].scale; + + if (valy != 0 && scaley > 0) + { + valy /= pow(10.0, (double)scaley); + } + + data->sumy += valy; + + // For decimal types, we need to move the decimal point. + uint32_t scalex = valsIn[1].scale; + + if (valx != 0 && scalex > 0) + { + valx /= pow(10.0, (double)scaley); + } + + data->sumx += valx; + data->sumx2 += valx*valx; + + data->sumxy += valx*valy; + ++data->cnt; + + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_intercept::subEvaluate(mcsv1Context* context, const UserData* userDataIn) +{ + if (!userDataIn) + { + return mcsv1_UDAF::SUCCESS; + } + + struct regr_intercept_data* outData = (struct regr_intercept_data*)context->getUserData()->data; + struct regr_intercept_data* inData = (struct regr_intercept_data*)userDataIn->data; + + outData->sumx += inData->sumx; + outData->sumx2 += inData->sumx2; + outData->sumy += inData->sumy; + outData->sumxy += inData->sumxy; + outData->cnt += inData->cnt; + + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_intercept::evaluate(mcsv1Context* context, static_any::any& valOut) +{ + struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data; + double N = data->cnt; + if (N > 0) + { + double sumx = data->sumx; + double sumy = data->sumy; + double sumx2 = data->sumx2; + double sumxy = data->sumxy; + double slope = 0.0; + double variance = (N * sumx2) - (sumx * sumx); + if (variance != 0) + { + slope = ((N * sumxy) - (sumx * sumy)) / variance; + valOut = (sumy - (slope * sumx)) / N; + } + } + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_intercept::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) +{ + static_any::any& valIn_y = valsDropped[0].columnData; + static_any::any& valIn_x = valsDropped[1].columnData; + struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data; + + double valx = 0.0; + double valy = 0.0; + + valx = convertAnyTo(valIn_x); + valy = convertAnyTo(valIn_y); + + // For decimal types, we need to move the decimal point. + uint32_t scaley = valsDropped[0].scale; + + if (valy != 0 && scaley > 0) + { + valy /= pow(10.0, (double)scaley); + } + + data->sumy -= valy; + + // For decimal types, we need to move the decimal point. + uint32_t scalex = valsDropped[1].scale; + + if (valx != 0 && scalex > 0) + { + valx /= pow(10.0, (double)scaley); + } + + data->sumx -= valx; + data->sumx2 -= valx*valx; + + data->sumxy -= valx*valy; + --data->cnt; + + return mcsv1_UDAF::SUCCESS; +} + diff --git a/utils/regr/regr_intercept.h b/utils/regr/regr_intercept.h new file mode 100644 index 000000000..ed82477cd --- /dev/null +++ b/utils/regr/regr_intercept.h @@ -0,0 +1,88 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/*********************************************************************** +* $Id$ +* +* regr_intercept.h +***********************************************************************/ + +/** + * Columnstore interface for for the regr_intercept function + * + * + * CREATE AGGREGATE FUNCTION regr_intercept returns REAL + * soname 'libregr_mysql.so'; + * + */ +#ifndef HEADER_regr_intercept +#define HEADER_regr_intercept + +#include +#include +#include +#ifdef _MSC_VER +#include +#else +#include +#endif + +#include "mcsv1_udaf.h" +#include "calpontsystemcatalog.h" +#include "windowfunctioncolumn.h" +using namespace execplan; + +#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT) +#define EXPORT __declspec(dllexport) +#else +#define EXPORT +#endif + +namespace mcsv1sdk +{ + +// Return the regr_intercept value of the dataset + +class regr_intercept : public mcsv1_UDAF +{ +public: + // Defaults OK + regr_intercept() : mcsv1_UDAF() {}; + virtual ~regr_intercept() {}; + + virtual ReturnCode init(mcsv1Context* context, + ColumnDatum* colTypes); + + virtual ReturnCode reset(mcsv1Context* context); + + virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn); + + virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn); + + virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut); + + virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped); + +protected: +}; + +}; // namespace + +#undef EXPORT + +#endif // HEADER_regr_intercept.h + diff --git a/utils/regr/regr_r2.cpp b/utils/regr/regr_r2.cpp new file mode 100644 index 000000000..052b5dcfc --- /dev/null +++ b/utils/regr/regr_r2.cpp @@ -0,0 +1,216 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include "regr_r2.h" +#include "bytestream.h" +#include "objectreader.h" + +using namespace mcsv1sdk; + +class Add_regr_r2_ToUDAFMap +{ +public: + Add_regr_r2_ToUDAFMap() + { + UDAFMap::getMap()["regr_r2"] = new regr_r2(); + } +}; + +static Add_regr_r2_ToUDAFMap addToMap; + +// Use the simple data model +struct regr_r2_data +{ + uint64_t cnt; + double sumx; + double sumx2; // sum of (x squared) + double sumy; + double sumy2; // sum of (y squared) + double sumxy; // sum of x * y +}; + + +mcsv1_UDAF::ReturnCode regr_r2::init(mcsv1Context* context, + ColumnDatum* colTypes) +{ + if (context->getParameterCount() != 2) + { + // The error message will be prepended with + // "The storage engine for the table doesn't support " + context->setErrorMessage("regr_r2() with other than 2 arguments"); + return mcsv1_UDAF::ERROR; + } + + context->setUserDataSize(sizeof(regr_r2_data)); + context->setResultType(CalpontSystemCatalog::DOUBLE); + context->setColWidth(8); + context->setScale(colTypes[0].scale + 8); + context->setPrecision(19); + context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS); + return mcsv1_UDAF::SUCCESS; + +} + +mcsv1_UDAF::ReturnCode regr_r2::reset(mcsv1Context* context) +{ + struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data; + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + data->sumy2 = 0.0; + data->sumxy = 0.0; + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_r2::nextValue(mcsv1Context* context, ColumnDatum* valsIn) +{ + static_any::any& valIn_y = valsIn[0].columnData; + static_any::any& valIn_x = valsIn[1].columnData; + struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data; + double valx = 0.0; + double valy = 0.0; + + valx = convertAnyTo(valIn_x); + valy = convertAnyTo(valIn_y); + + // For decimal types, we need to move the decimal point. + uint32_t scaley = valsIn[0].scale; + + if (valy != 0 && scaley > 0) + { + valy /= pow(10.0, (double)scaley); + } + + data->sumy += valy; + data->sumy2 += valy*valy; + + // For decimal types, we need to move the decimal point. + uint32_t scalex = valsIn[1].scale; + + if (valx != 0 && scalex > 0) + { + valx /= pow(10.0, (double)scaley); + } + + data->sumx += valx; + data->sumx2 += valx*valx; + + data->sumxy += valx*valy; + + ++data->cnt; + + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_r2::subEvaluate(mcsv1Context* context, const UserData* userDataIn) +{ + if (!userDataIn) + { + return mcsv1_UDAF::SUCCESS; + } + + struct regr_r2_data* outData = (struct regr_r2_data*)context->getUserData()->data; + struct regr_r2_data* inData = (struct regr_r2_data*)userDataIn->data; + + outData->sumx += inData->sumx; + outData->sumx2 += inData->sumx2; + outData->sumy += inData->sumy; + outData->sumy2 += inData->sumy2; + outData->sumxy += inData->sumxy; + outData->cnt += inData->cnt; + + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_r2::evaluate(mcsv1Context* context, static_any::any& valOut) +{ + struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data; + double N = data->cnt; + if (N > 0) + { + double sumx = data->sumx; + double sumy = data->sumy; + double sumx2 = data->sumx2; + double sumy2 = data->sumy2; + double sumxy = data->sumxy; + + double var_popx = (sumx2 - (sumx * sumx / N)) / N; + if (var_popx == 0) + { + // When var_popx is 0, NULL is the result. + return mcsv1_UDAF::SUCCESS; + } + double var_popy = (sumy2 - (sumy * sumy / N)) / N; + if (var_popy == 0) + { + // When var_popy is 0, 1 is the result + valOut = 1.0; + return mcsv1_UDAF::SUCCESS; + } + double std_popx = sqrt(var_popx); + double std_popy = sqrt(var_popy); + double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; + double corr = covar_pop / (std_popy * std_popx); + valOut = corr * corr; + } + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode regr_r2::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) +{ + static_any::any& valIn_y = valsDropped[0].columnData; + static_any::any& valIn_x = valsDropped[1].columnData; + struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data; + + double valx = 0.0; + double valy = 0.0; + + valx = convertAnyTo(valIn_x); + valy = convertAnyTo(valIn_y); + + // For decimal types, we need to move the decimal point. + uint32_t scaley = valsDropped[0].scale; + + if (valy != 0 && scaley > 0) + { + valy /= pow(10.0, (double)scaley); + } + + data->sumy -= valy; + data->sumy2 -= valy*valy; + + // For decimal types, we need to move the decimal point. + uint32_t scalex = valsDropped[1].scale; + + if (valx != 0 && scalex > 0) + { + valx /= pow(10.0, (double)scaley); + } + + data->sumx -= valx; + data->sumx2 -= valx*valx; + + data->sumxy -= valx*valy; + --data->cnt; + + return mcsv1_UDAF::SUCCESS; +} + diff --git a/utils/regr/regr_r2.h b/utils/regr/regr_r2.h new file mode 100644 index 000000000..6ff65009a --- /dev/null +++ b/utils/regr/regr_r2.h @@ -0,0 +1,88 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/*********************************************************************** +* $Id$ +* +* regr_r2.h +***********************************************************************/ + +/** + * Columnstore interface for for the regr_r2 function + * + * + * CREATE AGGREGATE FUNCTION regr_r2 returns REAL + * soname 'libregr_mysql.so'; + * + */ +#ifndef HEADER_regr_intercept +#define HEADER_regr_intercept + +#include +#include +#include +#ifdef _MSC_VER +#include +#else +#include +#endif + +#include "mcsv1_udaf.h" +#include "calpontsystemcatalog.h" +#include "windowfunctioncolumn.h" +using namespace execplan; + +#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT) +#define EXPORT __declspec(dllexport) +#else +#define EXPORT +#endif + +namespace mcsv1sdk +{ + +// Return the regr_r2 value of the dataset + +class regr_r2 : public mcsv1_UDAF +{ +public: + // Defaults OK + regr_r2() : mcsv1_UDAF() {}; + virtual ~regr_r2() {}; + + virtual ReturnCode init(mcsv1Context* context, + ColumnDatum* colTypes); + + virtual ReturnCode reset(mcsv1Context* context); + + virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn); + + virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn); + + virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut); + + virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped); + +protected: +}; + +}; // namespace + +#undef EXPORT + +#endif // HEADER_regr_intercept.h + diff --git a/utils/regr/regr_slope.cpp b/utils/regr/regr_slope.cpp index c4178c56b..51f649046 100644 --- a/utils/regr/regr_slope.cpp +++ b/utils/regr/regr_slope.cpp @@ -60,11 +60,8 @@ mcsv1_UDAF::ReturnCode regr_slope::init(mcsv1Context* context, context->setUserDataSize(sizeof(regr_slope_data)); context->setResultType(CalpontSystemCatalog::DOUBLE); context->setColWidth(8); - if (colTypes[0].scale) - { - context->setScale(colTypes[0].scale + 8); - context->setPrecision(19); - } + context->setScale(colTypes[0].scale + 8); + context->setPrecision(19); context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS); return mcsv1_UDAF::SUCCESS; @@ -77,6 +74,7 @@ mcsv1_UDAF::ReturnCode regr_slope::reset(mcsv1Context* context) data->sumx = 0.0; data->sumx2 = 0.0; data->sumy = 0.0; + data->sumxy = 0.0; return mcsv1_UDAF::SUCCESS; } @@ -150,7 +148,8 @@ mcsv1_UDAF::ReturnCode regr_slope::evaluate(mcsv1Context* context, static_any::a double variance = (N * sumx2) - (sumx * sumx); if (variance != 0) { - valOut = ((N * sumxy) - (sumx * sumy)) / variance; + double slope = ((N * sumxy) - (sumx * sumy)) / variance; + valOut = slope; } } return mcsv1_UDAF::SUCCESS; diff --git a/utils/regr/regrmysql.cpp b/utils/regr/regrmysql.cpp index 3b048f14d..fce6bb440 100644 --- a/utils/regr/regrmysql.cpp +++ b/utils/regr/regrmysql.cpp @@ -125,7 +125,7 @@ extern "C" //======================================================================= /** - * regr_avgx connector stub + * regr_avgx */ struct regr_avgx_data { @@ -209,7 +209,7 @@ extern "C" //======================================================================= /** - * regr_avgy connector stub + * regr_avgy */ struct regr_avgy_data { @@ -293,7 +293,7 @@ extern "C" //======================================================================= /** - * regr_count connector stub + * regr_count */ struct regr_count_data { @@ -372,11 +372,11 @@ extern "C" //======================================================================= /** - * regr_slope connector stub + * regr_slope */ struct regr_slope_data { - int64_t cnt; + int64_t cnt; double sumx; double sumx2; // sum of (x squared) double sumy; @@ -404,6 +404,7 @@ extern "C" data->sumx = 0.0; data->sumx2 = 0.0; data->sumy = 0.0; + data->sumxy = 0.0; initid->ptr = (char*)data; return 0; @@ -429,6 +430,7 @@ extern "C" data->sumx = 0.0; data->sumx2 = 0.0; data->sumy = 0.0; + data->sumxy = 0.0; } #ifdef _MSC_VER @@ -481,7 +483,7 @@ extern "C" //======================================================================= /** - * regr_intercept connector stub + * regr_intercept */ struct regr_intercept_data { @@ -513,6 +515,7 @@ extern "C" data->sumx = 0.0; data->sumx2 = 0.0; data->sumy = 0.0; + data->sumxy = 0.0; initid->ptr = (char*)data; return 0; @@ -538,6 +541,7 @@ extern "C" data->sumx = 0.0; data->sumx2 = 0.0; data->sumy = 0.0; + data->sumxy = 0.0; } #ifdef _MSC_VER @@ -587,6 +591,135 @@ extern "C" *is_null = 1; return 0; } + +//======================================================================= + + /** + * regr_r2 + */ + struct regr_r2_data + { + int64_t cnt; + double sumx; + double sumx2; // sum of (x squared) + double sumy; + double sumy2; // sum of (y squared) + double sumxy; // sum of (x*y) + }; + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + my_bool regr_r2_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + struct regr_r2_data* data; + if (args->arg_count != 2) + { + strcpy(message,"regr_r2() requires two arguments"); + return 1; + } + + if (!(data = (struct regr_r2_data*) malloc(sizeof(struct regr_r2_data)))) + { + strmov(message,"Couldn't allocate memory"); + return 1; + } + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + data->sumy2 = 0.0; + data->sumxy = 0.0; + + initid->ptr = (char*)data; + return 0; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void regr_r2_deinit(UDF_INIT* initid) + { + free(initid->ptr); + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void + regr_r2_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), + char* message __attribute__((unused))) + { + struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr; + data->cnt = 0; + data->sumx = 0.0; + data->sumx2 = 0.0; + data->sumy = 0.0; + data->sumy2 = 0.0; + data->sumxy = 0.0; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + void + regr_r2_add(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, + char* message __attribute__((unused))) + { + // Test for NULL in x and y + if (args->args[0] == 0 || args->args[1] == 0) + { + return; + } + struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr; + double yval = cvtArgToDouble(args->arg_type[0], args->args[0]); + double xval = cvtArgToDouble(args->arg_type[1], args->args[1]); + data->sumy += yval; + data->sumx += xval; + data->sumx2 += xval*xval; + data->sumy2 += yval*yval; + data->sumxy += xval*yval; + ++data->cnt; + } + + #ifdef _MSC_VER + __declspec(dllexport) + #endif + double regr_r2(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + char* is_null, char* error __attribute__((unused))) + { + struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr; + double N = data->cnt; + if (N > 0) + { + double sumx = data->sumx; + double sumy = data->sumy; + double sumx2 = data->sumx2; + double sumy2 = data->sumy2; + double sumxy = data->sumxy; + double var_popx = (sumx2 - (sumx * sumx / N)) / N; + if (var_popx == 0) + { + // When var_popx is 0, NULL is the result. + *is_null = 1; + return 0; + } + double var_popy = (sumy2 - (sumy * sumy / N)) / N; + if (var_popy == 0) + { + // When var_popy is 0, 1 is the result + return 1; + } + double std_popx = sqrt(var_popx); + double std_popy = sqrt(var_popy); + double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; + double corr = covar_pop / (std_popy * std_popx); + return corr * corr; + } + *is_null = 1; + return 0; + } } // vim:ts=4 sw=4: diff --git a/utils/udfsdk/distinct_count.cpp b/utils/udfsdk/distinct_count.cpp new file mode 100644 index 000000000..66dcea18f --- /dev/null +++ b/utils/udfsdk/distinct_count.cpp @@ -0,0 +1,99 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "distinct_count.h" + +using namespace mcsv1sdk; + +struct distinct_count_data +{ + uint64_t cnt; +}; + +#define OUT_TYPE int64_t +mcsv1_UDAF::ReturnCode distinct_count::init(mcsv1Context* context, + ColumnDatum* colTypes) +{ + context->setUserDataSize(sizeof(distinct_count_data)); + if (context->getParameterCount() != 1) + { + // The error message will be prepended with + // "The storage engine for the table doesn't support " + context->setErrorMessage("avgx() with other than 1 arguments"); + return mcsv1_UDAF::ERROR; + } + context->setResultType(CalpontSystemCatalog::BIGINT); + context->setColWidth(8); + context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS); + context->setRunFlag(mcsv1sdk::UDAF_DISTINCT); + context->setRunFlag(mcsv1sdk::UDAF_OVER_REQUIRED); + + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode distinct_count::reset(mcsv1Context* context) +{ + struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data; + data->cnt = 0; + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode distinct_count::nextValue(mcsv1Context* context, + ColumnDatum* valsIn) +{ + static_any::any& valIn = valsIn[0].columnData; + struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data; + + if (valIn.empty()) + { + return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. + } + data->cnt++; + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode distinct_count::subEvaluate(mcsv1Context* context, const UserData* userDataIn) +{ + struct distinct_count_data* outData = (struct distinct_count_data*)context->getUserData()->data; + struct distinct_count_data* inData = (struct distinct_count_data*)userDataIn->data; + outData->cnt += inData->cnt; + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode distinct_count::evaluate(mcsv1Context* context, static_any::any& valOut) +{ + struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data; + valOut = data->cnt; + return mcsv1_UDAF::SUCCESS; +} + +mcsv1_UDAF::ReturnCode distinct_count::dropValue(mcsv1Context* context, + ColumnDatum* valsDropped) +{ + static_any::any& valIn = valsDropped[0].columnData; + struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data; + + if (valIn.empty()) + { + return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. + } + + data->cnt--; + + return mcsv1_UDAF::SUCCESS; +} + diff --git a/utils/udfsdk/distinct_count.h b/utils/udfsdk/distinct_count.h new file mode 100644 index 000000000..1d804eaa8 --- /dev/null +++ b/utils/udfsdk/distinct_count.h @@ -0,0 +1,222 @@ +/* Copyright (C) 2017 MariaDB Corporaton + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/*********************************************************************** +* $Id$ +* +* mcsv1_UDAF.h +***********************************************************************/ + +/** + * Columnstore interface for writing a User Defined Aggregate + * Functions (UDAF) and User Defined Analytic Functions (UDAnF) + * or a function that can act as either - UDA(n)F + * + * The basic steps are: + * + * 1. Create a the UDA(n)F function interface in some .h file. + * 2. Create the UDF function implementation in some .cpp file + * 3. Create the connector stub (MariaDB UDAF definition) for + * this UDF function. + * 4. build the dynamic library using all of the source. + * 5 Put the library in $COLUMNSTORE_INSTALL/lib of + * all modules + * 6. restart the Columnstore system. + * 7. notify mysqld about the new functions with commands like: + * + * CREATE AGGREGATE FUNCTION distinct_count returns INT + * soname 'libudf_mysql.so'; + * + */ +#ifndef HEADER_distinct_count +#define HEADER_distinct_count + +#include +#include +#include + +#include "mcsv1_udaf.h" +#include "calpontsystemcatalog.h" +#include "windowfunctioncolumn.h" +using namespace execplan; + +#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT) +#define EXPORT __declspec(dllexport) +#else +#define EXPORT +#endif + +namespace mcsv1sdk +{ + +// Override mcsv1_UDAF to build your User Defined Aggregate (UDAF) and/or +// User Defined Analytic Function (UDAnF). +// These will be singleton classes, so don't put any instance +// specific data in here. All instance data is stored in mcsv1Context +// passed to each user function and retrieved by the getUserData() method. +// +// Each API function returns a ReturnCode. If ERROR is returned at any time, +// the query is aborted, getInterrupted() will begin to return true and the +// message set in config->setErrorMessage() is returned to MariaDB. +class distinct_count : public mcsv1_UDAF +{ +public: + // Defaults OK + distinct_count() : mcsv1_UDAF(){}; + virtual ~distinct_count(){}; + + /** + * init() + * + * Mandatory. Implement this to initialize flags and instance + * data. Called once per SQL statement. You can do any sanity + * checks here. + * + * colTypes (in) - A vector of ColDataType defining the + * parameters of the UDA(n)F call. These can be used to decide + * to override the default return type. If desired, the new + * return type can be set by context->setReturnType() and + * decimal precision can be set in context-> + * setResultDecimalCharacteristics. + * + * Return mcsv1_UDAF::ERROR on any error, such as non-compatible + * colTypes or wrong number of arguments. Else return + * mcsv1_UDAF::SUCCESS. + */ + virtual ReturnCode init(mcsv1Context* context, ColumnDatum* colTypes); + + /** + * reset() + * + * Mandatory. Reset the UDA(n)F for a new group, partition or, + * in some cases, new Window Frame. Do not free any memory + * allocated by context->setUserDataSize(). The SDK Framework owns + * that memory and will handle that. Use this opportunity to + * reset any variables in context->getUserData() needed for the + * next aggregation. May be called multiple times if running in + * a ditributed fashion. + * + * Use this opportunity to initialize the userData. + */ + virtual ReturnCode reset(mcsv1Context* context); + + /** + * nextValue() + * + * Mandatory. Handle a single row. + * + * colsIn - A vector of data structure describing the input + * data. + * + * This function is called once for every row in the filtered + * result set (before aggregation). It is very important that + * this function is efficient. + * + * If the UDAF is running in a distributed fashion, nextValue + * cannot depend on order, as it will only be called for each + * row found on the specific PM. + * + * valsIn (in) - a vector of the parameters from the row. + */ + virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn); + + /** + * subEvaluate() + * + * Mandatory -- Called if the UDAF is running in a distributed + * fashion. Columnstore tries to run all aggregate functions + * distributed, depending on context. + * + * Perform an aggregation on rows partially aggregated by + * nextValue. Columnstore calls nextValue for each row on a + * given PM for a group (GROUP BY). subEvaluate is called on the + * UM to consolodate those values into a single instance of + * userData. Keep your aggregated totals in context's userData. + * The first time this is called for a group, reset() would have + * been called with this version of userData. + * + * Called for every partial data set in each group in GROUP BY. + * + * When subEvaluate has been called for all subAggregated data + * sets, Evaluate will be called with the same context as here. + * + * valIn (In) - This is a pointer to a memory block of the size + * set in setUserDataSize. It will contain the value of userData + * as seen in the last call to NextValue for a given PM. + * + */ + virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* userDataIn); + + /** + * evaluate() + * + * Mandatory. Get the aggregated value. + * + * Called for every new group if UDAF GROUP BY, UDAnF partition + * or, in some cases, new Window Frame. + * + * Set the aggregated value into valOut. The datatype is assumed + * to be the same as that set in the init() function; + * + * If the UDAF is running in a distributed fashion, evaluate is + * called after a series of subEvaluate calls. + * + * valOut (out) - Set the aggregated value here. The datatype is + * assumed to be the same as that set in the init() function; + * + * To return a NULL value, don't assign to valOut. + */ + virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut); + + /** + * dropValue() + * + * Optional -- If defined, the server will call this instead of + * reset for UDAnF. + * + * Don't implement if a UDAnF has one or more of the following: + * The UDAnF can't be used with a Window Frame + * The UDAnF is not reversable in some way + * The UDAnF is not interested in optimal performance + * + * If not implemented, reset() followed by a series of + * nextValue() will be called for each movement of the Window + * Frame. + * + * If implemented, then each movement of the Window Frame will + * result in dropValue() being called for each row falling out + * of the Frame and nextValue() being called for each new row + * coming into the Frame. + * + * valsDropped (in) - a vector of the parameters from the row + * leaving the Frame + * + * dropValue() will not be called for unbounded/current row type + * frames, as those are already optimized. + */ + virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped); + +protected: + +}; + +}; // namespace + +#undef EXPORT + +#endif // HEADER_distinct_count.h +