diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 397b73459..b7e69db56 100755 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -1338,7 +1338,7 @@ void TupleAggregateStep::prep1PhaseAggregate( if (functionVec[i]->fAggFunction == ROWAGG_UDAF) { - // UDAF user data + // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVec[i].get()); if (!udafFuncCol) { @@ -1350,8 +1350,8 @@ void TupleAggregateStep::prep1PhaseAggregate( scaleAgg.push_back(0); precisionAgg.push_back(0); precisionAgg.push_back(0); - typeAgg.push_back(CalpontSystemCatalog::VARBINARY); - widthAgg.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); + typeAgg.push_back(CalpontSystemCatalog::UBIGINT); + widthAgg.push_back(bigUintWidth); continue; } @@ -1775,8 +1775,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( keysAgg.push_back(aggKey); scaleAgg.push_back(0); precisionAgg.push_back(0); - typeAgg.push_back(CalpontSystemCatalog::VARBINARY); - widthAgg.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes + typeAgg.push_back(CalpontSystemCatalog::UBIGINT); + widthAgg.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAgg++; break; } @@ -2207,7 +2207,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (functionVec2[i]->fAggFunction == ROWAGG_UDAF) { - // Dummy Column for UDAF user data + // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVec2[i].get()); if (!udafFuncCol) { @@ -2218,8 +2218,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( keysAggDist.push_back(keysAggDist[j]); // Dummy? scaleAggDist.push_back(0); precisionAggDist.push_back(0); - typeAggDist.push_back(CalpontSystemCatalog::VARBINARY); - widthAggDist.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); + typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); + widthAggDist.push_back(sizeof(uint64_t)); continue; } @@ -2844,13 +2844,13 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType()); widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth()); colAggPm++; - // Dummy Column for UDAF UserData struct + // Column for index of UDAF UserData struct oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(0); - typeAggPm.push_back(CalpontSystemCatalog::VARBINARY); - widthAggPm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes + typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); + widthAggPm.push_back(bigUintWidth); funct->fAuxColumnIndex = colAggPm++; break; } @@ -3111,7 +3111,7 @@ void TupleAggregateStep::prep2PhasesAggregate( if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF) { - // Dummy column for UDAF user data + // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVecUm[i].get()); if (!udafFuncCol) { @@ -3122,8 +3122,8 @@ void TupleAggregateStep::prep2PhasesAggregate( keysAggUm.push_back(keysAggUm[j]); // Dummy? scaleAggUm.push_back(0); precisionAggUm.push_back(0); - typeAggUm.push_back(CalpontSystemCatalog::VARBINARY); - widthAggUm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); + typeAggUm.push_back(CalpontSystemCatalog::UBIGINT); + widthAggUm.push_back(bigUintWidth); continue; } @@ -3560,13 +3560,13 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(udafFuncCol->fUDAFContext.getResultType()); widthAggPm.push_back(udafFuncCol->fUDAFContext.getColWidth()); colAggPm++; - // Dummy column for UDAF UserData struct + // Column for index of UDAF UserData struct oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); scaleAggPm.push_back(0); precisionAggPm.push_back(0); - typeAggPm.push_back(CalpontSystemCatalog::VARBINARY); - widthAggPm.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); // Binary column needs +2 for length bytes + typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); + widthAggPm.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAggPm++; break; } @@ -3992,7 +3992,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (functionVecUm[i]->fAggFunction == ROWAGG_UDAF) { - // Dummy column for UDAF user data + // Column for index of UDAF UserData struct RowUDAFFunctionCol* udafFuncCol = dynamic_cast(functionVecUm[i].get()); if (!udafFuncCol) { @@ -4003,8 +4003,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( keysAggDist.push_back(keysAggUm[j]); // Dummy? scaleAggDist.push_back(0); precisionAggDist.push_back(0); - typeAggDist.push_back(CalpontSystemCatalog::BIGINT); - widthAggDist.push_back(udafFuncCol->fUDAFContext.getUserDataSize()+2); + typeAggDist.push_back(CalpontSystemCatalog::UBIGINT); + widthAggDist.push_back(sizeof(uint64_t)); continue; } if (functionVecUm[i]->fAggFunction != ROWAGG_STATS) diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index f31e2e519..ef6d7a0c0 100755 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -2863,7 +2863,7 @@ void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, u int64_t rowCnt = 0; // For a NULL constant, call nextValue with NULL and then evaluate. bool bInterrupted = false; - mcsv1sdk::mcsv1Context context; + mcsv1sdk::mcsv1Context context(((RowUDAFFunctionCol*)fFunctionCols[i].get())->fUDAFContext); context.setRowCnt(rowCnt); context.setInterrupted(bInterrupted); context.createUserData(); @@ -3193,10 +3193,9 @@ void RowAggregationUM::doNotNullConstantAggregate(const ConstantAggData& aggData { int64_t rowCnt = 0; bool bInterrupted = false; - mcsv1sdk::mcsv1Context context; + mcsv1sdk::mcsv1Context context(((RowUDAFFunctionCol*)fFunctionCols[i].get())->fUDAFContext); context.setRowCnt(rowCnt); context.setInterrupted(bInterrupted); - // Try the complex data initiation. If not implemented, use the simple, context.createUserData(); mcsv1sdk::mcsv1_UDAF::ReturnCode rc; std::vector valsIn; diff --git a/utils/udfsdk/CMakeLists.txt b/utils/udfsdk/CMakeLists.txt index 07693e223..e69ff4d88 100755 --- a/utils/udfsdk/CMakeLists.txt +++ b/utils/udfsdk/CMakeLists.txt @@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ########### next target ############### -set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp median.cpp) +set(udfsdk_LIB_SRCS udfsdk.cpp mcsv1_udaf.cpp allnull.cpp ssq.cpp median.cpp avg_mode.cpp) add_definitions(-DMYSQL_DYNAMIC_PLUGIN) diff --git a/utils/udfsdk/mcsv1_udaf.cpp b/utils/udfsdk/mcsv1_udaf.cpp index ac0a1ee2e..de01c6033 100755 --- a/utils/udfsdk/mcsv1_udaf.cpp +++ b/utils/udfsdk/mcsv1_udaf.cpp @@ -35,6 +35,7 @@ UDAF_MAP UDAFMap::fm; #include "allnull.h" #include "ssq.h" #include "median.h" +#include "avg_mode.h" UDAF_MAP& UDAFMap::getMap() { if (fm.size() > 0) @@ -49,6 +50,7 @@ UDAF_MAP& UDAFMap::getMap() fm["allnull"] = new allnull(); fm["ssq"] = new ssq(); fm["median"] = new median(); + fm["avg_mode"] = new avg_mode(); return fm; } diff --git a/utils/udfsdk/mcsv1_udaf.h b/utils/udfsdk/mcsv1_udaf.h index 382e01d4a..33e15188d 100755 --- a/utils/udfsdk/mcsv1_udaf.h +++ b/utils/udfsdk/mcsv1_udaf.h @@ -677,6 +677,7 @@ inline mcsv1Context& mcsv1Context::copy(const mcsv1Context& rhs) rhs.getEndFrame(fEndFrame, fEndConstant); functionName = rhs.getName(); bInterrupted = rhs.bInterrupted; // Multiple threads will use the same reference + func = rhs.func; return *this; } diff --git a/utils/udfsdk/udfmysql.cpp b/utils/udfsdk/udfmysql.cpp index 0f1971897..f7d12e750 100755 --- a/utils/udfsdk/udfmysql.cpp +++ b/utils/udfsdk/udfmysql.cpp @@ -401,6 +401,75 @@ long long median(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), return 0; } +/** + * avg_mode connector stub + */ +#ifdef _MSC_VER +__declspec(dllexport) +#endif +my_bool avg_mode_init(UDF_INIT* initid, UDF_ARGS* args, char* message) +{ + if (args->arg_count != 1) + { + strcpy(message,"avg_mode() requires one argument"); + return 1; + } + +/* + if (!(data = (struct ssq_data*) malloc(sizeof(struct ssq_data)))) + { + strmov(message,"Couldn't allocate memory"); + return 1; + } + data->sumsq = 0; + + initid->ptr = (char*)data; +*/ + return 0; +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +void avg_mode_deinit(UDF_INIT* initid) +{ +// free(initid->ptr); +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +void +avg_mode_clear(UDF_INIT* initid, char* is_null __attribute__((unused)), + char* message __attribute__((unused))) +{ +// struct ssq_data* data = (struct ssq_data*)initid->ptr; +// data->sumsq = 0; +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +void +avg_mode_add(UDF_INIT* initid, UDF_ARGS* args, + char* is_null, + char* message __attribute__((unused))) +{ +// struct ssq_data* data = (struct ssq_data*)initid->ptr; +// double val = cvtArgToDouble(args->arg_type[0], args->args[0]); +// data->sumsq = val*val; +} + +#ifdef _MSC_VER +__declspec(dllexport) +#endif +long long avg_mode(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)), + char* is_null, char* error __attribute__((unused))) +{ +// struct ssq_data* data = (struct ssq_data*)initid->ptr; +// return data->sumsq; + return 0; +} } // vim:ts=4 sw=4: diff --git a/utils/udfsdk/udfsdk.vpj b/utils/udfsdk/udfsdk.vpj index 4307720fc..919c73d17 100755 --- a/utils/udfsdk/udfsdk.vpj +++ b/utils/udfsdk/udfsdk.vpj @@ -203,16 +203,18 @@ Name="Source Files" Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d"> + - + +