You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
fix(aggregation): MCOL-5467 Add support for duplicate expressions in group by. (#3052)
This patch adds support for duplicate expressions (builtin_functions) with one argument in select statement and group by statement.
This commit is contained in:
@@ -93,6 +93,20 @@ struct TupleInfo
|
|||||||
uint32_t csNum; // For collations
|
uint32_t csNum; // For collations
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// This struct holds information about `FunctionColumn`.
|
||||||
|
struct FunctionColumnInfo
|
||||||
|
{
|
||||||
|
// Function argument.
|
||||||
|
uint64_t associatedColumnOid;
|
||||||
|
// Function name.
|
||||||
|
std::string functionName;
|
||||||
|
|
||||||
|
FunctionColumnInfo(uint64_t colOid, std::string funcName)
|
||||||
|
: associatedColumnOid(colOid), functionName(funcName)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// for compound join
|
// for compound join
|
||||||
struct JoinData
|
struct JoinData
|
||||||
{
|
{
|
||||||
@@ -383,6 +397,8 @@ struct JobInfo
|
|||||||
std::map<std::pair<uint32_t, uint32_t>, int64_t> joinEdgesToRestore;
|
std::map<std::pair<uint32_t, uint32_t>, int64_t> joinEdgesToRestore;
|
||||||
// Represents a pair of `table` to be on a large side and weight associated with that table.
|
// Represents a pair of `table` to be on a large side and weight associated with that table.
|
||||||
std::unordered_map<uint32_t, int64_t> tablesForLargeSide;
|
std::unordered_map<uint32_t, int64_t> tablesForLargeSide;
|
||||||
|
// Represents a pair of `tupleId` and `FunctionColumnInfo`.
|
||||||
|
std::unordered_map<uint32_t, FunctionColumnInfo> functionColumnMap;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// defaults okay
|
// defaults okay
|
||||||
|
@@ -172,7 +172,7 @@ void projectSimpleColumn(const SimpleColumn* sc, JobStepVector& jsv, JobInfo& jo
|
|||||||
// This is a double-step step
|
// This is a double-step step
|
||||||
// if (jobInfo.trace)
|
// if (jobInfo.trace)
|
||||||
// cout << "doProject Emit pGetSignature for SimpleColumn " << dictOid <<
|
// cout << "doProject Emit pGetSignature for SimpleColumn " << dictOid <<
|
||||||
//endl;
|
// endl;
|
||||||
|
|
||||||
pds = new pDictionaryStep(dictOid, tbl_oid, ct, jobInfo);
|
pds = new pDictionaryStep(dictOid, tbl_oid, ct, jobInfo);
|
||||||
jobInfo.keyInfo->dictOidToColOid[dictOid] = oid;
|
jobInfo.keyInfo->dictOidToColOid[dictOid] = oid;
|
||||||
@@ -557,9 +557,9 @@ void checkGroupByCols(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
|
|||||||
if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
|
if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
|
||||||
{
|
{
|
||||||
if (csep->withRollup())
|
if (csep->withRollup())
|
||||||
{
|
{
|
||||||
throw runtime_error("constant GROUP BY columns are not supported when WITH ROLLUP is used");
|
throw runtime_error("constant GROUP BY columns are not supported when WITH ROLLUP is used");
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -927,7 +927,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
{
|
{
|
||||||
if (jobInfo.hasRollup)
|
if (jobInfo.hasRollup)
|
||||||
{
|
{
|
||||||
throw runtime_error("GROUP_CONCAT and JSONARRAYAGG aggregations are not supported when WITH ROLLUP modifier is used");
|
throw runtime_error(
|
||||||
|
"GROUP_CONCAT and JSONARRAYAGG aggregations are not supported when WITH ROLLUP modifier is "
|
||||||
|
"used");
|
||||||
}
|
}
|
||||||
jobInfo.groupConcatCols.push_back(retCols[i]);
|
jobInfo.groupConcatCols.push_back(retCols[i]);
|
||||||
|
|
||||||
@@ -1246,6 +1248,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
const WindowFunctionColumn* wc = NULL;
|
const WindowFunctionColumn* wc = NULL;
|
||||||
bool hasAggCols = false;
|
bool hasAggCols = false;
|
||||||
bool hasWndCols = false;
|
bool hasWndCols = false;
|
||||||
|
bool hasFuncColsWithOneArgument = false;
|
||||||
|
|
||||||
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
|
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
|
||||||
{
|
{
|
||||||
@@ -1263,6 +1266,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
hasAggCols = true;
|
hasAggCols = true;
|
||||||
if (fc->windowfunctionColumnList().size() > 0)
|
if (fc->windowfunctionColumnList().size() > 0)
|
||||||
hasWndCols = true;
|
hasWndCols = true;
|
||||||
|
// MCOL-5476 Currently support function with only one argument for group by list.
|
||||||
|
if (fc->simpleColumnList().size() == 1)
|
||||||
|
hasFuncColsWithOneArgument = true;
|
||||||
}
|
}
|
||||||
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
|
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
|
||||||
{
|
{
|
||||||
@@ -1291,6 +1297,13 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
{
|
{
|
||||||
jobInfo.expressionVec.push_back(tupleKey);
|
jobInfo.expressionVec.push_back(tupleKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (hasFuncColsWithOneArgument)
|
||||||
|
{
|
||||||
|
FunctionColumnInfo fcInfo(fcInfo.associatedColumnOid = fc->simpleColumnList().front()->oid(),
|
||||||
|
fc->functionName());
|
||||||
|
jobInfo.functionColumnMap.insert({tupleKey, fcInfo});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to project list
|
// add to project list
|
||||||
@@ -1704,7 +1717,7 @@ void parseExecutionPlan(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobS
|
|||||||
}
|
}
|
||||||
|
|
||||||
// special case, select without a table, like: select 1;
|
// special case, select without a table, like: select 1;
|
||||||
if (jobInfo.constantCol == CONST_COL_ONLY) // XXX: WITH ROLLUP
|
if (jobInfo.constantCol == CONST_COL_ONLY) // XXX: WITH ROLLUP
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// If there are no filters (select * from table;) then add one simple scan
|
// If there are no filters (select * from table;) then add one simple scan
|
||||||
|
@@ -75,67 +75,9 @@ using namespace querytele;
|
|||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
struct cmpTuple
|
|
||||||
{
|
|
||||||
bool operator()(boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*> a,
|
|
||||||
boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*> b) const
|
|
||||||
{
|
|
||||||
uint32_t keya = boost::get<0>(a);
|
|
||||||
uint32_t keyb = boost::get<0>(b);
|
|
||||||
int opa;
|
|
||||||
int opb;
|
|
||||||
mcsv1sdk::mcsv1_UDAF* pUDAFa;
|
|
||||||
mcsv1sdk::mcsv1_UDAF* pUDAFb;
|
|
||||||
|
|
||||||
// If key is less than
|
|
||||||
if (keya < keyb)
|
|
||||||
return true;
|
|
||||||
if (keya == keyb)
|
|
||||||
{
|
|
||||||
// test Op
|
|
||||||
opa = boost::get<1>(a);
|
|
||||||
opb = boost::get<1>(b);
|
|
||||||
if (opa < opb)
|
|
||||||
return true;
|
|
||||||
if (opa == opb)
|
|
||||||
{
|
|
||||||
// look at the UDAF object
|
|
||||||
pUDAFa = boost::get<2>(a);
|
|
||||||
pUDAFb = boost::get<2>(b);
|
|
||||||
if (pUDAFa < pUDAFb)
|
|
||||||
return true;
|
|
||||||
if (pUDAFa == pUDAFb)
|
|
||||||
{
|
|
||||||
std::vector<uint32_t>* paramKeysa = boost::get<3>(a);
|
|
||||||
std::vector<uint32_t>* paramKeysb = boost::get<3>(b);
|
|
||||||
if (paramKeysa == NULL || paramKeysb == NULL)
|
|
||||||
return false;
|
|
||||||
if (paramKeysa->size() < paramKeysb->size())
|
|
||||||
return true;
|
|
||||||
if (paramKeysa->size() == paramKeysb->size())
|
|
||||||
{
|
|
||||||
for (uint64_t i = 0; i < paramKeysa->size(); ++i)
|
|
||||||
{
|
|
||||||
if ((*paramKeysa)[i] < (*paramKeysb)[i])
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef vector<std::pair<Row::Pointer, uint64_t>> RowBucket;
|
typedef vector<std::pair<Row::Pointer, uint64_t>> RowBucket;
|
||||||
typedef vector<RowBucket> RowBucketVec;
|
typedef vector<RowBucket> RowBucketVec;
|
||||||
|
|
||||||
// The AGG_MAP type is used to maintain a list of aggregate functions in order to
|
|
||||||
// detect duplicates. Since all UDAF have the same op type (ROWAGG_UDAF), we add in
|
|
||||||
// the function pointer in order to ensure uniqueness.
|
|
||||||
typedef map<boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*>, uint64_t, cmpTuple>
|
|
||||||
AGG_MAP;
|
|
||||||
|
|
||||||
inline RowAggFunctionType functionIdMap(int planFuncId)
|
inline RowAggFunctionType functionIdMap(int planFuncId)
|
||||||
{
|
{
|
||||||
switch (planFuncId)
|
switch (planFuncId)
|
||||||
@@ -1190,13 +1132,31 @@ void TupleAggregateStep::prep1PhaseAggregate(JobInfo& jobInfo, vector<RowGroup>&
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Message::Args args;
|
uint32_t foundTupleKey{0};
|
||||||
args.add(keyName(i, key, jobInfo));
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, groupbyMap, key, foundTupleKey))
|
||||||
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
{
|
||||||
cerr << "prep1PhaseAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
oidsAgg.push_back(oidsProj[colProj]);
|
||||||
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable
|
keysAgg.push_back(key);
|
||||||
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView << ", function=" << (int)aggOp << endl;
|
scaleAgg.push_back(scaleProj[colProj]);
|
||||||
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
precisionAgg.push_back(precisionProj[colProj]);
|
||||||
|
typeAgg.push_back(typeProj[colProj]);
|
||||||
|
csNumAgg.push_back(csNumProj[colProj]);
|
||||||
|
widthAgg.push_back(width[colProj]);
|
||||||
|
// Update key.
|
||||||
|
key = foundTupleKey;
|
||||||
|
++outIdx;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Message::Args args;
|
||||||
|
args.add(keyName(i, key, jobInfo));
|
||||||
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
||||||
|
cerr << "prep1PhaseAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[key].fId
|
||||||
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[key].fTable
|
||||||
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[key].fView << ", function=" << (int)aggOp << endl;
|
||||||
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2245,81 +2205,108 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
|
|||||||
// not a direct hit -- a returned column is not already in the RG from PMs
|
// not a direct hit -- a returned column is not already in the RG from PMs
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
bool returnColMissing = true;
|
uint32_t foundTupleKey{0};
|
||||||
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
|
||||||
// check if a SUM or COUNT covered by AVG
|
|
||||||
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
|
||||||
{
|
{
|
||||||
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
|
||||||
udafc ? udafc->getContext().getParamKeys() : NULL));
|
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
||||||
|
colAgg = it->second;
|
||||||
if (it != aggFuncMap.end())
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
||||||
|
keysAggDist.push_back(keysAgg[colAgg]);
|
||||||
|
scaleAggDist.push_back(scaleAgg[colAgg]);
|
||||||
|
precisionAggDist.push_back(precisionAgg[colAgg]);
|
||||||
|
typeAggDist.push_back(typeAgg[colAgg]);
|
||||||
|
csNumAggDist.push_back(csNumAgg[colAgg]);
|
||||||
|
uint32_t width = widthAgg[colAgg];
|
||||||
|
if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
||||||
{
|
{
|
||||||
// false alarm
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
returnColMissing = false;
|
|
||||||
|
|
||||||
colAgg = it->second;
|
if (ti.width > width)
|
||||||
|
width = ti.width;
|
||||||
|
}
|
||||||
|
widthAggDist.push_back(width);
|
||||||
|
|
||||||
if (aggOp == ROWAGG_SUM)
|
// Update the `retKey` to specify that this column is a duplicate.
|
||||||
|
retKey = foundTupleKey;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bool returnColMissing = true;
|
||||||
|
|
||||||
|
// check if a SUM or COUNT covered by AVG
|
||||||
|
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
||||||
|
{
|
||||||
|
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
||||||
|
udafc ? udafc->getContext().getParamKeys() : NULL));
|
||||||
|
|
||||||
|
if (it != aggFuncMap.end())
|
||||||
{
|
{
|
||||||
oidsAggDist.push_back(oidsAgg[colAgg]);
|
// false alarm
|
||||||
keysAggDist.push_back(retKey);
|
returnColMissing = false;
|
||||||
csNumAggDist.push_back(8);
|
|
||||||
wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg,
|
|
||||||
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// leave the count() to avg
|
|
||||||
aggOp = ROWAGG_COUNT_NO_OP;
|
|
||||||
|
|
||||||
oidsAggDist.push_back(oidsAgg[colAgg]);
|
colAgg = it->second;
|
||||||
keysAggDist.push_back(retKey);
|
|
||||||
scaleAggDist.push_back(0);
|
|
||||||
|
|
||||||
if (isUnsigned(typeAgg[colAgg]))
|
if (aggOp == ROWAGG_SUM)
|
||||||
{
|
{
|
||||||
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
||||||
precisionAggDist.push_back(20);
|
keysAggDist.push_back(retKey);
|
||||||
|
csNumAggDist.push_back(8);
|
||||||
|
wideDecimalOrLongDouble(colAgg, typeAgg[colAgg], precisionAgg, scaleAgg, widthAgg,
|
||||||
|
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
// leave the count() to avg
|
||||||
precisionAggDist.push_back(19);
|
aggOp = ROWAGG_COUNT_NO_OP;
|
||||||
|
|
||||||
|
oidsAggDist.push_back(oidsAgg[colAgg]);
|
||||||
|
keysAggDist.push_back(retKey);
|
||||||
|
scaleAggDist.push_back(0);
|
||||||
|
|
||||||
|
if (isUnsigned(typeAgg[colAgg]))
|
||||||
|
{
|
||||||
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
|
precisionAggDist.push_back(20);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
||||||
|
precisionAggDist.push_back(19);
|
||||||
|
}
|
||||||
|
csNumAggDist.push_back(8);
|
||||||
|
widthAggDist.push_back(bigIntWidth);
|
||||||
}
|
}
|
||||||
csNumAggDist.push_back(8);
|
|
||||||
widthAggDist.push_back(bigIntWidth);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
||||||
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
jobInfo.expressionVec.end())
|
||||||
jobInfo.expressionVec.end())
|
{
|
||||||
{
|
// a function on aggregation
|
||||||
// a function on aggregation
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
oidsAggDist.push_back(ti.oid);
|
||||||
oidsAggDist.push_back(ti.oid);
|
keysAggDist.push_back(retKey);
|
||||||
keysAggDist.push_back(retKey);
|
scaleAggDist.push_back(ti.scale);
|
||||||
scaleAggDist.push_back(ti.scale);
|
precisionAggDist.push_back(ti.precision);
|
||||||
precisionAggDist.push_back(ti.precision);
|
typeAggDist.push_back(ti.dtype);
|
||||||
typeAggDist.push_back(ti.dtype);
|
csNumAggDist.push_back(ti.csNum);
|
||||||
csNumAggDist.push_back(ti.csNum);
|
widthAggDist.push_back(ti.width);
|
||||||
widthAggDist.push_back(ti.width);
|
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
else if (aggOp == ROWAGG_CONSTANT)
|
else if (aggOp == ROWAGG_CONSTANT)
|
||||||
{
|
{
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
oidsAggDist.push_back(ti.oid);
|
oidsAggDist.push_back(ti.oid);
|
||||||
keysAggDist.push_back(retKey);
|
keysAggDist.push_back(retKey);
|
||||||
scaleAggDist.push_back(ti.scale);
|
scaleAggDist.push_back(ti.scale);
|
||||||
precisionAggDist.push_back(ti.precision);
|
precisionAggDist.push_back(ti.precision);
|
||||||
typeAggDist.push_back(ti.dtype);
|
typeAggDist.push_back(ti.dtype);
|
||||||
csNumAggDist.push_back(ti.csNum);
|
csNumAggDist.push_back(ti.csNum);
|
||||||
widthAggDist.push_back(ti.width);
|
widthAggDist.push_back(ti.width);
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
else if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
else if (aggOp == ROWAGG_GROUP_CONCAT || aggOp == ROWAGG_JSON_ARRAY)
|
||||||
@@ -2337,63 +2324,64 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(JobInfo& jobInfo, vector<Ro
|
|||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
else if (jobInfo.groupConcatInfo.columns().find(retKey) !=
|
else if (jobInfo.groupConcatInfo.columns().find(retKey) !=
|
||||||
jobInfo.groupConcatInfo.columns().end())
|
jobInfo.groupConcatInfo.columns().end())
|
||||||
{
|
|
||||||
// TODO: columns only for group_concat do not needed in result set.
|
|
||||||
for (uint64_t k = 0; k < keysProj.size(); k++)
|
|
||||||
{
|
{
|
||||||
if (retKey == keysProj[k])
|
// TODO: columns only for group_concat do not needed in result set.
|
||||||
|
for (uint64_t k = 0; k < keysProj.size(); k++)
|
||||||
{
|
{
|
||||||
oidsAggDist.push_back(oidsProj[k]);
|
if (retKey == keysProj[k])
|
||||||
keysAggDist.push_back(retKey);
|
{
|
||||||
scaleAggDist.push_back(scaleProj[k] >> 8);
|
oidsAggDist.push_back(oidsProj[k]);
|
||||||
precisionAggDist.push_back(precisionProj[k]);
|
keysAggDist.push_back(retKey);
|
||||||
typeAggDist.push_back(typeProj[k]);
|
scaleAggDist.push_back(scaleProj[k] >> 8);
|
||||||
csNumAggDist.push_back(csNumProj[k]);
|
precisionAggDist.push_back(precisionProj[k]);
|
||||||
widthAggDist.push_back(widthProj[k]);
|
typeAggDist.push_back(typeProj[k]);
|
||||||
|
csNumAggDist.push_back(csNumProj[k]);
|
||||||
|
widthAggDist.push_back(widthProj[k]);
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
||||||
{
|
|
||||||
// skip window columns/expression, which are computed later
|
|
||||||
for (uint64_t k = 0; k < keysProj.size(); k++)
|
|
||||||
{
|
{
|
||||||
if (retKey == keysProj[k])
|
// skip window columns/expression, which are computed later
|
||||||
|
for (uint64_t k = 0; k < keysProj.size(); k++)
|
||||||
{
|
{
|
||||||
oidsAggDist.push_back(oidsProj[k]);
|
if (retKey == keysProj[k])
|
||||||
keysAggDist.push_back(retKey);
|
{
|
||||||
scaleAggDist.push_back(scaleProj[k] >> 8);
|
oidsAggDist.push_back(oidsProj[k]);
|
||||||
precisionAggDist.push_back(precisionProj[k]);
|
keysAggDist.push_back(retKey);
|
||||||
typeAggDist.push_back(typeProj[k]);
|
scaleAggDist.push_back(scaleProj[k] >> 8);
|
||||||
csNumAggDist.push_back(csNumProj[k]);
|
precisionAggDist.push_back(precisionProj[k]);
|
||||||
widthAggDist.push_back(widthProj[k]);
|
typeAggDist.push_back(typeProj[k]);
|
||||||
|
csNumAggDist.push_back(csNumProj[k]);
|
||||||
|
widthAggDist.push_back(widthProj[k]);
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (returnColMissing)
|
if (returnColMissing)
|
||||||
{
|
{
|
||||||
Message::Args args;
|
Message::Args args;
|
||||||
args.add(keyName(outIdx, retKey, jobInfo));
|
args.add(keyName(outIdx, retKey, jobInfo));
|
||||||
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
||||||
cerr << "prep1PhaseDistinctAggregate: " << emsg
|
cerr << "prep1PhaseDistinctAggregate: " << emsg
|
||||||
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
||||||
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
||||||
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
||||||
<< endl;
|
<< endl;
|
||||||
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
||||||
}
|
}
|
||||||
} // else
|
} // else
|
||||||
} // switch
|
} // switch
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// update groupby vector if the groupby column is a returned column
|
// update groupby vector if the groupby column is a returned column
|
||||||
@@ -3179,14 +3167,14 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>
|
|||||||
colAggPm++;
|
colAggPm++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// PM: put the count column for avg next to the sum
|
// PM: put the count column for avg next to the sum
|
||||||
// let fall through to add a count column for average function
|
// let fall through to add a count column for average function
|
||||||
if (aggOp != ROWAGG_AVG)
|
if (aggOp != ROWAGG_AVG)
|
||||||
break;
|
break;
|
||||||
// The AVG aggregation has a special treatment everywhere.
|
// The AVG aggregation has a special treatment everywhere.
|
||||||
// This is so because AVG(column) is a SUM(column)/COUNT(column)
|
// This is so because AVG(column) is a SUM(column)/COUNT(column)
|
||||||
// and these aggregations can be utilized by AVG, if present.
|
// and these aggregations can be utilized by AVG, if present.
|
||||||
/* fall through */
|
/* fall through */
|
||||||
|
|
||||||
case ROWAGG_COUNT_ASTERISK:
|
case ROWAGG_COUNT_ASTERISK:
|
||||||
case ROWAGG_COUNT_COL_NAME:
|
case ROWAGG_COUNT_COL_NAME:
|
||||||
@@ -3439,99 +3427,120 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>
|
|||||||
// not a direct hit -- a returned column is not already in the RG from PMs
|
// not a direct hit -- a returned column is not already in the RG from PMs
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
bool returnColMissing = true;
|
// MCOL-5476.
|
||||||
|
uint32_t foundTupleKey{0};
|
||||||
// check if a SUM or COUNT covered by AVG
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
|
||||||
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
|
||||||
{
|
{
|
||||||
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
|
||||||
udafc ? udafc->getContext().getParamKeys() : NULL));
|
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
||||||
|
colPm = it->second;
|
||||||
|
oidsAggUm.push_back(oidsAggPm[colPm]);
|
||||||
|
keysAggUm.push_back(retKey);
|
||||||
|
scaleAggUm.push_back(scaleAggPm[colPm]);
|
||||||
|
precisionAggUm.push_back(precisionAggPm[colPm]);
|
||||||
|
typeAggUm.push_back(typeAggPm[colPm]);
|
||||||
|
csNumAggUm.push_back(csNumAggPm[colPm]);
|
||||||
|
widthAggUm.push_back(widthAggPm[colPm]);
|
||||||
|
// Update the `retKey` to specify that this column is a duplicate.
|
||||||
|
retKey = foundTupleKey;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bool returnColMissing = true;
|
||||||
|
|
||||||
if (it != aggFuncMap.end())
|
// check if a SUM or COUNT covered by AVG
|
||||||
|
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
||||||
{
|
{
|
||||||
// false alarm
|
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
||||||
returnColMissing = false;
|
udafc ? udafc->getContext().getParamKeys() : NULL));
|
||||||
|
|
||||||
colPm = it->second;
|
if (it != aggFuncMap.end())
|
||||||
|
|
||||||
if (aggOp == ROWAGG_SUM)
|
|
||||||
{
|
{
|
||||||
wideDecimalOrLongDouble(colPm, typeAggPm[colPm], precisionAggPm, scaleAggPm, widthAggPm,
|
// false alarm
|
||||||
typeAggUm, scaleAggUm, precisionAggUm, widthAggUm);
|
returnColMissing = false;
|
||||||
|
|
||||||
oidsAggUm.push_back(oidsAggPm[colPm]);
|
colPm = it->second;
|
||||||
keysAggUm.push_back(retKey);
|
|
||||||
csNumAggUm.push_back(8);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// leave the count() to avg
|
|
||||||
aggOp = ROWAGG_COUNT_NO_OP;
|
|
||||||
|
|
||||||
colPm++;
|
if (aggOp == ROWAGG_SUM)
|
||||||
oidsAggUm.push_back(oidsAggPm[colPm]);
|
{
|
||||||
keysAggUm.push_back(retKey);
|
wideDecimalOrLongDouble(colPm, typeAggPm[colPm], precisionAggPm, scaleAggPm, widthAggPm,
|
||||||
scaleAggUm.push_back(0);
|
typeAggUm, scaleAggUm, precisionAggUm, widthAggUm);
|
||||||
precisionAggUm.push_back(19);
|
|
||||||
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
|
oidsAggUm.push_back(oidsAggPm[colPm]);
|
||||||
csNumAggUm.push_back(8);
|
keysAggUm.push_back(retKey);
|
||||||
widthAggUm.push_back(bigIntWidth);
|
csNumAggUm.push_back(8);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// leave the count() to avg
|
||||||
|
aggOp = ROWAGG_COUNT_NO_OP;
|
||||||
|
|
||||||
|
colPm++;
|
||||||
|
oidsAggUm.push_back(oidsAggPm[colPm]);
|
||||||
|
keysAggUm.push_back(retKey);
|
||||||
|
scaleAggUm.push_back(0);
|
||||||
|
precisionAggUm.push_back(19);
|
||||||
|
typeAggUm.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
|
csNumAggUm.push_back(8);
|
||||||
|
widthAggUm.push_back(bigIntWidth);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
||||||
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
jobInfo.expressionVec.end())
|
||||||
jobInfo.expressionVec.end())
|
{
|
||||||
{
|
// a function on aggregation
|
||||||
// a function on aggregation
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
oidsAggUm.push_back(ti.oid);
|
||||||
oidsAggUm.push_back(ti.oid);
|
keysAggUm.push_back(retKey);
|
||||||
keysAggUm.push_back(retKey);
|
scaleAggUm.push_back(ti.scale);
|
||||||
scaleAggUm.push_back(ti.scale);
|
precisionAggUm.push_back(ti.precision);
|
||||||
precisionAggUm.push_back(ti.precision);
|
typeAggUm.push_back(ti.dtype);
|
||||||
typeAggUm.push_back(ti.dtype);
|
csNumAggUm.push_back(ti.csNum);
|
||||||
csNumAggUm.push_back(ti.csNum);
|
widthAggUm.push_back(ti.width);
|
||||||
widthAggUm.push_back(ti.width);
|
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
||||||
{
|
{
|
||||||
// an window function
|
// an window function
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
oidsAggUm.push_back(ti.oid);
|
oidsAggUm.push_back(ti.oid);
|
||||||
keysAggUm.push_back(retKey);
|
keysAggUm.push_back(retKey);
|
||||||
scaleAggUm.push_back(ti.scale);
|
scaleAggUm.push_back(ti.scale);
|
||||||
precisionAggUm.push_back(ti.precision);
|
precisionAggUm.push_back(ti.precision);
|
||||||
typeAggUm.push_back(ti.dtype);
|
typeAggUm.push_back(ti.dtype);
|
||||||
csNumAggUm.push_back(ti.csNum);
|
csNumAggUm.push_back(ti.csNum);
|
||||||
widthAggUm.push_back(ti.width);
|
widthAggUm.push_back(ti.width);
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
else if (aggOp == ROWAGG_CONSTANT)
|
else if (aggOp == ROWAGG_CONSTANT)
|
||||||
{
|
{
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
oidsAggUm.push_back(ti.oid);
|
oidsAggUm.push_back(ti.oid);
|
||||||
keysAggUm.push_back(retKey);
|
keysAggUm.push_back(retKey);
|
||||||
scaleAggUm.push_back(ti.scale);
|
scaleAggUm.push_back(ti.scale);
|
||||||
precisionAggUm.push_back(ti.precision);
|
precisionAggUm.push_back(ti.precision);
|
||||||
typeAggUm.push_back(ti.dtype);
|
typeAggUm.push_back(ti.dtype);
|
||||||
csNumAggUm.push_back(ti.csNum);
|
csNumAggUm.push_back(ti.csNum);
|
||||||
widthAggUm.push_back(ti.width);
|
widthAggUm.push_back(ti.width);
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (returnColMissing)
|
if (returnColMissing)
|
||||||
{
|
{
|
||||||
Message::Args args;
|
Message::Args args;
|
||||||
args.add(keyName(outIdx, retKey, jobInfo));
|
args.add(keyName(outIdx, retKey, jobInfo));
|
||||||
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
||||||
cerr << "prep2PhasesAggregate: " << emsg << " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
cerr << "prep2PhasesAggregate: " << emsg
|
||||||
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
||||||
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
||||||
<< endl;
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
||||||
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
<< endl;
|
||||||
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3713,7 +3722,8 @@ void TupleAggregateStep::prep2PhasesAggregate(JobInfo& jobInfo, vector<RowGroup>
|
|||||||
|
|
||||||
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
|
RowGroup aggRgUm(oidsAggUm.size(), posAggUm, oidsAggUm, keysAggUm, typeAggUm, csNumAggUm, scaleAggUm,
|
||||||
precisionAggUm, jobInfo.stringTableThreshold);
|
precisionAggUm, jobInfo.stringTableThreshold);
|
||||||
SP_ROWAGG_UM_t rowAggUm(new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit, false));
|
SP_ROWAGG_UM_t rowAggUm(
|
||||||
|
new RowAggregationUMP2(groupByUm, functionVecUm, jobInfo.rm, jobInfo.umMemLimit, false));
|
||||||
rowAggUm->timeZone(jobInfo.timeZone);
|
rowAggUm->timeZone(jobInfo.timeZone);
|
||||||
rowgroups.push_back(aggRgUm);
|
rowgroups.push_back(aggRgUm);
|
||||||
aggregators.push_back(rowAggUm);
|
aggregators.push_back(rowAggUm);
|
||||||
@@ -4505,109 +4515,130 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(JobInfo& jobInfo, vector<R
|
|||||||
// not a direct hit -- a returned column is not already in the RG from PMs
|
// not a direct hit -- a returned column is not already in the RG from PMs
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
bool returnColMissing = true;
|
// MCOL-5476.
|
||||||
|
uint32_t foundTupleKey{0};
|
||||||
// check if a SUM or COUNT covered by AVG
|
if (tryToFindEqualFunctionColumnByTupleKey(jobInfo, aggFuncMap, retKey, foundTupleKey))
|
||||||
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
|
||||||
{
|
{
|
||||||
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
AGG_MAP::iterator it = aggFuncMap.find(boost::make_tuple(
|
||||||
udafc ? udafc->getContext().getParamKeys() : NULL));
|
foundTupleKey, aggOp, pUDAFFunc, udafc ? udafc->getContext().getParamKeys() : NULL));
|
||||||
|
colUm = it->second;
|
||||||
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
||||||
|
keysAggDist.push_back(keysAggUm[colUm]);
|
||||||
|
scaleAggDist.push_back(scaleAggUm[colUm]);
|
||||||
|
precisionAggDist.push_back(precisionAggUm[colUm]);
|
||||||
|
typeAggDist.push_back(typeAggUm[colUm]);
|
||||||
|
csNumAggDist.push_back(csNumAggUm[colUm]);
|
||||||
|
widthAggDist.push_back(widthAggUm[colUm]);
|
||||||
|
// Update the `retKey` to specify that this column is a duplicate.
|
||||||
|
retKey = foundTupleKey;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// here
|
||||||
|
bool returnColMissing = true;
|
||||||
|
|
||||||
if (it != aggFuncMap.end())
|
// check if a SUM or COUNT covered by AVG
|
||||||
|
if (aggOp == ROWAGG_SUM || aggOp == ROWAGG_COUNT_COL_NAME)
|
||||||
{
|
{
|
||||||
// false alarm
|
it = aggFuncMap.find(boost::make_tuple(returnedColVec[i].first, ROWAGG_AVG, pUDAFFunc,
|
||||||
returnColMissing = false;
|
udafc ? udafc->getContext().getParamKeys() : NULL));
|
||||||
|
|
||||||
colUm = it->second;
|
if (it != aggFuncMap.end())
|
||||||
|
|
||||||
if (aggOp == ROWAGG_SUM)
|
|
||||||
{
|
{
|
||||||
oidsAggDist.push_back(oidsAggUm[colUm]);
|
// false alarm
|
||||||
keysAggDist.push_back(retKey);
|
returnColMissing = false;
|
||||||
csNumAggDist.push_back(8);
|
|
||||||
wideDecimalOrLongDouble(colUm, typeAggUm[colUm], precisionAggUm, scaleAggUm, widthAggUm,
|
|
||||||
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// leave the count() to avg
|
|
||||||
aggOp = ROWAGG_COUNT_NO_OP;
|
|
||||||
|
|
||||||
oidsAggDist.push_back(oidsAggUm[colUm]);
|
colUm = it->second;
|
||||||
keysAggDist.push_back(retKey);
|
|
||||||
scaleAggDist.push_back(0);
|
if (aggOp == ROWAGG_SUM)
|
||||||
if (isUnsigned(typeAggUm[colUm]))
|
|
||||||
{
|
{
|
||||||
precisionAggDist.push_back(20);
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
||||||
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
keysAggDist.push_back(retKey);
|
||||||
|
csNumAggDist.push_back(8);
|
||||||
|
wideDecimalOrLongDouble(colUm, typeAggUm[colUm], precisionAggUm, scaleAggUm, widthAggUm,
|
||||||
|
typeAggDist, scaleAggDist, precisionAggDist, widthAggDist);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
precisionAggDist.push_back(19);
|
// leave the count() to avg
|
||||||
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
aggOp = ROWAGG_COUNT_NO_OP;
|
||||||
|
|
||||||
|
oidsAggDist.push_back(oidsAggUm[colUm]);
|
||||||
|
keysAggDist.push_back(retKey);
|
||||||
|
scaleAggDist.push_back(0);
|
||||||
|
if (isUnsigned(typeAggUm[colUm]))
|
||||||
|
{
|
||||||
|
precisionAggDist.push_back(20);
|
||||||
|
typeAggDist.push_back(CalpontSystemCatalog::UBIGINT);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
precisionAggDist.push_back(19);
|
||||||
|
typeAggDist.push_back(CalpontSystemCatalog::BIGINT);
|
||||||
|
}
|
||||||
|
csNumAggDist.push_back(8);
|
||||||
|
widthAggDist.push_back(bigIntWidth);
|
||||||
}
|
}
|
||||||
csNumAggDist.push_back(8);
|
|
||||||
widthAggDist.push_back(bigIntWidth);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
||||||
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), retKey) !=
|
jobInfo.expressionVec.end())
|
||||||
jobInfo.expressionVec.end())
|
{
|
||||||
{
|
// a function on aggregation
|
||||||
// a function on aggregation
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
oidsAggDist.push_back(ti.oid);
|
||||||
oidsAggDist.push_back(ti.oid);
|
keysAggDist.push_back(retKey);
|
||||||
keysAggDist.push_back(retKey);
|
scaleAggDist.push_back(ti.scale);
|
||||||
scaleAggDist.push_back(ti.scale);
|
precisionAggDist.push_back(ti.precision);
|
||||||
precisionAggDist.push_back(ti.precision);
|
typeAggDist.push_back(ti.dtype);
|
||||||
typeAggDist.push_back(ti.dtype);
|
csNumAggDist.push_back(ti.csNum);
|
||||||
csNumAggDist.push_back(ti.csNum);
|
widthAggDist.push_back(ti.width);
|
||||||
widthAggDist.push_back(ti.width);
|
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
else if (jobInfo.windowSet.find(retKey) != jobInfo.windowSet.end())
|
||||||
{
|
{
|
||||||
// a window function
|
// a window function
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
oidsAggDist.push_back(ti.oid);
|
oidsAggDist.push_back(ti.oid);
|
||||||
keysAggDist.push_back(retKey);
|
keysAggDist.push_back(retKey);
|
||||||
scaleAggDist.push_back(ti.scale);
|
scaleAggDist.push_back(ti.scale);
|
||||||
precisionAggDist.push_back(ti.precision);
|
precisionAggDist.push_back(ti.precision);
|
||||||
typeAggDist.push_back(ti.dtype);
|
typeAggDist.push_back(ti.dtype);
|
||||||
csNumAggDist.push_back(ti.csNum);
|
csNumAggDist.push_back(ti.csNum);
|
||||||
widthAggDist.push_back(ti.width);
|
widthAggDist.push_back(ti.width);
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
else if (aggOp == ROWAGG_CONSTANT)
|
else if (aggOp == ROWAGG_CONSTANT)
|
||||||
{
|
{
|
||||||
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
TupleInfo ti = getTupleInfo(retKey, jobInfo);
|
||||||
oidsAggDist.push_back(ti.oid);
|
oidsAggDist.push_back(ti.oid);
|
||||||
keysAggDist.push_back(retKey);
|
keysAggDist.push_back(retKey);
|
||||||
scaleAggDist.push_back(ti.scale);
|
scaleAggDist.push_back(ti.scale);
|
||||||
precisionAggDist.push_back(ti.precision);
|
precisionAggDist.push_back(ti.precision);
|
||||||
typeAggDist.push_back(ti.dtype);
|
typeAggDist.push_back(ti.dtype);
|
||||||
csNumAggDist.push_back(ti.csNum);
|
csNumAggDist.push_back(ti.csNum);
|
||||||
widthAggDist.push_back(ti.width);
|
widthAggDist.push_back(ti.width);
|
||||||
|
|
||||||
returnColMissing = false;
|
returnColMissing = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (returnColMissing)
|
if (returnColMissing)
|
||||||
{
|
{
|
||||||
Message::Args args;
|
Message::Args args;
|
||||||
args.add(keyName(outIdx, retKey, jobInfo));
|
args.add(keyName(outIdx, retKey, jobInfo));
|
||||||
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
|
||||||
cerr << "prep2PhasesDistinctAggregate: " << emsg
|
cerr << "prep2PhasesDistinctAggregate: " << emsg
|
||||||
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
<< " oid=" << (int)jobInfo.keyInfo->tupleKeyVec[retKey].fId
|
||||||
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
<< ", alias=" << jobInfo.keyInfo->tupleKeyVec[retKey].fTable
|
||||||
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
<< ", view=" << jobInfo.keyInfo->tupleKeyVec[retKey].fView << ", function=" << (int)aggOp
|
||||||
<< endl;
|
<< endl;
|
||||||
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
throw IDBExcept(emsg, ERR_NOT_GROUPBY_EXPRESSION);
|
||||||
}
|
}
|
||||||
} // else not a direct hit
|
} // else not a direct hit
|
||||||
} // else not a DISTINCT
|
}
|
||||||
|
} // else not a DISTINCT
|
||||||
|
|
||||||
// update groupby vector if the groupby column is a returned column
|
// update groupby vector if the groupby column is a returned column
|
||||||
if (returnedColVec[i].second == 0)
|
if (returnedColVec[i].second == 0)
|
||||||
@@ -5942,4 +5973,44 @@ void TupleAggregateStep::formatMiniStats()
|
|||||||
fMiniInfo += oss.str();
|
fMiniInfo += oss.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t TupleAggregateStep::getTupleKeyFromTuple(
|
||||||
|
const boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*>& tuple)
|
||||||
|
{
|
||||||
|
return tuple.get<0>();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t TupleAggregateStep::getTupleKeyFromTuple(uint32_t key)
|
||||||
|
{
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class GroupByMap>
|
||||||
|
bool TupleAggregateStep::tryToFindEqualFunctionColumnByTupleKey(JobInfo& jobInfo, GroupByMap& groupByMap,
|
||||||
|
const uint32_t tupleKey, uint32_t& foundKey)
|
||||||
|
{
|
||||||
|
auto funcMapIt = jobInfo.functionColumnMap.find(tupleKey);
|
||||||
|
if (funcMapIt != jobInfo.functionColumnMap.end())
|
||||||
|
{
|
||||||
|
const auto& rFunctionInfo = funcMapIt->second;
|
||||||
|
// Try to match given `tupleKey` in `groupByMap`.
|
||||||
|
for (const auto& groupByMapPair : groupByMap)
|
||||||
|
{
|
||||||
|
const auto currentTupleKey = getTupleKeyFromTuple(groupByMapPair.first);
|
||||||
|
auto currentFuncMapIt = jobInfo.functionColumnMap.find(currentTupleKey);
|
||||||
|
// Skip if the keys are the same.
|
||||||
|
if (currentFuncMapIt != jobInfo.functionColumnMap.end() && currentTupleKey != tupleKey)
|
||||||
|
{
|
||||||
|
const auto& lFunctionInfo = currentFuncMapIt->second;
|
||||||
|
// Oid and function name should be the same.
|
||||||
|
if (lFunctionInfo.associatedColumnOid == rFunctionInfo.associatedColumnOid &&
|
||||||
|
lFunctionInfo.functionName == rFunctionInfo.functionName)
|
||||||
|
{
|
||||||
|
foundKey = currentTupleKey;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
} // namespace joblist
|
} // namespace joblist
|
||||||
|
@@ -32,6 +32,64 @@ namespace joblist
|
|||||||
// forward reference
|
// forward reference
|
||||||
struct JobInfo;
|
struct JobInfo;
|
||||||
|
|
||||||
|
struct cmpTuple
|
||||||
|
{
|
||||||
|
bool operator()(boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*> a,
|
||||||
|
boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*> b) const
|
||||||
|
{
|
||||||
|
uint32_t keya = boost::get<0>(a);
|
||||||
|
uint32_t keyb = boost::get<0>(b);
|
||||||
|
int opa;
|
||||||
|
int opb;
|
||||||
|
mcsv1sdk::mcsv1_UDAF* pUDAFa;
|
||||||
|
mcsv1sdk::mcsv1_UDAF* pUDAFb;
|
||||||
|
|
||||||
|
// If key is less than
|
||||||
|
if (keya < keyb)
|
||||||
|
return true;
|
||||||
|
if (keya == keyb)
|
||||||
|
{
|
||||||
|
// test Op
|
||||||
|
opa = boost::get<1>(a);
|
||||||
|
opb = boost::get<1>(b);
|
||||||
|
if (opa < opb)
|
||||||
|
return true;
|
||||||
|
if (opa == opb)
|
||||||
|
{
|
||||||
|
// look at the UDAF object
|
||||||
|
pUDAFa = boost::get<2>(a);
|
||||||
|
pUDAFb = boost::get<2>(b);
|
||||||
|
if (pUDAFa < pUDAFb)
|
||||||
|
return true;
|
||||||
|
if (pUDAFa == pUDAFb)
|
||||||
|
{
|
||||||
|
std::vector<uint32_t>* paramKeysa = boost::get<3>(a);
|
||||||
|
std::vector<uint32_t>* paramKeysb = boost::get<3>(b);
|
||||||
|
if (paramKeysa == NULL || paramKeysb == NULL)
|
||||||
|
return false;
|
||||||
|
if (paramKeysa->size() < paramKeysb->size())
|
||||||
|
return true;
|
||||||
|
if (paramKeysa->size() == paramKeysb->size())
|
||||||
|
{
|
||||||
|
for (uint64_t i = 0; i < paramKeysa->size(); ++i)
|
||||||
|
{
|
||||||
|
if ((*paramKeysa)[i] < (*paramKeysb)[i])
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// The AGG_MAP type is used to maintain a list of aggregate functions in order to
|
||||||
|
// detect duplicates. Since all UDAF have the same op type (ROWAGG_UDAF), we add in
|
||||||
|
// the function pointer in order to ensure uniqueness.
|
||||||
|
using AGG_MAP =
|
||||||
|
map<boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*>, uint64_t, cmpTuple>;
|
||||||
|
|
||||||
/** @brief class TupleAggregateStep
|
/** @brief class TupleAggregateStep
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@@ -105,6 +163,13 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep
|
|||||||
void pruneAuxColumns();
|
void pruneAuxColumns();
|
||||||
void formatMiniStats();
|
void formatMiniStats();
|
||||||
void printCalTrace();
|
void printCalTrace();
|
||||||
|
template <class GroupByMap>
|
||||||
|
static bool tryToFindEqualFunctionColumnByTupleKey(JobInfo& jobInfo, GroupByMap& groupByMap,
|
||||||
|
const uint32_t tupleKey, uint32_t& foundTypleKey);
|
||||||
|
// This functions are workaround for the function above. For some reason different parts of the code with same
|
||||||
|
// semantics use different containers.
|
||||||
|
static uint32_t getTupleKeyFromTuple(const boost::tuple<uint32_t, int, mcsv1sdk::mcsv1_UDAF*, std::vector<uint32_t>*>& tuple);
|
||||||
|
static uint32_t getTupleKeyFromTuple(uint32_t key);
|
||||||
|
|
||||||
boost::shared_ptr<execplan::CalpontSystemCatalog> fCatalog;
|
boost::shared_ptr<execplan::CalpontSystemCatalog> fCatalog;
|
||||||
uint64_t fRowsReturned;
|
uint64_t fRowsReturned;
|
||||||
@@ -226,4 +291,3 @@ class TupleAggregateStep : public JobStep, public TupleDeliveryStep
|
|||||||
};
|
};
|
||||||
|
|
||||||
} // namespace joblist
|
} // namespace joblist
|
||||||
|
|
||||||
|
71
mysql-test/columnstore/bugfixes/mcol-5476.result
Normal file
71
mysql-test/columnstore/bugfixes/mcol-5476.result
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
DROP DATABASE IF EXISTS `mcol-5476`;
|
||||||
|
CREATE DATABASE `mcol-5476`;
|
||||||
|
USE `mcol-5476`;
|
||||||
|
create table t1 (a int, b int) engine=columnstore;
|
||||||
|
insert into t1 values (1, 1), (2, 1), (3, 1), (4, 2), (5, 2);
|
||||||
|
select sum(a), abs(b), abs(b) from t1 group by abs(b), abs(b);
|
||||||
|
sum(a) abs(b) abs(b)
|
||||||
|
6 1 1
|
||||||
|
9 2 2
|
||||||
|
select sum(a), abs(b), abs(b) from t1 group by abs(b);
|
||||||
|
sum(a) abs(b) abs(b)
|
||||||
|
6 1 1
|
||||||
|
9 2 2
|
||||||
|
select sum(distinct a), abs(b), abs(b) from t1 group by abs(b), abs(b);
|
||||||
|
sum(distinct a) abs(b) abs(b)
|
||||||
|
6 1 1
|
||||||
|
9 2 2
|
||||||
|
select sum(distinct a), abs(b), abs(b) from t1 group by abs(b);
|
||||||
|
sum(distinct a) abs(b) abs(b)
|
||||||
|
6 1 1
|
||||||
|
9 2 2
|
||||||
|
create table t2 (a int, b int, c varchar(20)) engine=columnstore;
|
||||||
|
insert into t2 values (1, 1, "abc"), (2, 1, "abc"), (1, 2, "abcd"), (3, 2, "abcd");
|
||||||
|
select sum(a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c);
|
||||||
|
sum(a) abs(b) length(c) abs(b) length(c)
|
||||||
|
3 1 3 1 3
|
||||||
|
4 2 4 2 4
|
||||||
|
select sum(a), abs(b), abs(b), length(c), length(c) from t2 group by abs(b), length(c);
|
||||||
|
sum(a) abs(b) abs(b) length(c) length(c)
|
||||||
|
3 1 1 3 3
|
||||||
|
4 2 2 4 4
|
||||||
|
select sum(a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), abs(b), length(c), length(c);
|
||||||
|
sum(a) abs(b) length(c) abs(b) length(c)
|
||||||
|
3 1 3 1 3
|
||||||
|
4 2 4 2 4
|
||||||
|
select sum(a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c), length(c), abs(b);
|
||||||
|
sum(a) abs(b) length(c) abs(b) length(c)
|
||||||
|
3 1 3 1 3
|
||||||
|
4 2 4 2 4
|
||||||
|
select sum(distinct a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c) order by abs(b);
|
||||||
|
sum(distinct a) abs(b) length(c) abs(b) length(c)
|
||||||
|
3 1 3 1 3
|
||||||
|
4 2 4 2 4
|
||||||
|
select sum(distinct a), abs(b), abs(b), length(c), length(c) from t2 group by abs(b), length(c) order by abs(b);
|
||||||
|
sum(distinct a) abs(b) abs(b) length(c) length(c)
|
||||||
|
3 1 1 3 3
|
||||||
|
4 2 2 4 4
|
||||||
|
select sum(distinct a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), abs(b), length(c), length(c);
|
||||||
|
sum(distinct a) abs(b) length(c) abs(b) length(c)
|
||||||
|
3 1 3 1 3
|
||||||
|
4 2 4 2 4
|
||||||
|
select sum(distinct a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c), length(c), abs(b);
|
||||||
|
sum(distinct a) abs(b) length(c) abs(b) length(c)
|
||||||
|
3 1 3 1 3
|
||||||
|
4 2 4 2 4
|
||||||
|
select sum(distinct t1.a), abs(t2.b), abs(t2.b) from t1 join t2 on t1.a = t2.a group by abs(t2.b);
|
||||||
|
sum(distinct t1.a) abs(t2.b) abs(t2.b)
|
||||||
|
3 1 1
|
||||||
|
4 2 2
|
||||||
|
select sum(t1.a), abs(t2.b), abs(t2.b) from t1 join t2 on t1.a = t2.a group by abs(t2.b);
|
||||||
|
sum(t1.a) abs(t2.b) abs(t2.b)
|
||||||
|
3 1 1
|
||||||
|
4 2 2
|
||||||
|
create table t3 (a datetime, b int) engine=columnstore;
|
||||||
|
insert into t3 values ("2007-01-30 21:31:07", 1), ("2007-01-30 21:31:07", 3), ("2007-01-29 21:31:07", 1), ("2007-01-29 21:31:07", 2);
|
||||||
|
select distinct DAYOFWEEK(a) as C1, DAYOFWEEK(a) as C2, SUM(b) from t3 group by DAYOFWEEK(a), DAYOFWEEK(a);
|
||||||
|
C1 C2 SUM(b)
|
||||||
|
2 2 3
|
||||||
|
3 3 4
|
||||||
|
DROP TABLE t1, t2, t3;
|
||||||
|
DROP DATABASE `mcol-5476`;
|
59
mysql-test/columnstore/bugfixes/mcol-5476.test
Normal file
59
mysql-test/columnstore/bugfixes/mcol-5476.test
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
-- source ../include/have_columnstore.inc
|
||||||
|
|
||||||
|
--disable_warnings
|
||||||
|
DROP DATABASE IF EXISTS `mcol-5476`;
|
||||||
|
--enable_warnings
|
||||||
|
CREATE DATABASE `mcol-5476`;
|
||||||
|
USE `mcol-5476`;
|
||||||
|
|
||||||
|
create table t1 (a int, b int) engine=columnstore;
|
||||||
|
insert into t1 values (1, 1), (2, 1), (3, 1), (4, 2), (5, 2);
|
||||||
|
#prep2aggregate
|
||||||
|
sorted_result;
|
||||||
|
select sum(a), abs(b), abs(b) from t1 group by abs(b), abs(b);
|
||||||
|
sorted_result;
|
||||||
|
select sum(a), abs(b), abs(b) from t1 group by abs(b);
|
||||||
|
#prep2distinctaggregate
|
||||||
|
sorted_result;
|
||||||
|
select sum(distinct a), abs(b), abs(b) from t1 group by abs(b), abs(b);
|
||||||
|
sorted_result;
|
||||||
|
select sum(distinct a), abs(b), abs(b) from t1 group by abs(b);
|
||||||
|
|
||||||
|
|
||||||
|
create table t2 (a int, b int, c varchar(20)) engine=columnstore;
|
||||||
|
insert into t2 values (1, 1, "abc"), (2, 1, "abc"), (1, 2, "abcd"), (3, 2, "abcd");
|
||||||
|
#prep2aggregate
|
||||||
|
sorted_result;
|
||||||
|
select sum(a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c);
|
||||||
|
sorted_result;
|
||||||
|
select sum(a), abs(b), abs(b), length(c), length(c) from t2 group by abs(b), length(c);
|
||||||
|
sorted_result;
|
||||||
|
select sum(a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), abs(b), length(c), length(c);
|
||||||
|
sorted_result;
|
||||||
|
select sum(a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c), length(c), abs(b);
|
||||||
|
#prep2distinctaggregate
|
||||||
|
sorted_result;
|
||||||
|
select sum(distinct a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c) order by abs(b);
|
||||||
|
sorted_result;
|
||||||
|
select sum(distinct a), abs(b), abs(b), length(c), length(c) from t2 group by abs(b), length(c) order by abs(b);
|
||||||
|
sorted_result;
|
||||||
|
select sum(distinct a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), abs(b), length(c), length(c);
|
||||||
|
sorted_result;
|
||||||
|
select sum(distinct a), abs(b), length(c), abs(b), length(c) from t2 group by abs(b), length(c), length(c), abs(b);
|
||||||
|
|
||||||
|
#Joins
|
||||||
|
#prep1distinctaggregate
|
||||||
|
sorted_result;
|
||||||
|
select sum(distinct t1.a), abs(t2.b), abs(t2.b) from t1 join t2 on t1.a = t2.a group by abs(t2.b);
|
||||||
|
#prep1aggregate
|
||||||
|
sorted_result;
|
||||||
|
select sum(t1.a), abs(t2.b), abs(t2.b) from t1 join t2 on t1.a = t2.a group by abs(t2.b);
|
||||||
|
|
||||||
|
#User test case
|
||||||
|
create table t3 (a datetime, b int) engine=columnstore;
|
||||||
|
insert into t3 values ("2007-01-30 21:31:07", 1), ("2007-01-30 21:31:07", 3), ("2007-01-29 21:31:07", 1), ("2007-01-29 21:31:07", 2);
|
||||||
|
sorted_result;
|
||||||
|
select distinct DAYOFWEEK(a) as C1, DAYOFWEEK(a) as C2, SUM(b) from t3 group by DAYOFWEEK(a), DAYOFWEEK(a);
|
||||||
|
|
||||||
|
DROP TABLE t1, t2, t3;
|
||||||
|
DROP DATABASE `mcol-5476`;
|
Reference in New Issue
Block a user