1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

MCOL-1201 manual rebase with develop. Obsoletes branch MCOL-1201

This commit is contained in:
David Hall
2018-05-11 09:50:10 -05:00
parent e79daac8a9
commit 51df837b4e
30 changed files with 2255 additions and 1196 deletions

View File

@@ -98,36 +98,6 @@ AggregateColumn::AggregateColumn(const uint32_t sessionID):
{
}
AggregateColumn::AggregateColumn(const AggOp aggOp, ReturnedColumn* parm, const uint32_t sessionID):
ReturnedColumn(sessionID),
fAggOp(aggOp),
fAsc(false),
fData(aggOp + "(" + parm->data() + ")")
{
fFunctionParms.reset(parm);
}
AggregateColumn::AggregateColumn(const AggOp aggOp, const string& content, const uint32_t sessionID):
ReturnedColumn(sessionID),
fAggOp(aggOp),
fAsc(false),
fData(aggOp + "(" + content + ")")
{
// TODO: need to handle distinct
fFunctionParms.reset(new ArithmeticColumn(content));
}
// deprecated constructor. use function name as string
AggregateColumn::AggregateColumn(const std::string& functionName, ReturnedColumn* parm, const uint32_t sessionID):
ReturnedColumn(sessionID),
fFunctionName(functionName),
fAggOp(NOOP),
fAsc(false),
fData(functionName + "(" + parm->data() + ")")
{
fFunctionParms.reset(parm);
}
// deprecated constructor. use function name as string
AggregateColumn::AggregateColumn(const string& functionName, const string& content, const uint32_t sessionID):
ReturnedColumn(sessionID),
@@ -137,20 +107,21 @@ AggregateColumn::AggregateColumn(const string& functionName, const string& conte
fData(functionName + "(" + content + ")")
{
// TODO: need to handle distinct
fFunctionParms.reset(new ArithmeticColumn(content));
SRCP srcp(new ArithmeticColumn(content));
fAggParms.push_back(srcp);
}
AggregateColumn::AggregateColumn( const AggregateColumn& rhs, const uint32_t sessionID ):
ReturnedColumn(rhs, sessionID),
fFunctionName (rhs.fFunctionName),
fAggOp(rhs.fAggOp),
fFunctionParms(rhs.fFunctionParms),
fTableAlias(rhs.tableAlias()),
fAsc(rhs.asc()),
fData(rhs.data()),
fConstCol(rhs.fConstCol)
{
fAlias = rhs.alias();
fAggParms = rhs.fAggParms;
}
/**
@@ -166,10 +137,14 @@ const string AggregateColumn::toString() const
if (fAlias.length() > 0) output << "/Alias: " << fAlias << endl;
if (fFunctionParms == 0)
output << "No arguments" << endl;
if (fAggParms.size() == 0)
output << "No arguments";
else
output << *fFunctionParms << endl;
for (uint32_t i = 0; i < fAggParms.size(); ++i)
{
output << *(fAggParms[i]) << " ";
}
output << endl;
if (fConstCol)
output << *fConstCol;
@@ -191,10 +166,11 @@ void AggregateColumn::serialize(messageqcpp::ByteStream& b) const
b << fFunctionName;
b << static_cast<uint8_t>(fAggOp);
if (fFunctionParms == 0)
b << (uint8_t) ObjectReader::NULL_CLASS;
else
fFunctionParms->serialize(b);
b << static_cast<uint32_t>(fAggParms.size());
for (uint32_t i = 0; i < fAggParms.size(); ++i)
{
fAggParms[i]->serialize(b);
}
b << static_cast<uint32_t>(fGroupByColList.size());
@@ -219,20 +195,26 @@ void AggregateColumn::serialize(messageqcpp::ByteStream& b) const
void AggregateColumn::unserialize(messageqcpp::ByteStream& b)
{
ObjectReader::checkType(b, ObjectReader::AGGREGATECOLUMN);
fGroupByColList.erase(fGroupByColList.begin(), fGroupByColList.end());
fProjectColList.erase(fProjectColList.begin(), fProjectColList.end());
ReturnedColumn::unserialize(b);
b >> fFunctionName;
b >> fAggOp;
//delete fFunctionParms;
fFunctionParms.reset(
dynamic_cast<ReturnedColumn*>(ObjectReader::createTreeNode(b)));
messageqcpp::ByteStream::quadbyte size;
messageqcpp::ByteStream::quadbyte i;
ReturnedColumn* rc;
ObjectReader::checkType(b, ObjectReader::AGGREGATECOLUMN);
fGroupByColList.erase(fGroupByColList.begin(), fGroupByColList.end());
fProjectColList.erase(fProjectColList.begin(), fProjectColList.end());
fAggParms.erase(fAggParms.begin(), fAggParms.end());
ReturnedColumn::unserialize(b);
b >> fFunctionName;
b >> fAggOp;
b >> size;
for (i = 0; i < size; i++)
{
rc = dynamic_cast<ReturnedColumn*>(ObjectReader::createTreeNode(b));
SRCP srcp(rc);
fAggParms.push_back(srcp);
}
b >> size;
for (i = 0; i < size; i++)
@@ -261,6 +243,7 @@ void AggregateColumn::unserialize(messageqcpp::ByteStream& b)
bool AggregateColumn::operator==(const AggregateColumn& t) const
{
const ReturnedColumn* rc1, *rc2;
AggParms::const_iterator it, it2;
rc1 = static_cast<const ReturnedColumn*>(this);
rc2 = static_cast<const ReturnedColumn*>(&t);
@@ -277,16 +260,18 @@ bool AggregateColumn::operator==(const AggregateColumn& t) const
if (fAggOp != t.fAggOp)
return false;
if (fFunctionParms.get() != NULL && t.fFunctionParms.get() != NULL)
if (aggParms().size() != t.aggParms().size())
{
if (*fFunctionParms.get() != t.fFunctionParms.get())
return false;
}
for (it = fAggParms.begin(), it2 = t.fAggParms.begin();
it != fAggParms.end();
++it, ++it2)
{
if (**it != **it2)
return false;
}
else if (fFunctionParms.get() != NULL || t.fFunctionParms.get() != NULL)
return false;
//if (fAlias != t.fAlias)
// return false;
if (fTableAlias != t.fTableAlias)
return false;
@@ -645,3 +630,4 @@ AggregateColumn::AggOp AggregateColumn::agname2num(const string& agname)
}
} // namespace execplan

View File

@@ -40,6 +40,8 @@ class ByteStream;
namespace execplan
{
typedef std::vector<execplan::SRCP> AggParms;
/**
* @brief A class to represent a aggregate return column
*
@@ -74,7 +76,8 @@ public:
BIT_OR,
BIT_XOR,
GROUP_CONCAT,
UDAF
UDAF,
MULTI_PARM
};
/**
@@ -94,21 +97,6 @@ public:
*/
AggregateColumn(const uint32_t sessionID);
/**
* ctor
*/
AggregateColumn(const AggOp aggop, ReturnedColumn* parm, const uint32_t sessionID = 0);
/**
* ctor
*/
AggregateColumn(const AggOp aggop, const std::string& content, const uint32_t sessionID = 0);
/**
* ctor
*/
AggregateColumn(const std::string& functionName, ReturnedColumn* parm, const uint32_t sessionID = 0);
/**
* ctor
*/
@@ -155,24 +143,27 @@ public:
fAggOp = aggOp;
}
/** get function parms
*
* set the function parms from this object
*/
virtual const SRCP functionParms() const
virtual AggParms& aggParms()
{
return fFunctionParms;
return fAggParms;
}
virtual const AggParms& aggParms() const
{
return fAggParms;
}
/** set function parms
*
* set the function parms for this object
*/
virtual void functionParms(const SRCP& functionParms)
virtual void aggParms(const AggParms& parms)
{
fFunctionParms = functionParms;
fAggParms = parms;
}
/** return a copy of this pointer
*
* deep copy of this pointer and return the copy
@@ -325,9 +316,10 @@ protected:
uint8_t fAggOp;
/**
* A ReturnedColumn objects that are the arguments to this function
* ReturnedColumn objects that are the arguments to this
* function
*/
SRCP fFunctionParms;
AggParms fAggParms;
/** table alias
* A string to represent table alias name which contains this column

View File

@@ -56,6 +56,17 @@ using namespace rowgroup;
namespace joblist
{
ExpressionStep::ExpressionStep() :
fExpressionFilter(NULL),
fExpressionId(-1),
fVarBinOK(false),
fSelectFilter(false),
fAssociatedJoinId(0),
fDoJoin(false),
fVirtual(false)
{
}
ExpressionStep::ExpressionStep(const JobInfo& jobInfo) :
JobStep(jobInfo),
fExpressionFilter(NULL),
@@ -68,7 +79,6 @@ ExpressionStep::ExpressionStep(const JobInfo& jobInfo) :
{
}
ExpressionStep::ExpressionStep(const ExpressionStep& rhs) :
JobStep(rhs),
fExpression(rhs.expression()),

View File

@@ -50,6 +50,7 @@ class ExpressionStep : public JobStep
{
public:
// constructors
ExpressionStep();
ExpressionStep(const JobInfo&);
// destructor constructors
virtual ~ExpressionStep();

View File

@@ -78,7 +78,7 @@ void GroupConcatInfo::prepGroupConcat(JobInfo& jobInfo)
while (i != jobInfo.groupConcatCols.end())
{
GroupConcatColumn* gcc = dynamic_cast<GroupConcatColumn*>(i->get());
const RowColumn* rcp = dynamic_cast<const RowColumn*>(gcc->functionParms().get());
const RowColumn* rcp = dynamic_cast<const RowColumn*>(gcc->aggParms()[0].get());
SP_GroupConcat groupConcat(new GroupConcat);
groupConcat->fSeparator = gcc->separator();

View File

@@ -18,7 +18,6 @@
// $Id: joblistfactory.cpp 9632 2013-06-18 22:18:20Z xlou $
#include <iostream>
#include <stack>
#include <iterator>
@@ -870,7 +869,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
if (gcc != NULL)
{
srcp = gcc->functionParms();
srcp = gcc->aggParms()[0];
const RowColumn* rcp = dynamic_cast<const RowColumn*>(srcp.get());
const vector<SRCP>& cols = rcp->columnVec();
@@ -891,21 +890,55 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
continue;
}
#if 0
// MCOL-1201 Add support for multi-parameter UDAnF
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(retCols[i].get());
if (udafc != NULL)
{
srcp = udafc->aggParms()[0];
const RowColumn* rcp = dynamic_cast<const RowColumn*>(srcp.get());
const vector<SRCP>& cols = rcp->columnVec();
for (vector<SRCP>::const_iterator j = cols.begin(); j != cols.end(); j++)
{
srcp = *j;
if (dynamic_cast<const ConstantColumn*>(srcp.get()) == NULL)
retCols.push_back(srcp);
// Do we need this?
const ArithmeticColumn* ac = dynamic_cast<const ArithmeticColumn*>(srcp.get());
const FunctionColumn* fc = dynamic_cast<const FunctionColumn*>(srcp.get());
if (ac != NULL || fc != NULL)
{
// bug 3728, make a dummy expression step for each expression.
scoped_ptr<ExpressionStep> es(new ExpressionStep(jobInfo));
es->expression(srcp, jobInfo);
}
}
continue;
}
#endif
srcp = retCols[i];
const AggregateColumn* ag = dynamic_cast<const AggregateColumn*>(retCols[i].get());
if (ag != NULL)
srcp = ag->functionParms();
const ArithmeticColumn* ac = dynamic_cast<const ArithmeticColumn*>(srcp.get());
const FunctionColumn* fc = dynamic_cast<const FunctionColumn*>(srcp.get());
if (ac != NULL || fc != NULL)
// bug 3728 Make a dummy expression for srcp if it is an
// expression. This is needed to fill in some stuff.
// Note that es.expression does nothing if the item is not an expression.
if (ag == NULL)
{
// bug 3728, make a dummy expression step for each expression.
scoped_ptr<ExpressionStep> es(new ExpressionStep(jobInfo));
es->expression(srcp, jobInfo);
// Not an aggregate. Make a dummy expression for the item
ExpressionStep es;
es.expression(srcp, jobInfo);
}
else
{
// MCOL-1201 multi-argument aggregate. make a dummy expression
// step for each argument that is an expression.
for (uint32_t i = 0; i < ag->aggParms().size(); ++i)
{
srcp = ag->aggParms()[i];
ExpressionStep es;
es.expression(srcp, jobInfo);
}
}
}
@@ -915,17 +948,18 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
{
srcp = retCols[i];
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(srcp.get());
AggregateColumn* aggc = dynamic_cast<AggregateColumn*>(srcp.get());
bool doDistinct = (csep->distinct() && csep->groupByCols().empty());
uint32_t tupleKey = -1;
string alias;
string view;
// returned column could be groupby column, a simplecoulumn not a agregatecolumn
// returned column could be groupby column, a simplecoulumn not an aggregatecolumn
int op = 0;
CalpontSystemCatalog::OID dictOid = 0;
CalpontSystemCatalog::ColType ct, aggCt;
if (sc == NULL)
if (aggc)
{
GroupConcatColumn* gcc = dynamic_cast<GroupConcatColumn*>(retCols[i].get());
@@ -939,7 +973,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
tupleKey = ti.key;
jobInfo.returnedColVec.push_back(make_pair(tupleKey, gcc->aggOp()));
// not a tokenOnly column. Mark all the columns involved
srcp = gcc->functionParms();
srcp = gcc->aggParms()[0];
const RowColumn* rowCol = dynamic_cast<const RowColumn*>(srcp.get());
if (rowCol)
@@ -963,186 +997,353 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
continue;
}
AggregateColumn* ac = dynamic_cast<AggregateColumn*>(retCols[i].get());
if (ac != NULL)
else
{
srcp = ac->functionParms();
sc = dynamic_cast<const SimpleColumn*>(srcp.get());
// Aggregate column not group concat
AggParms& aggParms = aggc->aggParms();
if (ac->constCol().get() != NULL)
for (uint32_t parm = 0; parm < aggParms.size(); ++parm)
{
// replace the aggregate on constant with a count(*)
SRCP clone;
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
if (udafc)
if (aggc->constCol().get() != NULL)
{
clone.reset(new UDAFColumn(*udafc, ac->sessionID()));
// replace the aggregate on constant with a count(*)
SRCP clone;
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(aggc);
if (udafc)
{
clone.reset(new UDAFColumn(*udafc, aggc->sessionID()));
}
else
{
clone.reset(new AggregateColumn(*aggc, aggc->sessionID()));
}
jobInfo.constAggregate.insert(make_pair(i, clone));
aggc->aggOp(AggregateColumn::COUNT_ASTERISK);
aggc->distinct(false);
}
srcp = aggParms[parm];
sc = dynamic_cast<const SimpleColumn*>(srcp.get());
if (parm == 0)
{
op = aggc->aggOp();
}
else
{
clone.reset(new AggregateColumn(*ac, ac->sessionID()));
op = AggregateColumn::MULTI_PARM;
}
doDistinct = aggc->distinct();
if (aggParms.size() == 1)
{
// Set the col type based on the single parm.
// Changing col type based on a parm if multiple parms
// doesn't really make sense.
updateAggregateColType(aggc, srcp, op, jobInfo);
}
aggCt = aggc->resultType();
// As of bug3695, make sure varbinary is not used in aggregation.
// TODO: allow for UDAF
if (sc != NULL && sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY)
throw runtime_error ("VARBINARY in aggregate function is not supported.");
// Project the parm columns or expressions
if (sc != NULL)
{
CalpontSystemCatalog::OID retOid = sc->oid();
CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
alias = extractTableAlias(sc);
view = sc->viewName();
if (!sc->schemaName().empty())
{
ct = sc->colType();
//XXX use this before connector sets colType in sc correctly.
if (sc->isInfiniDB() && dynamic_cast<const PseudoColumn*>(sc) == NULL)
ct = jobInfo.csc->colType(sc->oid());
//X
dictOid = isDictCol(ct);
}
else
{
retOid = (tblOid + 1) + sc->colPosition();
ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "")];
}
TupleInfo ti(setTupleInfo(ct, retOid, jobInfo, tblOid, sc, alias));
tupleKey = ti.key;
// this is a string column
if (dictOid > 0)
{
map<uint32_t, bool>::iterator findit = jobInfo.tokenOnly.find(tupleKey);
// if the column has never seen, and the op is count: possible need count only.
if (AggregateColumn::COUNT == op || AggregateColumn::COUNT_ASTERISK == op)
{
if (findit == jobInfo.tokenOnly.end())
jobInfo.tokenOnly[tupleKey] = true;
}
// if aggregate other than count, token is not enough.
else if (op != 0 || doDistinct)
{
jobInfo.tokenOnly[tupleKey] = false;
}
findit = jobInfo.tokenOnly.find(tupleKey);
if (!(findit != jobInfo.tokenOnly.end() && findit->second == true))
{
dictMap[tupleKey] = dictOid;
jobInfo.keyInfo->dictOidToColOid[dictOid] = retOid;
ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias);
jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key;
}
}
}
else
{
const ArithmeticColumn* ac = NULL;
const FunctionColumn* fc = NULL;
const WindowFunctionColumn* wc = NULL;
bool hasAggCols = false;
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
{
if (ac->aggColumnList().size() > 0)
hasAggCols = true;
}
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
{
if (fc->aggColumnList().size() > 0)
hasAggCols = true;
}
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
{
std::ostringstream errmsg;
errmsg << "Invalid aggregate function nesting.";
cerr << boldStart << errmsg.str() << boldStop << endl;
throw logic_error(errmsg.str());
}
else if (dynamic_cast<const ConstantColumn*>(srcp.get()) != NULL)
{
}
else if ((wc = dynamic_cast<const WindowFunctionColumn*>(srcp.get())) == NULL)
{
std::ostringstream errmsg;
errmsg << "doAggProject: unsupported column: " << typeid(*(srcp.get())).name();
cerr << boldStart << errmsg.str() << boldStop << endl;
throw logic_error(errmsg.str());
}
uint64_t eid = srcp.get()->expressionId();
ct = srcp.get()->resultType();
TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo));
tupleKey = ti.key;
if (hasAggCols)
jobInfo.expressionVec.push_back(tupleKey);
}
jobInfo.constAggregate.insert(make_pair(i, clone));
ac->aggOp(AggregateColumn::COUNT_ASTERISK);
ac->distinct(false);
}
// add to project list
vector<uint32_t>::iterator keyIt = find(projectKeys.begin(), projectKeys.end(), tupleKey);
op = ac->aggOp();
doDistinct = ac->distinct();
updateAggregateColType(ac, srcp, op, jobInfo);
aggCt = ac->resultType();
if (keyIt == projectKeys.end())
{
RetColsVector::iterator it = pcv.end();
// As of bug3695, make sure varbinary is not used in aggregation.
if (sc != NULL && sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY)
throw runtime_error ("VARBINARY in aggregate function is not supported.");
}
}
if (doDistinct)
it = pcv.insert(pcv.begin() + lastGroupByPos++, srcp);
else
it = pcv.insert(pcv.end(), srcp);
// simple column selected or aggregated
if (sc != NULL)
{
// one column only need project once
CalpontSystemCatalog::OID retOid = sc->oid();
CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
alias = extractTableAlias(sc);
view = sc->viewName();
projectKeys.insert(projectKeys.begin() + distance(pcv.begin(), it), tupleKey);
}
else if (doDistinct) // @bug4250, move forward distinct column if necessary.
{
uint32_t pos = distance(projectKeys.begin(), keyIt);
if (!sc->schemaName().empty())
{
ct = sc->colType();
if (pos >= lastGroupByPos)
{
pcv[pos] = pcv[lastGroupByPos];
pcv[lastGroupByPos] = srcp;
projectKeys[pos] = projectKeys[lastGroupByPos];
projectKeys[lastGroupByPos] = tupleKey;
lastGroupByPos++;
}
}
//XXX use this before connector sets colType in sc correctly.
if (sc->isInfiniDB() && dynamic_cast<const PseudoColumn*>(sc) == NULL)
ct = jobInfo.csc->colType(sc->oid());
if (doDistinct && dictOid > 0)
tupleKey = jobInfo.keyInfo->dictKeyMap[tupleKey];
//X
dictOid = isDictCol(ct);
}
else
{
retOid = (tblOid + 1) + sc->colPosition();
ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "")];
}
// remember the columns to be returned
jobInfo.returnedColVec.push_back(make_pair(tupleKey, op));
TupleInfo ti(setTupleInfo(ct, retOid, jobInfo, tblOid, sc, alias));
tupleKey = ti.key;
if (op == AggregateColumn::AVG || op == AggregateColumn::DISTINCT_AVG)
jobInfo.scaleOfAvg[tupleKey] = (ct.scale << 8) + aggCt.scale;
// this is a string column
if (dictOid > 0)
{
map<uint32_t, bool>::iterator findit = jobInfo.tokenOnly.find(tupleKey);
// if the column has never seen, and the op is count: possible need count only.
if (AggregateColumn::COUNT == op || AggregateColumn::COUNT_ASTERISK == op)
{
if (findit == jobInfo.tokenOnly.end())
jobInfo.tokenOnly[tupleKey] = true;
}
// if aggregate other than count, token is not enough.
else if (op != 0 || doDistinct)
{
jobInfo.tokenOnly[tupleKey] = false;
}
findit = jobInfo.tokenOnly.find(tupleKey);
if (!(findit != jobInfo.tokenOnly.end() && findit->second == true))
{
dictMap[tupleKey] = dictOid;
jobInfo.keyInfo->dictOidToColOid[dictOid] = retOid;
ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias);
jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key;
// bug 1499 distinct processing, save unique distinct columns
if (doDistinct &&
(jobInfo.distinctColVec.end() ==
find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey)))
{
jobInfo.distinctColVec.push_back(tupleKey);
}
}
}
}
else
{
const ArithmeticColumn* ac = NULL;
const FunctionColumn* fc = NULL;
const WindowFunctionColumn* wc = NULL;
bool hasAggCols = false;
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
// Not an Aggregate
// simple column selected
if (sc != NULL)
{
if (ac->aggColumnList().size() > 0)
hasAggCols = true;
// one column only need project once
CalpontSystemCatalog::OID retOid = sc->oid();
CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
alias = extractTableAlias(sc);
view = sc->viewName();
if (!sc->schemaName().empty())
{
ct = sc->colType();
//XXX use this before connector sets colType in sc correctly.
if (sc->isInfiniDB() && dynamic_cast<const PseudoColumn*>(sc) == NULL)
ct = jobInfo.csc->colType(sc->oid());
//X
dictOid = isDictCol(ct);
}
else
{
retOid = (tblOid + 1) + sc->colPosition();
ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "")];
}
TupleInfo ti(setTupleInfo(ct, retOid, jobInfo, tblOid, sc, alias));
tupleKey = ti.key;
// this is a string column
if (dictOid > 0)
{
map<uint32_t, bool>::iterator findit = jobInfo.tokenOnly.find(tupleKey);
// if the column has never seen, and the op is count: possible need count only.
if (AggregateColumn::COUNT == op || AggregateColumn::COUNT_ASTERISK == op)
{
if (findit == jobInfo.tokenOnly.end())
jobInfo.tokenOnly[tupleKey] = true;
}
// if aggregate other than count, token is not enough.
else if (op != 0 || doDistinct)
{
jobInfo.tokenOnly[tupleKey] = false;
}
findit = jobInfo.tokenOnly.find(tupleKey);
if (!(findit != jobInfo.tokenOnly.end() && findit->second == true))
{
dictMap[tupleKey] = dictOid;
jobInfo.keyInfo->dictOidToColOid[dictOid] = retOid;
ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias);
jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key;
}
}
}
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
{
if (fc->aggColumnList().size() > 0)
hasAggCols = true;
}
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
{
std::ostringstream errmsg;
errmsg << "Invalid aggregate function nesting.";
cerr << boldStart << errmsg.str() << boldStop << endl;
throw logic_error(errmsg.str());
}
else if ((wc = dynamic_cast<const WindowFunctionColumn*>(srcp.get())) == NULL)
{
std::ostringstream errmsg;
errmsg << "doAggProject: unsupported column: " << typeid(*(srcp.get())).name();
cerr << boldStart << errmsg.str() << boldStop << endl;
throw logic_error(errmsg.str());
}
uint64_t eid = srcp.get()->expressionId();
ct = srcp.get()->resultType();
TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo));
tupleKey = ti.key;
if (hasAggCols)
jobInfo.expressionVec.push_back(tupleKey);
}
// add to project list
vector<uint32_t>::iterator keyIt = find(projectKeys.begin(), projectKeys.end(), tupleKey);
if (keyIt == projectKeys.end())
{
RetColsVector::iterator it = pcv.end();
if (doDistinct)
it = pcv.insert(pcv.begin() + lastGroupByPos++, srcp);
else
it = pcv.insert(pcv.end(), srcp);
projectKeys.insert(projectKeys.begin() + distance(pcv.begin(), it), tupleKey);
}
else if (doDistinct) // @bug4250, move forward distinct column if necessary.
{
uint32_t pos = distance(projectKeys.begin(), keyIt);
if (pos >= lastGroupByPos)
{
pcv[pos] = pcv[lastGroupByPos];
pcv[lastGroupByPos] = srcp;
projectKeys[pos] = projectKeys[lastGroupByPos];
projectKeys[lastGroupByPos] = tupleKey;
lastGroupByPos++;
const ArithmeticColumn* ac = NULL;
const FunctionColumn* fc = NULL;
const WindowFunctionColumn* wc = NULL;
bool hasAggCols = false;
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
{
if (ac->aggColumnList().size() > 0)
hasAggCols = true;
}
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
{
if (fc->aggColumnList().size() > 0)
hasAggCols = true;
}
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
{
std::ostringstream errmsg;
errmsg << "Invalid aggregate function nesting.";
cerr << boldStart << errmsg.str() << boldStop << endl;
throw logic_error(errmsg.str());
}
else if (dynamic_cast<const ConstantColumn*>(srcp.get()) != NULL)
{
}
else if ((wc = dynamic_cast<const WindowFunctionColumn*>(srcp.get())) == NULL)
{
std::ostringstream errmsg;
errmsg << "doAggProject: unsupported column: " << typeid(*(srcp.get())).name();
cerr << boldStart << errmsg.str() << boldStop << endl;
throw logic_error(errmsg.str());
}
uint64_t eid = srcp.get()->expressionId();
ct = srcp.get()->resultType();
TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo));
tupleKey = ti.key;
if (hasAggCols)
jobInfo.expressionVec.push_back(tupleKey);
}
}
if (doDistinct && dictOid > 0)
tupleKey = jobInfo.keyInfo->dictKeyMap[tupleKey];
// add to project list
vector<uint32_t>::iterator keyIt = find(projectKeys.begin(), projectKeys.end(), tupleKey);
// remember the columns to be returned
jobInfo.returnedColVec.push_back(make_pair(tupleKey, op));
if (keyIt == projectKeys.end())
{
RetColsVector::iterator it = pcv.end();
if (op == AggregateColumn::AVG || op == AggregateColumn::DISTINCT_AVG)
jobInfo.scaleOfAvg[tupleKey] = (ct.scale << 8) + aggCt.scale;
if (doDistinct)
it = pcv.insert(pcv.begin() + lastGroupByPos++, srcp);
else
it = pcv.insert(pcv.end(), srcp);
// bug 1499 distinct processing, save unique distinct columns
if (doDistinct &&
(jobInfo.distinctColVec.end() ==
find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey)))
{
jobInfo.distinctColVec.push_back(tupleKey);
projectKeys.insert(projectKeys.begin() + distance(pcv.begin(), it), tupleKey);
}
else if (doDistinct) // @bug4250, move forward distinct column if necessary.
{
uint32_t pos = distance(projectKeys.begin(), keyIt);
if (pos >= lastGroupByPos)
{
pcv[pos] = pcv[lastGroupByPos];
pcv[lastGroupByPos] = srcp;
projectKeys[pos] = projectKeys[lastGroupByPos];
projectKeys[lastGroupByPos] = tupleKey;
lastGroupByPos++;
}
}
if (doDistinct && dictOid > 0)
tupleKey = jobInfo.keyInfo->dictKeyMap[tupleKey];
// remember the columns to be returned
jobInfo.returnedColVec.push_back(make_pair(tupleKey, op));
if (op == AggregateColumn::AVG || op == AggregateColumn::DISTINCT_AVG)
jobInfo.scaleOfAvg[tupleKey] = (ct.scale << 8) + aggCt.scale;
// bug 1499 distinct processing, save unique distinct columns
if (doDistinct &&
(jobInfo.distinctColVec.end() ==
find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey)))
{
jobInfo.distinctColVec.push_back(tupleKey);
}
}
}

View File

@@ -164,6 +164,9 @@ inline RowAggFunctionType functionIdMap(int planFuncId)
case AggregateColumn::UDAF:
return ROWAGG_UDAF;
case AggregateColumn::MULTI_PARM:
return ROWAGG_MULTI_PARM;
default:
return ROWAGG_FUNCT_UNDEFINE;
}
@@ -1302,7 +1305,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
throw logic_error("(1)prep1PhaseAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
@@ -1468,7 +1471,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
if (!udafFuncCol)
{
throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(2)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
pUDAFFunc = udafFuncCol->fUDAFContext.getFunction();
@@ -1483,6 +1486,17 @@ void TupleAggregateStep::prep1PhaseAggregate(
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(key);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
widthAgg.push_back(width[colProj]);
}
break;
default:
{
ostringstream emsg;
@@ -1560,7 +1574,7 @@ void TupleAggregateStep::prep1PhaseAggregate(
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(3)prep1PhaseAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVec[i]->fAuxColumnIndex = lastCol++;
@@ -1675,7 +1689,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
// the groupby columns are put in front, even not a returned column
// sum and count(column name) are omitted, if avg present
{
// project only uniq oids, but they may be repeated in aggregation
// project only unique oids, but they may be repeated in aggregation
// collect the projected column info, prepare for aggregation
map<uint32_t, int> projColPosMap;
@@ -1848,7 +1862,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
throw logic_error("(1)prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
@@ -2043,7 +2057,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (!udafFuncCol)
{
throw logic_error("prep1PhaseDistinctAggregate A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(2)prep1PhaseDistinctAggregate A UDAF function is called but there's no RowUDAFFunctionCol");
}
// Return column
@@ -2065,6 +2079,18 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAgg.push_back(oidsProj[colProj]);
keysAgg.push_back(aggKey);
scaleAgg.push_back(scaleProj[colProj]);
precisionAgg.push_back(precisionProj[colProj]);
typeAgg.push_back(typeProj[colProj]);
widthAgg.push_back(widthProj[colProj]);
++colAgg;
}
break;
default:
{
ostringstream emsg;
@@ -2111,7 +2137,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
groupByNoDist.push_back(groupby);
aggFuncMap.insert(make_pair(boost::make_tuple(keysAgg[i], 0, pUDAFFunc), i));
}
projColsUDAFIndex = 0;
// locate the return column position in aggregated rowgroup
for (uint64_t i = 0; i < returnedColVec.size(); i++)
{
@@ -2121,6 +2148,14 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
int colAgg = -1;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
if (udafc)
pUDAFFunc = udafc->getContext().getFunction();
}
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
jobInfo.distinctColVec.end() )
{
@@ -2432,11 +2467,37 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
new RowAggFunctionCol(
ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, i, dupGroupbyIndex)));
}
// update the aggregate function vector
else
{
SP_ROWAGG_FUNC_t funct(new RowAggFunctionCol(aggOp, stats, colAgg, i));
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colAgg, i));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colAgg, i));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
funct->fAuxColumnIndex = colAgg;
@@ -2549,7 +2610,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate(
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(4)prep1PhaseDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVec2[i]->fAuxColumnIndex = lastCol++;
@@ -2893,7 +2954,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
// the groupby columns are put in front, even not a returned column
// sum and count(column name) are omitted, if avg present
{
// project only uniq oids, but they may be repeated in aggregation
// project only unique oids, but they may be repeated in aggregation
// collect the projected column info, prepare for aggregation
vector<uint32_t> width;
map<uint32_t, int> projColPosMap;
@@ -3036,12 +3097,11 @@ void TupleAggregateStep::prep2PhasesAggregate(
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
throw logic_error("(1)prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
@@ -3240,7 +3300,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(2)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
oidsAggPm.push_back(oidsProj[colProj]);
@@ -3261,6 +3321,18 @@ void TupleAggregateStep::prep2PhasesAggregate(
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
widthAggPm.push_back(width[colProj]);
colAggPm++;
}
break;
default:
{
ostringstream emsg;
@@ -3278,11 +3350,16 @@ void TupleAggregateStep::prep2PhasesAggregate(
// add back sum or count(column name) if omitted due to avg column
// put count(column name) column to the end, if it is for avg only
{
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
// check if the count column for AVG is also a returned column,
// if so, replace the "-1" to actual position in returned vec.
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap;
AGG_MAP aggDupFuncMap;
projColsUDAFIndex = 0;
// copy over the groupby vector
// update the outputColumnIndex if returned
for (uint64_t i = 0; i < groupByPm.size(); i++)
@@ -3299,7 +3376,14 @@ void TupleAggregateStep::prep2PhasesAggregate(
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
int colPm = -1;
if (aggOp == ROWAGG_MULTI_PARM)
{
// Skip on UM: Extra parms for an aggregate have no work on the UM
++multiParms;
continue;
}
// Is this a UDAF? use the function as part of the key.
mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL;
if (aggOp == ROWAGG_UDAF)
@@ -3452,20 +3536,36 @@ void TupleAggregateStep::prep2PhasesAggregate(
functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, i, dupGroupbyIndex)));
}
// update the aggregate function vector
else
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, i));
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colPm, i-multiParms));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, i));
funct.reset(new RowAggFunctionCol(aggOp, stats, colPm, i-multiParms));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
@@ -3517,7 +3617,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
}
// there is avg(k), but no count(k) in the select list
uint64_t lastCol = returnedColVec.size();
uint64_t lastCol = returnedColVec.size() - multiParms;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{
@@ -3545,7 +3645,7 @@ void TupleAggregateStep::prep2PhasesAggregate(
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(4)prep2PhasesAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVecUm[i]->fAuxColumnIndex = lastCol++;
@@ -3691,6 +3791,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
vector<SP_ROWAGG_GRPBY_t> groupByPm, groupByUm, groupByNoDist;
vector<SP_ROWAGG_FUNC_t> functionVecPm, functionNoDistVec, functionVecUm;
list<uint32_t> multiParmIndexes;
uint32_t bigIntWidth = sizeof(int64_t);
map<pair<uint32_t, int>, uint64_t> avgFuncDistMap;
@@ -3702,7 +3803,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// the groupby columns are put in front, even not a returned column
// sum and count(column name) are omitted, if avg present
{
// project only uniq oids, but they may be repeated in aggregation
// project only unique oids, but they may be repeated in aggregation
// collect the projected column info, prepare for aggregation
vector<uint32_t> width;
map<uint32_t, int> projColPosMap;
@@ -3856,7 +3957,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (it == jobInfo.projectionCols.end())
{
throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
throw logic_error("(1)prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
@@ -4050,7 +4151,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(2)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
// Return column
@@ -4072,6 +4173,19 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
break;
}
case ROWAGG_MULTI_PARM:
{
oidsAggPm.push_back(oidsProj[colProj]);
keysAggPm.push_back(aggKey);
scaleAggPm.push_back(scaleProj[colProj]);
precisionAggPm.push_back(precisionProj[colProj]);
typeAggPm.push_back(typeProj[colProj]);
widthAggPm.push_back(width[colProj]);
multiParmIndexes.push_back(colAggPm);
colAggPm++;
}
break;
default:
{
ostringstream emsg;
@@ -4093,12 +4207,23 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
groupByUm.push_back(groupby);
}
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
for (uint32_t idx = 0; idx < functionVecPm.size(); idx++)
{
SP_ROWAGG_FUNC_t funct;
SP_ROWAGG_FUNC_t funcPm = functionVecPm[idx];
// UDAF support
if (funcPm->fAggFunction == ROWAGG_MULTI_PARM)
{
// Multi-Parm is not used on the UM
++multiParms;
continue;
}
if (funcPm->fAggFunction == ROWAGG_UDAF)
{
RowUDAFFunctionCol* udafFuncCol = dynamic_cast<RowUDAFFunctionCol*>(funcPm.get());
@@ -4106,7 +4231,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
udafFuncCol->fUDAFContext,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex));
udafFuncCol->fAuxColumnIndex-multiParms));
functionNoDistVec.push_back(funct);
}
else
@@ -4116,18 +4241,25 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
funcPm->fStatsFunction,
funcPm->fOutputColumnIndex,
funcPm->fOutputColumnIndex,
funcPm->fAuxColumnIndex));
funcPm->fAuxColumnIndex-multiParms));
functionNoDistVec.push_back(funct);
}
}
posAggUm = posAggPm;
oidsAggUm = oidsAggPm;
keysAggUm = keysAggPm;
scaleAggUm = scaleAggPm;
precisionAggUm = precisionAggPm;
widthAggUm = widthAggPm;
typeAggUm = typeAggPm;
// Copy over the PM arrays to the UM. Skip any that are a multi-parm entry.
for (uint32_t idx = 0; idx < oidsAggPm.size(); ++idx)
{
if (find (multiParmIndexes.begin(), multiParmIndexes.end(), idx ) != multiParmIndexes.end())
{
continue;
}
oidsAggUm.push_back(oidsAggPm[idx]);
keysAggUm.push_back(keysAggPm[idx]);
scaleAggUm.push_back(scaleAggPm[idx]);
precisionAggUm.push_back(precisionAggPm[idx]);
widthAggUm.push_back(widthAggPm[idx]);
typeAggUm.push_back(typeAggPm[idx]);
}
}
@@ -4137,6 +4269,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// add back sum or count(column name) if omitted due to avg column
// put count(column name) column to the end, if it is for avg only
{
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
// check if the count column for AVG is also a returned column,
// if so, replace the "-1" to actual position in returned vec.
map<uint32_t, SP_ROWAGG_FUNC_t> avgFuncMap, avgDistFuncMap;
@@ -4159,6 +4295,21 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
RowAggFunctionType stats = statsFuncIdMap(returnedColVec[i].second);
int colUm = -1;
if (aggOp == ROWAGG_MULTI_PARM)
{
// Skip on UM: Extra parms for an aggregate have no work on the UM
++multiParms;
continue;
}
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
if (udafc)
pUDAFFunc = udafc->getContext().getFunction();
}
if (find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), retKey) !=
jobInfo.distinctColVec.end() )
{
@@ -4285,7 +4436,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (it != aggFuncMap.end())
{
colUm = it->second;
colUm = it->second - multiParms;
oidsAggDist.push_back(oidsAggUm[colUm]);
keysAggDist.push_back(keysAggUm[colUm]);
scaleAggDist.push_back(scaleAggUm[colUm]);
@@ -4309,7 +4460,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
// false alarm
returnColMissing = false;
colUm = it->second;
colUm = it->second - multiParms;
if (aggOp == ROWAGG_SUM)
{
@@ -4412,21 +4563,36 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
functionVecUm.push_back(SP_ROWAGG_FUNC_t(new RowAggFunctionCol(
ROWAGG_DUP_FUNCT, ROWAGG_FUNCT_UNDEFINE, -1, i, dupGroupbyIndex)));
}
// update the aggregate function vector
else
{
// update the aggregate function vector
SP_ROWAGG_FUNC_t funct;
if (aggOp == ROWAGG_UDAF)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(jobInfo.projectionCols[i].get());
pUDAFFunc = udafc->getContext().getFunction();
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, i));
std::vector<SRCP>::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex;
for (; it != jobInfo.projectionCols.end(); it++)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>((*it).get());
projColsUDAFIndex++;
if (udafc)
{
pUDAFFunc = udafc->getContext().getFunction();
// Create a RowAggFunctionCol (UDAF subtype) with the context.
funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colUm, i-multiParms));
break;
}
}
if (it == jobInfo.projectionCols.end())
{
throw logic_error("(3)prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s");
}
}
else
{
funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, i));
funct.reset(new RowAggFunctionCol(aggOp, stats, colUm, i-multiParms));
}
if (aggOp == ROWAGG_COUNT_NO_OP)
@@ -4480,7 +4646,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
}
// there is avg(k), but no count(k) in the select list
uint64_t lastCol = returnedColVec.size();
uint64_t lastCol = returnedColVec.size() - multiParms;
for (map<uint32_t, SP_ROWAGG_FUNC_t>::iterator k = avgFuncMap.begin(); k != avgFuncMap.end(); k++)
{
@@ -4540,7 +4706,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
if (!udafFuncCol)
{
throw logic_error("(9)A UDAF function is called but there's no RowUDAFFunctionCol");
throw logic_error("(4)prep2PhasesDistinctAggregate: A UDAF function is called but there's no RowUDAFFunctionCol");
}
functionVecUm[i]->fAuxColumnIndex = lastCol++;
@@ -4687,6 +4853,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
SP_ROWAGG_GRPBY_t groupby(new RowAggGroupByCol(j, k));
groupBySub.push_back(groupby);
// Keep a count of the parms after the first for any aggregate.
// These will be skipped and the count needs to be subtracted
// from where the aux column will be.
int64_t multiParms = 0;
// tricky part : 2 function vectors
// -- dummy function vector for sub-aggregator, which does distinct only
// -- aggregate function on this distinct column for rowAggDist
@@ -4694,6 +4865,11 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
for (uint64_t k = 0; k < returnedColVec.size(); k++)
{
if (functionIdMap(returnedColVec[i].second) == ROWAGG_MULTI_PARM)
{
++multiParms;
continue;
}
if (returnedColVec[k].first != distinctColKey)
continue;
@@ -4715,7 +4891,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
f->fStatsFunction,
groupBySub.size() - 1,
f->fOutputColumnIndex,
f->fAuxColumnIndex));
f->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
}
@@ -4732,9 +4908,15 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
{
vector<SP_ROWAGG_FUNC_t> functionSub1 = functionNoDistVec;
vector<SP_ROWAGG_FUNC_t> functionSub2;
int64_t multiParms = 0;
for (uint64_t k = 0; k < returnedColVec.size(); k++)
{
if (functionIdMap(returnedColVec[k].second) == ROWAGG_MULTI_PARM)
{
++multiParms;
continue;
}
// search non-distinct functions in functionVec
vector<SP_ROWAGG_FUNC_t>::iterator it = functionVecUm.begin();
@@ -4752,7 +4934,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
udafFuncCol->fUDAFContext,
udafFuncCol->fInputColumnIndex,
udafFuncCol->fOutputColumnIndex,
udafFuncCol->fAuxColumnIndex));
udafFuncCol->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
else if (f->fAggFunction == ROWAGG_COUNT_ASTERISK ||
@@ -4773,7 +4955,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate(
f->fStatsFunction,
f->fInputColumnIndex,
f->fOutputColumnIndex,
f->fAuxColumnIndex));
f->fAuxColumnIndex-multiParms));
functionSub2.push_back(funct);
}
}

View File

@@ -4038,6 +4038,10 @@ ParseTree* buildParseTree(Item_func* item, gp_walk_info& gwi, bool& nonSupport)
ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
{
// MCOL-1201 For UDAnF multiple parameters
vector<SRCP> selCols;
vector<SRCP> orderCols;
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
gwi.thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
@@ -4054,6 +4058,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
// N.B. argument_count() is the # of formal parms to the agg fcn. InifniDB only supports 1 argument
// TODO: Support more than one parm
#if 0
if (isp->argument_count() != 1 && isp->sum_func() != Item_sum::GROUP_CONCAT_FUNC
&& isp->sum_func() != Item_sum::UDF_SUM_FUNC)
{
@@ -4061,7 +4066,7 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_MUL_ARG_AGG);
return NULL;
}
#endif
AggregateColumn* ac = NULL;
if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
@@ -4084,444 +4089,509 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
{
gwi.fatalParseError = true;
gwi.parseErrorText = "Non supported aggregate type on the select clause";
if (ac)
delete ac;
return NULL;
}
// special parsing for group_concat
if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
try
{
Item_func_group_concat* gc = (Item_func_group_concat*)isp;
// special parsing for group_concat
if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
{
Item_func_group_concat* gc = (Item_func_group_concat*)isp;
vector<SRCP> orderCols;
RowColumn* rowCol = new RowColumn();
RowColumn* rowCol = new RowColumn();
vector<SRCP> selCols;
uint32_t select_ctn = gc->count_field();
ReturnedColumn* rc = NULL;
uint32_t select_ctn = gc->count_field();
ReturnedColumn* rc = NULL;
for (uint32_t i = 0; i < select_ctn; i++)
{
rc = buildReturnedColumn(sfitempp[i], gwi, gwi.fatalParseError);
if (!rc || gwi.fatalParseError)
return NULL;
selCols.push_back(SRCP(rc));
}
ORDER** order_item, **end;
for (order_item = gc->get_order(),
end = order_item + gc->order_field(); order_item < end;
order_item++)
{
Item* ord_col = *(*order_item)->item;
if (ord_col->type() == Item::INT_ITEM)
for (uint32_t i = 0; i < select_ctn; i++)
{
Item_int* id = (Item_int*)ord_col;
if (id->val_int() > (int)selCols.size())
{
gwi.fatalParseError = true;
return NULL;
}
rc = selCols[id->val_int() - 1]->clone();
rc->orderPos(id->val_int() - 1);
}
else
{
rc = buildReturnedColumn(ord_col, gwi, gwi.fatalParseError);
rc = buildReturnedColumn(sfitempp[i], gwi, gwi.fatalParseError);
if (!rc || gwi.fatalParseError)
{
if (ac)
delete ac;
return NULL;
}
selCols.push_back(SRCP(rc));
}
// 10.2 TODO: direction is now a tri-state flag
rc->asc((*order_item)->direction == ORDER::ORDER_ASC ? true : false);
orderCols.push_back(SRCP(rc));
}
ORDER** order_item, **end;
rowCol->columnVec(selCols);
(dynamic_cast<GroupConcatColumn*>(ac))->orderCols(orderCols);
parm.reset(rowCol);
if (gc->str_separator())
{
string separator;
separator.assign(gc->str_separator()->ptr(), gc->str_separator()->length());
(dynamic_cast<GroupConcatColumn*>(ac))->separator(separator);
}
}
else
{
for (uint32_t i = 0; i < isp->argument_count(); i++)
{
Item* sfitemp = sfitempp[i];
Item::Type sfitype = sfitemp->type();
switch (sfitype)
for (order_item = gc->get_order(),
end = order_item + gc->order_field(); order_item < end;
order_item++)
{
case Item::FIELD_ITEM:
{
Item_field* ifp = reinterpret_cast<Item_field*>(sfitemp);
SimpleColumn* sc = buildSimpleColumn(ifp, gwi);
Item* ord_col = *(*order_item)->item;
if (!sc)
if (ord_col->type() == Item::INT_ITEM)
{
Item_int* id = (Item_int*)ord_col;
if (id->val_int() > (int)selCols.size())
{
gwi.fatalParseError = true;
break;
if (ac)
delete ac;
return NULL;
}
parm.reset(sc);
gwi.columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(ifp->field_name), parm));
TABLE_LIST* tmp = (ifp->cached_table ? ifp->cached_table : 0);
gwi.tableMap[make_aliastable(sc->schemaName(), sc->tableName(), sc->tableAlias(), sc->isInfiniDB())] = make_pair(1, tmp);
break;
rc = selCols[id->val_int() - 1]->clone();
rc->orderPos(id->val_int() - 1);
}
case Item::INT_ITEM:
case Item::STRING_ITEM:
case Item::REAL_ITEM:
case Item::DECIMAL_ITEM:
else
{
// treat as count(*)
if (ac->aggOp() == AggregateColumn::COUNT)
ac->aggOp(AggregateColumn::COUNT_ASTERISK);
rc = buildReturnedColumn(ord_col, gwi, gwi.fatalParseError);
ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)));
break;
}
case Item::NULL_ITEM:
{
//ac->aggOp(AggregateColumn::COUNT);
parm.reset(new ConstantColumn("", ConstantColumn::NULLDATA));
//ac->functionParms(parm);
ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)));
break;
}
case Item::FUNC_ITEM:
{
Item_func* ifp = (Item_func*)sfitemp;
ReturnedColumn* rc = 0;
// check count(1+1) case
vector <Item_field*> tmpVec;
uint16_t parseInfo = 0;
parse_item(ifp, tmpVec, gwi.fatalParseError, parseInfo);
if (parseInfo & SUB_BIT)
if (!rc || gwi.fatalParseError)
{
gwi.fatalParseError = true;
break;
}
else if (!gwi.fatalParseError &&
!(parseInfo & AGG_BIT) &&
!(parseInfo & AF_BIT) &&
tmpVec.size() == 0)
{
rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError);
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
if ((fc && fc->functionParms().empty()) || !fc)
{
//ac->aggOp(AggregateColumn::COUNT_ASTERISK);
ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError);
if (dynamic_cast<ConstantColumn*>(rc))
{
//@bug5229. handle constant function on aggregate argument
ac->constCol(SRCP(rc));
break;
}
}
}
// MySQL carelessly allows correlated aggregate function on the WHERE clause.
// Here is the work around to deal with that inconsistence.
// e.g., SELECT (SELECT t.c FROM t1 AS t WHERE t.b=MAX(t1.b + 0)) FROM t1;
ClauseType clauseType = gwi.clauseType;
if (gwi.clauseType == WHERE)
gwi.clauseType = HAVING;
// @bug 3603. for cases like max(rand()). try to build function first.
if (!rc)
rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError);
parm.reset(rc);
gwi.clauseType = clauseType;
if (gwi.fatalParseError)
break;
//ac->functionParms(parm);
break;
}
case Item::REF_ITEM:
{
ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError);
if (rc)
{
parm.reset(rc);
//ac->functionParms(parm);
break;
if (ac)
delete ac;
return NULL;
}
}
default:
{
gwi.fatalParseError = true;
//gwi.parseErrorText = "Non-supported Item in Aggregate function";
}
// 10.2 TODO: direction is now a tri-state flag
rc->asc((*order_item)->direction == ORDER::ORDER_ASC ? true : false);
orderCols.push_back(SRCP(rc));
}
if (gwi.fatalParseError)
rowCol->columnVec(selCols);
(dynamic_cast<GroupConcatColumn*>(ac))->orderCols(orderCols);
parm.reset(rowCol);
if (gc->str_separator())
{
if (gwi.parseErrorText.empty())
{
Message::Args args;
if (item->name)
args.add(item->name);
else
args.add("");
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_AGG_ARGS, args);
}
return NULL;
string separator;
separator.assign(gc->str_separator()->ptr(), gc->str_separator()->length());
(dynamic_cast<GroupConcatColumn*>(ac))->separator(separator);
}
}
}
if (parm)
{
ac->functionParms(parm);
if (isp->sum_func() == Item_sum::AVG_FUNC ||
isp->sum_func() == Item_sum::AVG_DISTINCT_FUNC)
{
CalpontSystemCatalog::ColType ct = parm->resultType();
switch (ct.colDataType)
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
ct.colDataType = CalpontSystemCatalog::DECIMAL;
ct.colWidth = 8;
ct.scale += 4;
break;
#if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
ct.colDataType = CalpontSystemCatalog::DOUBLE;
ct.colWidth = 8;
break;
#endif
default:
break;
}
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::COUNT_FUNC ||
isp->sum_func() == Item_sum::COUNT_DISTINCT_FUNC)
{
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::BIGINT;
ct.colWidth = 8;
ct.scale = parm->resultType().scale;
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::SUM_FUNC ||
isp->sum_func() == Item_sum::SUM_DISTINCT_FUNC)
{
CalpontSystemCatalog::ColType ct = parm->resultType();
switch (ct.colDataType)
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
ct.colDataType = CalpontSystemCatalog::BIGINT;
// no break, let fall through
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
ct.colWidth = 8;
break;
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
ct.colDataType = CalpontSystemCatalog::UBIGINT;
ct.colWidth = 8;
break;
#if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
ct.colDataType = CalpontSystemCatalog::DOUBLE;
ct.colWidth = 8;
break;
#endif
default:
break;
}
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::STD_FUNC ||
isp->sum_func() == Item_sum::VARIANCE_FUNC)
{
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::DOUBLE;
ct.colWidth = 8;
ct.scale = 0;
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::SUM_BIT_FUNC)
{
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::BIGINT;
ct.colWidth = 8;
ct.scale = 0;
ct.precision = -16; // borrowed to indicate skip null value check on connector
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
{
//Item_func_group_concat* gc = (Item_func_group_concat*)isp;
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::VARCHAR;
ct.colWidth = isp->max_length;
ct.precision = 0;
ac->resultType(ct);
}
else
{
ac->resultType(parm->resultType());
for (uint32_t i = 0; i < isp->argument_count(); i++)
{
Item* sfitemp = sfitempp[i];
Item::Type sfitype = sfitemp->type();
switch (sfitype)
{
case Item::FIELD_ITEM:
{
Item_field* ifp = reinterpret_cast<Item_field*>(sfitemp);
SimpleColumn* sc = buildSimpleColumn(ifp, gwi);
if (!sc)
{
gwi.fatalParseError = true;
break;
}
parm.reset(sc);
gwi.columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(ifp->field_name), parm));
TABLE_LIST* tmp = (ifp->cached_table ? ifp->cached_table : 0);
gwi.tableMap[make_aliastable(sc->schemaName(), sc->tableName(), sc->tableAlias(), sc->isInfiniDB())] = make_pair(1, tmp);
break;
}
case Item::INT_ITEM:
case Item::STRING_ITEM:
case Item::REAL_ITEM:
case Item::DECIMAL_ITEM:
{
// treat as count(*)
if (ac->aggOp() == AggregateColumn::COUNT)
ac->aggOp(AggregateColumn::COUNT_ASTERISK);
ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)));
break;
}
case Item::NULL_ITEM:
{
parm.reset(new ConstantColumn("", ConstantColumn::NULLDATA));
ac->constCol(SRCP(buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError)));
break;
}
case Item::FUNC_ITEM:
{
Item_func* ifp = (Item_func*)sfitemp;
ReturnedColumn* rc = 0;
// check count(1+1) case
vector <Item_field*> tmpVec;
uint16_t parseInfo = 0;
parse_item(ifp, tmpVec, gwi.fatalParseError, parseInfo);
if (parseInfo & SUB_BIT)
{
gwi.fatalParseError = true;
break;
}
else if (!gwi.fatalParseError &&
!(parseInfo & AGG_BIT) &&
!(parseInfo & AF_BIT) &&
tmpVec.size() == 0)
{
rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError);
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
if ((fc && fc->functionParms().empty()) || !fc)
{
//ac->aggOp(AggregateColumn::COUNT_ASTERISK);
ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError);
if (dynamic_cast<ConstantColumn*>(rc))
{
//@bug5229. handle constant function on aggregate argument
ac->constCol(SRCP(rc));
break;
}
}
}
// MySQL carelessly allows correlated aggregate function on the WHERE clause.
// Here is the work around to deal with that inconsistence.
// e.g., SELECT (SELECT t.c FROM t1 AS t WHERE t.b=MAX(t1.b + 0)) FROM t1;
ClauseType clauseType = gwi.clauseType;
if (gwi.clauseType == WHERE)
gwi.clauseType = HAVING;
// @bug 3603. for cases like max(rand()). try to build function first.
if (!rc)
rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError);
parm.reset(rc);
gwi.clauseType = clauseType;
if (gwi.fatalParseError)
break;
break;
}
case Item::REF_ITEM:
{
ReturnedColumn* rc = buildReturnedColumn(sfitemp, gwi, gwi.fatalParseError);
if (rc)
{
parm.reset(rc);
break;
}
}
default:
{
gwi.fatalParseError = true;
//gwi.parseErrorText = "Non-supported Item in Aggregate function";
}
}
if (gwi.fatalParseError)
{
if (gwi.parseErrorText.empty())
{
Message::Args args;
if (item->name)
args.add(item->name);
else
args.add("");
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_AGG_ARGS, args);
}
if (ac)
delete ac;
return NULL;
}
if (parm)
{
// MCOL-1201 multi-argument aggregate
ac->aggParms().push_back(parm);
}
}
}
}
else
{
ac->resultType(colType_MysqlToIDB(isp));
}
// adjust decimal result type according to internalDecimalScale
if (gwi.internalDecimalScale >= 0 && ac->resultType().colDataType == CalpontSystemCatalog::DECIMAL)
{
CalpontSystemCatalog::ColType ct = ac->resultType();
ct.scale = gwi.internalDecimalScale;
ac->resultType(ct);
}
// check for same aggregate on the select list
ac->expressionId(ci->expressionId++);
if (gwi.clauseType != SELECT)
{
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
// Get result type
// Modified for MCOL-1201 multi-argument aggregate
if (ac->aggParms().size() > 0)
{
if (*ac == gwi.returnedCols[i].get())
ac->expressionId(gwi.returnedCols[i]->expressionId());
}
}
// These are all one parm functions, so we can safely
// use the first parm for result type.
parm = ac->aggParms()[0];
if (isp->sum_func() == Item_sum::AVG_FUNC ||
isp->sum_func() == Item_sum::AVG_DISTINCT_FUNC)
{
CalpontSystemCatalog::ColType ct = parm->resultType();
// @bug5977 @note Temporary fix to avoid mysqld crash. The permanent fix will
// be applied in ExeMgr. When the ExeMgr fix is available, this checking
// will be taken out.
if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty())
switch (ct.colDataType)
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
ct.colDataType = CalpontSystemCatalog::DECIMAL;
ct.colWidth = 8;
ct.scale += 4;
break;
#if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
ct.colDataType = CalpontSystemCatalog::DOUBLE;
ct.colWidth = 8;
break;
#endif
default:
break;
}
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::COUNT_FUNC ||
isp->sum_func() == Item_sum::COUNT_DISTINCT_FUNC)
{
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::BIGINT;
ct.colWidth = 8;
ct.scale = parm->resultType().scale;
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::SUM_FUNC ||
isp->sum_func() == Item_sum::SUM_DISTINCT_FUNC)
{
CalpontSystemCatalog::ColType ct = parm->resultType();
switch (ct.colDataType)
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
ct.colDataType = CalpontSystemCatalog::BIGINT;
// no break, let fall through
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
ct.colWidth = 8;
break;
case CalpontSystemCatalog::UTINYINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
ct.colDataType = CalpontSystemCatalog::UBIGINT;
ct.colWidth = 8;
break;
#if PROMOTE_FLOAT_TO_DOUBLE_ON_SUM
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
ct.colDataType = CalpontSystemCatalog::DOUBLE;
ct.colWidth = 8;
break;
#endif
default:
break;
}
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::STD_FUNC ||
isp->sum_func() == Item_sum::VARIANCE_FUNC)
{
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::DOUBLE;
ct.colWidth = 8;
ct.scale = 0;
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::SUM_BIT_FUNC)
{
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::BIGINT;
ct.colWidth = 8;
ct.scale = 0;
ct.precision = -16; // borrowed to indicate skip null value check on connector
ac->resultType(ct);
}
else if (isp->sum_func() == Item_sum::GROUP_CONCAT_FUNC)
{
//Item_func_group_concat* gc = (Item_func_group_concat*)isp;
CalpontSystemCatalog::ColType ct;
ct.colDataType = CalpontSystemCatalog::VARCHAR;
ct.colWidth = isp->max_length;
ct.precision = 0;
ac->resultType(ct);
}
else
{
// UDAF result type will be set below.
ac->resultType(parm->resultType());
}
}
else
{
ac->resultType(colType_MysqlToIDB(isp));
}
// adjust decimal result type according to internalDecimalScale
if (gwi.internalDecimalScale >= 0 && ac->resultType().colDataType == CalpontSystemCatalog::DECIMAL)
{
CalpontSystemCatalog::ColType ct = ac->resultType();
ct.scale = gwi.internalDecimalScale;
ac->resultType(ct);
}
// check for same aggregate on the select list
ac->expressionId(ci->expressionId++);
if (gwi.clauseType != SELECT)
{
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
{
if (*ac == gwi.returnedCols[i].get())
ac->expressionId(gwi.returnedCols[i]->expressionId());
}
}
// @bug5977 @note Temporary fix to avoid mysqld crash. The permanent fix will
// be applied in ExeMgr. When the ExeMgr fix is available, this checking
// will be taken out.
if (ac->constCol() && gwi.tbList.empty() && gwi.derivedTbList.empty())
{
gwi.fatalParseError = true;
gwi.parseErrorText = "No project column found for aggregate function";
if (ac)
delete ac;
return NULL;
}
else if (ac->constCol())
{
gwi.count_asterisk_list.push_back(ac);
}
// For UDAF, populate the context and call the UDAF init() function.
// The return type is (should be) set in context by init().
if (isp->sum_func() == Item_sum::UDF_SUM_FUNC)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
if (udafc)
{
mcsv1Context& context = udafc->getContext();
context.setName(isp->func_name());
// Set up the return type defaults for the call to init()
context.setResultType(udafc->resultType().colDataType);
context.setColWidth(udafc->resultType().colWidth);
context.setScale(udafc->resultType().scale);
context.setPrecision(udafc->resultType().precision);
context.setParamCount(udafc->aggParms().size());
ColumnDatum colType;
ColumnDatum colTypes[udafc->aggParms().size()];
// Build the column type vector.
// Modified for MCOL-1201 multi-argument aggregate
for (uint32_t i = 0; i < udafc->aggParms().size(); ++i)
{
const execplan::CalpontSystemCatalog::ColType& resultType
= udafc->aggParms()[i]->resultType();
colType.dataType = resultType.colDataType;
colType.precision = resultType.precision;
colType.scale = resultType.scale;
colTypes[i] = colType;
}
// Call the user supplied init()
mcsv1sdk::mcsv1_UDAF* udaf = context.getFunction();
if (!udaf)
{
gwi.fatalParseError = true;
gwi.parseErrorText = "Aggregate Function " + context.getName() + " doesn't exist in the ColumnStore engine";
if (ac)
delete ac;
return NULL;
}
if (udaf->init(&context, colTypes) == mcsv1_UDAF::ERROR)
{
gwi.fatalParseError = true;
gwi.parseErrorText = udafc->getContext().getErrorMessage();
if (ac)
delete ac;
return NULL;
}
// UDAF_OVER_REQUIRED means that this function is for Window
// Function only. Reject it here in aggregate land.
if (udafc->getContext().getRunFlag(UDAF_OVER_REQUIRED))
{
gwi.fatalParseError = true;
gwi.parseErrorText =
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_WINDOW_FUNC_ONLY,
context.getName());
if (ac)
delete ac;
return NULL;
}
// Set the return type as set in init()
CalpontSystemCatalog::ColType ct;
ct.colDataType = context.getResultType();
ct.colWidth = context.getColWidth();
ct.scale = context.getScale();
ct.precision = context.getPrecision();
udafc->resultType(ct);
}
}
}
catch (std::logic_error e)
{
gwi.fatalParseError = true;
gwi.parseErrorText = "No project column found for aggregate function";
gwi.parseErrorText = "error building Aggregate Function: ";
gwi.parseErrorText += e.what();
if (ac)
delete ac;
return NULL;
}
else if (ac->constCol())
catch (...)
{
gwi.count_asterisk_list.push_back(ac);
gwi.fatalParseError = true;
gwi.parseErrorText = "error building Aggregate Function: Unspecified exception";
if (ac)
delete ac;
return NULL;
}
// For UDAF, populate the context and call the UDAF init() function.
if (isp->sum_func() == Item_sum::UDF_SUM_FUNC)
{
UDAFColumn* udafc = dynamic_cast<UDAFColumn*>(ac);
if (udafc)
{
mcsv1Context& context = udafc->getContext();
context.setName(isp->func_name());
// Set up the return type defaults for the call to init()
context.setResultType(udafc->resultType().colDataType);
context.setColWidth(udafc->resultType().colWidth);
context.setScale(udafc->resultType().scale);
context.setPrecision(udafc->resultType().precision);
COL_TYPES colTypes;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator cmIter;
// Build the column type vector. For now, there is only one
colTypes.push_back(make_pair(udafc->functionParms()->alias(), udafc->functionParms()->resultType().colDataType));
// Call the user supplied init()
if (context.getFunction()->init(&context, colTypes) == mcsv1_UDAF::ERROR)
{
gwi.fatalParseError = true;
gwi.parseErrorText = udafc->getContext().getErrorMessage();
return NULL;
}
if (udafc->getContext().getRunFlag(UDAF_OVER_REQUIRED))
{
gwi.fatalParseError = true;
gwi.parseErrorText =
logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_WINDOW_FUNC_ONLY,
context.getName());
return NULL;
}
// Set the return type as set in init()
CalpontSystemCatalog::ColType ct;
ct.colDataType = context.getResultType();
ct.colWidth = context.getColWidth();
ct.scale = context.getScale();
ct.precision = context.getPrecision();
udafc->resultType(ct);
}
}
return ac;
}
@@ -7843,7 +7913,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
return ER_CHECK_NOT_IMPLEMENTED;
}
(*coliter)->functionParms(minSc);
(*coliter)->aggParms().push_back(minSc);
}
std::vector<FunctionColumn*>::iterator funciter;
@@ -9923,7 +9993,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
return ER_CHECK_NOT_IMPLEMENTED;
}
(*coliter)->functionParms(minSc);
(*coliter)->aggParms().push_back(minSc);
}
std::vector<FunctionColumn*>::iterator funciter;

View File

@@ -781,8 +781,11 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
//double double_val = *(double*)(&value);
//f2->store(double_val);
if (f2->decimals() < (uint32_t)row.getScale(s))
f2->dec = (uint32_t)row.getScale(s);
if ((f2->decimals() == DECIMAL_NOT_SPECIFIED && row.getScale(s) > 0)
|| f2->decimals() < row.getScale(s))
{
f2->dec = row.getScale(s);
}
f2->store(dl);
@@ -5275,8 +5278,6 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
execplan::ParseTree* ptIt;
execplan::ReturnedColumn* rcIt;
for (TABLE_LIST* tl = gi.groupByTables; tl; tl = tl->next_local)
{
mapiter = ci->tableMap.find(tl->table);

View File

@@ -340,6 +340,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
ac->distinct(item_sum->has_with_distinct());
Window_spec* win_spec = wf->window_spec;
SRCP srcp;
CalpontSystemCatalog::ColType ct; // For return type
// arguments
vector<SRCP> funcParms;
@@ -370,18 +371,25 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
context.setColWidth(rt.colWidth);
context.setScale(rt.scale);
context.setPrecision(rt.precision);
context.setParamCount(funcParms.size());
mcsv1sdk::ColumnDatum colType;
mcsv1sdk::ColumnDatum colTypes[funcParms.size()];
// Turn on the Analytic flag so the function is aware it is being called
// as a Window Function.
context.setContextFlag(CONTEXT_IS_ANALYTIC);
COL_TYPES colTypes;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator cmIter;
// Build the column type vector.
// Modified for MCOL-1201 multi-argument aggregate
for (size_t i = 0; i < funcParms.size(); ++i)
{
colTypes.push_back(make_pair(funcParms[i]->alias(), funcParms[i]->resultType().colDataType));
const execplan::CalpontSystemCatalog::ColType& resultType
= funcParms[i]->resultType();
colType.dataType = resultType.colDataType;
colType.precision = resultType.precision;
colType.scale = resultType.scale;
colTypes[i] = colType;
}
// Call the user supplied init()
@@ -401,7 +409,6 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
}
// Set the return type as set in init()
CalpontSystemCatalog::ColType ct;
ct.colDataType = context.getResultType();
ct.colWidth = context.getColWidth();
ct.scale = context.getScale();
@@ -419,10 +426,10 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
{
case Item_sum::UDF_SUM_FUNC:
{
uint64_t bIgnoreNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS));
char sIgnoreNulls[18];
sprintf(sIgnoreNulls, "%lu", bIgnoreNulls);
srcp.reset(new ConstantColumn(sIgnoreNulls, (uint64_t)bIgnoreNulls, ConstantColumn::NUM)); // IGNORE/RESPECT NULLS. 1 => RESPECT
uint64_t bRespectNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) ? 0 : 1;
char sRespectNulls[18];
sprintf(sRespectNulls, "%lu", bRespectNulls);
srcp.reset(new ConstantColumn(sRespectNulls, (uint64_t)bRespectNulls, ConstantColumn::NUM)); // IGNORE/RESPECT NULLS. 1 => RESPECT
funcParms.push_back(srcp);
break;
}
@@ -881,11 +888,13 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
return NULL;
}
ac->resultType(colType_MysqlToIDB(item_sum));
// bug5736. Make the result type double for some window functions when
// infinidb_double_for_decimal_math is set.
ac->adjustResultType();
if (item_sum->sum_func() != Item_sum::UDF_SUM_FUNC)
{
ac->resultType(colType_MysqlToIDB(item_sum));
// bug5736. Make the result type double for some window functions when
// infinidb_double_for_decimal_math is set.
ac->adjustResultType();
}
ac->expressionId(ci->expressionId++);