You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-869 UDAF with other aggregate gives wrong answer
This commit is contained in:
@ -1338,7 +1338,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
|||||||
|
|
||||||
if (functionVec[i]->fAggFunction == ROWAGG_UDAF)
|
if (functionVec[i]->fAggFunction == ROWAGG_UDAF)
|
||||||
{
|
{
|
||||||
// UDAF user data
|
// Column for index of UDAF UserData struct
|
||||||
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec[i].get());
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec[i].get());
|
||||||
if (!udafFuncCol)
|
if (!udafFuncCol)
|
||||||
{
|
{
|
||||||
@ -1350,8 +1350,8 @@ void TupleAggregateStep::prep1PhaseAggregate(
|
|||||||
scaleAgg.push_back(0);
|
scaleAgg.push_back(0);
|
||||||
precisionAgg.push_back(0);
|
precisionAgg.push_back(0);
|
||||||
precisionAgg.push_back(0);
|
precisionAgg.push_back(0);
|
||||||
typeAgg.push_back(CalpontSystemCatalog::VARBINARY);
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
widthAgg.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
|
widthAgg.push_back(bigUintWidth);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1775,8 +1775,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
|||||||
keysAgg.push_back(aggKey);
|
keysAgg.push_back(aggKey);
|
||||||
scaleAgg.push_back(0);
|
scaleAgg.push_back(0);
|
||||||
precisionAgg.push_back(0);
|
precisionAgg.push_back(0);
|
||||||
typeAgg.push_back(CalpontSystemCatalog::VARBINARY);
|
typeAgg.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
widthAgg.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes
|
widthAgg.push_back(sizeof(uint64_t));
|
||||||
funct->fAuxColumnIndex = colAgg++;
|
funct->fAuxColumnIndex = colAgg++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2207,7 +2207,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
|||||||
|
|
||||||
if (functionVec2[i]->fAggFunction == ROWAGG_UDAF)
|
if (functionVec2[i]->fAggFunction == ROWAGG_UDAF)
|
||||||
{
|
{
|
||||||
// Dummy Column for UDAF user data
|
// Column for index of UDAF UserData struct
|
||||||
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec2[i].get());
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVec2[i].get());
|
||||||
if (!udafFuncCol)
|
if (!udafFuncCol)
|
||||||
{
|
{
|
||||||
@ -2218,8 +2218,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
|
|||||||
keysAggDist.push_back(keysAggDist[j]); // Dummy?
|
keysAggDist.push_back(keysAggDist[j]); // Dummy?
|
||||||
scaleAggDist.push_back(0);
|
scaleAggDist.push_back(0);
|
||||||
precisionAggDist.push_back(0);
|
precisionAggDist.push_back(0);
|
||||||
typeAggDist.push_back(CalpontSystemCatalog::VARBINARY);
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
widthAggDist.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
|
widthAggDist.push_back(sizeof(uint64_t));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2844,13 +2844,13 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
|||||||
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
|
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
|
||||||
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
||||||
colAggPm++;
|
colAggPm++;
|
||||||
// Dummy Column for UDAF UserData struct
|
// Column for index of UDAF UserData struct
|
||||||
oidsAggPm.push_back(oidsProj[colProj]);
|
oidsAggPm.push_back(oidsProj[colProj]);
|
||||||
keysAggPm.push_back(aggKey);
|
keysAggPm.push_back(aggKey);
|
||||||
scaleAggPm.push_back(0);
|
scaleAggPm.push_back(0);
|
||||||
precisionAggPm.push_back(0);
|
precisionAggPm.push_back(0);
|
||||||
typeAggPm.push_back(CalpontSystemCatalog::VARBINARY);
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
widthAggPm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes
|
widthAggPm.push_back(bigUintWidth);
|
||||||
funct->fAuxColumnIndex = colAggPm++;
|
funct->fAuxColumnIndex = colAggPm++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -3111,7 +3111,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
|||||||
|
|
||||||
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
|
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
|
||||||
{
|
{
|
||||||
// Dummy column for UDAF user data
|
// Column for index of UDAF UserData struct
|
||||||
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
|
||||||
if (!udafFuncCol)
|
if (!udafFuncCol)
|
||||||
{
|
{
|
||||||
@ -3122,8 +3122,8 @@ void TupleAggregateStep::prep2PhasesAggregate(
|
|||||||
keysAggUm.push_back(keysAggUm[j]); // Dummy?
|
keysAggUm.push_back(keysAggUm[j]); // Dummy?
|
||||||
scaleAggUm.push_back(0);
|
scaleAggUm.push_back(0);
|
||||||
precisionAggUm.push_back(0);
|
precisionAggUm.push_back(0);
|
||||||
typeAggUm.push_back(CalpontSystemCatalog::VARBINARY);
|
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
widthAggUm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
|
widthAggUm.push_back(bigUintWidth);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3560,13 +3560,13 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
|||||||
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
|
typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType());
|
||||||
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth());
|
||||||
colAggPm++;
|
colAggPm++;
|
||||||
// Dummy column for UDAF UserData struct
|
// Column for index of UDAF UserData struct
|
||||||
oidsAggPm.push_back(oidsProj[colProj]);
|
oidsAggPm.push_back(oidsProj[colProj]);
|
||||||
keysAggPm.push_back(aggKey);
|
keysAggPm.push_back(aggKey);
|
||||||
scaleAggPm.push_back(0);
|
scaleAggPm.push_back(0);
|
||||||
precisionAggPm.push_back(0);
|
precisionAggPm.push_back(0);
|
||||||
typeAggPm.push_back(CalpontSystemCatalog::VARBINARY);
|
typeAggPm.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
widthAggPm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes
|
widthAggPm.push_back(sizeof(uint64_t));
|
||||||
funct->fAuxColumnIndex = colAggPm++;
|
funct->fAuxColumnIndex = colAggPm++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -3992,7 +3992,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
|||||||
|
|
||||||
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
|
if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF)
|
||||||
{
|
{
|
||||||
// Dummy column for UDAF user data
|
// Column for index of UDAF UserData struct
|
||||||
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
|
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(functionVecUm[i].get());
|
||||||
if (!udafFuncCol)
|
if (!udafFuncCol)
|
||||||
{
|
{
|
||||||
@ -4003,8 +4003,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
|
|||||||
keysAggDist.push_back(keysAggUm[j]); // Dummy?
|
keysAggDist.push_back(keysAggUm[j]); // Dummy?
|
||||||
scaleAggDist.push_back(0);
|
scaleAggDist.push_back(0);
|
||||||
precisionAggDist.push_back(0);
|
precisionAggDist.push_back(0);
|
||||||
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
widthAggDist.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2);
|
widthAggDist.push_back(sizeof(uint64_t));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
|
if (functionVecUm[i]->fAggFunction != ROWAGG_STATS)
|
||||||
|
@ -2863,7 +2863,7 @@ void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, u
|
|||||||
int64_t rowCnt = 0;
|
int64_t rowCnt = 0;
|
||||||
// For a NULL constant, call nextValue with NULL and then evaluate.
|
// For a NULL constant, call nextValue with NULL and then evaluate.
|
||||||
bool bInterrupted = false;
|
bool bInterrupted = false;
|
||||||
mcsv1sdk::mcsv1Context context;
|
mcsv1sdk::mcsv1Context context(((RowUDAFFunctionCol*)fFunctionCols[i].get())->fUDAFContext);
|
||||||
context.setRowCnt(rowCnt);
|
context.setRowCnt(rowCnt);
|
||||||
context.setInterrupted(bInterrupted);
|
context.setInterrupted(bInterrupted);
|
||||||
context.createUserData();
|
context.createUserData();
|
||||||
@ -3193,10 +3193,9 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData
|
|||||||
{
|
{
|
||||||
int64_t rowCnt = 0;
|
int64_t rowCnt = 0;
|
||||||
bool bInterrupted = false;
|
bool bInterrupted = false;
|
||||||
mcsv1sdk::mcsv1Context context;
|
mcsv1sdk::mcsv1Context context(((RowUDAFFunctionCol*)fFunctionCols[i].get())->fUDAFContext);
|
||||||
context.setRowCnt(rowCnt);
|
context.setRowCnt(rowCnt);
|
||||||
context.setInterrupted(bInterrupted);
|
context.setInterrupted(bInterrupted);
|
||||||
// Try the complex data initiation. If not implemented, use the simple,
|
|
||||||
context.createUserData();
|
context.createUserData();
|
||||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||||
std::vector<mcsv1sdk::ColumnDatum> valsIn;
|
std::vector<mcsv1sdk::ColumnDatum> valsIn;
|
||||||
|
@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
|
|||||||
|
|
||||||
########### next target ###############
|
########### next target ###############
|
||||||
|
|
||||||
set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp median.cpp)
|
set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp median.cpp avg_mode.cpp)
|
||||||
|
|
||||||
add_definitions(-DMYSQL_DYNAMIC_PLUGIN)
|
add_definitions(-DMYSQL_DYNAMIC_PLUGIN)
|
||||||
|
|
||||||
|
@ -35,6 +35,7 @@ UDAF_MAP UDAFMap::fm;
|
|||||||
#include "allnull.h"
|
#include "allnull.h"
|
||||||
#include "ssq.h"
|
#include "ssq.h"
|
||||||
#include "median.h"
|
#include "median.h"
|
||||||
|
#include "avg_mode.h"
|
||||||
UDAF_MAP& UDAFMap::getMap()
|
UDAF_MAP& UDAFMap::getMap()
|
||||||
{
|
{
|
||||||
if (fm.size() > 0)
|
if (fm.size() > 0)
|
||||||
@ -49,6 +50,7 @@ UDAF_MAP& UDAFMap::getMap()
|
|||||||
fm["allnull"] = new allnull();
|
fm["allnull"] = new allnull();
|
||||||
fm["ssq"] = new ssq();
|
fm["ssq"] = new ssq();
|
||||||
fm["median"] = new median();
|
fm["median"] = new median();
|
||||||
|
fm["avg_mode"] = new avg_mode();
|
||||||
|
|
||||||
return fm;
|
return fm;
|
||||||
}
|
}
|
||||||
|
@ -677,6 +677,7 @@ inline mcsv1Context& mcsv1Context::copy(const mcsv1Context& rhs)
|
|||||||
rhs.getEndFrame(fEndFrame, fEndConstant);
|
rhs.getEndFrame(fEndFrame, fEndConstant);
|
||||||
functionName = rhs.getName();
|
functionName = rhs.getName();
|
||||||
bInterrupted = rhs.bInterrupted; // Multiple threads will use the same reference
|
bInterrupted = rhs.bInterrupted; // Multiple threads will use the same reference
|
||||||
|
func = rhs.func;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -401,6 +401,75 @@ long long median(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* avg_mode connector stub
|
||||||
|
*/
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
__declspec(dllexport)
|
||||||
|
#endif
|
||||||
|
my_bool avg_mode_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
|
||||||
|
{
|
||||||
|
if (args->arg_count != 1)
|
||||||
|
{
|
||||||
|
strcpy(message,"avg_mode() requires one argument");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
if (!(data = (struct ssq_data*) malloc(sizeof(struct ssq_data))))
|
||||||
|
{
|
||||||
|
strmov(message,"Couldn't allocate memory");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
data->sumsq = 0;
|
||||||
|
|
||||||
|
initid->ptr = (char*)data;
|
||||||
|
*/
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
__declspec(dllexport)
|
||||||
|
#endif
|
||||||
|
void avg_mode_deinit(UDF_INIT* initid)
|
||||||
|
{
|
||||||
|
// free(initid->ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
__declspec(dllexport)
|
||||||
|
#endif
|
||||||
|
void
|
||||||
|
avg_mode_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
|
||||||
|
char* message __attribute__((unused)))
|
||||||
|
{
|
||||||
|
// struct ssq_data* data = (struct ssq_data*)initid->ptr;
|
||||||
|
// data->sumsq = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
__declspec(dllexport)
|
||||||
|
#endif
|
||||||
|
void
|
||||||
|
avg_mode_add(UDF_INIT* initid, UDF_ARGS* args,
|
||||||
|
char* is_null,
|
||||||
|
char* message __attribute__((unused)))
|
||||||
|
{
|
||||||
|
// struct ssq_data* data = (struct ssq_data*)initid->ptr;
|
||||||
|
// double val = cvtArgToDouble(args->arg_type[0], args->args[0]);
|
||||||
|
// data->sumsq = val*val;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
__declspec(dllexport)
|
||||||
|
#endif
|
||||||
|
long long avg_mode(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
|
||||||
|
char* is_null, char* error __attribute__((unused)))
|
||||||
|
{
|
||||||
|
// struct ssq_data* data = (struct ssq_data*)initid->ptr;
|
||||||
|
// return data->sumsq;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// vim:ts=4 sw=4:
|
// vim:ts=4 sw=4:
|
||||||
|
|
||||||
|
@ -203,16 +203,18 @@
|
|||||||
Name="Source Files"
|
Name="Source Files"
|
||||||
Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d">
|
Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d">
|
||||||
<F N="allnull.cpp"/>
|
<F N="allnull.cpp"/>
|
||||||
|
<F N="avg_mode.cpp"/>
|
||||||
<F N="mcsv1_udaf.cpp"/>
|
<F N="mcsv1_udaf.cpp"/>
|
||||||
<F N="median.cpp"/>
|
<F N="median.cpp"/>
|
||||||
<F N="ssq.cpp"/>
|
<F N="ssq.cpp"/>
|
||||||
<F N="udfinfinidb.cpp"/>
|
|
||||||
<F N="udfmysql.cpp"/>
|
<F N="udfmysql.cpp"/>
|
||||||
|
<F N="udfsdk.cpp"/>
|
||||||
</Folder>
|
</Folder>
|
||||||
<Folder
|
<Folder
|
||||||
Name="Header Files"
|
Name="Header Files"
|
||||||
Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if">
|
Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if">
|
||||||
<F N="allnull.h"/>
|
<F N="allnull.h"/>
|
||||||
|
<F N="avg_mode.h"/>
|
||||||
<F N="mcsv1_udaf.h"/>
|
<F N="mcsv1_udaf.h"/>
|
||||||
<F N="median.h"/>
|
<F N="median.h"/>
|
||||||
<F N="ssq.h"/>
|
<F N="ssq.h"/>
|
||||||
|
Reference in New Issue
Block a user