1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge pull request #504 from mariadb-corporation/1.1-merge-up-20180621

Merge develop-1.1 into develop
This commit is contained in:
Roman Nozdrin
2018-07-04 22:52:04 +03:00
committed by GitHub
54 changed files with 1905 additions and 1010 deletions

View File

@@ -1420,6 +1420,7 @@ DataConvert::convertColumnData(const CalpontSystemCatalog::ColType& colType,
{
pushWarning = true;
}
value = (int64_t) * (reinterpret_cast<int64_t*>(&aTime));
}
break;
@@ -1928,6 +1929,7 @@ int64_t DataConvert::convertColumnTime(
{
return value;
}
if (dataOrgLen < 3)
{
// Not enough chars to be a time

View File

@@ -75,7 +75,7 @@ int64_t Func_date::getIntVal(rowgroup::Row& row,
break;
}
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
{
int64_t val;

View File

@@ -62,7 +62,7 @@ int64_t Func_day::getIntVal(rowgroup::Row& row,
val = parm[0]->data()->getIntVal(row, isNull);
return (uint32_t)((val >> 38) & 0x3f);
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -73,7 +73,7 @@ int64_t Func_dayname::getIntVal(rowgroup::Row& row,
day = (uint32_t)((val >> 38) & 0x3f);
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);
@@ -160,8 +160,10 @@ string Func_dayname::getStrVal(rowgroup::Row& row,
CalpontSystemCatalog::ColType& op_ct)
{
int32_t weekday = getIntVal(row, parm, isNull, op_ct);
if (weekday == -1)
return "";
return helpers::weekdayFullNames[weekday];
}

View File

@@ -71,7 +71,7 @@ int64_t Func_dayofweek::getIntVal(rowgroup::Row& row,
day = (uint32_t)((val >> 38) & 0x3f);
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -71,7 +71,7 @@ int64_t Func_dayofyear::getIntVal(rowgroup::Row& row,
day = (uint32_t)((val >> 38) & 0x3f);
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -72,7 +72,7 @@ int64_t Func_last_day::getIntVal(rowgroup::Row& row,
day = (uint32_t)((val >> 38) & 0x3f);
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -61,7 +61,7 @@ int64_t Func_month::getIntVal(rowgroup::Row& row,
val = parm[0]->data()->getIntVal(row, isNull);
return (unsigned)((val >> 44) & 0xf);
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -48,8 +48,10 @@ string Func_monthname::getStrVal(rowgroup::Row& row,
CalpontSystemCatalog::ColType& op_ct)
{
int32_t month = getIntVal(row, parm, isNull, op_ct);
if (month == -1)
return "";
return helpers::monthFullNames[month];
}
@@ -90,7 +92,7 @@ int64_t Func_monthname::getIntVal(rowgroup::Row& row,
val = parm[0]->data()->getIntVal(row, isNull);
return (unsigned)((val >> 44) & 0xf);
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -65,7 +65,7 @@ int64_t Func_quarter::getIntVal(rowgroup::Row& row,
month = (val >> 44) & 0xf;
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -85,7 +85,7 @@ int64_t Func_to_days::getIntVal(rowgroup::Row& row,
break;
}
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
{
int64_t val;

View File

@@ -75,7 +75,7 @@ int64_t Func_week::getIntVal(rowgroup::Row& row,
day = (uint32_t)((val >> 38) & 0x3f);
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -71,7 +71,7 @@ int64_t Func_weekday::getIntVal(rowgroup::Row& row,
day = (uint32_t)((val >> 38) & 0x3f);
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -61,7 +61,7 @@ int64_t Func_year::getIntVal(rowgroup::Row& row,
val = parm[0]->data()->getIntVal(row, isNull);
return (unsigned)((val >> 48) & 0xffff);
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -78,7 +78,7 @@ int64_t Func_yearweek::getIntVal(rowgroup::Row& row,
day = (uint32_t)((val >> 38) & 0x3f);
break;
// Time adds to now() and then gets value
// Time adds to now() and then gets value
case CalpontSystemCatalog::TIME:
aDateTime = static_cast<DateTime>(nowDatetime());
aTime = parm[0]->data()->getTimeIntVal(row, isNull);

View File

@@ -120,12 +120,17 @@ int LibMySQL::run(const char* query)
void LibMySQL::handleMySqlError(const char* errStr, unsigned int errCode)
{
ostringstream oss;
oss << errStr << "(" << errCode << ")";
if (errCode == (unsigned int) - 1)
oss << "(null pointer)";
if (getErrno())
{
oss << errStr << " (" << getErrno() << ")";
oss << " (" << getErrorMsg() << ")";
}
else
oss << "(" << errCode << ")";
{
oss << errStr << " (" << errCode << ")";
oss << " (unknown)";
}
throw logging::IDBExcept(oss.str(), logging::ERR_CROSS_ENGINE_CONNECT);

View File

@@ -71,6 +71,14 @@ public:
{
return fErrStr;
}
unsigned int getErrno()
{
return mysql_errno(fCon);
}
const char* getErrorMsg()
{
return mysql_error(fCon);
}
private:
MYSQL* fCon;

View File

@@ -2002,7 +2002,7 @@ void RowAggregation::doStatistics(const Row& rowIn, int64_t colIn, int64_t colOu
fRow.setLongDoubleField(fRow.getLongDoubleField(colAux + 1) + valIn * valIn, colAux + 1);
}
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
int64_t colAux, uint64_t& funcColsIdx)
{
uint32_t paramCount = fRGContext.getParameterCount();
@@ -2012,6 +2012,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
ConstantColumn* cc;
bool bIsNull = false;
execplan::CalpontSystemCatalog::ColDataType colDataType;
for (uint32_t i = 0; i < paramCount; ++i)
{
// If UDAF_IGNORE_NULLS is on, bIsNull gets set the first time
@@ -2022,6 +2023,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
++funcColsIdx;
continue;
}
SP_ROWAGG_FUNC_t pFunctionCol = fFunctionCols[funcColsIdx];
mcsv1sdk::ColumnDatum& datum = valsIn[i];
// Turn on NULL flags
@@ -2030,13 +2032,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
// If this particular parameter is a constant, then we need
// to acces the constant value rather than a row value.
cc = NULL;
if (pFunctionCol->fpConstCol)
{
cc = dynamic_cast<ConstantColumn*>(pFunctionCol->fpConstCol.get());
}
if ((cc && cc->type() == ConstantColumn::NULLDATA)
|| (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
|| (!cc && isNull(&fRowGroupIn, rowIn, colIn) == true))
{
if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))
{
@@ -2044,6 +2047,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
++funcColsIdx;
continue;
}
dataFlags[i] |= mcsv1sdk::PARAM_IS_NULL;
}
@@ -2055,6 +2059,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
colDataType = fRowGroupIn.getColTypes()[colIn];
}
if (!(dataFlags[i] & mcsv1sdk::PARAM_IS_NULL))
{
switch (colDataType)
@@ -2066,6 +2071,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
case execplan::CalpontSystemCatalog::BIGINT:
{
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
if (cc)
{
datum.columnData = cc->getIntVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2078,12 +2084,15 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
datum.scale = fRowGroupIn.getScale()[colIn];
datum.precision = fRowGroupIn.getPrecision()[colIn];
}
break;
}
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
datum.dataType = colDataType;
if (cc)
{
datum.columnData = cc->getDecimalVal(const_cast<Row&>(rowIn), bIsNull).value;
@@ -2096,6 +2105,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
datum.scale = fRowGroupIn.getScale()[colIn];
datum.precision = fRowGroupIn.getPrecision()[colIn];
}
break;
}
@@ -2106,6 +2116,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
case execplan::CalpontSystemCatalog::UBIGINT:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
if (cc)
{
datum.columnData = cc->getUintVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2114,6 +2125,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
datum.columnData = rowIn.getUintField(colIn);
}
break;
}
@@ -2121,6 +2133,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
case execplan::CalpontSystemCatalog::UDOUBLE:
{
datum.dataType = execplan::CalpontSystemCatalog::DOUBLE;
if (cc)
{
datum.columnData = cc->getDoubleVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2129,6 +2142,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
datum.columnData = rowIn.getDoubleField(colIn);
}
break;
}
@@ -2136,6 +2150,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
case execplan::CalpontSystemCatalog::UFLOAT:
{
datum.dataType = execplan::CalpontSystemCatalog::FLOAT;
if (cc)
{
datum.columnData = cc->getFloatVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2144,12 +2159,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
datum.columnData = rowIn.getFloatField(colIn);
}
break;
}
case execplan::CalpontSystemCatalog::DATE:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
if (cc)
{
datum.columnData = cc->getDateIntVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2158,11 +2175,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
datum.columnData = rowIn.getUintField(colIn);
}
break;
}
case execplan::CalpontSystemCatalog::DATETIME:
{
datum.dataType = execplan::CalpontSystemCatalog::UBIGINT;
if (cc)
{
datum.columnData = cc->getDatetimeIntVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2171,12 +2191,14 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
datum.columnData = rowIn.getUintField(colIn);
}
break;
}
case execplan::CalpontSystemCatalog::TIME:
{
datum.dataType = execplan::CalpontSystemCatalog::BIGINT;
if (cc)
{
datum.columnData = cc->getTimeIntVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2185,6 +2207,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
datum.columnData = rowIn.getIntField(colIn);
}
break;
}
@@ -2196,6 +2219,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
case execplan::CalpontSystemCatalog::BLOB:
{
datum.dataType = colDataType;
if (cc)
{
datum.columnData = cc->getStrVal(const_cast<Row&>(rowIn), bIsNull);
@@ -2204,6 +2228,7 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
{
datum.columnData = rowIn.getStringField(colIn);
}
break;
}
@@ -2221,8 +2246,8 @@ void RowAggregation::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
// MCOL-1201: If there are multiple parameters, the next fFunctionCols
// will have the column used. By incrementing the funcColsIdx (passed by
// ref, we also increment the caller's index.
if (fFunctionCols.size() > funcColsIdx + 1
&& fFunctionCols[funcColsIdx+1]->fAggFunction == ROWAGG_MULTI_PARM)
if (fFunctionCols.size() > funcColsIdx + 1
&& fFunctionCols[funcColsIdx + 1]->fAggFunction == ROWAGG_MULTI_PARM)
{
++funcColsIdx;
SP_ROWAGG_FUNC_t pFunctionCol = fFunctionCols[funcColsIdx];
@@ -2718,6 +2743,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
std::string strOut;
bool bSetSuccess = false;
switch (colDataType)
{
case execplan::CalpontSystemCatalog::BIT:
@@ -2732,10 +2758,12 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
intOut = valOut.cast<signed char>();
bSetSuccess = true;
}
if (bSetSuccess)
{
fRow.setIntField<1>(intOut, colOut);
}
break;
case execplan::CalpontSystemCatalog::SMALLINT:
@@ -2746,6 +2774,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setIntField<2>(intOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::INT:
@@ -2759,10 +2788,12 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
intOut = valOut.cast<long>();
bSetSuccess = true;
}
if (bSetSuccess)
{
fRow.setIntField<4>(intOut, colOut);
}
break;
case execplan::CalpontSystemCatalog::BIGINT:
@@ -2774,6 +2805,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setIntField<8>(intOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::UTINYINT:
@@ -2783,6 +2815,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setUintField<1>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::USMALLINT:
@@ -2793,6 +2826,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setUintField<2>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::UINT:
@@ -2802,6 +2836,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setUintField<4>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::UBIGINT:
@@ -2811,6 +2846,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setUintField<8>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::DATE:
@@ -2821,6 +2857,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setUintField<8>(uintOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::FLOAT:
@@ -2831,6 +2868,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setFloatField(floatOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::DOUBLE:
@@ -2841,6 +2879,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setDoubleField(doubleOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::CHAR:
@@ -2852,6 +2891,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setStringField(strOut, colOut);
bSetSuccess = true;
}
break;
case execplan::CalpontSystemCatalog::VARBINARY:
@@ -2863,6 +2903,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
fRow.setVarBinaryField(strOut, colOut);
bSetSuccess = true;
}
break;
default:
@@ -2873,6 +2914,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
break;
}
}
if (!bSetSuccess)
{
SetUDAFAnyValue(valOut, colOut);
@@ -3404,14 +3446,17 @@ void RowAggregationUM::doNullConstantAggregate(const ConstantAggData& aggData, u
fRGContext.setInterrupted(true);
throw logging::QueryDataExcept(fRGContext.getErrorMessage(), logging::aggregateFuncErr);
}
#if 0
uint32_t dataFlags[fRGContext.getParameterCount()];
for (uint32_t i = 0; i < fRGContext.getParameterCount(); ++i)
{
mcsv1sdk::ColumnDatum& datum = valsIn[i];
// Turn on NULL flags
dataFlags[i] = 0;
}
#endif
// Turn the NULL and CONSTANT flags on.
uint32_t flags[1];
@@ -4278,19 +4323,20 @@ void RowAggregationUMP2::doBitOp(const Row& rowIn, int64_t colIn, int64_t colOut
// colAux(in) - Where the UDAF userdata resides
// rowUDAF(in) - pointer to the RowUDAFFunctionCol for this UDAF instance
//------------------------------------------------------------------------------
void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
void RowAggregationUMP2::doUDAF(const Row& rowIn, int64_t colIn, int64_t colOut,
int64_t colAux, uint64_t& funcColsIdx)
{
static_any::any valOut;
// Get the user data
boost::shared_ptr<mcsv1sdk::UserData> userDataIn = rowIn.getUserData(colIn+1);
boost::shared_ptr<mcsv1sdk::UserData> userDataIn = rowIn.getUserData(colIn + 1);
// Unlike other aggregates, the data isn't in colIn, so testing it for NULL
// there won't help. In case of NULL, userData will be NULL.
uint32_t flags[1];
flags[0] = 0;
if (!userDataIn)
{
if (fRGContext.getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS))

View File

@@ -228,9 +228,9 @@ struct RowUDAFFunctionCol : public RowAggFunctionCol
inputColIndex, outputColIndex, auxColIndex),
bInterrupted(false)
{}
RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs) :
RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, rhs.fInputColumnIndex,
rhs.fOutputColumnIndex, rhs.fAuxColumnIndex),
RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs) :
RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, rhs.fInputColumnIndex,
rhs.fOutputColumnIndex, rhs.fAuxColumnIndex),
fUDAFContext(rhs.fUDAFContext),
bInterrupted(false)
{}
@@ -249,6 +249,7 @@ inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const
bs << (uint8_t)fAggFunction;
bs << fInputColumnIndex;
bs << fOutputColumnIndex;
if (fpConstCol)
{
bs << (uint8_t)1;
@@ -258,7 +259,7 @@ inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const
{
bs << (uint8_t)0;
}
}
inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs)
@@ -268,6 +269,7 @@ inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs)
bs >> fOutputColumnIndex;
uint8_t t;
bs >> t;
if (t)
{
fpConstCol.reset(new ConstantColumn);

View File

@@ -33,6 +33,8 @@ using namespace logging;
#include "prioritythreadpool.h"
using namespace boost;
#include "dbcon/joblist/primitivemsg.h"
namespace threadpool
{
@@ -51,9 +53,9 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads
cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads
<< " low.\n";
threadCounts[HIGH] = highThreads;
threadCounts[MEDIUM] = midThreads;
threadCounts[LOW] = lowThreads;
defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads;
defaultThreadCounts[MEDIUM] = threadCounts[MEDIUM] = midThreads;
defaultThreadCounts[LOW] = threadCounts[LOW] = lowThreads;
}
PriorityThreadPool::~PriorityThreadPool()
@@ -68,6 +70,25 @@ void PriorityThreadPool::addJob(const Job& job, bool useLock)
if (useLock)
lk.lock();
// Create any missing threads
if (defaultThreadCounts[HIGH] != threadCounts[HIGH])
{
threads.create_thread(ThreadHelper(this, HIGH));
threadCounts[HIGH]++;
}
if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM])
{
threads.create_thread(ThreadHelper(this, MEDIUM));
threadCounts[MEDIUM]++;
}
if (defaultThreadCounts[LOW] != threadCounts[LOW])
{
threads.create_thread(ThreadHelper(this, LOW));
threadCounts[LOW]++;
}
if (job.priority > 66)
jobQueues[HIGH].push_back(job);
else if (job.priority > 33)
@@ -113,80 +134,148 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
vector<bool> reschedule;
uint32_t rescheduleCount;
uint32_t queueSize;
bool running = false;
while (!_stop)
try
{
mutex::scoped_lock lk(mutex);
queue = pickAQueue(preferredQueue);
if (jobQueues[queue].empty())
while (!_stop)
{
newJob.wait(lk);
continue;
}
queueSize = jobQueues[queue].size();
weight = 0;
// 3 conditions stop this thread from grabbing all jobs in the queue
//
// 1: The weight limit has been exceeded
// 2: The queue is empty
// 3: It has grabbed more than half of the jobs available &
// should leave some to the other threads
mutex::scoped_lock lk(mutex);
while ((weight < weightPerRun) && (!jobQueues[queue].empty())
&& (runList.size() <= queueSize / 2))
{
runList.push_back(jobQueues[queue].front());
jobQueues[queue].pop_front();
weight += runList.back().weight;
}
queue = pickAQueue(preferredQueue);
lk.unlock();
if (jobQueues[queue].empty())
{
newJob.wait(lk);
continue;
}
reschedule.resize(runList.size());
rescheduleCount = 0;
queueSize = jobQueues[queue].size();
weight = 0;
// 3 conditions stop this thread from grabbing all jobs in the queue
//
// 1: The weight limit has been exceeded
// 2: The queue is empty
// 3: It has grabbed more than half of the jobs available &
// should leave some to the other threads
for (i = 0; i < runList.size() && !_stop; i++)
{
try
while ((weight < weightPerRun) && (!jobQueues[queue].empty())
&& (runList.size() <= queueSize / 2))
{
runList.push_back(jobQueues[queue].front());
jobQueues[queue].pop_front();
weight += runList.back().weight;
}
lk.unlock();
reschedule.resize(runList.size());
rescheduleCount = 0;
for (i = 0; i < runList.size() && !_stop; i++)
{
reschedule[i] = false;
running = true;
reschedule[i] = (*(runList[i].functor))();
running = false;
if (reschedule[i])
rescheduleCount++;
}
catch (std::exception& e)
// no real work was done, prevent intensive busy waiting
if (rescheduleCount == runList.size())
usleep(1000);
if (rescheduleCount > 0)
{
cerr << e.what() << endl;
lk.lock();
for (i = 0; i < runList.size(); i++)
if (reschedule[i])
addJob(runList[i], false);
if (rescheduleCount > 1)
newJob.notify_all();
else
newJob.notify_one();
lk.unlock();
}
runList.clear();
}
// no real work was done, prevent intensive busy waiting
if (rescheduleCount == runList.size())
usleep(1000);
if (rescheduleCount > 0)
{
lk.lock();
for (i = 0; i < runList.size(); i++)
if (reschedule[i])
addJob(runList[i], false);
if (rescheduleCount > 1)
newJob.notify_all();
else
newJob.notify_one();
lk.unlock();
}
runList.clear();
}
catch (std::exception& ex)
{
// Log the exception and exit this thread
try
{
threadCounts[queue]--;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("threadFcn: Caught exception: ");
args.add(ex.what());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
}
catch (...)
{
}
}
catch (...)
{
// Log the exception and exit this thread
try
{
threadCounts[queue]--;
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(6);
args.add("threadFcn: Caught unknown exception!");
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
}
catch (...)
{
}
}
}
void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0};
ism.Status = logging::primitiveServerErr;
ph.UniqueID = id;
ph.StepID = step;
ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*) &ism, sizeof(ism));
msg.append((uint8_t*) &ph, sizeof(ph));
sock->write(msg);
}
void PriorityThreadPool::stop()

View File

@@ -36,6 +36,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include "../winport/winport.h"
#include "primitives/primproc/umsocketselector.h"
namespace threadpool
{
@@ -62,6 +63,9 @@ public:
uint32_t weight;
uint32_t priority;
uint32_t id;
uint32_t uniqueID;
uint32_t stepID;
primitiveprocessor::SP_UM_IOSOCK sock;
};
enum Priority
@@ -112,9 +116,11 @@ private:
Priority pickAQueue(Priority preference);
void threadFcn(const Priority preferredQueue) throw();
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
std::list<Job> jobQueues[3]; // higher indexes = higher priority
uint32_t threadCounts[3];
uint32_t defaultThreadCounts[3];
boost::mutex mutex;
boost::condition newJob;
boost::thread_group threads;

View File

@@ -161,9 +161,11 @@ mcsv1_UDAF::ReturnCode avgx::subEvaluate(mcsv1Context* context, const UserData*
}
struct avgx_data* outData = (struct avgx_data*)context->getUserData()->data;
struct avgx_data* inData = (struct avgx_data*)userDataIn->data;
outData->sum += inData->sum;
outData->cnt += inData->cnt;
return mcsv1_UDAF::SUCCESS;

View File

@@ -120,7 +120,7 @@ bool mcsv1Context::operator==(const mcsv1Context& c) const
// We don't test the per row data fields. They don't determine
// if it's the same Context.
if (getName() != c.getName()
||fRunFlags != c.fRunFlags
|| fRunFlags != c.fRunFlags
|| fContextFlags != c.fContextFlags
|| fUserDataSize != c.fUserDataSize
|| fResultType != c.fResultType

View File

@@ -82,6 +82,7 @@ mcsv1_UDAF::ReturnCode regr_avgx::nextValue(mcsv1Context* context, ColumnDatum*
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
if (valIn_x.empty() || valIn_y.empty()) // Usually empty if NULL. Probably redundant
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
@@ -162,9 +163,11 @@ mcsv1_UDAF::ReturnCode regr_avgx::subEvaluate(mcsv1Context* context, const UserD
}
struct regr_avgx_data* outData = (struct regr_avgx_data*)context->getUserData()->data;
struct regr_avgx_data* inData = (struct regr_avgx_data*)userDataIn->data;
outData->sum += inData->sum;
outData->cnt += inData->cnt;
return mcsv1_UDAF::SUCCESS;
@@ -182,6 +185,7 @@ mcsv1_UDAF::ReturnCode regr_avgx::evaluate(mcsv1Context* context, static_any::an
{
valOut = data->sum / (double)data->cnt;
}
return mcsv1_UDAF::SUCCESS;
}

View File

@@ -498,159 +498,163 @@ extern "C"
*/
struct regr_avgx_data
{
double sumx;
int64_t cnt;
double sumx;
int64_t cnt;
};
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
my_bool regr_avgx_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct regr_avgx_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_avgx() requires two arguments");
return 1;
}
struct regr_avgx_data* data;
if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
}
data->sumx = 0;
if (args->arg_count != 2)
{
strcpy(message, "regr_avgx() requires two arguments");
return 1;
}
if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data))))
{
strmov(message, "Couldn't allocate memory");
return 1;
}
data->sumx = 0;
data->cnt = 0;
initid->ptr = (char*)data;
return 0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void regr_avgx_deinit(UDF_INIT* initid)
{
free(initid->ptr);
}
free(initid->ptr);
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void
regr_avgx_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
char* message __attribute__((unused)))
{
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
data->sumx = 0;
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
data->sumx = 0;
data->cnt = 0;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void
regr_avgx_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
char* is_null,
char* message __attribute__((unused)))
{
// TODO test for NULL in x and y
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
double xval = cvtArgToDouble(args->arg_type[1], args->args[0]);
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
double xval = cvtArgToDouble(args->arg_type[1], args->args[0]);
++data->cnt;
data->sumx += xval;
data->sumx += xval;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
long long regr_avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
char* is_null, char* error __attribute__((unused)))
{
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
return data->sumx / data->cnt;
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
return data->sumx / data->cnt;
}
//=======================================================================
/**
* avgx connector stub. Exactly the same functionality as the
* built in avg() function. Use to test the performance of the
* API
* avgx connector stub. Exactly the same functionality as the
* built in avg() function. Use to test the performance of the
* API
*/
struct avgx_data
{
double sumx;
int64_t cnt;
double sumx;
int64_t cnt;
};
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
my_bool avgx_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct avgx_data* data;
if (args->arg_count != 1)
{
strcpy(message,"avgx() requires one argument");
return 1;
}
struct avgx_data* data;
if (!(data = (struct avgx_data*) malloc(sizeof(struct avgx_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
}
data->sumx = 0;
if (args->arg_count != 1)
{
strcpy(message, "avgx() requires one argument");
return 1;
}
if (!(data = (struct avgx_data*) malloc(sizeof(struct avgx_data))))
{
strmov(message, "Couldn't allocate memory");
return 1;
}
data->sumx = 0;
data->cnt = 0;
initid->ptr = (char*)data;
return 0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void avgx_deinit(UDF_INIT* initid)
{
free(initid->ptr);
}
free(initid->ptr);
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void
avgx_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
char* message __attribute__((unused)))
{
struct avgx_data* data = (struct avgx_data*)initid->ptr;
data->sumx = 0;
struct avgx_data* data = (struct avgx_data*)initid->ptr;
data->sumx = 0;
data->cnt = 0;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
void
avgx_add(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
char* is_null,
char* message __attribute__((unused)))
{
// TODO test for NULL in x and y
struct avgx_data* data = (struct avgx_data*)initid->ptr;
double xval = cvtArgToDouble(args->arg_type[1], args->args[0]);
struct avgx_data* data = (struct avgx_data*)initid->ptr;
double xval = cvtArgToDouble(args->arg_type[1], args->args[0]);
++data->cnt;
data->sumx += xval;
data->sumx += xval;
}
#ifdef _MSC_VER
#ifdef _MSC_VER
__declspec(dllexport)
#endif
#endif
long long avgx(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
char* is_null, char* error __attribute__((unused)))
{
struct avgx_data* data = (struct avgx_data*)initid->ptr;
return data->sumx / data->cnt;
struct avgx_data* data = (struct avgx_data*)initid->ptr;
return data->sumx / data->cnt;
}
}
// vim:ts=4 sw=4:

View File

@@ -152,7 +152,7 @@ void WF_udaf<T>::parseParms(const std::vector<execplan::SRCP>& parms)
{
bRespectNulls = true;
// The last parms: respect null | ignore null
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(parms[parms.size()-1].get());
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(parms[parms.size() - 1].get());
idbassert(cc != NULL);
bool isNull = false; // dummy, harded coded
bRespectNulls = (cc->getIntVal(fRow, isNull) > 0);
@@ -175,9 +175,10 @@ bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
// Put the parameter metadata (type, scale, precision) into valsIn
mcsv1sdk::ColumnDatum valsIn[getContext().getParameterCount()];
for (uint32_t i = 0; i < getContext().getParameterCount(); ++i)
{
uint64_t colIn = fFieldIndex[i+1];
uint64_t colIn = fFieldIndex[i + 1];
mcsv1sdk::ColumnDatum& datum = valsIn[i];
datum.dataType = fRow.getColType(colIn);
datum.scale = fRow.getScale(colIn);
@@ -196,9 +197,10 @@ bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
for (uint32_t k = 0; k < getContext().getParameterCount(); ++k)
{
uint64_t colIn = fFieldIndex[k+1];
uint64_t colIn = fFieldIndex[k + 1];
mcsv1sdk::ColumnDatum& datum = valsIn[k];
flags[k] = 0;
if (fRow.isNullValue(colIn) == true)
{
if (!bRespectNulls)
@@ -228,6 +230,7 @@ bool WF_udaf<T>::dropValues(int64_t b, int64_t e)
datum.columnData = valIn;
}
if (bHasNull)
{
continue;
@@ -452,6 +455,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
uint64_t colOut = fFieldIndex[0];
bool isNull = false;
if ((fFrameUnit == WF__FRAME_ROWS) ||
(fPrev == -1) ||
(!fPeer->operator()(getPointer(fRowData->at(c)), getPointer(fRowData->at(fPrev)))))
@@ -469,10 +473,12 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
// Put the parameter metadata (type, scale, precision) into valsIn
mcsv1sdk::ColumnDatum valsIn[getContext().getParameterCount()];
ConstantColumn* cc = NULL;
for (uint32_t i = 0; i < getContext().getParameterCount(); ++i)
{
mcsv1sdk::ColumnDatum& datum = valsIn[i];
cc = static_cast<ConstantColumn*>(fConstantParms[i].get());
if (cc)
{
datum.dataType = cc->resultType().colDataType;
@@ -481,7 +487,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
}
else
{
uint64_t colIn = fFieldIndex[i+1];
uint64_t colIn = fFieldIndex[i + 1];
datum.dataType = fRow.getColType(colIn);
datum.scale = fRow.getScale(colIn);
datum.precision = fRow.getPrecision(colIn);
@@ -494,6 +500,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
getContext().clearContextFlag(mcsv1sdk::CONTEXT_HAS_CURRENT_ROW);
bool bHasNull = false;
for (int64_t i = b; i <= e; i++)
{
if (i % 1000 == 0 && fStep->cancelled())
@@ -504,16 +511,18 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
// NULL flags
uint32_t flags[getContext().getParameterCount()];
bHasNull = false;
for (uint32_t k = 0; k < getContext().getParameterCount(); ++k)
{
cc = static_cast<ConstantColumn*>(fConstantParms[k].get());
uint64_t colIn = fFieldIndex[k+1];
uint64_t colIn = fFieldIndex[k + 1];
mcsv1sdk::ColumnDatum& datum = valsIn[k];
// Turn on Null flags or skip based on respect nulls
flags[k] = 0;
if ((!cc && fRow.isNullValue(colIn) == true)
|| (cc && cc->type() == ConstantColumn::NULLDATA))
|| (cc && cc->type() == ConstantColumn::NULLDATA))
{
if (!bRespectNulls)
{
@@ -535,6 +544,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
case CalpontSystemCatalog::BIGINT:
{
int64_t valIn;
if (cc)
{
valIn = cc->getIntVal(fRow, isNull);
@@ -543,6 +553,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
@@ -555,6 +566,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
if (fDistinct)
fDistinctSet.insert(valIn);
}
datum.columnData = valIn;
break;
}
@@ -563,6 +575,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
case CalpontSystemCatalog::UDECIMAL:
{
int64_t valIn;
if (cc)
{
valIn = cc->getDecimalVal(fRow, isNull).value;
@@ -571,6 +584,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
@@ -583,6 +597,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
if (fDistinct)
fDistinctSet.insert(valIn);
}
datum.columnData = valIn;
break;
}
@@ -594,6 +609,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
case CalpontSystemCatalog::UBIGINT:
{
uint64_t valIn;
if (cc)
{
valIn = cc->getUintVal(fRow, isNull);
@@ -602,6 +618,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
@@ -614,6 +631,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
if (fDistinct)
fDistinctSet.insert(valIn);
}
datum.columnData = valIn;
break;
}
@@ -622,6 +640,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
case CalpontSystemCatalog::UDOUBLE:
{
double valIn;
if (cc)
{
valIn = cc->getDoubleVal(fRow, isNull);
@@ -630,6 +649,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
@@ -642,6 +662,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
if (fDistinct)
fDistinctSet.insert(valIn);
}
datum.columnData = valIn;
break;
}
@@ -650,6 +671,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
case CalpontSystemCatalog::UFLOAT:
{
float valIn;
if (cc)
{
valIn = cc->getFloatVal(fRow, isNull);
@@ -658,6 +680,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
@@ -670,6 +693,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
if (fDistinct)
fDistinctSet.insert(valIn);
}
datum.columnData = valIn;
break;
}
@@ -681,6 +705,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
case CalpontSystemCatalog::BLOB:
{
string valIn;
if (cc)
{
valIn = cc->getStrVal(fRow, isNull);
@@ -689,6 +714,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
{
getValue(colIn, valIn);
}
// Check for distinct, if turned on.
// Currently, distinct only works on the first parameter.
if (k == 0)
@@ -701,6 +727,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
if (fDistinct)
fDistinctSet.insert(valIn);
}
datum.columnData = valIn;
break;
}
@@ -717,13 +744,15 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
}
}
}
// Skip if any value is NULL and respect nulls is off.
if (bHasNull)
{
continue;
}
getContext().setDataFlags(flags);
rc = getContext().getFunction()->nextValue(&getContext(), valsIn);
if (rc == mcsv1sdk::mcsv1_UDAF::ERROR)

View File

@@ -93,7 +93,7 @@ protected:
bool bRespectNulls; // respect null | ignore null
bool bHasDropValue; // Set to false when we discover the UDAnF doesn't implement dropValue.
// To hold distinct values
std::tr1::unordered_set<static_any::any, DistinctHasher, DistinctEqual> fDistinctSet;
std::tr1::unordered_set<static_any::any, DistinctHasher, DistinctEqual> fDistinctSet;
static_any::any fValOut; // The return value
public:

View File

@@ -645,6 +645,7 @@ void WindowFunctionType::constParms(const std::vector<SRCP>& functionParms)
for (size_t i = 0; i < functionParms.size(); ++i)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(functionParms[i].get());
if (cc)
{
fConstantParms.push_back(functionParms[i]);

View File

@@ -199,7 +199,7 @@ public:
}
void constParms(const std::vector<SRCP>& functionParms);
static boost::shared_ptr<WindowFunctionType> makeWindowFunction(const std::string&, int ct, WindowFunctionColumn* wc);
protected: