1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-521 add regr_intecept and regr_r2

This commit is contained in:
David Hall
2018-09-28 13:51:43 -05:00
parent b8bf311c51
commit dc9ba90f96
11 changed files with 1058 additions and 13 deletions

View File

@ -89,6 +89,7 @@ CREATE AGGREGATE FUNCTION regr_avgy RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_count RETURNS INTEGER soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_slope RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_intercept RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION regr_r2 RETURNS REAL soname 'libregr_mysql.so';
CREATE AGGREGATE FUNCTION distinct_count RETURNS INTEGER soname 'libudf_mysql.so';

View File

@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
########### next target ###############
set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept)
set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept regr_r2)
add_definitions(-DMYSQL_DYNAMIC_PLUGIN)

View File

@ -198,6 +198,7 @@
<F N="regr_avgy.cpp"/>
<F N="regr_count.cpp"/>
<F N="regr_intercept.cpp"/>
<F N="regr_r2.cpp"/>
<F N="regr_slope.cpp"/>
<F N="regrmysql.cpp"/>
</Folder>
@ -208,6 +209,7 @@
<F N="regr_avgy.h"/>
<F N="regr_count.h"/>
<F N="regr_intercept.h"/>
<F N="regr_r2.h"/>
<F N="regr_slope.h"/>
</Folder>
<Folder

View File

@ -0,0 +1,197 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sstream>
#include <cstring>
#include <typeinfo>
#include "regr_intercept.h"
#include "bytestream.h"
#include "objectreader.h"
using namespace mcsv1sdk;
class Add_regr_intercept_ToUDAFMap
{
public:
Add_regr_intercept_ToUDAFMap()
{
UDAFMap::getMap()["regr_intercept"] = new regr_intercept();
}
};
static Add_regr_intercept_ToUDAFMap addToMap;
// Use the simple data model
struct regr_intercept_data
{
uint64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumxy; // sum of (x*y)
};
mcsv1_UDAF::ReturnCode regr_intercept::init(mcsv1Context* context,
ColumnDatum* colTypes)
{
if (context->getParameterCount() != 2)
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("regr_intercept() with other than 2 arguments");
return mcsv1_UDAF::ERROR;
}
context->setUserDataSize(sizeof(regr_intercept_data));
context->setResultType(CalpontSystemCatalog::DOUBLE);
context->setColWidth(8);
context->setScale(colTypes[0].scale + 8);
context->setPrecision(19);
context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS);
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_intercept::reset(mcsv1Context* context)
{
struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data;
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumxy = 0.0;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_intercept::nextValue(mcsv1Context* context, ColumnDatum* valsIn)
{
static_any::any& valIn_y = valsIn[0].columnData;
static_any::any& valIn_x = valsIn[1].columnData;
struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data;
double valx = 0.0;
double valy = 0.0;
valx = convertAnyTo<double>(valIn_x);
valy = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scaley = valsIn[0].scale;
if (valy != 0 && scaley > 0)
{
valy /= pow(10.0, (double)scaley);
}
data->sumy += valy;
// For decimal types, we need to move the decimal point.
uint32_t scalex = valsIn[1].scale;
if (valx != 0 && scalex > 0)
{
valx /= pow(10.0, (double)scaley);
}
data->sumx += valx;
data->sumx2 += valx*valx;
data->sumxy += valx*valy;
++data->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_intercept::subEvaluate(mcsv1Context* context, const UserData* userDataIn)
{
if (!userDataIn)
{
return mcsv1_UDAF::SUCCESS;
}
struct regr_intercept_data* outData = (struct regr_intercept_data*)context->getUserData()->data;
struct regr_intercept_data* inData = (struct regr_intercept_data*)userDataIn->data;
outData->sumx += inData->sumx;
outData->sumx2 += inData->sumx2;
outData->sumy += inData->sumy;
outData->sumxy += inData->sumxy;
outData->cnt += inData->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_intercept::evaluate(mcsv1Context* context, static_any::any& valOut)
{
struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data;
double N = data->cnt;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumxy = data->sumxy;
double slope = 0.0;
double variance = (N * sumx2) - (sumx * sumx);
if (variance != 0)
{
slope = ((N * sumxy) - (sumx * sumy)) / variance;
valOut = (sumy - (slope * sumx)) / N;
}
}
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_intercept::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
static_any::any& valIn_y = valsDropped[0].columnData;
static_any::any& valIn_x = valsDropped[1].columnData;
struct regr_intercept_data* data = (struct regr_intercept_data*)context->getUserData()->data;
double valx = 0.0;
double valy = 0.0;
valx = convertAnyTo<double>(valIn_x);
valy = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scaley = valsDropped[0].scale;
if (valy != 0 && scaley > 0)
{
valy /= pow(10.0, (double)scaley);
}
data->sumy -= valy;
// For decimal types, we need to move the decimal point.
uint32_t scalex = valsDropped[1].scale;
if (valx != 0 && scalex > 0)
{
valx /= pow(10.0, (double)scaley);
}
data->sumx -= valx;
data->sumx2 -= valx*valx;
data->sumxy -= valx*valy;
--data->cnt;
return mcsv1_UDAF::SUCCESS;
}

View File

@ -0,0 +1,88 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id$
*
* regr_intercept.h
***********************************************************************/
/**
* Columnstore interface for for the regr_intercept function
*
*
* CREATE AGGREGATE FUNCTION regr_intercept returns REAL
* soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_intercept
#define HEADER_regr_intercept
#include <cstdlib>
#include <string>
#include <vector>
#ifdef _MSC_VER
#include <unordered_map>
#else
#include <tr1/unordered_map>
#endif
#include "mcsv1_udaf.h"
#include "calpontsystemcatalog.h"
#include "windowfunctioncolumn.h"
using namespace execplan;
#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
namespace mcsv1sdk
{
// Return the regr_intercept value of the dataset
class regr_intercept : public mcsv1_UDAF
{
public:
// Defaults OK
regr_intercept() : mcsv1_UDAF() {};
virtual ~regr_intercept() {};
virtual ReturnCode init(mcsv1Context* context,
ColumnDatum* colTypes);
virtual ReturnCode reset(mcsv1Context* context);
virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn);
virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn);
virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut);
virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped);
protected:
};
}; // namespace
#undef EXPORT
#endif // HEADER_regr_intercept.h

216
utils/regr/regr_r2.cpp Normal file
View File

@ -0,0 +1,216 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sstream>
#include <cstring>
#include <typeinfo>
#include "regr_r2.h"
#include "bytestream.h"
#include "objectreader.h"
using namespace mcsv1sdk;
class Add_regr_r2_ToUDAFMap
{
public:
Add_regr_r2_ToUDAFMap()
{
UDAFMap::getMap()["regr_r2"] = new regr_r2();
}
};
static Add_regr_r2_ToUDAFMap addToMap;
// Use the simple data model
struct regr_r2_data
{
uint64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumy2; // sum of (y squared)
double sumxy; // sum of x * y
};
mcsv1_UDAF::ReturnCode regr_r2::init(mcsv1Context* context,
ColumnDatum* colTypes)
{
if (context->getParameterCount() != 2)
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("regr_r2() with other than 2 arguments");
return mcsv1_UDAF::ERROR;
}
context->setUserDataSize(sizeof(regr_r2_data));
context->setResultType(CalpontSystemCatalog::DOUBLE);
context->setColWidth(8);
context->setScale(colTypes[0].scale + 8);
context->setPrecision(19);
context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS);
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_r2::reset(mcsv1Context* context)
{
struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data;
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumy2 = 0.0;
data->sumxy = 0.0;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_r2::nextValue(mcsv1Context* context, ColumnDatum* valsIn)
{
static_any::any& valIn_y = valsIn[0].columnData;
static_any::any& valIn_x = valsIn[1].columnData;
struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data;
double valx = 0.0;
double valy = 0.0;
valx = convertAnyTo<double>(valIn_x);
valy = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scaley = valsIn[0].scale;
if (valy != 0 && scaley > 0)
{
valy /= pow(10.0, (double)scaley);
}
data->sumy += valy;
data->sumy2 += valy*valy;
// For decimal types, we need to move the decimal point.
uint32_t scalex = valsIn[1].scale;
if (valx != 0 && scalex > 0)
{
valx /= pow(10.0, (double)scaley);
}
data->sumx += valx;
data->sumx2 += valx*valx;
data->sumxy += valx*valy;
++data->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_r2::subEvaluate(mcsv1Context* context, const UserData* userDataIn)
{
if (!userDataIn)
{
return mcsv1_UDAF::SUCCESS;
}
struct regr_r2_data* outData = (struct regr_r2_data*)context->getUserData()->data;
struct regr_r2_data* inData = (struct regr_r2_data*)userDataIn->data;
outData->sumx += inData->sumx;
outData->sumx2 += inData->sumx2;
outData->sumy += inData->sumy;
outData->sumy2 += inData->sumy2;
outData->sumxy += inData->sumxy;
outData->cnt += inData->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_r2::evaluate(mcsv1Context* context, static_any::any& valOut)
{
struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data;
double N = data->cnt;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumy2 = data->sumy2;
double sumxy = data->sumxy;
double var_popx = (sumx2 - (sumx * sumx / N)) / N;
if (var_popx == 0)
{
// When var_popx is 0, NULL is the result.
return mcsv1_UDAF::SUCCESS;
}
double var_popy = (sumy2 - (sumy * sumy / N)) / N;
if (var_popy == 0)
{
// When var_popy is 0, 1 is the result
valOut = 1.0;
return mcsv1_UDAF::SUCCESS;
}
double std_popx = sqrt(var_popx);
double std_popy = sqrt(var_popy);
double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
double corr = covar_pop / (std_popy * std_popx);
valOut = corr * corr;
}
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode regr_r2::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
static_any::any& valIn_y = valsDropped[0].columnData;
static_any::any& valIn_x = valsDropped[1].columnData;
struct regr_r2_data* data = (struct regr_r2_data*)context->getUserData()->data;
double valx = 0.0;
double valy = 0.0;
valx = convertAnyTo<double>(valIn_x);
valy = convertAnyTo<double>(valIn_y);
// For decimal types, we need to move the decimal point.
uint32_t scaley = valsDropped[0].scale;
if (valy != 0 && scaley > 0)
{
valy /= pow(10.0, (double)scaley);
}
data->sumy -= valy;
data->sumy2 -= valy*valy;
// For decimal types, we need to move the decimal point.
uint32_t scalex = valsDropped[1].scale;
if (valx != 0 && scalex > 0)
{
valx /= pow(10.0, (double)scaley);
}
data->sumx -= valx;
data->sumx2 -= valx*valx;
data->sumxy -= valx*valy;
--data->cnt;
return mcsv1_UDAF::SUCCESS;
}

88
utils/regr/regr_r2.h Normal file
View File

@ -0,0 +1,88 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id$
*
* regr_r2.h
***********************************************************************/
/**
* Columnstore interface for for the regr_r2 function
*
*
* CREATE AGGREGATE FUNCTION regr_r2 returns REAL
* soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_intercept
#define HEADER_regr_intercept
#include <cstdlib>
#include <string>
#include <vector>
#ifdef _MSC_VER
#include <unordered_map>
#else
#include <tr1/unordered_map>
#endif
#include "mcsv1_udaf.h"
#include "calpontsystemcatalog.h"
#include "windowfunctioncolumn.h"
using namespace execplan;
#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
namespace mcsv1sdk
{
// Return the regr_r2 value of the dataset
class regr_r2 : public mcsv1_UDAF
{
public:
// Defaults OK
regr_r2() : mcsv1_UDAF() {};
virtual ~regr_r2() {};
virtual ReturnCode init(mcsv1Context* context,
ColumnDatum* colTypes);
virtual ReturnCode reset(mcsv1Context* context);
virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn);
virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn);
virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut);
virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped);
protected:
};
}; // namespace
#undef EXPORT
#endif // HEADER_regr_intercept.h

View File

@ -60,11 +60,8 @@ mcsv1_UDAF::ReturnCode regr_slope::init(mcsv1Context* context,
context->setUserDataSize(sizeof(regr_slope_data));
context->setResultType(CalpontSystemCatalog::DOUBLE);
context->setColWidth(8);
if (colTypes[0].scale)
{
context->setScale(colTypes[0].scale + 8);
context->setPrecision(19);
}
context->setScale(colTypes[0].scale + 8);
context->setPrecision(19);
context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS);
return mcsv1_UDAF::SUCCESS;
@ -77,6 +74,7 @@ mcsv1_UDAF::ReturnCode regr_slope::reset(mcsv1Context* context)
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumxy = 0.0;
return mcsv1_UDAF::SUCCESS;
}
@ -150,7 +148,8 @@ mcsv1_UDAF::ReturnCode regr_slope::evaluate(mcsv1Context* context, static_any::a
double variance = (N * sumx2) - (sumx * sumx);
if (variance != 0)
{
valOut = ((N * sumxy) - (sumx * sumy)) / variance;
double slope = ((N * sumxy) - (sumx * sumy)) / variance;
valOut = slope;
}
}
return mcsv1_UDAF::SUCCESS;

View File

@ -125,7 +125,7 @@ extern "C"
//=======================================================================
/**
* regr_avgx connector stub
* regr_avgx
*/
struct regr_avgx_data
{
@ -209,7 +209,7 @@ extern "C"
//=======================================================================
/**
* regr_avgy connector stub
* regr_avgy
*/
struct regr_avgy_data
{
@ -293,7 +293,7 @@ extern "C"
//=======================================================================
/**
* regr_count connector stub
* regr_count
*/
struct regr_count_data
{
@ -372,11 +372,11 @@ extern "C"
//=======================================================================
/**
* regr_slope connector stub
* regr_slope
*/
struct regr_slope_data
{
int64_t cnt;
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
@ -404,6 +404,7 @@ extern "C"
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumxy = 0.0;
initid->ptr = (char*)data;
return 0;
@ -429,6 +430,7 @@ extern "C"
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumxy = 0.0;
}
#ifdef _MSC_VER
@ -481,7 +483,7 @@ extern "C"
//=======================================================================
/**
* regr_intercept connector stub
* regr_intercept
*/
struct regr_intercept_data
{
@ -513,6 +515,7 @@ extern "C"
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumxy = 0.0;
initid->ptr = (char*)data;
return 0;
@ -538,6 +541,7 @@ extern "C"
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumxy = 0.0;
}
#ifdef _MSC_VER
@ -587,6 +591,135 @@ extern "C"
*is_null = 1;
return 0;
}
//=======================================================================
/**
* regr_r2
*/
struct regr_r2_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumy2; // sum of (y squared)
double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool regr_r2_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct regr_r2_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_r2() requires two arguments");
return 1;
}
if (!(data = (struct regr_r2_data*) malloc(sizeof(struct regr_r2_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumy2 = 0.0;
data->sumxy = 0.0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void regr_r2_deinit(UDF_INIT* initid)
{
free(initid->ptr);
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_r2_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
{
struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr;
data->cnt = 0;
data->sumx = 0.0;
data->sumx2 = 0.0;
data->sumy = 0.0;
data->sumy2 = 0.0;
data->sumxy = 0.0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void
regr_r2_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
{
// Test for NULL in x and y
if (args->args[0] == 0 || args->args[1] == 0)
{
return;
}
struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr;
double yval = cvtArgToDouble(args->arg_type[0], args->args[0]);
double xval = cvtArgToDouble(args->arg_type[1], args->args[1]);
data->sumy += yval;
data->sumx += xval;
data->sumx2 += xval*xval;
data->sumy2 += yval*yval;
data->sumxy += xval*yval;
++data->cnt;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
double regr_r2(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr;
double N = data->cnt;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumy2 = data->sumy2;
double sumxy = data->sumxy;
double var_popx = (sumx2 - (sumx * sumx / N)) / N;
if (var_popx == 0)
{
// When var_popx is 0, NULL is the result.
*is_null = 1;
return 0;
}
double var_popy = (sumy2 - (sumy * sumy / N)) / N;
if (var_popy == 0)
{
// When var_popy is 0, 1 is the result
return 1;
}
double std_popx = sqrt(var_popx);
double std_popy = sqrt(var_popy);
double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
double corr = covar_pop / (std_popy * std_popx);
return corr * corr;
}
*is_null = 1;
return 0;
}
}
// vim:ts=4 sw=4:

View File

@ -0,0 +1,99 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "distinct_count.h"
using namespace mcsv1sdk;
struct distinct_count_data
{
uint64_t cnt;
};
#define OUT_TYPE int64_t
mcsv1_UDAF::ReturnCode distinct_count::init(mcsv1Context* context,
ColumnDatum* colTypes)
{
context->setUserDataSize(sizeof(distinct_count_data));
if (context->getParameterCount() != 1)
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("avgx() with other than 1 arguments");
return mcsv1_UDAF::ERROR;
}
context->setResultType(CalpontSystemCatalog::BIGINT);
context->setColWidth(8);
context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS);
context->setRunFlag(mcsv1sdk::UDAF_DISTINCT);
context->setRunFlag(mcsv1sdk::UDAF_OVER_REQUIRED);
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode distinct_count::reset(mcsv1Context* context)
{
struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data;
data->cnt = 0;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode distinct_count::nextValue(mcsv1Context* context,
ColumnDatum* valsIn)
{
static_any::any& valIn = valsIn[0].columnData;
struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data;
if (valIn.empty())
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
data->cnt++;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode distinct_count::subEvaluate(mcsv1Context* context, const UserData* userDataIn)
{
struct distinct_count_data* outData = (struct distinct_count_data*)context->getUserData()->data;
struct distinct_count_data* inData = (struct distinct_count_data*)userDataIn->data;
outData->cnt += inData->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode distinct_count::evaluate(mcsv1Context* context, static_any::any& valOut)
{
struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data;
valOut = data->cnt;
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF::ReturnCode distinct_count::dropValue(mcsv1Context* context,
ColumnDatum* valsDropped)
{
static_any::any& valIn = valsDropped[0].columnData;
struct distinct_count_data* data = (struct distinct_count_data*)context->getUserData()->data;
if (valIn.empty())
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
data->cnt--;
return mcsv1_UDAF::SUCCESS;
}

View File

@ -0,0 +1,222 @@
/* Copyright (C) 2017 MariaDB Corporaton
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id$
*
* mcsv1_UDAF.h
***********************************************************************/
/**
* Columnstore interface for writing a User Defined Aggregate
* Functions (UDAF) and User Defined Analytic Functions (UDAnF)
* or a function that can act as either - UDA(n)F
*
* The basic steps are:
*
* 1. Create a the UDA(n)F function interface in some .h file.
* 2. Create the UDF function implementation in some .cpp file
* 3. Create the connector stub (MariaDB UDAF definition) for
* this UDF function.
* 4. build the dynamic library using all of the source.
* 5 Put the library in $COLUMNSTORE_INSTALL/lib of
* all modules
* 6. restart the Columnstore system.
* 7. notify mysqld about the new functions with commands like:
*
* CREATE AGGREGATE FUNCTION distinct_count returns INT
* soname 'libudf_mysql.so';
*
*/
#ifndef HEADER_distinct_count
#define HEADER_distinct_count
#include <cstdlib>
#include <string>
#include <boost/any.hpp>
#include "mcsv1_udaf.h"
#include "calpontsystemcatalog.h"
#include "windowfunctioncolumn.h"
using namespace execplan;
#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
namespace mcsv1sdk
{
// Override mcsv1_UDAF to build your User Defined Aggregate (UDAF) and/or
// User Defined Analytic Function (UDAnF).
// These will be singleton classes, so don't put any instance
// specific data in here. All instance data is stored in mcsv1Context
// passed to each user function and retrieved by the getUserData() method.
//
// Each API function returns a ReturnCode. If ERROR is returned at any time,
// the query is aborted, getInterrupted() will begin to return true and the
// message set in config->setErrorMessage() is returned to MariaDB.
class distinct_count : public mcsv1_UDAF
{
public:
// Defaults OK
distinct_count() : mcsv1_UDAF(){};
virtual ~distinct_count(){};
/**
* init()
*
* Mandatory. Implement this to initialize flags and instance
* data. Called once per SQL statement. You can do any sanity
* checks here.
*
* colTypes (in) - A vector of ColDataType defining the
* parameters of the UDA(n)F call. These can be used to decide
* to override the default return type. If desired, the new
* return type can be set by context->setReturnType() and
* decimal precision can be set in context->
* setResultDecimalCharacteristics.
*
* Return mcsv1_UDAF::ERROR on any error, such as non-compatible
* colTypes or wrong number of arguments. Else return
* mcsv1_UDAF::SUCCESS.
*/
virtual ReturnCode init(mcsv1Context* context, ColumnDatum* colTypes);
/**
* reset()
*
* Mandatory. Reset the UDA(n)F for a new group, partition or,
* in some cases, new Window Frame. Do not free any memory
* allocated by context->setUserDataSize(). The SDK Framework owns
* that memory and will handle that. Use this opportunity to
* reset any variables in context->getUserData() needed for the
* next aggregation. May be called multiple times if running in
* a ditributed fashion.
*
* Use this opportunity to initialize the userData.
*/
virtual ReturnCode reset(mcsv1Context* context);
/**
* nextValue()
*
* Mandatory. Handle a single row.
*
* colsIn - A vector of data structure describing the input
* data.
*
* This function is called once for every row in the filtered
* result set (before aggregation). It is very important that
* this function is efficient.
*
* If the UDAF is running in a distributed fashion, nextValue
* cannot depend on order, as it will only be called for each
* row found on the specific PM.
*
* valsIn (in) - a vector of the parameters from the row.
*/
virtual ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn);
/**
* subEvaluate()
*
* Mandatory -- Called if the UDAF is running in a distributed
* fashion. Columnstore tries to run all aggregate functions
* distributed, depending on context.
*
* Perform an aggregation on rows partially aggregated by
* nextValue. Columnstore calls nextValue for each row on a
* given PM for a group (GROUP BY). subEvaluate is called on the
* UM to consolodate those values into a single instance of
* userData. Keep your aggregated totals in context's userData.
* The first time this is called for a group, reset() would have
* been called with this version of userData.
*
* Called for every partial data set in each group in GROUP BY.
*
* When subEvaluate has been called for all subAggregated data
* sets, Evaluate will be called with the same context as here.
*
* valIn (In) - This is a pointer to a memory block of the size
* set in setUserDataSize. It will contain the value of userData
* as seen in the last call to NextValue for a given PM.
*
*/
virtual ReturnCode subEvaluate(mcsv1Context* context, const UserData* userDataIn);
/**
* evaluate()
*
* Mandatory. Get the aggregated value.
*
* Called for every new group if UDAF GROUP BY, UDAnF partition
* or, in some cases, new Window Frame.
*
* Set the aggregated value into valOut. The datatype is assumed
* to be the same as that set in the init() function;
*
* If the UDAF is running in a distributed fashion, evaluate is
* called after a series of subEvaluate calls.
*
* valOut (out) - Set the aggregated value here. The datatype is
* assumed to be the same as that set in the init() function;
*
* To return a NULL value, don't assign to valOut.
*/
virtual ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut);
/**
* dropValue()
*
* Optional -- If defined, the server will call this instead of
* reset for UDAnF.
*
* Don't implement if a UDAnF has one or more of the following:
* The UDAnF can't be used with a Window Frame
* The UDAnF is not reversable in some way
* The UDAnF is not interested in optimal performance
*
* If not implemented, reset() followed by a series of
* nextValue() will be called for each movement of the Window
* Frame.
*
* If implemented, then each movement of the Window Frame will
* result in dropValue() being called for each row falling out
* of the Frame and nextValue() being called for each new row
* coming into the Frame.
*
* valsDropped (in) - a vector of the parameters from the row
* leaving the Frame
*
* dropValue() will not be called for unbounded/current row type
* frames, as those are already optimized.
*/
virtual ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped);
protected:
};
}; // namespace
#undef EXPORT
#endif // HEADER_distinct_count.h