diff --git a/dbcon/execplan/constantcolumn.h b/dbcon/execplan/constantcolumn.h index 04098faae..be0731044 100644 --- a/dbcon/execplan/constantcolumn.h +++ b/dbcon/execplan/constantcolumn.h @@ -38,6 +38,8 @@ class ByteStream; */ namespace execplan { +class ConstantColumn; + /** * @brief A class to represent a constant return column * diff --git a/dbcon/joblist/jlf_common.cpp b/dbcon/joblist/jlf_common.cpp index f5dbeee17..4b1980d49 100644 --- a/dbcon/joblist/jlf_common.cpp +++ b/dbcon/joblist/jlf_common.cpp @@ -405,7 +405,7 @@ uint32_t getTupleKey(JobInfo& jobInfo, const SRCP& srcp, bool add) if (add) { - // setTupleInfo first if add is ture, ok if already set. + // setTupleInfo first if add is true, ok if already set. const SimpleColumn* sc = dynamic_cast(srcp.get()); if (sc != NULL) diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 4cf7bccc5..033bf2643 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -300,6 +300,7 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo) { const ArithmeticColumn* ac = NULL; const FunctionColumn* fc = NULL; + const ConstantColumn* cc = NULL; uint64_t eid = -1; CalpontSystemCatalog::ColType ct; ExpressionStep* es = new ExpressionStep(jobInfo); @@ -316,6 +317,11 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo) eid = fc->expressionId(); ct = fc->resultType(); } + else if ((cc = dynamic_cast(retCols[i].get())) != NULL) + { + eid = cc->expressionId(); + ct = cc->resultType(); + } else { std::ostringstream errmsg; @@ -1004,7 +1010,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo for (uint32_t parm = 0; parm < aggParms.size(); ++parm) { - if (aggc->constCol().get() != NULL) + // Only do the optimization of converting to count(*) if + // there is only one parameter. + if (aggParms.size() == 1 && aggc->constCol().get() != NULL) { // replace the aggregate on constant with a count(*) SRCP clone; diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 8f7755ad9..491f86a8f 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -1097,7 +1097,8 @@ void TupleAggregateStep::prep1PhaseAggregate( uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigUintWidth = sizeof(uint64_t); // For UDAF - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // for count column of average function @@ -1286,11 +1287,11 @@ void TupleAggregateStep::prep1PhaseAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -1477,6 +1478,14 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(udafFuncCol->fUDAFContext.getPrecision()); typeAgg.push_back(udafFuncCol->fUDAFContext.getResultType()); widthAgg.push_back(udafFuncCol->fUDAFContext.getColWidth()); + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -1488,6 +1497,13 @@ void TupleAggregateStep::prep1PhaseAggregate( precisionAgg.push_back(precisionProj[colProj]); typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(width[colProj]); + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -1676,7 +1692,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( // fOR udaf UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; // for count column of average function map avgFuncMap, avgDistFuncMap; @@ -1840,12 +1857,12 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { @@ -2071,6 +2088,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(CalpontSystemCatalog::UBIGINT); widthAgg.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAgg++; + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -2083,6 +2108,13 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(typeProj[colProj]); widthAgg.push_back(widthProj[colProj]); ++colAgg; + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -2133,7 +2165,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc), i)); } - projColsUDAFIndex = 0; + projColsUDAFIdx = 0; // locate the return column position in aggregated rowgroup for (uint64_t i = 0; i < returnedColVec.size(); i++) { @@ -2146,11 +2178,11 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -2893,7 +2925,8 @@ void TupleAggregateStep::prep2PhasesAggregate( set avgSet; vector >& returnedColVec = jobInfo.returnedColVec; // For UDAF - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; @@ -3073,11 +3106,11 @@ void TupleAggregateStep::prep2PhasesAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -3305,6 +3338,14 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); widthAggPm.push_back(bigUintWidth); funct->fAuxColumnIndex = colAggPm++; + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -3317,6 +3358,13 @@ void TupleAggregateStep::prep2PhasesAggregate( typeAggPm.push_back(typeProj[colProj]); widthAggPm.push_back(width[colProj]); colAggPm++; + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -3342,7 +3390,7 @@ void TupleAggregateStep::prep2PhasesAggregate( map avgFuncMap; AGG_MAP aggDupFuncMap; - projColsUDAFIndex = 0; + projColsUDAFIdx = 0; // copy over the groupby vector // update the outputColumnIndex if returned for (uint64_t i = 0; i < groupByPm.size(); i++) @@ -3372,12 +3420,12 @@ void TupleAggregateStep::prep2PhasesAggregate( udafc = NULL; if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -3703,7 +3751,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( set avgSet, avgDistSet; vector >& returnedColVec = jobInfo.returnedColVec; // For UDAF - uint32_t projColsUDAFIndex = 0; + uint32_t projColsUDAFIdx = 0; + uint32_t udafcParamIdx = 0; UDAFColumn* udafc = NULL; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; @@ -3919,11 +3968,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); @@ -4147,6 +4196,14 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(CalpontSystemCatalog::UBIGINT); widthAggPm.push_back(sizeof(uint64_t)); funct->fAuxColumnIndex = colAggPm++; + // If the first param is const + udafcParamIdx = 0; + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; break; } @@ -4160,6 +4217,13 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( widthAggPm.push_back(width[colProj]); multiParmIndexes.push_back(colAggPm); colAggPm++; + // If the param is const + ConstantColumn* cc = dynamic_cast(udafc->aggParms()[udafcParamIdx].get()); + if (cc) + { + funct->fpConstCol = udafc->aggParms()[udafcParamIdx]; + } + ++udafcParamIdx; } break; @@ -4251,7 +4315,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( // These will be skipped and the count needs to be subtracted // from where the aux column will be. int64_t multiParms = 0; - projColsUDAFIndex = 0; + projColsUDAFIdx = 0; // check if the count column for AVG is also a returned column, // if so, replace the "-1" to actual position in returned vec. map avgFuncMap, avgDistFuncMap; @@ -4286,11 +4350,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( if (aggOp == ROWAGG_UDAF) { - std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIdx; for (; it != jobInfo.projectionCols.end(); it++) { udafc = dynamic_cast((*it).get()); - projColsUDAFIndex++; + projColsUDAFIdx++; if (udafc) { pUDAFFunc = udafc->getContext().getFunction(); diff --git a/dbcon/joblist/windowfunctionstep.cpp b/dbcon/joblist/windowfunctionstep.cpp index 4d24f0b4b..2a93f680b 100644 --- a/dbcon/joblist/windowfunctionstep.cpp +++ b/dbcon/joblist/windowfunctionstep.cpp @@ -569,6 +569,7 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo) for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++) { + bool isUDAF = false; // window function type WindowFunctionColumn* wc = dynamic_cast(i->get()); uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index @@ -590,6 +591,7 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo) // if (boost::iequals(wc->functionName(),"UDAF_FUNC") if (wc->functionName() == "UDAF_FUNC") { + isUDAF = true; ++wfsUserFunctionCount; } @@ -646,10 +648,13 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo) // column type for functor templates int ct = 0; + if (isUDAF) + { + ct = wc->getUDAFContext().getResultType(); + } // make sure index is in range - if (fields.size() > 1 && fields[1] >= 0 && static_cast(fields[1]) < types.size()) + else if (fields.size() > 1 && fields[1] >= 0 && static_cast(fields[1]) < types.size()) ct = types[fields[1]]; - // workaround for functions using "within group (order by)" syntax string fn = boost::to_upper_copy(wc->functionName()); diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 701e1c14f..b02712409 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -4206,8 +4206,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) // treat as count(*) if (ac->aggOp() == AggregateColumn::COUNT) ac->aggOp(AggregateColumn::COUNT_ASTERISK); - - ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError))); + parm.reset(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)); + ac->constCol(parm); break; } @@ -4485,17 +4485,20 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) // @bug5977 @note Temporary fix to avoid mysqld crash. The permanent fix will // be applied in ExeMgr. When the ExeMgr fix is available, this checking // will be taken out. - if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty()) + if (isp->sum_func() != Item_sum::UDF_SUM_FUNC) { - gwi.fatalParseError = true; - gwi.parseErrorText = "No project column found for aggregate function"; - if (ac) - delete ac; - return NULL; - } - else if (ac->constCol()) - { - gwi.count_asterisk_list.push_back(ac); + if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty()) + { + gwi.fatalParseError = true; + gwi.parseErrorText = "No project column found for aggregate function"; + if (ac) + delete ac; + return NULL; + } + else if (ac->constCol()) + { + gwi.count_asterisk_list.push_back(ac); + } } // For UDAF, populate the context and call the UDAF init() function. @@ -7903,8 +7906,15 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi); return ER_CHECK_NOT_IMPLEMENTED; } - - (*coliter)->aggParms().push_back(minSc); + // Replace the last (presumably constant) object with minSc + if ((*coliter)->aggParms().empty()) + { + (*coliter)->aggParms().push_back(minSc); + } + else + { + (*coliter)->aggParms()[0] = minSc; + } } std::vector::iterator funciter; diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index bc56a7430..019761d39 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -1677,15 +1677,11 @@ void BatchPrimitiveProcessor::execute() } catch (logging::QueryDataExcept& qex) { - ostringstream os; - os << qex.what() << endl; - writeErrorMsg(os.str(), qex.errorCode()); + writeErrorMsg(qex.what(), qex.errorCode()); } catch (logging::DictionaryBufferOverflow& db) { - ostringstream os; - os << db.what() << endl; - writeErrorMsg(os.str(), db.errorCode()); + writeErrorMsg(db.what(), db.errorCode()); } catch (scalar_exception& se) { @@ -1758,15 +1754,11 @@ void BatchPrimitiveProcessor::execute() } catch (IDBExcept& iex) { - ostringstream os; - os << iex.what() << endl; - writeErrorMsg(os.str(), iex.errorCode(), true, false); + writeErrorMsg(iex.what(), iex.errorCode(), true, false); } catch (const std::exception& ex) { - ostringstream os; - os << ex.what() << endl; - writeErrorMsg(os.str(), logging::batchPrimitiveProcessorErr); + writeErrorMsg(ex.what(), logging::batchPrimitiveProcessorErr); } catch (...) { diff --git a/utils/common/any.hpp b/utils/common/any.hpp index 5408c5c87..63d05d3d2 100755 --- a/utils/common/any.hpp +++ b/utils/common/any.hpp @@ -11,15 +11,12 @@ #include #include +#include namespace static_any { namespace anyimpl { - struct bad_any_cast - { - }; - struct empty_any { }; @@ -266,7 +263,7 @@ public: T& cast() { if (policy != anyimpl::get_policy()) - throw anyimpl::bad_any_cast(); + throw std::runtime_error("static_any: type mismatch in cast"); T* r = reinterpret_cast(policy->get_value(&object)); return *r; } diff --git a/utils/loggingcpp/errorcodes.cpp b/utils/loggingcpp/errorcodes.cpp index 60919c906..4b4196800 100644 --- a/utils/loggingcpp/errorcodes.cpp +++ b/utils/loggingcpp/errorcodes.cpp @@ -29,7 +29,7 @@ using namespace std; namespace logging { -ErrorCodes::ErrorCodes(): fErrorCodes(), fPreamble("An unexpected condition within the query caused an internal processing error within InfiniDB. Please check the log files for more details. Additional Information: ") +ErrorCodes::ErrorCodes(): fErrorCodes(), fPreamble("An unexpected condition within the query caused an internal processing error within Columnstore. Please check the log files for more details. Additional Information: ") { fErrorCodes[batchPrimitiveStepErr] = "error in BatchPrimitiveStep."; fErrorCodes[tupleBPSErr] = "error in TupleBPS."; diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index d1a3f4988..f8453843e 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -35,6 +35,7 @@ #include "exceptclasses.h" #include "serializeable.h" +#include "any.hpp" class ByteStreamTestSuite; diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index 043dcaac2..6339554f1 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -1723,17 +1723,7 @@ void RowAggregation::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(pFunctionCol.get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colOut + 1, rowUDAF, i); - } - else - { - throw logic_error("(3)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colOut + 1, i); break; } @@ -2012,31 +2002,60 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + valIn * valIn, colAux + 1); } -void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, - RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx) +void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, + int64_t colAux, uint64_t& funcColsIdx) { uint32_t paramCount = fRGContext.getParameterCount(); // The vector of parameters to be sent to the UDAF mcsv1sdk::ColumnDatum valsIn[paramCount]; uint32_t dataFlags[paramCount]; - + ConstantColumn* cc; + bool bIsNull = false; execplan::CalpontSystemCatalog::ColDataType colDataType; for (uint32_t i = 0; i < paramCount; ++i) { + // If UDAF_IGNORE_NULLS is on, bIsNull gets set the first time + // we find a null. We still need to eat the rest of the parameters + // to sync updateEntry + if (bIsNull) + { + ++funcColsIdx; + continue; + } + SP_ROWAGG_FUNC_t pFunctionCol = fFunctionCols[funcColsIdx]; mcsv1sdk::ColumnDatum& datum = valsIn[i]; // Turn on NULL flags dataFlags[i] = 0; - if (isNull(&fRowGroupIn, rowIn, colIn) == true) + + // If this particular parameter is a constant, then we need + // to acces the constant value rather than a row value. + cc = NULL; + if (pFunctionCol->fpConstCol) + { + cc = dynamic_cast(pFunctionCol->fpConstCol.get()); + } + + if ((cc && cc->type() == ConstantColumn::NULLDATA) + || (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true)) { if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) { - return; + bIsNull = true; + ++funcColsIdx; + continue; } dataFlags[i] |= mcsv1sdk::PARAM_IS_NULL; } - - colDataType = fRowGroupIn.getColTypes()[colIn]; - if (!fRGContext.isParamNull(i)) + + if (cc) + { + colDataType = cc->resultType().colDataType; + } + else + { + colDataType = fRowGroupIn.getColTypes()[colIn]; + } + if (!(dataFlags[i] & mcsv1sdk::PARAM_IS_NULL)) { switch (colDataType) { @@ -2045,13 +2064,38 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::BIGINT: + { + datum.dataType = execplan::CalpontSystemCatalog::BIGINT; + if (cc) + { + datum.columnData = cc->getIntVal(const_cast(rowIn), bIsNull); + datum.scale = cc->resultType().scale; + datum.precision = cc->resultType().precision; + } + else + { + datum.columnData = rowIn.getIntField(colIn); + datum.scale = fRowGroupIn.getScale()[colIn]; + datum.precision = fRowGroupIn.getPrecision()[colIn]; + } + break; + } case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL: { - datum.dataType = execplan::CalpontSystemCatalog::BIGINT; - datum.columnData = rowIn.getIntField(colIn); - datum.scale = fRowGroupIn.getScale()[colIn]; - datum.precision = fRowGroupIn.getPrecision()[colIn]; + datum.dataType = colDataType; + if (cc) + { + datum.columnData = cc->getDecimalVal(const_cast(rowIn), bIsNull).value; + datum.scale = cc->resultType().scale; + datum.precision = cc->resultType().precision; + } + else + { + datum.columnData = rowIn.getIntField(colIn); + datum.scale = fRowGroupIn.getScale()[colIn]; + datum.precision = fRowGroupIn.getPrecision()[colIn]; + } break; } @@ -2062,7 +2106,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::UBIGINT: { datum.dataType = execplan::CalpontSystemCatalog::UBIGINT; - datum.columnData = rowIn.getUintField(colIn); + if (cc) + { + datum.columnData = cc->getUintVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getUintField(colIn); + } break; } @@ -2070,7 +2121,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::UDOUBLE: { datum.dataType = execplan::CalpontSystemCatalog::DOUBLE; - datum.columnData = rowIn.getDoubleField(colIn); + if (cc) + { + datum.columnData = cc->getDoubleVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getDoubleField(colIn); + } break; } @@ -2078,22 +2136,55 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::UFLOAT: { datum.dataType = execplan::CalpontSystemCatalog::FLOAT; - datum.columnData = rowIn.getFloatField(colIn); + if (cc) + { + datum.columnData = cc->getFloatVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getFloatField(colIn); + } break; } case execplan::CalpontSystemCatalog::DATE: + { + datum.dataType = execplan::CalpontSystemCatalog::UBIGINT; + if (cc) + { + datum.columnData = cc->getDateIntVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getUintField(colIn); + } + break; + } case execplan::CalpontSystemCatalog::DATETIME: { datum.dataType = execplan::CalpontSystemCatalog::UBIGINT; - datum.columnData = rowIn.getUintField(colIn); + if (cc) + { + datum.columnData = cc->getDatetimeIntVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getUintField(colIn); + } break; } case execplan::CalpontSystemCatalog::TIME: { datum.dataType = execplan::CalpontSystemCatalog::BIGINT; - datum.columnData = rowIn.getIntField(colIn); + if (cc) + { + datum.columnData = cc->getTimeIntVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getIntField(colIn); + } break; } @@ -2105,7 +2196,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int case execplan::CalpontSystemCatalog::BLOB: { datum.dataType = colDataType; - datum.columnData = rowIn.getStringField(colIn); + if (cc) + { + datum.columnData = cc->getStrVal(const_cast(rowIn), bIsNull); + } + else + { + datum.columnData = rowIn.getStringField(colIn); + } break; } @@ -2147,6 +2245,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { + RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[funcColsIdx].get()); rowUDAF->bInterrupted = true; throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr); } @@ -2443,17 +2542,7 @@ void RowAggregationUM::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[i].get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i); - } - else - { - throw logic_error("(5)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colAux, i); break; } @@ -3991,17 +4080,7 @@ void RowAggregationUMP2::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[i].get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i); - } - else - { - throw logic_error("(6)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colAux, i); break; } @@ -4199,8 +4278,8 @@ void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut // colAux(in) - Where the UDAF userdata resides // rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance //------------------------------------------------------------------------------ -void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, int64_t colAux, - RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx) +void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, + int64_t colAux, uint64_t& funcColsIdx) { static_any::any valOut; @@ -4235,6 +4314,7 @@ void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut, if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { + RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[funcColsIdx].get()); rowUDAF->bInterrupted = true; throw logging::IDBExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr); } @@ -4429,17 +4509,7 @@ void RowAggregationDistinct::updateEntry(const Row& rowIn) case ROWAGG_UDAF: { - RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[i].get()); - - if (rowUDAF) - { - doUDAF(rowIn, colIn, colOut, colAux, rowUDAF, i); - } - else - { - throw logic_error("(7)A UDAF function is called but there's no RowUDAFFunctionCol"); - } - + doUDAF(rowIn, colIn, colOut, colAux, i); break; } diff --git a/utils/rowgroup/rowaggregation.h b/utils/rowgroup/rowaggregation.h index 282f354fc..14e4313cf 100644 --- a/utils/rowgroup/rowaggregation.h +++ b/utils/rowgroup/rowaggregation.h @@ -50,6 +50,7 @@ #include "stlpoolallocator.h" #include "returnedcolumn.h" #include "mcsv1_udaf.h" +#include "constantcolumn.h" // To do: move code that depends on joblist to a proper subsystem. namespace joblist @@ -200,6 +201,13 @@ struct RowAggFunctionCol // 4. for duplicate - point to the real aggretate column to be copied from // Set only on UM, the fAuxColumnIndex is defaulted to fOutputColumnIndex+1 on PM. uint32_t fAuxColumnIndex; + + // For UDAF that have more than one parameter and some parameters are constant. + // There will be a series of RowAggFunctionCol created, one for each parameter. + // The first will be a RowUDAFFunctionCol. Subsequent ones will be RowAggFunctionCol + // with fAggFunction == ROWAGG_MULTI_PARM. Order is important. + // If this parameter is constant, that value is here. + SRCP fpConstCol; }; @@ -220,8 +228,11 @@ struct RowUDAFFunctionCol : public RowAggFunctionCol inputColIndex, outputColIndex, auxColIndex), bInterrupted(false) {} - RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs) : RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, - rhs.fInputColumnIndex, rhs.fOutputColumnIndex, rhs.fAuxColumnIndex), fUDAFContext(rhs.fUDAFContext) + RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs) : + RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, rhs.fInputColumnIndex, + rhs.fOutputColumnIndex, rhs.fAuxColumnIndex), + fUDAFContext(rhs.fUDAFContext), + bInterrupted(false) {} virtual ~RowUDAFFunctionCol() {} @@ -238,6 +249,16 @@ inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const bs << (uint8_t)fAggFunction; bs << fInputColumnIndex; bs << fOutputColumnIndex; + if (fpConstCol) + { + bs << (uint8_t)1; + fpConstCol.get()->serialize(bs); + } + else + { + bs << (uint8_t)0; + } + } inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs) @@ -245,6 +266,13 @@ inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs) bs >> (uint8_t&)fAggFunction; bs >> fInputColumnIndex; bs >> fOutputColumnIndex; + uint8_t t; + bs >> t; + if (t) + { + fpConstCol.reset(new ConstantColumn); + fpConstCol.get()->unserialize(bs); + } } inline void RowUDAFFunctionCol::serialize(messageqcpp::ByteStream& bs) const @@ -586,7 +614,7 @@ protected: virtual void doAvg(const Row&, int64_t, int64_t, int64_t); virtual void doStatistics(const Row&, int64_t, int64_t, int64_t); virtual void doBitOp(const Row&, int64_t, int64_t, int); - virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx); + virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx); virtual bool countSpecial(const RowGroup* pRG) { fRow.setIntField<8>(fRow.getIntField<8>(0) + pRG->getRowCount(), 0); @@ -902,7 +930,7 @@ protected: void doStatistics(const Row&, int64_t, int64_t, int64_t); void doGroupConcat(const Row&, int64_t, int64_t); void doBitOp(const Row&, int64_t, int64_t, int); - void doUDAF(const Row&, int64_t, int64_t, int64_t, RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx); + void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx); bool countSpecial(const RowGroup* pRG) { return false; diff --git a/utils/udfsdk/allnull.h b/utils/udfsdk/allnull.h index da17f5d6b..6a727caf6 100644 --- a/utils/udfsdk/allnull.h +++ b/utils/udfsdk/allnull.h @@ -48,7 +48,6 @@ #include #include #include -#include #ifdef _MSC_VER #include #else diff --git a/utils/udfsdk/avg_mode.h b/utils/udfsdk/avg_mode.h index 5722c5fea..fba1fcdcc 100644 --- a/utils/udfsdk/avg_mode.h +++ b/utils/udfsdk/avg_mode.h @@ -56,7 +56,6 @@ #include #include #include -#include #ifdef _MSC_VER #include #else diff --git a/utils/udfsdk/avgx.h b/utils/udfsdk/avgx.h index 0569b6091..a830c6803 100644 --- a/utils/udfsdk/avgx.h +++ b/utils/udfsdk/avgx.h @@ -35,7 +35,6 @@ #include #include #include -#include #ifdef _MSC_VER #include #else diff --git a/utils/udfsdk/mcsv1_udaf.h b/utils/udfsdk/mcsv1_udaf.h index df3f47649..e09228d77 100644 --- a/utils/udfsdk/mcsv1_udaf.h +++ b/utils/udfsdk/mcsv1_udaf.h @@ -68,7 +68,6 @@ #include #include #include -#include #ifdef _MSC_VER #include #else diff --git a/utils/udfsdk/median.h b/utils/udfsdk/median.h index 142be6ba8..48bd93c70 100644 --- a/utils/udfsdk/median.h +++ b/utils/udfsdk/median.h @@ -56,7 +56,6 @@ #include #include #include -#include #ifdef _MSC_VER #include #else diff --git a/utils/udfsdk/regr_avgx.cpp b/utils/udfsdk/regr_avgx.cpp index c7cc5b56e..aec4f361f 100644 --- a/utils/udfsdk/regr_avgx.cpp +++ b/utils/udfsdk/regr_avgx.cpp @@ -82,7 +82,7 @@ mcsv1_UDAF::ReturnCode regr_avgx::nextValue(mcsv1Context* context, ColumnDatum* { return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. } - if (valIn_x.empty() || valIn_y.empty()) + if (valIn_x.empty() || valIn_y.empty()) // Usually empty if NULL. Probably redundant { return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on. } @@ -107,10 +107,6 @@ mcsv1_UDAF::ReturnCode regr_avgx::nextValue(mcsv1Context* context, ColumnDatum* { val = valIn_x.cast(); } - else if (valIn_x.compatible(longTypeId)) - { - val = valIn_x.cast(); - } else if (valIn_x.compatible(llTypeId)) { val = valIn_x.cast(); diff --git a/utils/udfsdk/regr_avgx.h b/utils/udfsdk/regr_avgx.h index f70f30d8c..27b8708f7 100644 --- a/utils/udfsdk/regr_avgx.h +++ b/utils/udfsdk/regr_avgx.h @@ -35,7 +35,6 @@ #include #include #include -#include #ifdef _MSC_VER #include #else diff --git a/utils/udfsdk/ssq.h b/utils/udfsdk/ssq.h index 2cac61c2c..e27ecf1fa 100644 --- a/utils/udfsdk/ssq.h +++ b/utils/udfsdk/ssq.h @@ -56,7 +56,6 @@ #include #include #include -#include #ifdef _MSC_VER #include #else diff --git a/utils/udfsdk/udfsdk.vpj b/utils/udfsdk/udfsdk.vpj index 3d3ac39ca..fe1f3fd0e 100755 --- a/utils/udfsdk/udfsdk.vpj +++ b/utils/udfsdk/udfsdk.vpj @@ -238,38 +238,5 @@ N="Makefile" Type="Makefile"/> - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/utils/windowfunction/wf_udaf.cpp b/utils/windowfunction/wf_udaf.cpp index 5cd5243c5..2876fbf7e 100644 --- a/utils/windowfunction/wf_udaf.cpp +++ b/utils/windowfunction/wf_udaf.cpp @@ -451,7 +451,7 @@ void WF_udaf::operator()(int64_t b, int64_t e, int64_t c) { mcsv1sdk::mcsv1_UDAF::ReturnCode rc; uint64_t colOut = fFieldIndex[0]; - + bool isNull = false; if ((fFrameUnit == WF__FRAME_ROWS) || (fPrev == -1) || (!fPeer->operator()(getPointer(fRowData->at(c)), getPointer(fRowData->at(fPrev))))) @@ -468,13 +468,24 @@ void WF_udaf::operator()(int64_t b, int64_t e, int64_t c) // Put the parameter metadata (type, scale, precision) into valsIn mcsv1sdk::ColumnDatum valsIn[getContext().getParameterCount()]; + ConstantColumn* cc = NULL; for (uint32_t i = 0; i < getContext().getParameterCount(); ++i) { - uint64_t colIn = fFieldIndex[i+1]; mcsv1sdk::ColumnDatum& datum = valsIn[i]; - datum.dataType = fRow.getColType(colIn); - datum.scale = fRow.getScale(colIn); - datum.precision = fRow.getPrecision(colIn); + cc = static_cast(fConstantParms[i].get()); + if (cc) + { + datum.dataType = cc->resultType().colDataType; + datum.scale = cc->resultType().scale; + datum.precision = cc->resultType().precision; + } + else + { + uint64_t colIn = fFieldIndex[i+1]; + datum.dataType = fRow.getColType(colIn); + datum.scale = fRow.getScale(colIn); + datum.precision = fRow.getPrecision(colIn); + } } if (b <= c && c <= e) @@ -494,12 +505,14 @@ void WF_udaf::operator()(int64_t b, int64_t e, int64_t c) uint32_t flags[getContext().getParameterCount()]; for (uint32_t k = 0; k < getContext().getParameterCount(); ++k) { + cc = static_cast(fConstantParms[k].get()); uint64_t colIn = fFieldIndex[k+1]; mcsv1sdk::ColumnDatum& datum = valsIn[k]; // Turn on Null flags or skip based on respect nulls flags[k] = 0; - if (fRow.isNullValue(colIn) == true) + if ((!cc && fRow.isNullValue(colIn) == true) + || (cc && cc->type() == ConstantColumn::NULLDATA)) { if (!bRespectNulls) { @@ -510,133 +523,196 @@ void WF_udaf::operator()(int64_t b, int64_t e, int64_t c) flags[k] |= mcsv1sdk::PARAM_IS_NULL; } - // MCOL-1201 Multi-Paramter calls - switch (datum.dataType) + if (!bHasNull && !(flags[k] & mcsv1sdk::PARAM_IS_NULL)) { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - case CalpontSystemCatalog::DECIMAL: + switch (datum.dataType) { - int64_t valIn; - getValue(colIn, valIn); - // Check for distinct, if turned on. - // Currently, distinct only works on the first parameter. - if (k == 0) + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: { - if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + int64_t valIn; + if (cc) { - continue; + valIn = cc->getIntVal(fRow, isNull); } + else + { + getValue(colIn, valIn); + } + // Check for distinct, if turned on. + // Currently, distinct only works on the first parameter. + if (k == 0) + { + if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + { + continue; + } - if (fDistinct) - fDistinctSet.insert(valIn); + if (fDistinct) + fDistinctSet.insert(valIn); + } + datum.columnData = valIn; + break; } - datum.columnData = valIn; - break; - } - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - case CalpontSystemCatalog::UDECIMAL: - { - uint64_t valIn; - getValue(colIn, valIn); - // Check for distinct, if turned on. - // Currently, distinct only works on the first parameter. - if (k == 0) + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: { - if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + int64_t valIn; + if (cc) { - continue; + valIn = cc->getDecimalVal(fRow, isNull).value; } + else + { + getValue(colIn, valIn); + } + // Check for distinct, if turned on. + // Currently, distinct only works on the first parameter. + if (k == 0) + { + if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + { + continue; + } - if (fDistinct) - fDistinctSet.insert(valIn); + if (fDistinct) + fDistinctSet.insert(valIn); + } + datum.columnData = valIn; + break; } - datum.columnData = valIn; - break; - } - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - double valIn; - getValue(colIn, valIn); - // Check for distinct, if turned on. - // Currently, distinct only works on the first parameter. - if (k == 0) + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: { - if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + uint64_t valIn; + if (cc) { - continue; + valIn = cc->getUintVal(fRow, isNull); } + else + { + getValue(colIn, valIn); + } + // Check for distinct, if turned on. + // Currently, distinct only works on the first parameter. + if (k == 0) + { + if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + { + continue; + } - if (fDistinct) - fDistinctSet.insert(valIn); + if (fDistinct) + fDistinctSet.insert(valIn); + } + datum.columnData = valIn; + break; } - datum.columnData = valIn; - break; - } - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - { - float valIn; - getValue(colIn, valIn); - // Check for distinct, if turned on. - // Currently, distinct only works on the first parameter. - if (k == 0) + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: { - if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + double valIn; + if (cc) { - continue; + valIn = cc->getDoubleVal(fRow, isNull); } + else + { + getValue(colIn, valIn); + } + // Check for distinct, if turned on. + // Currently, distinct only works on the first parameter. + if (k == 0) + { + if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + { + continue; + } - if (fDistinct) - fDistinctSet.insert(valIn); + if (fDistinct) + fDistinctSet.insert(valIn); + } + datum.columnData = valIn; + break; } - datum.columnData = valIn; - break; - } - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::VARCHAR: - case CalpontSystemCatalog::VARBINARY: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::BLOB: - { - string valIn; - getValue(colIn, valIn); - // Check for distinct, if turned on. - // Currently, distinct only works on the first parameter. - if (k == 0) + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: { - if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + float valIn; + if (cc) { - continue; + valIn = cc->getFloatVal(fRow, isNull); } + else + { + getValue(colIn, valIn); + } + // Check for distinct, if turned on. + // Currently, distinct only works on the first parameter. + if (k == 0) + { + if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + { + continue; + } - if (fDistinct) - fDistinctSet.insert(valIn); + if (fDistinct) + fDistinctSet.insert(valIn); + } + datum.columnData = valIn; + break; } - datum.columnData = valIn; - break; - } - default: - { - string errStr = "(" + colType2String[i] + ")"; - errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_PARM_TYPE, errStr); - cerr << errStr << endl; - throw IDBExcept(errStr, ERR_WF_INVALID_PARM_TYPE); + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::BLOB: + { + string valIn; + if (cc) + { + valIn = cc->getStrVal(fRow, isNull); + } + else + { + getValue(colIn, valIn); + } + // Check for distinct, if turned on. + // Currently, distinct only works on the first parameter. + if (k == 0) + { + if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end())) + { + continue; + } - break; + if (fDistinct) + fDistinctSet.insert(valIn); + } + datum.columnData = valIn; + break; + } + + default: + { + string errStr = "(" + colType2String[i] + ")"; + errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_PARM_TYPE, errStr); + cerr << errStr << endl; + throw IDBExcept(errStr, ERR_WF_INVALID_PARM_TYPE); + + break; + } } } // Skip if any value is NULL and respect nulls is off. diff --git a/utils/windowfunction/wf_udaf.h b/utils/windowfunction/wf_udaf.h index f7a4c4b08..fc3f9006d 100644 --- a/utils/windowfunction/wf_udaf.h +++ b/utils/windowfunction/wf_udaf.h @@ -53,8 +53,6 @@ public: // A class to control the execution of User Define Analytic Functions (UDAnF) // as defined by a specialization of mcsv1sdk::mcsv1_UDAF -// The template parameter is currently only used to support DISTINCT, as -// as that is done via a set template class WF_udaf : public WindowFunctionType { diff --git a/utils/windowfunction/windowfunctiontype.cpp b/utils/windowfunction/windowfunctiontype.cpp index 4c5b4de32..f5598a7e5 100644 --- a/utils/windowfunction/windowfunctiontype.cpp +++ b/utils/windowfunction/windowfunctiontype.cpp @@ -39,7 +39,6 @@ using namespace logging; using namespace ordering; #include "calpontsystemcatalog.h" -#include "constantcolumn.h" #include "dataconvert.h" // int64_t IDB_pow[19] using namespace execplan; @@ -228,6 +227,9 @@ WindowFunctionType::makeWindowFunction(const string& name, int ct, WindowFunctio break; } + // Copy the only the constant parameter pointers + af->constParms(wc->functionParms()); + return af; } @@ -634,6 +636,26 @@ void* WindowFunctionType::getNullValueByType(int ct, int pos) return v; } +void WindowFunctionType::constParms(const std::vector& functionParms) +{ + // fConstantParms will end up with a copy of functionParms, but only + // the constant types will be copied. Other types will take up space but + // be NULL. This allows us to acces the constants without the overhead + // of dynamic_cast for every row. + for (size_t i = 0; i < functionParms.size(); ++i) + { + ConstantColumn* cc = dynamic_cast(functionParms[i].get()); + if (cc) + { + fConstantParms.push_back(functionParms[i]); + } + else + { + fConstantParms.push_back(SRCP(cc)); + } + } +} + } //namespace // vim:ts=4 sw=4: diff --git a/utils/windowfunction/windowfunctiontype.h b/utils/windowfunction/windowfunctiontype.h index 50732d3b5..efa1c548a 100644 --- a/utils/windowfunction/windowfunctiontype.h +++ b/utils/windowfunction/windowfunctiontype.h @@ -31,7 +31,7 @@ #include "returnedcolumn.h" #include "rowgroup.h" #include "windowframe.h" - +#include "constantcolumn.h" namespace ordering { @@ -198,6 +198,8 @@ public: fStep = step; } + void constParms(const std::vector& functionParms); + static boost::shared_ptr makeWindowFunction(const std::string&, int ct, WindowFunctionColumn* wc); protected: @@ -244,6 +246,9 @@ protected: // output and input field indices: [0] - output std::vector fFieldIndex; + // constant function parameters -- needed for udaf with constant + std::vector fConstantParms; + // row meta data rowgroup::RowGroup fRowGroup; rowgroup::Row fRow;