You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-3343 Handle windowfunction <arithmetic op> simple column
This commit is contained in:
@@ -386,7 +386,6 @@ void WindowFunctionColumn::adjustResultType()
|
|||||||
fResultType.colDataType = CalpontSystemCatalog::LONGDOUBLE;
|
fResultType.colDataType = CalpontSystemCatalog::LONGDOUBLE;
|
||||||
fResultType.colWidth = sizeof(long double);
|
fResultType.colWidth = sizeof(long double);
|
||||||
fResultType.precision = -1;
|
fResultType.precision = -1;
|
||||||
fResultType.scale = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -386,19 +386,57 @@ CalpontSystemCatalog::OID tableOid(const SimpleColumn* sc, boost::shared_ptr<Cal
|
|||||||
return p.objnum;
|
return p.objnum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t getTupleKey(JobInfo& jobInfo,
|
||||||
uint32_t getTupleKey(const JobInfo& jobInfo,
|
const execplan::SimpleColumn* sc,
|
||||||
const execplan::SimpleColumn* sc)
|
bool add)
|
||||||
{
|
{
|
||||||
|
int key = -1;
|
||||||
const PseudoColumn* pc = dynamic_cast<const execplan::PseudoColumn*>(sc);
|
const PseudoColumn* pc = dynamic_cast<const execplan::PseudoColumn*>(sc);
|
||||||
uint32_t pseudoType = (pc) ? pc->pseudoType() : execplan::PSEUDO_UNKNOWN;
|
uint32_t pseudoType = (pc) ? pc->pseudoType() : execplan::PSEUDO_UNKNOWN;
|
||||||
return getTupleKey_(jobInfo, sc->oid(), sc->columnName(), extractTableAlias(sc),
|
if (sc == NULL)
|
||||||
sc->schemaName(), sc->viewName(),
|
{
|
||||||
((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0),
|
return -1;
|
||||||
pseudoType, (sc->isInfiniDB() ? 0 : 1));
|
}
|
||||||
|
|
||||||
|
if (add)
|
||||||
|
{
|
||||||
|
// setTupleInfo first if add is true, ok if already set.
|
||||||
|
if (sc->schemaName().empty())
|
||||||
|
{
|
||||||
|
SimpleColumn tmp(*sc, jobInfo.sessionId);
|
||||||
|
tmp.oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
|
||||||
|
key = getTupleKey(jobInfo, &tmp); // sub-query should be there
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CalpontSystemCatalog::ColType ct = sc->colType();
|
||||||
|
string alias(extractTableAlias(sc));
|
||||||
|
CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
|
||||||
|
TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tblOid, sc, alias));
|
||||||
|
key = ti.key;
|
||||||
|
|
||||||
|
CalpontSystemCatalog::OID dictOid = isDictCol(ct);
|
||||||
|
|
||||||
|
if (dictOid > 0)
|
||||||
|
{
|
||||||
|
ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias);
|
||||||
|
jobInfo.keyInfo->dictKeyMap[key] = ti.key;
|
||||||
|
key = ti.key;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// TupleInfo is expected to be set already
|
||||||
|
return getTupleKey_(jobInfo, sc->oid(), sc->columnName(), extractTableAlias(sc),
|
||||||
|
sc->schemaName(), sc->viewName(),
|
||||||
|
((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0),
|
||||||
|
pseudoType, (sc->isInfiniDB() ? 0 : 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
return key;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
uint32_t getTupleKey(JobInfo& jobInfo, const SRCP& srcp, bool add)
|
uint32_t getTupleKey(JobInfo& jobInfo, const SRCP& srcp, bool add)
|
||||||
{
|
{
|
||||||
int key = -1;
|
int key = -1;
|
||||||
@@ -608,7 +646,7 @@ uint32_t getExpTupleKey(const JobInfo& jobInfo, uint64_t eid, bool cr)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void addAggregateColumn(AggregateColumn* agc, int idx, RetColsVector& vec, JobInfo& jobInfo)
|
void addAggregateColumn(ReturnedColumn* agc, int idx, RetColsVector& vec, JobInfo& jobInfo)
|
||||||
{
|
{
|
||||||
uint32_t eid = agc->expressionId();
|
uint32_t eid = agc->expressionId();
|
||||||
setExpTupleInfo(agc->resultType(), eid, agc->alias(), jobInfo);
|
setExpTupleInfo(agc->resultType(), eid, agc->alias(), jobInfo);
|
||||||
|
@@ -401,8 +401,9 @@ execplan::CalpontSystemCatalog::OID tableOid(const execplan::SimpleColumn* sc,
|
|||||||
/** @brief Returns the unique ID to be used in tupleInfo
|
/** @brief Returns the unique ID to be used in tupleInfo
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
uint32_t getTupleKey(const JobInfo& jobInfo,
|
uint32_t getTupleKey(JobInfo& jobInfo,
|
||||||
const execplan::SimpleColumn* sc);
|
const execplan::SimpleColumn* sc,
|
||||||
|
bool add = false);
|
||||||
uint32_t getTableKey(const JobInfo& jobInfo,
|
uint32_t getTableKey(const JobInfo& jobInfo,
|
||||||
execplan::CalpontSystemCatalog::OID tableOid,
|
execplan::CalpontSystemCatalog::OID tableOid,
|
||||||
const std::string& alias,
|
const std::string& alias,
|
||||||
@@ -464,7 +465,7 @@ TupleInfo setExpTupleInfo(const execplan::ReturnedColumn* rc, JobInfo& jobInfo);
|
|||||||
/** @brief add an aggregate column info
|
/** @brief add an aggregate column info
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
void addAggregateColumn(execplan::AggregateColumn*, int, RetColsVector&, JobInfo&);
|
void addAggregateColumn(execplan::ReturnedColumn*, int, RetColsVector&, JobInfo&);
|
||||||
|
|
||||||
void makeJobSteps(execplan::CalpontSelectExecutionPlan* csep, JobInfo& jobInfo,
|
void makeJobSteps(execplan::CalpontSelectExecutionPlan* csep, JobInfo& jobInfo,
|
||||||
JobStepVector& querySteps, JobStepVector& projectSteps,
|
JobStepVector& querySteps, JobStepVector& projectSteps,
|
||||||
|
@@ -274,9 +274,11 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo)
|
|||||||
|
|
||||||
if (retCols[i]->windowfunctionColumnList().size() > 0)
|
if (retCols[i]->windowfunctionColumnList().size() > 0)
|
||||||
jobInfo.expressionVec.push_back(key);
|
jobInfo.expressionVec.push_back(key);
|
||||||
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) ==
|
else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key)
|
||||||
jobInfo.expressionVec.end())
|
== jobInfo.expressionVec.end())
|
||||||
|
{
|
||||||
jobInfo.returnedExpressions.push_back(sjstep);
|
jobInfo.returnedExpressions.push_back(sjstep);
|
||||||
|
}
|
||||||
|
|
||||||
//put place hold column in projection list
|
//put place hold column in projection list
|
||||||
jobInfo.pjColList.push_back(ti);
|
jobInfo.pjColList.push_back(ti);
|
||||||
@@ -1047,16 +1049,21 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
const FunctionColumn* fc = NULL;
|
const FunctionColumn* fc = NULL;
|
||||||
const WindowFunctionColumn* wc = NULL;
|
const WindowFunctionColumn* wc = NULL;
|
||||||
bool hasAggCols = false;
|
bool hasAggCols = false;
|
||||||
|
bool hasWndCols = false;
|
||||||
|
|
||||||
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
|
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
|
||||||
{
|
{
|
||||||
if (ac->aggColumnList().size() > 0)
|
if (ac->aggColumnList().size() > 0)
|
||||||
hasAggCols = true;
|
hasAggCols = true;
|
||||||
|
if (ac->windowfunctionColumnList().size() > 0)
|
||||||
|
hasWndCols = true;
|
||||||
}
|
}
|
||||||
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
|
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
|
||||||
{
|
{
|
||||||
if (fc->aggColumnList().size() > 0)
|
if (fc->aggColumnList().size() > 0)
|
||||||
hasAggCols = true;
|
hasAggCols = true;
|
||||||
|
if (fc->windowfunctionColumnList().size() > 0)
|
||||||
|
hasWndCols = true;
|
||||||
}
|
}
|
||||||
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
|
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
|
||||||
{
|
{
|
||||||
@@ -1081,7 +1088,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo));
|
TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo));
|
||||||
tupleKey = ti.key;
|
tupleKey = ti.key;
|
||||||
|
|
||||||
if (hasAggCols)
|
if (hasAggCols && !hasWndCols)
|
||||||
jobInfo.expressionVec.push_back(tupleKey);
|
jobInfo.expressionVec.push_back(tupleKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1195,16 +1202,21 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
const FunctionColumn* fc = NULL;
|
const FunctionColumn* fc = NULL;
|
||||||
const WindowFunctionColumn* wc = NULL;
|
const WindowFunctionColumn* wc = NULL;
|
||||||
bool hasAggCols = false;
|
bool hasAggCols = false;
|
||||||
|
bool hasWndCols = false;
|
||||||
|
|
||||||
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
|
if ((ac = dynamic_cast<const ArithmeticColumn*>(srcp.get())) != NULL)
|
||||||
{
|
{
|
||||||
if (ac->aggColumnList().size() > 0)
|
if (ac->aggColumnList().size() > 0)
|
||||||
hasAggCols = true;
|
hasAggCols = true;
|
||||||
|
if (ac->windowfunctionColumnList().size() > 0)
|
||||||
|
hasWndCols = true;
|
||||||
}
|
}
|
||||||
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
|
else if ((fc = dynamic_cast<const FunctionColumn*>(srcp.get())) != NULL)
|
||||||
{
|
{
|
||||||
if (fc->aggColumnList().size() > 0)
|
if (fc->aggColumnList().size() > 0)
|
||||||
hasAggCols = true;
|
hasAggCols = true;
|
||||||
|
if (fc->windowfunctionColumnList().size() > 0)
|
||||||
|
hasWndCols = true;
|
||||||
}
|
}
|
||||||
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
|
else if (dynamic_cast<const AggregateColumn*>(srcp.get()) != NULL)
|
||||||
{
|
{
|
||||||
@@ -1229,7 +1241,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo));
|
TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo));
|
||||||
tupleKey = ti.key;
|
tupleKey = ti.key;
|
||||||
|
|
||||||
if (hasAggCols)
|
if (hasAggCols && !hasWndCols)
|
||||||
jobInfo.expressionVec.push_back(tupleKey);
|
jobInfo.expressionVec.push_back(tupleKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -5011,7 +5011,8 @@ void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInf
|
|||||||
uint64_t eid = -1;
|
uint64_t eid = -1;
|
||||||
|
|
||||||
if (((ac = dynamic_cast<ArithmeticColumn*>(it->get())) != NULL) &&
|
if (((ac = dynamic_cast<ArithmeticColumn*>(it->get())) != NULL) &&
|
||||||
(ac->aggColumnList().size() > 0))
|
(ac->aggColumnList().size() > 0) &&
|
||||||
|
(ac->windowfunctionColumnList().size() == 0))
|
||||||
{
|
{
|
||||||
const vector<SimpleColumn*>& scols = ac->simpleColumnList();
|
const vector<SimpleColumn*>& scols = ac->simpleColumnList();
|
||||||
simpleColumns.insert(simpleColumns.end(), scols.begin(), scols.end());
|
simpleColumns.insert(simpleColumns.end(), scols.begin(), scols.end());
|
||||||
@@ -5020,7 +5021,8 @@ void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInf
|
|||||||
expressionVec.push_back(*it);
|
expressionVec.push_back(*it);
|
||||||
}
|
}
|
||||||
else if (((fc = dynamic_cast<FunctionColumn*>(it->get())) != NULL) &&
|
else if (((fc = dynamic_cast<FunctionColumn*>(it->get())) != NULL) &&
|
||||||
(fc->aggColumnList().size() > 0))
|
(fc->aggColumnList().size() > 0) &&
|
||||||
|
(fc->windowfunctionColumnList().size() == 0))
|
||||||
{
|
{
|
||||||
const vector<SimpleColumn*>& sCols = fc->simpleColumnList();
|
const vector<SimpleColumn*>& sCols = fc->simpleColumnList();
|
||||||
simpleColumns.insert(simpleColumns.end(), sCols.begin(), sCols.end());
|
simpleColumns.insert(simpleColumns.end(), sCols.begin(), sCols.end());
|
||||||
|
@@ -320,6 +320,38 @@ const string WindowFunctionStep::toString() const
|
|||||||
return oss.str();
|
return oss.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WindowFunctionStep::AddSimplColumn(const vector<SimpleColumn*>& scs,
|
||||||
|
JobInfo& jobInfo)
|
||||||
|
{
|
||||||
|
// append the simple columns if not already projected
|
||||||
|
set<UniqId> scProjected;
|
||||||
|
|
||||||
|
for (RetColsVector::iterator i = jobInfo.projectionCols.begin();
|
||||||
|
i != jobInfo.projectionCols.end();
|
||||||
|
i++)
|
||||||
|
{
|
||||||
|
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(i->get());
|
||||||
|
|
||||||
|
if (sc != NULL)
|
||||||
|
{
|
||||||
|
if (sc->schemaName().empty())
|
||||||
|
sc->oid(joblist::tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
|
||||||
|
|
||||||
|
scProjected.insert(UniqId(sc));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (vector<SimpleColumn*>::const_iterator i = scs.begin(); i != scs.end(); i++)
|
||||||
|
{
|
||||||
|
if (scProjected.find(UniqId(*i)) == scProjected.end())
|
||||||
|
{
|
||||||
|
jobInfo.windowDels.push_back(SRCP((*i)->clone()));
|
||||||
|
jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
|
||||||
|
scProjected.insert(UniqId(*i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
|
void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
|
||||||
{
|
{
|
||||||
// window functions in select clause, selected or in expression
|
// window functions in select clause, selected or in expression
|
||||||
@@ -404,6 +436,23 @@ void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, J
|
|||||||
if (jobInfo.windowCols.empty())
|
if (jobInfo.windowCols.empty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// Add in the non-window side of arithmetic columns and functions
|
||||||
|
for (uint64_t i = 0; i < jobInfo.windowExps.size(); i++)
|
||||||
|
{
|
||||||
|
const ArithmeticColumn* ac =
|
||||||
|
dynamic_cast<const ArithmeticColumn*>(jobInfo.windowExps[i].get());
|
||||||
|
const FunctionColumn* fc =
|
||||||
|
dynamic_cast<const FunctionColumn*>(jobInfo.windowExps[i].get());
|
||||||
|
|
||||||
|
if (ac != NULL && ac->windowfunctionColumnList().size() > 0)
|
||||||
|
{
|
||||||
|
AddSimplColumn(ac->simpleColumnList(), jobInfo);
|
||||||
|
}
|
||||||
|
else if (fc != NULL && fc->windowfunctionColumnList().size() > 0)
|
||||||
|
{
|
||||||
|
AddSimplColumn(fc->simpleColumnList(), jobInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
// reconstruct the delivered column list with auxiliary columns
|
// reconstruct the delivered column list with auxiliary columns
|
||||||
set<uint64_t> colSet;
|
set<uint64_t> colSet;
|
||||||
jobInfo.deliveredCols.resize(0);
|
jobInfo.deliveredCols.resize(0);
|
||||||
@@ -445,7 +494,10 @@ void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, J
|
|||||||
key = getTupleKey(jobInfo, *j, true);
|
key = getTupleKey(jobInfo, *j, true);
|
||||||
|
|
||||||
if (colSet.find(key) == colSet.end())
|
if (colSet.find(key) == colSet.end())
|
||||||
|
{
|
||||||
jobInfo.deliveredCols.push_back(*j);
|
jobInfo.deliveredCols.push_back(*j);
|
||||||
|
jobInfo.windowSet.insert(getTupleKey(jobInfo, *j, true));
|
||||||
|
}
|
||||||
|
|
||||||
colSet.insert(key);
|
colSet.insert(key);
|
||||||
}
|
}
|
||||||
|
@@ -148,6 +148,8 @@ private:
|
|||||||
void formatMiniStats();
|
void formatMiniStats();
|
||||||
void printCalTrace();
|
void printCalTrace();
|
||||||
|
|
||||||
|
static void AddSimplColumn(const vector<SimpleColumn*>& scs, JobInfo& jobInfo);
|
||||||
|
|
||||||
class Runner
|
class Runner
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
Reference in New Issue
Block a user