/************************************************************************************ Copyright (c) 2019 MariaDB Corporation This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; if not see or write to the Free Software Foundation, Inc., 51 Franklin St., Fifth Floor, Boston, MA 02110, USA *************************************************************************************/ //#define NDEBUG #include #include #include #include #include using namespace std; #include using namespace boost; #include "loggingid.h" #include "errorcodes.h" #include "idberrorinfo.h" using namespace logging; #include "rowgroup.h" using namespace rowgroup; #include "idborderby.h" using namespace ordering; #include "joblisttypes.h" #include "calpontsystemcatalog.h" #include "constantcolumn.h" using namespace execplan; #include "windowfunctionstep.h" using namespace joblist; #include "wf_udaf.h" #include "vlarray.h" namespace windowfunction { boost::shared_ptr WF_udaf::makeFunction(int id, const string& name, int ct, mcsv1sdk::mcsv1Context& context, WindowFunctionColumn* wc) { boost::shared_ptr func; func.reset(new WF_udaf(id, name, context)); // Get the UDAnF function object WF_udaf* wfUDAF = (WF_udaf*)func.get(); mcsv1sdk::mcsv1Context& udafContext = wfUDAF->getContext(); udafContext.setInterrupted(wfUDAF->getInterruptedPtr()); wfUDAF->resetData(); return func; } WF_udaf::WF_udaf(WF_udaf& rhs) : WindowFunctionType(rhs.functionId(), rhs.functionName()) , fUDAFContext(rhs.getContext()) , bInterrupted(rhs.getInterrupted()) , fDistinct(rhs.getDistinct()) { getContext().setInterrupted(getInterruptedPtr()); } WindowFunctionType* WF_udaf::clone() const { return new WF_udaf(*const_cast(this)); } void WF_udaf::resetData() { getContext().getFunction()->reset(&getContext()); fDistinctMap.clear(); WindowFunctionType::resetData(); } void WF_udaf::parseParms(const std::vector& parms) { bRespectNulls = true; // The last parms: respect null | ignore null ConstantColumn* cc = dynamic_cast(parms[parms.size() - 1].get()); idbassert(cc != NULL); bool isNull = false; // dummy, harded coded bRespectNulls = (cc->getIntVal(fRow, isNull) > 0); if (getContext().getRunFlag(mcsv1sdk::UDAF_DISTINCT)) { setDistinct(); } } bool WF_udaf::dropValues(int64_t b, int64_t e) { if (!bHasDropValue) { // Save work if we discovered dropValue is not implemented in the UDAnF return false; } mcsv1sdk::mcsv1_UDAF::ReturnCode rc; bool isNull = false; // Turn on the Analytic flag so the function is aware it is being called // as a Window Function. getContext().setContextFlag(mcsv1sdk::CONTEXT_IS_ANALYTIC); // Put the parameter metadata (type, scale, precision) into valsIn utils::VLArray valsIn(getContext().getParameterCount()); ConstantColumn* cc = NULL; for (uint32_t i = 0; i < getContext().getParameterCount(); ++i) { mcsv1sdk::ColumnDatum& datum = valsIn[i]; cc = static_cast(fConstantParms[i].get()); if (cc) { datum.dataType = cc->resultType().colDataType; datum.scale = cc->resultType().scale; datum.precision = cc->resultType().precision; } else { uint64_t colIn = fFieldIndex[i + 1]; datum.dataType = fRow.getColType(colIn); datum.scale = fRow.getScale(colIn); datum.precision = fRow.getPrecision(colIn); } } utils::VLArray flags(getContext().getParameterCount()); for (int64_t i = b; i < e; i++) { if (i % 1000 == 0 && fStep->cancelled()) break; fRow.setData(getPointer(fRowData->at(i))); // NULL flags bool bSkipIt = false; for (uint32_t k = 0; k < getContext().getParameterCount(); ++k) { cc = static_cast(fConstantParms[k].get()); uint64_t colIn = fFieldIndex[k + 1]; mcsv1sdk::ColumnDatum& datum = valsIn[k]; // Turn on Null flags or skip based on respect nulls flags[k] = 0; if ((!cc && fRow.isNullValue(colIn) == true) || (cc && cc->isNull())) { if (!bRespectNulls) { bSkipIt = true; break; } flags[k] |= mcsv1sdk::PARAM_IS_NULL; } if (!bSkipIt && !(flags[k] & mcsv1sdk::PARAM_IS_NULL)) { switch (datum.dataType) { case CalpontSystemCatalog::TINYINT: case CalpontSystemCatalog::SMALLINT: case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: case CalpontSystemCatalog::BIGINT: { int64_t valIn; if (cc) { valIn = cc->getIntVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0) { if (fDistinct) { DistinctMap::iterator distinct; distinct = fDistinctMap.find(valIn); if (distinct != fDistinctMap.end()) { // This is a duplicate: decrement the count --(*distinct).second; if ((*distinct).second > 0) // still more of these { bSkipIt = true; continue; } else { fDistinctMap.erase(distinct); } } } } datum.columnData = valIn; break; } case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: { int64_t valIn; if (cc) { valIn = cc->getDecimalVal(fRow, isNull).value; } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0) { if (fDistinct) { DistinctMap::iterator distinct; distinct = fDistinctMap.find(valIn); if (distinct != fDistinctMap.end()) { // This is a duplicate: decrement the count --(*distinct).second; if ((*distinct).second > 0) // still more of these { bSkipIt = true; continue; } else { fDistinctMap.erase(distinct); } } } } datum.columnData = valIn; break; } case CalpontSystemCatalog::UTINYINT: case CalpontSystemCatalog::USMALLINT: case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: case CalpontSystemCatalog::UBIGINT: case CalpontSystemCatalog::TIME: case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: case CalpontSystemCatalog::TIMESTAMP: { uint64_t valIn; if (cc) { valIn = cc->getUintVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0) { if (fDistinct) { DistinctMap::iterator distinct; distinct = fDistinctMap.find(valIn); if (distinct != fDistinctMap.end()) { // This is a duplicate: decrement the count --(*distinct).second; if ((*distinct).second > 0) // still more of these { bSkipIt = true; continue; } else { fDistinctMap.erase(distinct); } } } } datum.columnData = valIn; break; } case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: { double valIn; if (cc) { valIn = cc->getDoubleVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0) { if (fDistinct) { DistinctMap::iterator distinct; distinct = fDistinctMap.find(valIn); if (distinct != fDistinctMap.end()) { // This is a duplicate: decrement the count --(*distinct).second; if ((*distinct).second > 0) // still more of these { bSkipIt = true; continue; } else { fDistinctMap.erase(distinct); } } } } datum.columnData = valIn; break; } case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: { float valIn; if (cc) { valIn = cc->getFloatVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0) { if (fDistinct) { DistinctMap::iterator distinct; distinct = fDistinctMap.find(valIn); if (distinct != fDistinctMap.end()) { // This is a duplicate: decrement the count --(*distinct).second; if ((*distinct).second > 0) // still more of these { bSkipIt = true; continue; } else { fDistinctMap.erase(distinct); } } } } datum.columnData = valIn; break; } case CalpontSystemCatalog::LONGDOUBLE: { double valIn; if (cc) { valIn = cc->getLongDoubleVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0) { if (fDistinct) { DistinctMap::iterator distinct; distinct = fDistinctMap.find(valIn); if (distinct != fDistinctMap.end()) { // This is a duplicate: decrement the count --(*distinct).second; if ((*distinct).second > 0) // still more of these { bSkipIt = true; continue; } else { fDistinctMap.erase(distinct); } } } } datum.columnData = valIn; break; } case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::TEXT: case CalpontSystemCatalog::BLOB: { utils::NullString valIn; if (cc) { valIn = cc->getStrVal(fRow, isNull); // XXX: we probably need to change Distinctmap. } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0) { if (fDistinct) { DistinctMap::iterator distinct; distinct = fDistinctMap.find(valIn); if (distinct != fDistinctMap.end()) { // This is a duplicate: decrement the count --(*distinct).second; if ((*distinct).second > 0) // still more of these { bSkipIt = true; continue; } else { fDistinctMap.erase(distinct); } } } } datum.columnData = valIn; break; } default: { string errStr = "(" + colType2String[(int)datum.dataType] + ")"; errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_PARM_TYPE, errStr); cerr << errStr << endl; throw IDBExcept(errStr, ERR_WF_INVALID_PARM_TYPE); break; } } } } if (bSkipIt) { continue; } getContext().setDataFlags(flags); rc = getContext().getFunction()->dropValue(&getContext(), valsIn); if (rc == mcsv1sdk::mcsv1_UDAF::NOT_IMPLEMENTED) { bHasDropValue = false; return false; } if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { bInterrupted = true; string errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_UDANF_ERROR, getContext().getErrorMessage()); cerr << errStr << endl; throw IDBExcept(errStr, ERR_WF_UDANF_ERROR); } } WindowFunctionType::resetData(); return true; } // Sets the value from valOut into column colOut, performing any conversions. void WF_udaf::SetUDAFValue(static_any::any& valOut, int64_t colOut, int64_t b, int64_t e, int64_t c) { static const static_any::any& charTypeId = (char)1; static const static_any::any& scharTypeId = (signed char)1; static const static_any::any& shortTypeId = (short)1; static const static_any::any& intTypeId = (int)1; static const static_any::any& longTypeId = (long)1; static const static_any::any& llTypeId = (long long)1; static const static_any::any& int128TypeId = (int128_t)1; static const static_any::any& ucharTypeId = (unsigned char)1; static const static_any::any& ushortTypeId = (unsigned short)1; static const static_any::any& uintTypeId = (unsigned int)1; static const static_any::any& ulongTypeId = (unsigned long)1; static const static_any::any& ullTypeId = (unsigned long long)1; static const static_any::any& floatTypeId = (float)1; static const static_any::any& doubleTypeId = (double)1; static const std::string typeStr; static const static_any::any& strTypeId = typeStr; CDT colDataType = fRow.getColType(colOut); // This may seem a bit convoluted. Users shouldn't return a type // that they didn't set in mcsv1_UDAF::init(), but this // handles whatever return type is given and casts // it to whatever they said to return. int64_t intOut = 0; uint64_t uintOut = 0; int128_t int128Out = 0; float floatOut = 0.0; double doubleOut = 0.0; long double longdoubleOut = 0.0; ostringstream oss; std::string strOut; if (valOut.compatible(charTypeId)) { uintOut = intOut = valOut.cast(); floatOut = intOut; oss << intOut; } else if (valOut.compatible(scharTypeId)) { uintOut = intOut = valOut.cast(); floatOut = intOut; oss << intOut; } else if (valOut.compatible(shortTypeId)) { uintOut = intOut = valOut.cast(); floatOut = intOut; oss << intOut; } else if (valOut.compatible(intTypeId)) { uintOut = intOut = valOut.cast(); floatOut = intOut; oss << intOut; } else if (valOut.compatible(longTypeId)) { uintOut = intOut = valOut.cast(); floatOut = intOut; oss << intOut; } else if (valOut.compatible(llTypeId)) { uintOut = intOut = valOut.cast(); floatOut = intOut; oss << intOut; } else if (valOut.compatible(ucharTypeId)) { intOut = uintOut = valOut.cast(); floatOut = uintOut; oss << uintOut; } else if (valOut.compatible(ushortTypeId)) { intOut = uintOut = valOut.cast(); floatOut = uintOut; oss << uintOut; } else if (valOut.compatible(uintTypeId)) { intOut = uintOut = valOut.cast(); floatOut = uintOut; oss << uintOut; } else if (valOut.compatible(ulongTypeId)) { intOut = uintOut = valOut.cast(); floatOut = uintOut; oss << uintOut; } else if (valOut.compatible(ullTypeId)) { intOut = uintOut = valOut.cast(); floatOut = uintOut; oss << uintOut; } else if (valOut.compatible(floatTypeId)) { floatOut = valOut.cast(); doubleOut = floatOut; longdoubleOut = floatOut; intOut = uintOut = floatOut; oss << floatOut; } else if (valOut.compatible(doubleTypeId)) { doubleOut = valOut.cast(); longdoubleOut = doubleOut; floatOut = (float)doubleOut; uintOut = (uint64_t)doubleOut; intOut = (int64_t)doubleOut; oss << doubleOut; } else if (valOut.compatible(int128TypeId)) { int128Out = valOut.cast(); uintOut = intOut = int128Out; // may truncate floatOut = int128Out; doubleOut = int128Out; longdoubleOut = int128Out; oss << longdoubleOut; } if (valOut.compatible(strTypeId)) { strOut = valOut.cast(); // Convert the string to numeric type, just in case. intOut = atol(strOut.c_str()); uintOut = strtoul(strOut.c_str(), NULL, 10); doubleOut = strtod(strOut.c_str(), NULL); longdoubleOut = doubleOut; floatOut = (float)doubleOut; } else { strOut = oss.str(); } switch (colDataType) { case execplan::CalpontSystemCatalog::BIT: case execplan::CalpontSystemCatalog::TINYINT: case execplan::CalpontSystemCatalog::SMALLINT: case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::BIGINT: case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL: if (valOut.empty()) { setValue(colDataType, b, e, c, (int64_t*)NULL); } else { setValue(colDataType, b, e, c, &intOut); } break; case execplan::CalpontSystemCatalog::UTINYINT: case execplan::CalpontSystemCatalog::USMALLINT: case execplan::CalpontSystemCatalog::UMEDINT: case execplan::CalpontSystemCatalog::UINT: case execplan::CalpontSystemCatalog::UBIGINT: case execplan::CalpontSystemCatalog::DATE: case execplan::CalpontSystemCatalog::DATETIME: case execplan::CalpontSystemCatalog::TIMESTAMP: case execplan::CalpontSystemCatalog::TIME: if (valOut.empty()) { setValue(colDataType, b, e, c, (uint64_t*)NULL); } else { setValue(colDataType, b, e, c, &uintOut); } break; case execplan::CalpontSystemCatalog::FLOAT: case execplan::CalpontSystemCatalog::UFLOAT: if (valOut.empty()) { setValue(colDataType, b, e, c, (float*)NULL); } else { setValue(colDataType, b, e, c, &floatOut); } break; case execplan::CalpontSystemCatalog::DOUBLE: case execplan::CalpontSystemCatalog::UDOUBLE: if (valOut.empty()) { setValue(colDataType, b, e, c, (double*)NULL); } else { setValue(colDataType, b, e, c, &doubleOut); } break; case execplan::CalpontSystemCatalog::LONGDOUBLE: if (valOut.empty()) { setValue(colDataType, b, e, c, (long double*)NULL); } else { setValue(colDataType, b, e, c, &longdoubleOut); } break; case execplan::CalpontSystemCatalog::CHAR: case execplan::CalpontSystemCatalog::VARCHAR: case execplan::CalpontSystemCatalog::TEXT: case execplan::CalpontSystemCatalog::VARBINARY: case execplan::CalpontSystemCatalog::CLOB: case execplan::CalpontSystemCatalog::BLOB: if (valOut.empty()) { setValue(colDataType, b, e, c, (utils::NullString*)NULL); } else { utils::NullString nullStrOut(strOut); setValue(colDataType, b, e, c, &nullStrOut); } break; default: { std::ostringstream errmsg; errmsg << "WF_udaf: No logic for data type: " << colDataType; cerr << errmsg.str() << endl; throw runtime_error(errmsg.str().c_str()); break; } } } void WF_udaf::operator()(int64_t b, int64_t e, int64_t c) { mcsv1sdk::mcsv1_UDAF::ReturnCode rc; uint64_t colOut = fFieldIndex[0]; bool isNull = false; if ((fFrameUnit == WF__FRAME_ROWS) || (fPrev == -1) || (!fPeer->operator()(getPointer(fRowData->at(c)), getPointer(fRowData->at(fPrev))))) { fValOut.reset(); // for unbounded - current row special handling if (fPrev >= b && fPrev < c) b = c; else if (fPrev <= e && fPrev > c) e = c; // Turn on the Analytic flag so the function is aware it is being called // as a Window Function. getContext().setContextFlag(mcsv1sdk::CONTEXT_IS_ANALYTIC); // Put the parameter metadata (type, scale, precision) into valsIn utils::VLArray valsIn(getContext().getParameterCount()); ConstantColumn* cc = NULL; for (uint32_t i = 0; i < getContext().getParameterCount(); ++i) { mcsv1sdk::ColumnDatum& datum = valsIn[i]; cc = static_cast(fConstantParms[i].get()); if (cc) { datum.dataType = cc->resultType().colDataType; datum.scale = cc->resultType().scale; datum.precision = cc->resultType().precision; } else { uint64_t colIn = fFieldIndex[i + 1]; datum.dataType = fRow.getColType(colIn); datum.scale = fRow.getScale(colIn); datum.precision = fRow.getPrecision(colIn); } } if (b <= c && c <= e) getContext().setContextFlag(mcsv1sdk::CONTEXT_HAS_CURRENT_ROW); else getContext().clearContextFlag(mcsv1sdk::CONTEXT_HAS_CURRENT_ROW); bool bSkipIt = false; utils::VLArray flags(getContext().getParameterCount()); for (int64_t i = b; i <= e; i++) { if (i % 1000 == 0 && fStep->cancelled()) break; fRow.setData(getPointer(fRowData->at(i))); // NULL flags bSkipIt = false; for (uint32_t k = 0; k < getContext().getParameterCount(); ++k) { cc = static_cast(fConstantParms[k].get()); uint64_t colIn = fFieldIndex[k + 1]; mcsv1sdk::ColumnDatum& datum = valsIn[k]; // Turn on Null flags or skip based on respect nulls flags[k] = 0; if ((!cc && fRow.isNullValue(colIn) == true) || (cc && cc->isNull())) { if (!bRespectNulls) { bSkipIt = true; break; } flags[k] |= mcsv1sdk::PARAM_IS_NULL; } if (!bSkipIt && !(flags[k] & mcsv1sdk::PARAM_IS_NULL)) { switch (datum.dataType) { case CalpontSystemCatalog::TINYINT: case CalpontSystemCatalog::SMALLINT: case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: case CalpontSystemCatalog::BIGINT: { int64_t valIn; if (cc) { valIn = cc->getIntVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { // MCOL-1698 std::pair val = make_pair(valIn, 1); // Unordered_map will not insert a duplicate key (valIn). // If it doesn't insert, the original pair will be returned // in distinct.first and distinct.second will be a bool -- // true if newly inserted, false if a duplicate. std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { // This is a duplicate: increment the count ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn; break; } case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: { if (fRow.getColumnWidth(colIn) < 16) { int64_t valIn; if (cc) { valIn = cc->getDecimalVal(fRow, isNull).value; } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { std::pair val = make_pair(valIn, 1); std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn; } else { int128_t valIn; if (cc) { valIn = cc->getDecimalVal(fRow, isNull).s128Value; } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { std::pair val = make_pair(valIn, 1); std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn; } break; } case CalpontSystemCatalog::UTINYINT: case CalpontSystemCatalog::USMALLINT: case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: case CalpontSystemCatalog::UBIGINT: case CalpontSystemCatalog::TIME: case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: case CalpontSystemCatalog::TIMESTAMP: { uint64_t valIn; if (cc) { valIn = cc->getUintVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { std::pair val = make_pair(valIn, 1); std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn; break; } case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: { double valIn; if (cc) { valIn = cc->getDoubleVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { std::pair val = make_pair(valIn, 1); std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn; break; } case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: { float valIn; if (cc) { valIn = cc->getFloatVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { std::pair val = make_pair(valIn, 1); std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn; break; } case CalpontSystemCatalog::LONGDOUBLE: { long double valIn; if (cc) { valIn = cc->getLongDoubleVal(fRow, isNull); } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { std::pair val = make_pair(valIn, 1); std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn; break; } case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::TEXT: case CalpontSystemCatalog::BLOB: { utils::NullString valIn; if (cc) { valIn = cc->getStrVal(fRow, isNull); // XXX: the same problem with distinct. } else { getValue(colIn, valIn); } // Check for distinct, if turned on. // Currently, distinct only works on the first parameter. if (k == 0 && fDistinct) { std::pair val = make_pair(valIn.isNull() ? nullptr : valIn.safeString(""), 1); std::pair distinct; distinct = fDistinctMap.insert(val); if (distinct.second == false) { ++(*distinct.first).second; bSkipIt = true; continue; } } datum.columnData = valIn.isNull() ? nullptr : valIn.safeString(""); break; } default: { string errStr = "(" + colType2String[(int)datum.dataType] + ")"; errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_PARM_TYPE, errStr); cerr << errStr << endl; throw IDBExcept(errStr, ERR_WF_INVALID_PARM_TYPE); break; } } } } // Skip if any value is NULL and respect nulls is off. if (bSkipIt) { continue; } getContext().setDataFlags(flags); rc = getContext().getFunction()->nextValue(&getContext(), valsIn); if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { bInterrupted = true; string errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_UDANF_ERROR, getContext().getErrorMessage()); cerr << errStr << endl; throw IDBExcept(errStr, ERR_WF_UDANF_ERROR); } } rc = getContext().getFunction()->evaluate(&getContext(), fValOut); if (rc == mcsv1sdk::mcsv1_UDAF::ERROR) { bInterrupted = true; string errStr = IDBErrorInfo::instance()->errorMsg(ERR_WF_UDANF_ERROR, getContext().getErrorMessage()); cerr << errStr << endl; throw IDBExcept(errStr, ERR_WF_UDANF_ERROR); } } SetUDAFValue(fValOut, colOut, b, e, c); fPrev = c; } } // namespace windowfunction