/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: joblistfactory.cpp 9632 2013-06-18 22:18:20Z xlou $ #include #include #include #include //#define NDEBUG #include #include #include #include #include using namespace std; #include #include #include using namespace boost; #include "joblistfactory.h" #include "calpontexecutionplan.h" #include "calpontselectexecutionplan.h" #include "mcsanalyzetableexecutionplan.h" #include "calpontsystemcatalog.h" #include "dbrm.h" #include "filter.h" #include "simplefilter.h" #include "constantfilter.h" #include "existsfilter.h" #include "selectfilter.h" #include "returnedcolumn.h" #include "aggregatecolumn.h" #include "windowfunctioncolumn.h" #include "arithmeticcolumn.h" #include "constantcolumn.h" #include "functioncolumn.h" #include "groupconcatcolumn.h" #include "pseudocolumn.h" #include "simplecolumn.h" #include "rowcolumn.h" #include "treenodeimpl.h" #include "udafcolumn.h" using namespace execplan; #include "configcpp.h" using namespace config; #include "messagelog.h" using namespace logging; #include "elementtype.h" #include "joblist.h" #include "jobstep.h" #include "primitivestep.h" #include "jl_logger.h" #include "jlf_execplantojoblist.h" #include "rowaggregation.h" #include "tuplehashjoin.h" #include "tupleunion.h" #include "expressionstep.h" #include "tupleconstantstep.h" #include "tuplehavingstep.h" #include "windowfunctionstep.h" #include "tupleannexstep.h" #include "jlf_common.h" #include "jlf_graphics.h" #include "jlf_subquery.h" #include "jlf_tuplejoblist.h" #include "rowgroup.h" using namespace rowgroup; #include "mcsv1_udaf.h" #ifdef __clang__ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wpotentially-evaluated-expression" // for warnings on typeid :expression with side effects will be evaluated despite being used as an operand to // 'typeid' #endif namespace { using namespace joblist; void projectSimpleColumn(const SimpleColumn* sc, JobStepVector& jsv, JobInfo& jobInfo) { if (sc == NULL) throw logic_error("projectSimpleColumn: sc is null"); CalpontSystemCatalog::OID oid = sc->oid(); CalpontSystemCatalog::OID tbl_oid = tableOid(sc, jobInfo.csc); string alias(extractTableAlias(sc)); string view(sc->viewName()); CalpontSystemCatalog::OID dictOid = 0; CalpontSystemCatalog::ColType ct; pColStep* pcs = NULL; pDictionaryStep* pds = NULL; bool tokenOnly = false; TupleInfo ti; if (!sc->schemaName().empty()) { SJSTEP sjstep; // always tuples after release 3.0 // if (!jobInfo.tryTuples) // jobInfo.tables.insert(make_table(sc->schemaName(), sc->tableName())); // if (jobInfo.trace) // cout << "doProject Emit pCol for SimpleColumn " << oid << endl; const PseudoColumn* pc = dynamic_cast(sc); ct = sc->colType(); // XXX use this before connector sets colType in sc correctly. // type of pseudo column is set by connector if (sc->isColumnStore() && !pc) { ct = jobInfo.csc->colType(sc->oid()); ct.charsetNumber = sc->colType().charsetNumber; } // X if (pc == NULL) pcs = new pColStep(oid, tbl_oid, ct, jobInfo); else pcs = new PseudoColStep(oid, tbl_oid, pc->pseudoType(), ct, jobInfo); pcs->alias(alias); pcs->view(view); pcs->name(sc->columnName()); pcs->cardinality(sc->cardinality()); // pcs->setOrderRids(true); sjstep.reset(pcs); jsv.push_back(sjstep); dictOid = isDictCol(ct); ti = setTupleInfo(ct, oid, jobInfo, tbl_oid, sc, alias); pcs->tupleId(ti.key); if (dictOid > 0 && jobInfo.hasAggregation) { map::iterator it = jobInfo.tokenOnly.find(getTupleKey(jobInfo, sc)); if (it != jobInfo.tokenOnly.end()) tokenOnly = it->second; } if (dictOid > 0 && !tokenOnly) { // This is a double-step step // if (jobInfo.trace) // cout << "doProject Emit pGetSignature for SimpleColumn " << dictOid << // endl; pds = new pDictionaryStep(dictOid, tbl_oid, ct, jobInfo); jobInfo.keyInfo->dictOidToColOid[dictOid] = oid; pds->alias(alias); pds->view(view); pds->name(sc->columnName()); pds->cardinality(sc->cardinality()); // pds->setOrderRids(true); // Associate these two linked steps JobStepAssociation outJs; AnyDataListSPtr spdl1(new AnyDataList()); RowGroupDL* dl1 = new RowGroupDL(1, jobInfo.fifoSize); spdl1->rowGroupDL(dl1); dl1->OID(oid); // not a tokenOnly column setTupleInfo(ct, dictOid, jobInfo, tbl_oid, sc, alias); jobInfo.tokenOnly[getTupleKey(jobInfo, sc)] = false; outJs.outAdd(spdl1); pcs->outputAssociation(outJs); pds->inputAssociation(outJs); sjstep.reset(pds); jsv.push_back(sjstep); oid = dictOid; // dictionary column ti = setTupleInfo(ct, oid, jobInfo, tbl_oid, sc, alias); pds->tupleId(ti.key); jobInfo.keyInfo->dictKeyMap[pcs->tupleId()] = ti.key; } } else // must be vtable mode { oid = (tbl_oid + 1) + sc->colPosition(); ct = jobInfo.vtableColTypes[UniqId(oid, alias, "", "")]; ti = setTupleInfo(ct, oid, jobInfo, tbl_oid, sc, alias); } if (dictOid > 0 && tokenOnly) { // scale is not used by string columns // borrow it to indicate token is used in projection, not the real string. ti.scale = 8; } jobInfo.pjColList.push_back(ti); } const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo) { JobStepVector jsv; SJSTEP sjstep; for (unsigned i = 0; i < retCols.size(); i++) { const SimpleColumn* sc = dynamic_cast(retCols[i].get()); const WindowFunctionColumn* wc = NULL; if (sc != NULL) { projectSimpleColumn(sc, jsv, jobInfo); } else if ((wc = dynamic_cast(retCols[i].get())) != NULL) { // put place hold column in projection list uint64_t eid = wc->expressionId(); CalpontSystemCatalog::ColType ct = wc->resultType(); TupleInfo ti(setExpTupleInfo(ct, eid, retCols[i].get()->alias(), jobInfo)); jobInfo.pjColList.push_back(ti); } else { const ArithmeticColumn* ac = NULL; const FunctionColumn* fc = NULL; const ConstantColumn* cc = NULL; const RollupMarkColumn* mc = NULL; uint64_t eid = -1; CalpontSystemCatalog::ColType ct; ExpressionStep* es = new ExpressionStep(jobInfo); es->expression(retCols[i], jobInfo); sjstep.reset(es); if ((ac = dynamic_cast(retCols[i].get())) != NULL) { eid = ac->expressionId(); ct = ac->resultType(); } else if ((fc = dynamic_cast(retCols[i].get())) != NULL) { eid = fc->expressionId(); ct = fc->resultType(); } else if ((cc = dynamic_cast(retCols[i].get())) != NULL) { eid = cc->expressionId(); ct = cc->resultType(); } else if ((mc = dynamic_cast(retCols[i].get())) != NULL) { eid = mc->expressionId(); ct = mc->resultType(); } else { std::ostringstream errmsg; errmsg << "doProject: unhandled returned column: " << typeid(*retCols[i]).name(); cerr << boldStart << errmsg.str() << boldStop << endl; throw logic_error(errmsg.str()); } // set expression tuple Info TupleInfo ti(setExpTupleInfo(ct, eid, retCols[i].get()->alias(), jobInfo)); uint32_t key = ti.key; if (retCols[i]->windowfunctionColumnList().size() > 0) jobInfo.expressionVec.push_back(key); else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) == jobInfo.expressionVec.end()) { jobInfo.returnedExpressions.push_back(sjstep); } // put place hold column in projection list jobInfo.pjColList.push_back(ti); } } return jsv; } void checkHavingClause(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { TupleHavingStep* ths = new TupleHavingStep(jobInfo); ths->expressionFilter(csep->having(), jobInfo); jobInfo.havingStep.reset(ths); // simple columns in select clause set scInSelect; for (RetColsVector::iterator i = jobInfo.nonConstCols.begin(); i != jobInfo.nonConstCols.end(); i++) { SimpleColumn* sc = dynamic_cast(i->get()); if (sc != NULL) { if (sc->schemaName().empty()) sc->oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); scInSelect.insert(UniqId(sc)); } } // simple columns in gruop by clause set scInGroupBy; for (RetColsVector::iterator i = csep->groupByCols().begin(); i != csep->groupByCols().end(); i++) { SimpleColumn* sc = dynamic_cast(i->get()); if (sc != NULL) { if (sc->schemaName().empty() && sc->oid() == 0) { if (sc->colPosition() == -1) { // from select subquery SRCP ss = csep->returnedCols()[sc->orderPos()]; (*i) = ss; } else { sc->oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); } } scInGroupBy.insert(UniqId(sc)); } } bool aggInHaving = false; const vector& columns = ths->columns(); for (vector::const_iterator i = columns.begin(); i != columns.end(); i++) { // evaluate aggregate columns in having AggregateColumn* agc = dynamic_cast(*i); if (agc) { addAggregateColumn(agc, -1, jobInfo.nonConstCols, jobInfo); aggInHaving = true; } else { // simple columns used in having and in group by clause must be in rowgroup SimpleColumn* sc = dynamic_cast(*i); if (sc != NULL) { if (sc->schemaName().empty()) sc->oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); UniqId scId(sc); if (scInGroupBy.find(scId) != scInGroupBy.end() && scInSelect.find(scId) == scInSelect.end()) { jobInfo.nonConstCols.push_back(SRCP(sc->clone())); } } } } if (aggInHaving == false) { // treated the same as where clause if no aggregate column in having. jobInfo.havingStep.reset(); // parse the having expression ParseTree* filters = csep->having(); if (filters != 0) { JLF_ExecPlanToJobList::walkTree(filters, jobInfo); } if (!jobInfo.stack.empty()) { idbassert(jobInfo.stack.size() == 1); jobInfo.havingStepVec = jobInfo.stack.top(); jobInfo.stack.pop(); } } } void preProcessFunctionOnAggregation(const vector& scs, const vector& aggs, const vector& wcs, JobInfo& jobInfo) { // append the simple columns if not already projected set scProjected; for (RetColsVector::iterator i = jobInfo.projectionCols.begin(); i != jobInfo.projectionCols.end(); i++) { SimpleColumn* sc = dynamic_cast(i->get()); if (sc != NULL) { if (sc->schemaName().empty()) sc->oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); scProjected.insert(UniqId(sc)); } } for (vector::const_iterator i = scs.begin(); i != scs.end(); i++) { if (scProjected.find(UniqId(*i)) == scProjected.end()) { jobInfo.projectionCols.push_back(SRCP((*i)->clone())); scProjected.insert(UniqId(*i)); } } // append the aggregate columns in arithmetic/function column to the projection list for (vector::const_iterator i = aggs.begin(); i != aggs.end(); i++) { addAggregateColumn(*i, -1, jobInfo.projectionCols, jobInfo); if (wcs.size() > 0) { jobInfo.nonConstDelCols.push_back(SRCP((*i)->clone())); } } } void checkReturnedColumns(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++) { if (NULL == dynamic_cast(jobInfo.deliveredCols[i].get())) jobInfo.nonConstCols.push_back(jobInfo.deliveredCols[i]); } // save the original delivered non constant columns jobInfo.nonConstDelCols = jobInfo.nonConstCols; if (jobInfo.nonConstCols.size() != jobInfo.deliveredCols.size()) { jobInfo.constantCol = CONST_COL_EXIST; // bug 2531, all constant column. if (jobInfo.nonConstCols.size() == 0) { if (csep->columnMap().size() > 0) jobInfo.nonConstCols.push_back((*(csep->columnMap().begin())).second); else jobInfo.constantCol = CONST_COL_ONLY; } } for (uint64_t i = 0; i < jobInfo.nonConstCols.size(); i++) { AggregateColumn* agc = dynamic_cast(jobInfo.nonConstCols[i].get()); if (agc) addAggregateColumn(agc, i, jobInfo.nonConstCols, jobInfo); } if (csep->having() != NULL) checkHavingClause(csep, jobInfo); jobInfo.projectionCols = jobInfo.nonConstCols; for (uint64_t i = 0; i < jobInfo.nonConstCols.size(); i++) { const ArithmeticColumn* ac = dynamic_cast(jobInfo.nonConstCols[i].get()); const FunctionColumn* fc = dynamic_cast(jobInfo.nonConstCols[i].get()); if (ac != NULL && ac->aggColumnList().size() > 0) { jobInfo.nonConstCols[i]->outputIndex(i); preProcessFunctionOnAggregation(ac->simpleColumnList(), ac->aggColumnList(), ac->windowfunctionColumnList(), jobInfo); } else if (fc != NULL && fc->aggColumnList().size() > 0) { jobInfo.nonConstCols[i]->outputIndex(i); preProcessFunctionOnAggregation(fc->simpleColumnList(), fc->aggColumnList(), fc->windowfunctionColumnList(), jobInfo); } } } /* This function is to get a unique non-constant column list for grouping. After sub-query is supported, GROUP BY column can be a column from SELECT or FROM sub-queries, which has empty schema name, and 0 oid (if SELECT). In order to distinguish these columns, data member fSequence is used to indicate the column position in FROM sub-query's select list, the table OID for sub-query vtable is assumed to CNX_VTABLE_ID, the column OIDs for that vtable is caculated based on this table OID and column position. The data member fOrderPos is used to indicate the column position in the outer select clause, this value is set to -1 if the column is not selected (implicit group by). For select sub-query, the fSequence is not set, so orderPos is used to locate the column. */ void checkGroupByCols(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { // order by columns may be not in the select and [group by] clause const CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols(); for (uint64_t i = 0; i < orderByCols.size(); i++) { if (orderByCols[i]->orderPos() == (uint64_t)(-1)) { // @bug 4531, skip window functions, should be already added. if (dynamic_cast(orderByCols[i].get()) != NULL || orderByCols[i]->windowfunctionColumnList().size() > 0) continue; jobInfo.deliveredCols.push_back(orderByCols[i]); // @bug 3025 // Append the non-aggregate orderby column to group by, if there is group by clause. // Duplicates will be removed by next if block. if (csep->groupByCols().size() > 0) { // Not an aggregate column and not an expression of aggregation. if (dynamic_cast(orderByCols[i].get()) == NULL && orderByCols[i]->aggColumnList().empty()) csep->groupByCols().push_back(orderByCols[i]); } } } if (csep->groupByCols().size() > 0) { set colInGroupBy; RetColsVector uniqGbCols; for (RetColsVector::iterator i = csep->groupByCols().begin(); i != csep->groupByCols().end(); i++) { // skip constant columns if (dynamic_cast(i->get()) != NULL) { if (csep->withRollup()) { throw runtime_error("constant GROUP BY columns are not supported when WITH ROLLUP is used"); } continue; } ReturnedColumn* rc = i->get(); SimpleColumn* sc = dynamic_cast(rc); bool selectSubquery = false; if (sc && sc->schemaName().empty() && sc->oid() == 0) { if (sc->colPosition() == -1) { // from select subquery // sc->orderPos() should NOT be -1 because it is a SELECT sub-query. SRCP ss = csep->returnedCols()[sc->orderPos()]; (*i) = ss; selectSubquery = true; // At this point whatever sc pointed to is invalid // update the rc and sc rc = ss.get(); sc = dynamic_cast(rc); } else { sc->oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); } } UniqId col; if (sc) col = UniqId(sc); else col = UniqId(rc->expressionId(), rc->alias(), "", ""); if (colInGroupBy.find(col) == colInGroupBy.end() || selectSubquery) { colInGroupBy.insert(col); uniqGbCols.push_back(*i); } } if (csep->groupByCols().size() != uniqGbCols.size()) (csep)->groupByCols(uniqGbCols); } } void checkAggregation(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { checkGroupByCols(csep, jobInfo); checkReturnedColumns(csep, jobInfo); RetColsVector& retCols = jobInfo.projectionCols; jobInfo.hasDistinct = csep->distinct(); // DISTINCT with window functions must be done in tupleannexstep if (csep->distinct() == true && jobInfo.windowDels.size() == 0) { jobInfo.hasAggregation = true; } else if (csep->groupByCols().size() > 0) { // groupby without aggregate functions is supported. jobInfo.hasAggregation = true; } else { for (uint64_t i = 0; i < retCols.size(); i++) { if (dynamic_cast(retCols[i].get()) != NULL) { jobInfo.hasAggregation = true; break; } } } } void updateAggregateColType(AggregateColumn* ac, const SRCP& srcp, int op, JobInfo& jobInfo) { CalpontSystemCatalog::ColType ct; const SimpleColumn* sc = dynamic_cast(srcp.get()); const ArithmeticColumn* ar = NULL; const FunctionColumn* fc = NULL; if (sc != NULL) ct = sc->resultType(); else if ((ar = dynamic_cast(srcp.get())) != NULL) ct = ar->resultType(); else if ((fc = dynamic_cast(srcp.get())) != NULL) ct = fc->resultType(); if (op == AggregateColumn::STDDEV_POP || op == AggregateColumn::STDDEV_SAMP || op == AggregateColumn::VAR_POP || op == AggregateColumn::VAR_SAMP) { ct.colWidth = sizeof(double); ct.colDataType = CalpontSystemCatalog::DOUBLE; ct.scale = 0; ct.precision = -1; } else if (op == AggregateColumn::UDAF) { UDAFColumn* udafc = dynamic_cast(ac); if (udafc) { mcsv1sdk::mcsv1Context& udafContext = udafc->getContext(); ct.colDataType = udafContext.getResultType(); ct.colWidth = udafContext.getColWidth(); ct.scale = udafContext.getScale(); ct.precision = udafContext.getPrecision(); } else { ct = ac->resultType(); } } else { ct = ac->resultType(); } ac->resultType(ct); // update the original if this aggregate column is cloned from function on aggregation pair::iterator, multimap::iterator> range = jobInfo.cloneAggregateColMap.equal_range(ac); for (multimap::iterator i = range.first; i != range.second; ++i) (i->second)->resultType(ct); } const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { vector projectKeys; // projected column keys -- unique RetColsVector pcv; // projected column vector -- may have duplicates // add the groupby cols in the front part of the project column vector (pcv) const CalpontSelectExecutionPlan::GroupByColumnList& groupByCols = csep->groupByCols(); uint64_t lastGroupByPos = 0; jobInfo.hasRollup = csep->withRollup(); for (uint64_t i = 0; i < groupByCols.size(); i++) { pcv.push_back(groupByCols[i]); lastGroupByPos++; const SimpleColumn* sc = dynamic_cast(groupByCols[i].get()); const ArithmeticColumn* ac = NULL; const FunctionColumn* fc = NULL; const RollupMarkColumn* mc = NULL; if (sc != NULL) { CalpontSystemCatalog::OID gbOid = sc->oid(); CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); CalpontSystemCatalog::OID dictOid = 0; CalpontSystemCatalog::ColType ct; string alias(extractTableAlias(sc)); string view(sc->viewName()); if (!sc->schemaName().empty()) { ct = sc->colType(); // XXX use this before connector sets colType in sc correctly. if (sc->isColumnStore() && dynamic_cast(sc) == NULL) { ct = jobInfo.csc->colType(sc->oid()); ct.charsetNumber = sc->colType().charsetNumber; } // X dictOid = isDictCol(ct); } else { gbOid = (tblOid + 1) + sc->colPosition(); ct = jobInfo.vtableColTypes[UniqId(gbOid, alias, "", "")]; } // As of bug3695, make sure varbinary is not used in group by. if (ct.colDataType == CalpontSystemCatalog::VARBINARY) throw runtime_error("VARBINARY in group by is not supported."); TupleInfo ti(setTupleInfo(ct, gbOid, jobInfo, tblOid, sc, alias)); uint32_t tupleKey = ti.key; if (find(projectKeys.begin(), projectKeys.end(), tupleKey) == projectKeys.end()) projectKeys.push_back(tupleKey); // for dictionary columns, replace the token oid with string oid if (dictOid > 0) { jobInfo.tokenOnly[tupleKey] = false; ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias); jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key; tupleKey = ti.key; } jobInfo.groupByColVec.push_back(tupleKey); } else if ((ac = dynamic_cast(groupByCols[i].get())) != NULL) { uint64_t eid = ac->expressionId(); CalpontSystemCatalog::ColType ct = ac->resultType(); TupleInfo ti(setExpTupleInfo(ct, eid, ac->alias(), jobInfo)); uint32_t tupleKey = ti.key; jobInfo.groupByColVec.push_back(tupleKey); if (find(projectKeys.begin(), projectKeys.end(), tupleKey) == projectKeys.end()) projectKeys.push_back(tupleKey); } else if ((fc = dynamic_cast(groupByCols[i].get())) != NULL) { uint64_t eid = fc->expressionId(); CalpontSystemCatalog::ColType ct = fc->resultType(); TupleInfo ti(setExpTupleInfo(ct, eid, fc->alias(), jobInfo)); uint32_t tupleKey = ti.key; jobInfo.groupByColVec.push_back(tupleKey); if (find(projectKeys.begin(), projectKeys.end(), tupleKey) == projectKeys.end()) projectKeys.push_back(tupleKey); } else if ((mc = dynamic_cast(groupByCols[i].get())) != NULL) { uint64_t eid = mc->expressionId(); CalpontSystemCatalog::ColType ct = mc->resultType(); TupleInfo ti(setExpTupleInfo(ct, eid, mc->alias(), jobInfo)); uint32_t tupleKey = ti.key; jobInfo.groupByColVec.push_back(tupleKey); if (find(projectKeys.begin(), projectKeys.end(), tupleKey) == projectKeys.end()) { projectKeys.push_back(tupleKey); } } else { std::ostringstream errmsg; errmsg << "doAggProject: unsupported group by column: " << typeid(*groupByCols[i]).name(); cerr << boldStart << errmsg.str() << boldStop << endl; throw logic_error(errmsg.str()); } } // process the returned columns RetColsVector& retCols = jobInfo.projectionCols; SRCP srcp; for (uint64_t i = 0; i < retCols.size(); i++) { GroupConcatColumn* gcc = dynamic_cast(retCols[i].get()); if (gcc != NULL) { srcp = gcc->aggParms()[0]; const RowColumn* rcp = dynamic_cast(srcp.get()); const vector& cols = rcp->columnVec(); for (vector::const_iterator j = cols.begin(); j != cols.end(); j++) { if (dynamic_cast(j->get()) == NULL) retCols.push_back(*j); } vector& orderCols = gcc->orderCols(); for (vector::iterator k = orderCols.begin(); k != orderCols.end(); k++) { if (dynamic_cast(k->get()) == NULL) retCols.push_back(*k); } continue; } #if 0 // MCOL-1201 Add support for multi-parameter UDAnF UDAFColumn* udafc = dynamic_cast(retCols[i].get()); if (udafc != NULL) { srcp = udafc->aggParms()[0]; const RowColumn* rcp = dynamic_cast(srcp.get()); const vector& cols = rcp->columnVec(); for (vector::const_iterator j = cols.begin(); j != cols.end(); j++) { srcp = *j; if (dynamic_cast(srcp.get()) == NULL) retCols.push_back(srcp); // Do we need this? const ArithmeticColumn* ac = dynamic_cast(srcp.get()); const FunctionColumn* fc = dynamic_cast(srcp.get()); if (ac != NULL || fc != NULL) { // bug 3728, make a dummy expression step for each expression. scoped_ptr es(new ExpressionStep(jobInfo)); es->expression(srcp, jobInfo); } } continue; } #endif srcp = retCols[i]; const AggregateColumn* ag = dynamic_cast(retCols[i].get()); // bug 3728 Make a dummy expression for srcp if it is an // expression. This is needed to fill in some stuff. // Note that es.expression does nothing if the item is not an expression. if (ag == NULL) { // Not an aggregate. Make a dummy expression for the item ExpressionStep es; es.expression(srcp, jobInfo); } else { // MCOL-1201 multi-argument aggregate. make a dummy expression // step for each argument that is an expression. for (uint32_t i = 0; i < ag->aggParms().size(); ++i) { srcp = ag->aggParms()[i]; ExpressionStep es; es.expression(srcp, jobInfo); } } } map dictMap; // bug 1853, the tupleKey - dictoid map for (uint64_t i = 0; i < retCols.size(); i++) { srcp = retCols[i]; const SimpleColumn* sc = dynamic_cast(srcp.get()); AggregateColumn* aggc = dynamic_cast(srcp.get()); bool doDistinct = (csep->distinct() && csep->groupByCols().empty()); // Use this instead of the above line to mimic MariaDB's sql_mode = 'ONLY_FULL_GROUP_BY' // bool doDistinct = (csep->distinct() && // csep->groupByCols().empty() && // !jobInfo.hasAggregation); uint32_t tupleKey = -1; string alias; string view; // returned column could be groupby column, a simplecoulumn not an aggregatecolumn int op = 0; CalpontSystemCatalog::OID dictOid = 0; CalpontSystemCatalog::ColType ct, aggCt; if (aggc) { GroupConcatColumn* gcc = dynamic_cast(retCols[i].get()); if (gcc != NULL) { if (jobInfo.hasRollup) { throw runtime_error( "GROUP_CONCAT and JSONARRAYAGG aggregations are not supported when WITH ROLLUP modifier is " "used"); } jobInfo.groupConcatCols.push_back(retCols[i]); uint64_t eid = gcc->expressionId(); ct = gcc->resultType(); TupleInfo ti(setExpTupleInfo(ct, eid, gcc->alias(), jobInfo)); tupleKey = ti.key; jobInfo.returnedColVec.push_back(make_pair(tupleKey, gcc->aggOp())); // not a tokenOnly column. Mark all the columns involved srcp = gcc->aggParms()[0]; const RowColumn* rowCol = dynamic_cast(srcp.get()); if (rowCol) { const std::vector& cols = rowCol->columnVec(); for (vector::const_iterator j = cols.begin(); j != cols.end(); j++) { sc = dynamic_cast(j->get()); if (sc) { CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); alias = extractTableAlias(sc); ct = sc->colType(); TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tblOid, sc, alias)); jobInfo.tokenOnly[ti.key] = false; } } } continue; } else { // Aggregate column not group concat AggParms& aggParms = aggc->aggParms(); for (uint32_t parm = 0; parm < aggParms.size(); ++parm) { // Only do the optimization of converting to count(*) if // there is only one parameter. if (aggParms.size() == 1 && aggc->constCol().get() != NULL) { // replace the aggregate on constant with a count(*) SRCP clone; UDAFColumn* udafc = dynamic_cast(aggc); if (udafc) { clone.reset(new UDAFColumn(*udafc, aggc->sessionID())); } else { clone.reset(new AggregateColumn(*aggc, aggc->sessionID())); } jobInfo.constAggregate.insert(make_pair(i, clone)); aggc->aggOp(AggregateColumn::COUNT_ASTERISK); aggc->distinct(false); } srcp = aggParms[parm]; sc = dynamic_cast(srcp.get()); if (parm == 0) { op = aggc->aggOp(); } else { op = AggregateColumn::MULTI_PARM; } doDistinct = aggc->distinct(); if (aggParms.size() == 1) { // Set the col type based on the single parm. // Changing col type based on a parm if multiple parms // doesn't really make sense. if (op != AggregateColumn::SUM && op != AggregateColumn::DISTINCT_SUM && op != AggregateColumn::AVG && op != AggregateColumn::DISTINCT_AVG && op != AggregateColumn::BIT_AND && op != AggregateColumn::BIT_OR && op != AggregateColumn::BIT_XOR) { updateAggregateColType(aggc, srcp, op, jobInfo); } } aggCt = aggc->resultType(); // As of bug3695, make sure varbinary is not used in aggregation. // TODO: allow for UDAF if (sc != NULL && sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY) throw runtime_error("VARBINARY in aggregate function is not supported."); // Project the parm columns or expressions if (sc != NULL) { CalpontSystemCatalog::OID retOid = sc->oid(); CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); alias = extractTableAlias(sc); view = sc->viewName(); if (!sc->schemaName().empty()) { ct = sc->colType(); // XXX use this before connector sets colType in sc correctly. if (sc->isColumnStore() && dynamic_cast(sc) == NULL) { ct = jobInfo.csc->colType(sc->oid()); ct.charsetNumber = sc->colType().charsetNumber; } // X dictOid = isDictCol(ct); } else { retOid = (tblOid + 1) + sc->colPosition(); ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "")]; } TupleInfo ti(setTupleInfo(ct, retOid, jobInfo, tblOid, sc, alias)); tupleKey = ti.key; // this is a string column if (dictOid > 0) { map::iterator findit = jobInfo.tokenOnly.find(tupleKey); // if the column has never seen, and the op is count: possible need count only. if (AggregateColumn::COUNT == op || AggregateColumn::COUNT_ASTERISK == op) { if (findit == jobInfo.tokenOnly.end()) jobInfo.tokenOnly[tupleKey] = true; } // if aggregate other than count, token is not enough. else if (op != 0 || doDistinct) { jobInfo.tokenOnly[tupleKey] = false; } findit = jobInfo.tokenOnly.find(tupleKey); if (!(findit != jobInfo.tokenOnly.end() && findit->second == true)) { dictMap[tupleKey] = dictOid; jobInfo.keyInfo->dictOidToColOid[dictOid] = retOid; ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias); jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key; } } } else { const ArithmeticColumn* ac = NULL; const FunctionColumn* fc = NULL; const WindowFunctionColumn* wc = NULL; bool hasAggCols = false; bool hasWndCols = false; if ((ac = dynamic_cast(srcp.get())) != NULL) { if (ac->aggColumnList().size() > 0) hasAggCols = true; if (ac->windowfunctionColumnList().size() > 0) hasWndCols = true; } else if ((fc = dynamic_cast(srcp.get())) != NULL) { if (fc->aggColumnList().size() > 0) hasAggCols = true; if (fc->windowfunctionColumnList().size() > 0) hasWndCols = true; } else if (dynamic_cast(srcp.get()) != NULL) { std::ostringstream errmsg; errmsg << "Invalid aggregate function nesting."; cerr << boldStart << errmsg.str() << boldStop << endl; throw logic_error(errmsg.str()); } else if (dynamic_cast(srcp.get()) != NULL) { } else if ((wc = dynamic_cast(srcp.get())) == NULL) { std::ostringstream errmsg; errmsg << "doAggProject: unsupported column: " << typeid(*(srcp.get())).name(); cerr << boldStart << errmsg.str() << boldStop << endl; throw logic_error(errmsg.str()); } uint64_t eid = srcp.get()->expressionId(); ct = srcp.get()->resultType(); TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo)); tupleKey = ti.key; if (hasAggCols && !hasWndCols) jobInfo.expressionVec.push_back(tupleKey); } // add to project list vector::iterator keyIt = find(projectKeys.begin(), projectKeys.end(), tupleKey); if (keyIt == projectKeys.end()) { RetColsVector::iterator it = pcv.end(); if (doDistinct) it = pcv.insert(pcv.begin() + lastGroupByPos++, srcp); else it = pcv.insert(pcv.end(), srcp); projectKeys.insert(projectKeys.begin() + std::distance(pcv.begin(), it), tupleKey); } else if (doDistinct) // @bug4250, move forward distinct column if necessary. { uint32_t pos = std::distance(projectKeys.begin(), keyIt); if (pos >= lastGroupByPos) { pcv[pos] = pcv[lastGroupByPos]; pcv[lastGroupByPos] = srcp; projectKeys[pos] = projectKeys[lastGroupByPos]; projectKeys[lastGroupByPos] = tupleKey; lastGroupByPos++; } } if (doDistinct && dictOid > 0) tupleKey = jobInfo.keyInfo->dictKeyMap[tupleKey]; // remember the columns to be returned jobInfo.returnedColVec.push_back(make_pair(tupleKey, op)); // bug 1499 distinct processing, save unique distinct columns if (doDistinct && (jobInfo.distinctColVec.end() == find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey))) { jobInfo.distinctColVec.push_back(tupleKey); } } } } else { // Not an Aggregate // simple column selected if (sc != NULL) { // one column only need project once CalpontSystemCatalog::OID retOid = sc->oid(); CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); alias = extractTableAlias(sc); view = sc->viewName(); if (!sc->schemaName().empty()) { ct = sc->colType(); // XXX use this before connector sets colType in sc correctly. if (sc->isColumnStore() && dynamic_cast(sc) == NULL) { ct = jobInfo.csc->colType(sc->oid()); ct.charsetNumber = sc->colType().charsetNumber; } // X dictOid = isDictCol(ct); } else { retOid = (tblOid + 1) + sc->colPosition(); ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "")]; } TupleInfo ti(setTupleInfo(ct, retOid, jobInfo, tblOid, sc, alias)); tupleKey = ti.key; // this is a string column if (dictOid > 0) { map::iterator findit = jobInfo.tokenOnly.find(tupleKey); // if the column has never seen, and the op is count: possible need count only. if (AggregateColumn::COUNT == op || AggregateColumn::COUNT_ASTERISK == op) { if (findit == jobInfo.tokenOnly.end()) jobInfo.tokenOnly[tupleKey] = true; } // if aggregate other than count, token is not enough. else if (op != 0 || doDistinct) { jobInfo.tokenOnly[tupleKey] = false; } findit = jobInfo.tokenOnly.find(tupleKey); if (!(findit != jobInfo.tokenOnly.end() && findit->second == true)) { dictMap[tupleKey] = dictOid; jobInfo.keyInfo->dictOidToColOid[dictOid] = retOid; ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias); jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key; } } } else { const ArithmeticColumn* ac = NULL; const FunctionColumn* fc = NULL; const WindowFunctionColumn* wc = NULL; bool hasAggCols = false; bool hasWndCols = false; bool hasFuncColsWithOneArgument = false; if ((ac = dynamic_cast(srcp.get())) != NULL) { if (ac->aggColumnList().size() > 0) hasAggCols = true; if (ac->windowfunctionColumnList().size() > 0) hasWndCols = true; } else if (dynamic_cast(srcp.get()) != NULL) { } else if ((fc = dynamic_cast(srcp.get())) != NULL) { if (fc->aggColumnList().size() > 0) hasAggCols = true; if (fc->windowfunctionColumnList().size() > 0) hasWndCols = true; // MCOL-5476 Currently support function with only one argument for group by list. if (fc->simpleColumnList().size() == 1) hasFuncColsWithOneArgument = true; } else if (dynamic_cast(srcp.get()) != NULL) { std::ostringstream errmsg; errmsg << "Invalid aggregate function nesting."; cerr << boldStart << errmsg.str() << boldStop << endl; throw logic_error(errmsg.str()); } else if (dynamic_cast(srcp.get()) != NULL) { } else if ((wc = dynamic_cast(srcp.get())) == NULL) { std::ostringstream errmsg; errmsg << "doAggProject: unsupported column: " << typeid(*(srcp.get())).name(); cerr << boldStart << errmsg.str() << boldStop << endl; throw logic_error(errmsg.str()); } uint64_t eid = srcp.get()->expressionId(); ct = srcp.get()->resultType(); TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo)); tupleKey = ti.key; if (hasAggCols && !hasWndCols) { jobInfo.expressionVec.push_back(tupleKey); } if (hasFuncColsWithOneArgument) { FunctionColumnInfo fcInfo(fcInfo.associatedColumnOid = fc->simpleColumnList().front()->oid(), fc->functionName()); jobInfo.functionColumnMap.insert({tupleKey, fcInfo}); } } // add to project list vector::iterator keyIt = find(projectKeys.begin(), projectKeys.end(), tupleKey); if (keyIt == projectKeys.end()) { RetColsVector::iterator it = pcv.end(); if (doDistinct) it = pcv.insert(pcv.begin() + lastGroupByPos++, srcp); else it = pcv.insert(pcv.end(), srcp); projectKeys.insert(projectKeys.begin() + std::distance(pcv.begin(), it), tupleKey); } else if (doDistinct) // @bug4250, move forward distinct column if necessary. { uint32_t pos = std::distance(projectKeys.begin(), keyIt); if (pos >= lastGroupByPos) { pcv[pos] = pcv[lastGroupByPos]; pcv[lastGroupByPos] = srcp; projectKeys[pos] = projectKeys[lastGroupByPos]; projectKeys[lastGroupByPos] = tupleKey; lastGroupByPos++; } } if (doDistinct && dictOid > 0) tupleKey = jobInfo.keyInfo->dictKeyMap[tupleKey]; // remember the columns to be returned jobInfo.returnedColVec.push_back(make_pair(tupleKey, op)); // bug 1499 distinct processing, save unique distinct columns if (doDistinct && (jobInfo.distinctColVec.end() == find(jobInfo.distinctColVec.begin(), jobInfo.distinctColVec.end(), tupleKey))) { jobInfo.distinctColVec.push_back(tupleKey); } } } // for dictionary columns not count only, replace the token oid with string oid for (vector >::iterator it = jobInfo.returnedColVec.begin(); it != jobInfo.returnedColVec.end(); it++) { // if the column is a dictionary column and not count only bool tokenOnly = false; map::iterator i = jobInfo.tokenOnly.find(it->first); if (i != jobInfo.tokenOnly.end()) tokenOnly = i->second; if (dictMap.find(it->first) != dictMap.end() && !tokenOnly) { uint32_t tupleKey = jobInfo.keyInfo->dictKeyMap[it->first]; int op = it->second; *it = make_pair(tupleKey, op); } } return doProject(pcv, jobInfo); } template class Uniqer { private: typedef typename T::mapped_type Mt_; class Pred { public: Pred(const Mt_& retCol) : fRetCol(retCol) { } bool operator()(const Mt_ rc) const { return fRetCol->sameColumn(rc.get()); } private: const Mt_& fRetCol; }; public: void operator()(typename T::value_type mapItem) { Pred pred(mapItem.second); RetColsVector::iterator iter; iter = find_if(fRetColsVec.begin(), fRetColsVec.end(), pred); if (iter == fRetColsVec.end()) { // Add this ReturnedColumn fRetColsVec.push_back(mapItem.second); } } RetColsVector fRetColsVec; }; uint16_t numberSteps(JobStepVector& steps, uint16_t stepNo, uint32_t flags) { JobStepVector::iterator iter = steps.begin(); JobStepVector::iterator end = steps.end(); while (iter != end) { // don't number the delimiters // if (dynamic_cast(iter->get()) != NULL) //{ // ++iter; // continue; //} JobStep* pJobStep = iter->get(); pJobStep->stepId(stepNo); pJobStep->setTraceFlags(flags); stepNo++; ++iter; } return stepNo; } void changePcolStepToPcolScan(JobStepVector::iterator& it, JobStepVector::iterator& end) { // make sure no pseudo column is a scan column idbassert(dynamic_cast(it->get()) == NULL); pColStep* colStep = dynamic_cast(it->get()); pColScanStep* scanStep = 0; // Might be a pDictionaryScan step if (colStep) { scanStep = new pColScanStep(*colStep); } else { // If we have a pDictionaryScan-pColStep duo, then change the pColStep if (typeid(*(it->get())) == typeid(pDictionaryScan) && std::distance(it, end) > 1 && typeid(*((it + 1)->get())) == typeid(pColStep)) { ++it; colStep = dynamic_cast(it->get()); scanStep = new pColScanStep(*colStep); } } if (scanStep) { it->reset(scanStep); } } // optimize filter order // perform none string filters first because string filter joins the tokens. void optimizeFilterOrder(JobStepVector& qsv) { // move all none string filters uint64_t pdsPos = 0; // int64_t orbranch = 0; for (; pdsPos < qsv.size(); ++pdsPos) { // skip the or branches // OrDelimiterLhs* lhs = dynamic_cast(qsv[pdsPos].get()); // if (lhs != NULL) // { // orbranch++; // continue; // } // // if (orbranch > 0) // { // UnionStep* us = dynamic_cast(qsv[pdsPos].get()); // if (us) // orbranch--; // } // else { pDictionaryScan* pds = dynamic_cast(qsv[pdsPos].get()); if (pds) break; } } // no pDictionaryScan step if (pdsPos >= qsv.size()) return; // get the filter steps that are not in or branches vector pcolIdVec; JobStepVector pcolStepVec; // orbranch = 0; for (uint64_t i = pdsPos; i < qsv.size(); ++i) { // OrDelimiterLhs* lhs = dynamic_cast(qsv[pdsPos].get()); // if (lhs != NULL) // { // orbranch++; // continue; // } // if (orbranch > 0) // { // UnionStep* us = dynamic_cast(qsv[pdsPos].get()); // if (us) // orbranch--; // } // else { pColStep* pcol = dynamic_cast(qsv[i].get()); if (pcol != NULL && pcol->filterCount() > 0) pcolIdVec.push_back(i); } } for (vector::reverse_iterator r = pcolIdVec.rbegin(); r < pcolIdVec.rend(); ++r) { pcolStepVec.push_back(qsv[*r]); qsv.erase(qsv.begin() + (*r)); } qsv.insert(qsv.begin() + pdsPos, pcolStepVec.rbegin(), pcolStepVec.rend()); } void exceptionHandler(JobList* joblist, const JobInfo& jobInfo, const string& logMsg, logging::LOG_TYPE logLevel = LOG_TYPE_ERROR) { cerr << "### JobListFactory ses:" << jobInfo.sessionId << " caught: " << logMsg << endl; Message::Args args; args.add(logMsg); jobInfo.logger->logMessage(logLevel, LogMakeJobList, args, LoggingID(5, jobInfo.sessionId, jobInfo.txnId, 0)); // dummy delivery map, workaround for (qb == 2) in main.cpp DeliveredTableMap dtm; SJSTEP dummyStep; dtm[0] = dummyStep; joblist->addDelivery(dtm); } void parseExecutionPlan(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobStepVector& querySteps, JobStepVector& projectSteps, DeliveredTableMap& deliverySteps) { ParseTree* filters = csep->filters(); jobInfo.deliveredCols = csep->returnedCols(); if (filters != 0) { JLF_ExecPlanToJobList::walkTree(filters, jobInfo); } if (jobInfo.trace) cout << endl << "Stack: " << endl; if (!jobInfo.stack.empty()) { idbassert(jobInfo.stack.size() == 1); querySteps = jobInfo.stack.top(); jobInfo.stack.pop(); // do some filter order optimization optimizeFilterOrder(querySteps); } if (jobInfo.selectAndFromSubs.size() > 0) { querySteps.insert(querySteps.begin(), jobInfo.selectAndFromSubs.begin(), jobInfo.selectAndFromSubs.end()); } // bug4531, window function support WindowFunctionStep::checkWindowFunction(csep, jobInfo); // bug3391, move forward the aggregation check for no aggregate having clause. checkAggregation(csep, jobInfo); // include filters in having clause, if any. if (jobInfo.havingStepVec.size() > 0) querySteps.insert(querySteps.begin(), jobInfo.havingStepVec.begin(), jobInfo.havingStepVec.end()); // Need to change the leading pColStep to a pColScanStep // Keep a list of the (table OIDs,alias) that we've already processed for @bug 598 self-join set seenTableIds; // Stack of seenTables to make sure the left-hand side and right-hand have the same content stack > seenTableStack; if (!querySteps.empty()) { JobStepVector::iterator iter = querySteps.begin(); JobStepVector::iterator end = querySteps.end(); for (; iter != end; ++iter) { idbassert(iter->get()); // As of bug3695, make sure varbinary is not used in filters. if (typeid(*(iter->get())) == typeid(pColStep)) { // only pcolsteps, no pcolscan yet. pColStep* pcol = dynamic_cast(iter->get()); if (pcol->colType().colDataType == CalpontSystemCatalog::VARBINARY) { if (pcol->filterCount() != 1) throw runtime_error("VARBINARY in filter or function is not supported."); // error out if the filter is not "is null" or "is not null" // should block "= null" and "!= null" ??? messageqcpp::ByteStream filter = pcol->filterString(); uint8_t op = 0; filter >> op; bool nullOp = (op == COMPARE_EQ || op == COMPARE_NE || op == COMPARE_NIL); filter >> op; // skip roundFlag uint64_t value = 0; filter >> value; nullOp = nullOp && (value == 0xfffffffffffffffeULL); if (nullOp == false) throw runtime_error("VARBINARY in filter or function is not supported."); } } // // save the current seentable for right-hand side // if (typeid(*(iter->get())) == typeid(OrDelimiterLhs)) // { // seenTableStack.push(seenTableIds); // continue; // } // // // restore the seentable // else if (typeid(*(iter->get())) == typeid(OrDelimiterRhs)) // { // seenTableIds = seenTableStack.top(); // seenTableStack.pop(); // continue; // } if (typeid(*(iter->get())) == typeid(pColStep)) { pColStep* colStep = dynamic_cast(iter->get()); string alias(colStep->alias()); string view(colStep->view()); // If this is the first time we've seen this table or alias uint32_t tableId = 0; tableId = getTableKey(jobInfo, colStep->tupleId()); if (seenTableIds.find(tableId) == seenTableIds.end()) changePcolStepToPcolScan(iter, end); // Mark this OID as seen seenTableIds.insert(tableId); } } } // build the project steps if (jobInfo.deliveredCols.empty()) { throw logic_error("No delivery column."); } // if any aggregate columns if (jobInfo.hasAggregation == true) { projectSteps = doAggProject(csep, jobInfo); } else { projectSteps = doProject(jobInfo.nonConstCols, jobInfo); } // bug3736, have jobInfo include the column map info. const CalpontSelectExecutionPlan::ColumnMap& retCols = csep->columnMap(); CalpontSelectExecutionPlan::ColumnMap::const_iterator i = retCols.begin(); for (; i != retCols.end(); i++) { SimpleColumn* sc = dynamic_cast(i->second.get()); if (sc && !sc->schemaName().empty()) { CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); CalpontSystemCatalog::ColType ct = sc->colType(); // XXX use this before connector sets colType in sc correctly. if (sc->isColumnStore() && dynamic_cast(sc) == NULL) { ct = jobInfo.csc->colType(sc->oid()); ct.charsetNumber = sc->colType().charsetNumber; } // X string alias(extractTableAlias(sc)); TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tblOid, sc, alias)); uint32_t colKey = ti.key; uint32_t tblKey = getTableKey(jobInfo, colKey); jobInfo.columnMap[tblKey].push_back(colKey); if (jobInfo.tableColMap.find(tblKey) == jobInfo.tableColMap.end()) jobInfo.tableColMap[tblKey] = i->second; } } // special case, select without a table, like: select 1; if (jobInfo.constantCol == CONST_COL_ONLY) // XXX: WITH ROLLUP return; // If there are no filters (select * from table;) then add one simple scan // TODO: more work here... // @bug 497 fix. populate a map of tableoid for querysteps. tablescan // cols whose table does not belong to the map typedef set tableIDMap_t; tableIDMap_t tableIDMap; JobStepVector::iterator qsiter = querySteps.begin(); JobStepVector::iterator qsend = querySteps.end(); uint32_t tableId = 0; while (qsiter != qsend) { JobStep* js = qsiter->get(); if (js->tupleId() != (uint64_t)-1) tableId = getTableKey(jobInfo, js->tupleId()); tableIDMap.insert(tableId); ++qsiter; } JobStepVector::iterator jsiter = projectSteps.begin(); JobStepVector::iterator jsend = projectSteps.end(); while (jsiter != jsend) { JobStep* js = jsiter->get(); if (js->tupleId() != (uint64_t)-1) tableId = getTableKey(jobInfo, js->tupleId()); else tableId = getTableKey(jobInfo, js); if (typeid(*(jsiter->get())) == typeid(pColStep) && tableIDMap.find(tableId) == tableIDMap.end()) { SJSTEP step0 = *jsiter; pColStep* colStep = dynamic_cast(step0.get()); pColScanStep* scanStep = new pColScanStep(*colStep); // clear out any output association so we get a nice, new one during association scanStep->outputAssociation(JobStepAssociation()); step0.reset(scanStep); querySteps.push_back(step0); js = step0.get(); tableId = getTableKey(jobInfo, js->tupleId()); tableIDMap.insert(tableId); } ++jsiter; } } // v-table mode void makeVtableModeSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobStepVector& querySteps, JobStepVector& projectSteps, DeliveredTableMap& deliverySteps) { // special case for outer query order by limit -- return all if (jobInfo.subId == 0 && csep->hasOrderBy() && !csep->specHandlerProcessed()) { jobInfo.limitCount = (uint64_t)-1; } // support order by and limit in sub-query/union or // GROUP BY handler processed outer query order else if (csep->orderByCols().size() > 0) { addOrderByAndLimit(csep, jobInfo); } // limit without order by in any query else { jobInfo.limitStart = csep->limitStart(); jobInfo.limitCount = csep->limitNum(); } // Bug 2123. Added overrideLargeSideEstimate parm below. True if the query was written // with a hint telling us to skip the estimatation process for determining the large side // table and instead use the table order in the from clause. associateTupleJobSteps(querySteps, projectSteps, deliverySteps, jobInfo, csep->overrideLargeSideEstimate()); uint16_t stepNo = jobInfo.subId * 10000; numberSteps(querySteps, stepNo, jobInfo.traceFlags); // SJSTEP ds = deliverySteps.begin()->second; idbassert(deliverySteps.begin()->second.get()); // ds->stepId(stepNo); // ds->setTraceFlags(jobInfo.traceFlags); } void makeAnalyzeTableJobSteps(MCSAnalyzeTableExecutionPlan* caep, JobInfo& jobInfo, JobStepVector& querySteps, DeliveredTableMap& deliverySteps) { JobStepVector projectSteps; const auto& retCols = caep->returnedCols(); if (retCols.size() == 0) return; // This is strange, but to generate a valid `out rowgroup` in AnnexStep we have to initialize // `nonConstDelCols`, otherwise we will get an empty result in ExeMgr. Why don't just use `in // row group`? jobInfo.nonConstDelCols = retCols; // Iterate over `returned columns` and create a `pColStep` for each. for (uint32_t i = 0; i < retCols.size(); i++) { const SimpleColumn* sc = dynamic_cast(retCols[i].get()); CalpontSystemCatalog::OID oid = sc->oid(); CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); CalpontSystemCatalog::ColType colType; if (!sc->schemaName().empty()) { SJSTEP sjstep; colType = sc->colType(); auto* pcs = new pColStep(oid, tblOid, colType, jobInfo); pcs->alias(extractTableAlias(sc)); pcs->view(sc->viewName()); pcs->name(sc->columnName()); pcs->cardinality(sc->cardinality()); auto ti = setTupleInfo(colType, oid, jobInfo, tblOid, sc, pcs->alias()); pcs->tupleId(ti.key); sjstep.reset(pcs); projectSteps.push_back(sjstep); } } if (projectSteps.size() == 0) return; // Transform the first `pColStep` to `pColScanStep`. SJSTEP firstStep = projectSteps.front(); pColStep* colStep = static_cast(firstStep.get()); // Create a `pColScanStep` using first `pColStep`. pColScanStep* scanStep = new pColScanStep(*colStep); scanStep->outputAssociation(JobStepAssociation()); // Update first step. firstStep.reset(scanStep); // Create tuple BPS step. TupleBPS* tbps = new TupleBPS(*scanStep, jobInfo); tbps->setFirstStepType(SCAN); // One `filter` step is scan step. tbps->setBPP(scanStep); tbps->setStepCount(); vector pos; vector oids; vector keys; vector scale; vector precision; vector types; vector csNums; pos.push_back(2); bool passThruCreated = false; for (JobStepVector::iterator it = projectSteps.begin(); it != projectSteps.end(); it++) { JobStep* js = it->get(); auto* colStep = static_cast(js); // TODO: Hoist the condition branch from the cycle, it will look ugly, but probaby faster. if (UNLIKELY(!passThruCreated)) { PassThruStep* pts = new PassThruStep(*colStep); passThruCreated = true; pts->alias(colStep->alias()); pts->view(colStep->view()); pts->name(colStep->name()); pts->tupleId(colStep->tupleId()); tbps->setProjectBPP(pts, NULL); } else { tbps->setProjectBPP(it->get(), NULL); } TupleInfo ti(getTupleInfo(colStep->tupleId(), jobInfo)); pos.push_back(pos.back() + ti.width); oids.push_back(ti.oid); keys.push_back(ti.key); types.push_back(ti.dtype); csNums.push_back(ti.csNum); scale.push_back(ti.scale); precision.push_back(ti.precision); } RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, 20); SJSTEP sjsp; sjsp.reset(tbps); // Add tuple BPS step to query steps. querySteps.push_back(sjsp); // Delivery step. SJSTEP annexStep; auto* tas = new TupleAnnexStep(jobInfo); annexStep.reset(tas); // Create input `RowGroupDL`. RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize); dlIn->OID(CNX_VTABLE_ID); AnyDataListSPtr spdlIn(new AnyDataList()); spdlIn->rowGroupDL(dlIn); JobStepAssociation jsaIn; jsaIn.outAdd(spdlIn); // Create output `RowGroupDL`. RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize); dlOut->OID(CNX_VTABLE_ID); AnyDataListSPtr spdlOut(new AnyDataList()); spdlOut->rowGroupDL(dlOut); JobStepAssociation jsaOut; jsaOut.outAdd(spdlOut); // Set input and output. tbps->setOutputRowGroup(rg); tbps->outputAssociation(jsaIn); annexStep->inputAssociation(jsaIn); annexStep->outputAssociation(jsaOut); // Initialize. tas->initialize(rg, jobInfo); // Add `annexStep` to delivery steps and to query steps. deliverySteps[CNX_VTABLE_ID] = annexStep; querySteps.push_back(annexStep); if (jobInfo.trace) { std::cout << "TupleBPS created: " << std::endl; std::cout << tbps->toString() << std::endl; std::cout << "Result row group: " << std::endl; std::cout << rg.toString() << std::endl; } } } // namespace namespace joblist { void makeJobSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobStepVector& querySteps, JobStepVector& projectSteps, DeliveredTableMap& deliverySteps) { // v-table mode, switch to tuple methods and return the tuple joblist. //@Bug 1958 Build table list only for tryTuples. const CalpontSelectExecutionPlan::SelectList& fromSubquery = csep->derivedTableList(); int i = 0; for (CalpontSelectExecutionPlan::TableList::const_iterator it = csep->tableList().begin(); it != csep->tableList().end(); it++) { CalpontSystemCatalog::OID oid; if (it->schema.empty()) oid = doFromSubquery(fromSubquery[i++].get(), it->alias, it->view, jobInfo); else if (it->fisColumnStore) oid = jobInfo.csc->tableRID(*it).objnum; else oid = 0; uint32_t tableUid = makeTableKey(jobInfo, oid, it->table, it->alias, it->schema, it->view); jobInfo.tableList.push_back(tableUid); } // add select suqueries preprocessSelectSubquery(csep, jobInfo); // semi-join may appear in having clause if (csep->having() != NULL) preprocessHavingClause(csep, jobInfo); // parse plan and make jobstep list parseExecutionPlan(csep, jobInfo, querySteps, projectSteps, deliverySteps); makeVtableModeSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps); } void makeUnionJobSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobStepVector& querySteps, JobStepVector&, DeliveredTableMap& deliverySteps) { CalpontSelectExecutionPlan::SelectList& selectVec = csep->unionVec(); uint8_t distinctUnionNum = csep->distinctUnionNum(); RetColsVector unionRetCols = csep->returnedCols(); JobStepVector unionFeeders; for (CalpontSelectExecutionPlan::SelectList::iterator cit = selectVec.begin(); cit != selectVec.end(); cit++) { // @bug4848, enhance and unify limit handling. SJSTEP sub = doUnionSub(cit->get(), jobInfo); querySteps.push_back(sub); unionFeeders.push_back(sub); } jobInfo.deliveredCols = unionRetCols; SJSTEP unionStep(unionQueries(unionFeeders, distinctUnionNum, jobInfo)); querySteps.push_back(unionStep); uint16_t stepNo = jobInfo.subId * 10000; numberSteps(querySteps, stepNo, jobInfo.traceFlags); deliverySteps[execplan::CNX_VTABLE_ID] = unionStep; } } // namespace joblist namespace { void handleException(std::exception_ptr e, JobList* jl, JobInfo& jobInfo, unsigned& errCode, string& emsg) { try { std::rethrow_exception(e); } catch (IDBExcept& iex) { jobInfo.errorInfo->errCode = iex.errorCode(); errCode = iex.errorCode(); exceptionHandler(jl, jobInfo, iex.what(), LOG_TYPE_DEBUG); emsg = iex.what(); } catch (const std::exception& ex) { jobInfo.errorInfo->errCode = makeJobListErr; errCode = makeJobListErr; exceptionHandler(jl, jobInfo, ex.what()); emsg = ex.what(); } catch (...) { jobInfo.errorInfo->errCode = makeJobListErr; errCode = makeJobListErr; exceptionHandler(jl, jobInfo, "an exception"); emsg = "An unknown internal joblist error"; } delete jl; jl = nullptr; } SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, const PrimitiveServerThreadPools& primitiveServerThreadPools, bool isExeMgr, unsigned& errCode, string& emsg) { // TODO: This part requires a proper refactoring, we have to move common methods from // `CalpontSelectExecutionPlan` to the base class. I have no idea what's a point of // `CalpontExecutionPlan` as base class without any meaningful virtual functions and common // fields. The main thing I concern about - to make a huge refactoring of this before a week // of release, because there is no time to test it and I do not want to introduce an errors in // the existing code. Hope will make it on the next iteration of statistics development. CalpontSelectExecutionPlan* csep = dynamic_cast(cplan); if (csep != nullptr) { boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(csep->sessionID()); static config::Config* sysConfig = config::Config::makeConfig(); int pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str()); // We have to go ahead and create JobList now so we can store the joblist's // projectTableOID pointer in JobInfo for use during jobstep creation. SErrorInfo errorInfo(new ErrorInfo()); boost::shared_ptr keyInfo(new TupleKeyInfo); boost::shared_ptr subCount(new int); *subCount = 0; JobList* jl = new TupleJobList(isExeMgr); jl->setPMsConfigured(pmsConfigured); jl->priority(csep->priority()); jl->errorInfo(errorInfo); rm->setTraceFlags(csep->traceFlags()); // Stuff a util struct with some stuff we always need JobInfo jobInfo(rm); jobInfo.sessionId = csep->sessionID(); jobInfo.txnId = csep->txnID(); jobInfo.verId = csep->verID(); jobInfo.statementId = csep->statementID(); jobInfo.queryType = csep->queryType(); jobInfo.csc = csc; jobInfo.primitiveServerThreadPools = primitiveServerThreadPools; // TODO: clean up the vestiges of the bool trace jobInfo.trace = csep->traceOn(); jobInfo.traceFlags = csep->traceFlags(); jobInfo.isExeMgr = isExeMgr; // jobInfo.tryTuples = tryTuples; // always tuples after release 3.0 jobInfo.stringScanThreshold = csep->stringScanThreshold(); jobInfo.errorInfo = errorInfo; jobInfo.keyInfo = keyInfo; jobInfo.subCount = subCount; jobInfo.projectingTableOID = jl->projectingTableOIDPtr(); jobInfo.jobListPtr = jl; jobInfo.stringTableThreshold = csep->stringTableThreshold(); jobInfo.localQuery = csep->localQuery(); jobInfo.uuid = csep->uuid(); jobInfo.timeZone = csep->timeZone(); /* disk-based join vars */ jobInfo.smallSideLimit = csep->djsSmallSideLimit(); jobInfo.largeSideLimit = csep->djsLargeSideLimit(); jobInfo.partitionSize = csep->djsPartitionSize(); jobInfo.djsMaxPartitionTreeDepth = csep->djsMaxPartitionTreeDepth(); jobInfo.djsForceRun = csep->djsForceRun(); jobInfo.maxPmJoinResultCount = csep->maxPmJoinResultCount(); jobInfo.umMemLimit.reset(new int64_t); *(jobInfo.umMemLimit) = csep->umMemLimit(); jobInfo.isDML = csep->isDML(); jobInfo.smallSideUsage.reset(new int64_t); *jobInfo.smallSideUsage = 0; // set fifoSize to 1 for CalpontSystemCatalog query if (csep->sessionID() & 0x80000000) jobInfo.fifoSize = 1; else if (csep->traceOn()) cout << (*csep) << endl; try { JobStepVector querySteps; JobStepVector projectSteps; DeliveredTableMap deliverySteps; if (csep->unionVec().size() == 0) makeJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps); else makeUnionJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps); uint16_t stepNo = numberSteps(querySteps, 0, jobInfo.traceFlags); stepNo = numberSteps(projectSteps, stepNo, jobInfo.traceFlags); struct timeval stTime; if (jobInfo.trace) { ostringstream oss; oss << endl; oss << endl << "job parms: " << endl; oss << "maxBuckets = " << jobInfo.maxBuckets << ", maxElems = " << jobInfo.maxElems << ", flushInterval = " << jobInfo.flushInterval << ", fifoSize = " << jobInfo.fifoSize << endl; oss << "UUID: " << jobInfo.uuid << endl; oss << endl << "job filter steps: " << endl; ostream_iterator oIter(oss, "\n"); copy(querySteps.begin(), querySteps.end(), oIter); oss << endl << "job project steps: " << endl; copy(projectSteps.begin(), projectSteps.end(), oIter); oss << endl << "job delivery steps: " << endl; DeliveredTableMap::iterator dsi = deliverySteps.begin(); while (dsi != deliverySteps.end()) { oss << dynamic_cast(dsi->second.get()) << endl; ++dsi; } oss << endl; gettimeofday(&stTime, 0); struct tm tmbuf; localtime_r(&stTime.tv_sec, &tmbuf); ostringstream tms; tms << setfill('0') << setw(4) << (tmbuf.tm_year + 1900) << setw(2) << (tmbuf.tm_mon + 1) << setw(2) << (tmbuf.tm_mday) << setw(2) << (tmbuf.tm_hour) << setw(2) << (tmbuf.tm_min) << setw(2) << (tmbuf.tm_sec) << setw(6) << (stTime.tv_usec); string tmstr(tms.str()); string jsrname("jobstep." + tmstr + ".dot"); ofstream dotFile(jsrname.c_str()); jlf_graphics::writeDotCmds(dotFile, querySteps, projectSteps); char timestamp[80]; ctime_r((const time_t*)&stTime.tv_sec, timestamp); oss << "runtime updates: start at " << timestamp; cout << oss.str(); Message::Args args; args.add(oss.str()); jobInfo.logger->logMessage(LOG_TYPE_DEBUG, LogSQLTrace, args, LoggingID(5, jobInfo.sessionId, jobInfo.txnId, 0)); cout << flush; } else { gettimeofday(&stTime, 0); } // Finish initializing the JobList object jl->addQuery(querySteps); jl->addProject(projectSteps); jl->addDelivery(deliverySteps); csep->setDynamicParseTreeVec(jobInfo.dynamicParseTreeVec); dynamic_cast(jl)->setDeliveryFlag(true); } catch (IDBExcept& iex) { jobInfo.errorInfo->errCode = iex.errorCode(); errCode = iex.errorCode(); exceptionHandler(jl, jobInfo, iex.what(), LOG_TYPE_DEBUG); emsg = iex.what(); goto bailout; } catch (const std::exception& ex) { jobInfo.errorInfo->errCode = makeJobListErr; errCode = makeJobListErr; exceptionHandler(jl, jobInfo, ex.what()); emsg = ex.what(); goto bailout; } catch (...) { jobInfo.errorInfo->errCode = makeJobListErr; errCode = makeJobListErr; exceptionHandler(jl, jobInfo, "an exception"); emsg = "An unknown internal joblist error"; goto bailout; } goto done; bailout: delete jl; jl = 0; if (emsg.empty()) emsg = "An unknown internal joblist error"; done: SJLP jlp(jl); return jlp; } else { auto* caep = dynamic_cast(cplan); JobList* jl = nullptr; if (caep == nullptr) { SJLP jlp(jl); std::cerr << "Ivalid execution plan" << std::endl; return jlp; } jl = new TupleJobList(isExeMgr); boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(caep->sessionID()); static config::Config* sysConfig = config::Config::makeConfig(); uint32_t pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str()); SErrorInfo errorInfo(new ErrorInfo()); boost::shared_ptr keyInfo(new TupleKeyInfo); boost::shared_ptr subCount(new int); *subCount = 0; jl->setPMsConfigured(pmsConfigured); jl->priority(caep->priority()); jl->errorInfo(errorInfo); JobInfo jobInfo(rm); jobInfo.sessionId = caep->sessionID(); jobInfo.txnId = caep->txnID(); jobInfo.verId = caep->verID(); jobInfo.statementId = caep->statementID(); jobInfo.csc = csc; jobInfo.trace = caep->traceOn(); jobInfo.isExeMgr = isExeMgr; // TODO: Implement it when we have a dict column. jobInfo.stringScanThreshold = 20; jobInfo.errorInfo = errorInfo; jobInfo.keyInfo = keyInfo; jobInfo.subCount = subCount; jobInfo.projectingTableOID = jl->projectingTableOIDPtr(); jobInfo.jobListPtr = jl; jobInfo.localQuery = caep->localQuery(); jobInfo.timeZone = caep->timeZone(); try { JobStepVector querySteps; DeliveredTableMap deliverySteps; // Parse exe plan and create a jobstesp from it. makeAnalyzeTableJobSteps(caep, jobInfo, querySteps, deliverySteps); if (!querySteps.size()) { delete jl; // Indicates that query steps is empty. errCode = logging::statisticsJobListEmpty; jl = nullptr; goto out; } numberSteps(querySteps, 0, jobInfo.traceFlags); jl->addQuery(querySteps); jl->addDelivery(deliverySteps); dynamic_cast(jl)->setDeliveryFlag(true); } catch (...) { handleException(std::current_exception(), jl, jobInfo, errCode, emsg); } out: SJLP jlp(jl); return jlp; } } } // namespace namespace joblist { /* static */ SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* rm, const PrimitiveServerThreadPools& primitiveServerThreadPools, bool tryTuple, bool isExeMgr) { SJLP ret; string emsg; unsigned errCode = 0; ret = makeJobList_(cplan, rm, primitiveServerThreadPools, isExeMgr, errCode, emsg); if (!ret) { ret.reset(new TupleJobList(isExeMgr)); SErrorInfo errorInfo(new ErrorInfo); errorInfo->errCode = errCode; errorInfo->errMsg = emsg; ret->errorInfo(errorInfo); } return ret; } } // namespace joblist #ifdef __clang__ #pragma clang diagnostic pop #endif