/* Copyright (C) 2014 InfiniDB, Inc. Copyright (c) 2016-2020 MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: windowfunctionstep.cpp 9681 2013-07-11 22:58:05Z xlou $ //#define NDEBUG #include #include #include using namespace std; #include // to_upper_copy #include #include #include using namespace boost; #include "atomicops.h" using namespace atomicops; #include "loggingid.h" #include "errorcodes.h" #include "idberrorinfo.h" using namespace logging; #include "configcpp.h" using namespace config; #include "calpontselectexecutionplan.h" #include "calpontsystemcatalog.h" #include "aggregatecolumn.h" #include "arithmeticcolumn.h" #include "constantcolumn.h" #include "functioncolumn.h" #include "pseudocolumn.h" #include "simplefilter.h" #include "windowfunctioncolumn.h" using namespace execplan; #include "../../utils/windowfunction/windowfunction.h" #include "../../utils/windowfunction/windowfunctiontype.h" #include "../../utils/windowfunction/framebound.h" #include "../../utils/windowfunction/frameboundrange.h" #include "../../utils/windowfunction/frameboundrow.h" #include "../../utils/windowfunction/windowframe.h" using namespace windowfunction; #include "rowgroup.h" using namespace rowgroup; using namespace ordering; #include "funcexp.h" using namespace funcexp; #include "querytele.h" using namespace querytele; #include "jlf_common.h" #include "jobstep.h" #include "windowfunctionstep.h" using namespace joblist; #include "checks.h" namespace { uint64_t getColumnIndex(const SRCP& c, const map& m, JobInfo& jobInfo) { uint64_t key = getTupleKey(jobInfo, c, true); const SimpleColumn* sc = dynamic_cast(c.get()); if (sc != NULL && !sc->schemaName().empty()) { // special handling for dictionary CalpontSystemCatalog::ColType ct = sc->colType(); // XXX use this before connector sets colType in sc correctly. // type of pseudo column is set by connector if (!(dynamic_cast(sc))) { ct = jobInfo.csc->colType(sc->oid()); ct.charsetNumber = sc->colType().charsetNumber; } // X CalpontSystemCatalog::OID dictOid = isDictCol(ct); string alias(extractTableAlias(sc)); if (dictOid > 0) { TupleInfo ti = setTupleInfo(ct, dictOid, jobInfo, tableOid(sc, jobInfo.csc), sc, alias); key = ti.key; } } map::const_iterator j = m.find(key); if (j == m.end()) { string name = jobInfo.keyInfo->tupleKeyToName[key]; cerr << name << " is not in tuple, key=" << key << endl; throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING); } return j->second; } } // namespace namespace joblist { WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) : JobStep(jobInfo) , fRunner(0) , fCatalog(jobInfo.csc) , fRowsReturned(0) , fEndOfResult(false) , fIsSelect(true) , fUseSSMutex(false) , fUseUFMutex(false) , fInputDL(NULL) , fOutputDL(NULL) , fInputIterator(-1) , fOutputIterator(-1) , fFunctionCount(0) , fTotalThreads(1) , fNextIndex(0) , fMemUsage(0) , fRm(jobInfo.rm) , fSessionMemLimit(jobInfo.umMemLimit) { fTotalThreads = fRm->windowFunctionThreads(); fExtendedInfo = "WFS: "; fQtc.stepParms().stepType = StepTeleStats::T_WFS; } WindowFunctionStep::~WindowFunctionStep() { if (fMemUsage > 0) fRm->returnMemory(fMemUsage, fSessionMemLimit); } void WindowFunctionStep::run() { if (fInputJobStepAssociation.outSize() == 0) throw logic_error("No input data list for window function step."); fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL(); if (fInputDL == NULL) throw logic_error("Input is not a RowGroup data list in window function step."); fInputIterator = fInputDL->getIterator(); if (fOutputJobStepAssociation.outSize() == 0) throw logic_error("No output data list for window function step."); fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL(); if (fOutputDL == NULL) throw logic_error("Output of window function step is not a RowGroup data list."); if (fDelivery == true) { fOutputIterator = fOutputDL->getIterator(); } fRunner = jobstepThreadPool.invoke(Runner(this)); } void WindowFunctionStep::join() { if (fRunner) jobstepThreadPool.join(fRunner); } uint32_t WindowFunctionStep::nextBand(messageqcpp::ByteStream& bs) { RGData rgDataOut; bool more = false; uint32_t rowCount = 0; try { bs.restart(); more = fOutputDL->next(fOutputIterator, &rgDataOut); if (more && !cancelled()) { fRowGroupDelivered.setData(&rgDataOut); fRowGroupDelivered.serializeRGData(bs); rowCount = fRowGroupDelivered.getRowCount(); } else { while (more) more = fOutputDL->next(fOutputIterator, &rgDataOut); fEndOfResult = true; } } catch (...) { handleException(std::current_exception(), logging::ERR_IN_DELIVERY, logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::nextBand()"); while (more) more = fOutputDL->next(fOutputIterator, &rgDataOut); fEndOfResult = true; } if (fEndOfResult) { // send an empty / error band rgDataOut.reinit(fRowGroupDelivered, 0); fRowGroupDelivered.setData(&rgDataOut); fRowGroupDelivered.resetRowGroup(0); fRowGroupDelivered.setStatus(status()); fRowGroupDelivered.serializeRGData(bs); } return rowCount; } void WindowFunctionStep::setOutputRowGroup(const RowGroup& rg) { idbassert(0); } const RowGroup& WindowFunctionStep::getOutputRowGroup() const { return fRowGroupOut; } const RowGroup& WindowFunctionStep::getDeliveredRowGroup() const { return fRowGroupDelivered; } void WindowFunctionStep::deliverStringTableRowGroup(bool b) { fRowGroupOut.setUseStringTable(b); fRowGroupDelivered.setUseStringTable(b); } bool WindowFunctionStep::deliverStringTableRowGroup() const { idbassert(fRowGroupOut.usesStringTable() == fRowGroupDelivered.usesStringTable()); return fRowGroupDelivered.usesStringTable(); } const string WindowFunctionStep::toString() const { ostringstream oss; oss << "WindowFunctionStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; oss << " in:"; for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++) oss << fInputJobStepAssociation.outAt(i); if (fOutputJobStepAssociation.outSize() > 0) { oss << " out:"; for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++) oss << fOutputJobStepAssociation.outAt(i); } return oss.str(); } void WindowFunctionStep::AddSimplColumn(const vector& scs, JobInfo& jobInfo) { // append the simple columns if not already projected set scProjected; for (RetColsVector::iterator i = jobInfo.projectionCols.begin(); i != jobInfo.projectionCols.end(); i++) { SimpleColumn* sc = dynamic_cast(i->get()); if (sc != NULL) { if (sc->schemaName().empty()) sc->oid(joblist::tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); scProjected.insert(UniqId(sc)); } } for (vector::const_iterator i = scs.begin(); i != scs.end(); i++) { if (scProjected.find(UniqId(*i)) == scProjected.end()) { jobInfo.windowDels.push_back(SRCP((*i)->clone())); // MCOL-3343 Enable this if we decide to allow Window Functions to run with // aggregates with no group by. MariaDB allows this. Nobody else in the world does. // There will be more work to get it to function if we try this. // jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true)); scProjected.insert(UniqId(*i)); } } } void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { // window functions in select clause, selected or in expression jobInfo.windowDels = jobInfo.deliveredCols; for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++) { const vector& wcl = (*i)->windowfunctionColumnList(); RetColsVector wcList; for (vector::const_iterator j = wcl.begin(); j != wcl.end(); j++) wcList.push_back(SRCP((*j)->clone())); if (!wcList.empty()) { jobInfo.windowExps.push_back(*i); jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true)); } if (dynamic_cast(i->get()) != NULL) { jobInfo.windowCols.push_back(*i); jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true)); } else if (!wcList.empty()) { jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end()); for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++) { jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true)); } } } // window functions in order by clause const CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols(); RetColsVector wcInOrderby; for (uint64_t i = 0; i < orderByCols.size(); i++) { if (orderByCols[i]->orderPos() == (uint64_t)(-1)) { WindowFunctionColumn* wc = dynamic_cast(orderByCols[i].get()); const vector& wcl = orderByCols[i]->windowfunctionColumnList(); RetColsVector wcList; for (vector::const_iterator j = wcl.begin(); j != wcl.end(); j++) wcList.push_back(SRCP((*j)->clone())); if (wc == NULL && wcList.empty()) continue; // an window function or expression of window functions wcInOrderby.push_back(orderByCols[i]); if (!wcList.empty()) { jobInfo.windowExps.push_back(orderByCols[i]); jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true)); } if (dynamic_cast(orderByCols[i].get()) != NULL) { jobInfo.windowCols.push_back(orderByCols[i]); jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true)); } else if (!wcList.empty()) { jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end()); for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++) { jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true)); } } } } // no window function involved in the query if (jobInfo.windowCols.empty()) return; // Add in the non-window side of arithmetic columns and functions for (uint64_t i = 0; i < jobInfo.windowExps.size(); i++) { const ArithmeticColumn* ac = dynamic_cast(jobInfo.windowExps[i].get()); const FunctionColumn* fc = dynamic_cast(jobInfo.windowExps[i].get()); if (ac != NULL && ac->windowfunctionColumnList().size() > 0) { AddSimplColumn(ac->simpleColumnList(), jobInfo); } else if (fc != NULL && fc->windowfunctionColumnList().size() > 0) { AddSimplColumn(fc->simpleColumnList(), jobInfo); } } // reconstruct the delivered column list with auxiliary columns set colSet; jobInfo.deliveredCols.resize(0); for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++) { jobInfo.deliveredCols.push_back(*i); uint64_t key = getTupleKey(jobInfo, *i, true); // TODO: remove duplicates in select clause colSet.insert(key); } // add window columns in orderby for (RetColsVector::iterator i = wcInOrderby.begin(); i < wcInOrderby.end(); i++) { jobInfo.deliveredCols.push_back(*i); uint64_t key = getTupleKey(jobInfo, *i, true); colSet.insert(key); } // MCOL-3435 We haven't yet checked for aggregate, but we need to know bool hasAggregation = false; for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++) { if (dynamic_cast(jobInfo.deliveredCols[i].get()) != NULL) { hasAggregation = true; break; } } // add non-duplicate auxiliary columns RetColsVector colsInAf; for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++) { uint64_t key = getTupleKey(jobInfo, *i, true); if (colSet.find(key) == colSet.end()) jobInfo.deliveredCols.push_back(*i); RetColsVector columns = dynamic_cast(i->get())->getColumnList(); for (RetColsVector::iterator j = columns.begin(); j < columns.end(); j++) { if (dynamic_cast(j->get()) != NULL) continue; key = getTupleKey(jobInfo, *j, true); if (colSet.find(key) == colSet.end()) { jobInfo.deliveredCols.push_back(*j); // MCOL-3435 Allow Window Functions to run with aggregates with // no group by by inserting a group by for window parameters. if (hasAggregation) { // If an argument is an AggregateColumn, don't group by it. if (dynamic_cast(j->get()) == NULL) { bool bFound = false; for (std::vector::iterator igpc = csep->groupByCols().begin(); igpc < csep->groupByCols().end(); ++igpc) { if ((*igpc)->alias() == (*j)->alias()) { bFound = true; break; } } if (!bFound) { csep->groupByCols().push_back(*j); } } } } colSet.insert(key); } } // for handling order by and limit in outer query jobInfo.wfqLimitStart = csep->limitStart(); jobInfo.wfqLimitCount = csep->limitNum(); csep->limitStart(0); csep->limitNum(-1); if (csep->orderByCols().size() > 0) { jobInfo.wfqOrderby = csep->orderByCols(); csep->orderByCols().clear(); // add order by columns for (RetColsVector::iterator i = jobInfo.wfqOrderby.begin(); i < jobInfo.wfqOrderby.end(); i++) { if (dynamic_cast(i->get()) != NULL) continue; uint64_t key = getTupleKey(jobInfo, *i, true); if (colSet.find(key) == colSet.end()) jobInfo.deliveredCols.push_back(*i); colSet.insert(key); } } } SJSTEP WindowFunctionStep::makeWindowFunctionStep(SJSTEP& step, JobInfo& jobInfo) { // create a window function step WindowFunctionStep* ws = new WindowFunctionStep(jobInfo); // connect to the feeding step JobStepAssociation jsa; AnyDataListSPtr spdl(new AnyDataList()); RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize); dl->OID(execplan::CNX_VTABLE_ID); spdl->rowGroupDL(dl); jsa.outAdd(spdl); ws->inputAssociation(jsa); ws->stepId(step->stepId() + 1); step->outputAssociation(jsa); AnyDataListSPtr spdlOut(new AnyDataList()); RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize); dlOut->OID(CNX_VTABLE_ID); spdlOut->rowGroupDL(dlOut); JobStepAssociation jsaOut; jsaOut.outAdd(spdlOut); ws->outputAssociation(jsaOut); // configure the rowgroups and index mapping TupleDeliveryStep* ds = dynamic_cast(step.get()); idbassert(ds != NULL); ws->initialize(ds->getDeliveredRowGroup(), jobInfo); // restore the original delivery coloumns jobInfo.deliveredCols = jobInfo.windowDels; jobInfo.nonConstDelCols.clear(); for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++) { if (NULL == dynamic_cast(i->get())) jobInfo.nonConstDelCols.push_back(*i); } return SJSTEP(ws); } void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo) { if (jobInfo.trace) cout << "Input to WindowFunctionStep: " << rg.toString() << endl; // query type decides the output by dbroot or partition // @bug 5631. Insert select should be treated as select fIsSelect = (jobInfo.queryType == "SELECT" || jobInfo.queryType == "INSERT_SELECT"); // input row meta data fRowGroupIn = rg; fRowGroupIn.initRow(&fRowIn); // make an input map(id, index) map colIndexMap; uint64_t colCntIn = rg.getColumnCount(); const vector& pos = rg.getOffsets(); const vector& oids = rg.getOIDs(); const vector& keys = rg.getKeys(); const vector& types = rg.getColTypes(); const vector& csNums = rg.getCharsetNumbers(); const vector& scales = rg.getScale(); const vector& precisions = rg.getPrecision(); for (uint64_t i = 0; i < colCntIn; i++) colIndexMap.insert(make_pair(keys[i], i)); // @bug6065, window functions that will update string table int64_t wfsUpdateStringTable = 0; int64_t wfsUserFunctionCount = 0; for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++) { bool isUDAF = false; // window function type WindowFunctionColumn* wc = dynamic_cast(i->get()); uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index // @bug6065, window functions that will update string table { CalpontSystemCatalog::ColType rt = wc->resultType(); if ((types[ridx] == CalpontSystemCatalog::CHAR || types[ridx] == CalpontSystemCatalog::VARCHAR || types[ridx] == CalpontSystemCatalog::TEXT || types[ridx] == CalpontSystemCatalog::VARBINARY || types[ridx] == CalpontSystemCatalog::BLOB) && rg.getColumnWidth(ridx) >= jobInfo.stringTableThreshold) { ++wfsUpdateStringTable; } } // if (boost::iequals(wc->functionName(),"UDAF_FUNC") if (wc->functionName() == "UDAF_FUNC") { isUDAF = true; ++wfsUserFunctionCount; } vector fields; fields.push_back(ridx); // result const RetColsVector& parms = wc->functionParms(); for (uint64_t i = 0; i < parms.size(); i++) // arguments { // skip constant column if (dynamic_cast(parms[i].get()) == NULL) fields.push_back(getColumnIndex(parms[i], colIndexMap, jobInfo)); else fields.push_back(-1); } // partition & order by const RetColsVector& partitions = wc->partitions(); vector eqIdx; vector peerIdx; vector sorts; for (uint64_t i = 0; i < partitions.size(); i++) { // skip constant column if (dynamic_cast(partitions[i].get()) != NULL) continue; // get column index uint64_t idx = getColumnIndex(partitions[i], colIndexMap, jobInfo); eqIdx.push_back(idx); sorts.push_back(IdbSortSpec(idx, partitions[i]->asc(), partitions[i]->nullsFirst())); } const RetColsVector& orders = wc->orderBy().fOrders; for (uint64_t i = 0; i < orders.size(); i++) { // skip constant column if (dynamic_cast(orders[i].get()) != NULL) continue; // get column index uint64_t idx = getColumnIndex(orders[i], colIndexMap, jobInfo); peerIdx.push_back(idx); sorts.push_back(IdbSortSpec(idx, orders[i]->asc(), orders[i]->nullsFirst())); } // functors for sorting boost::shared_ptr parts(new EqualCompData(eqIdx, rg)); boost::shared_ptr orderbys(new OrderByData(sorts, rg)); boost::shared_ptr peers(new EqualCompData(peerIdx, rg)); // column type for functor templates int ct = 0; if (isUDAF) { ct = wc->getUDAFContext().getResultType(); } // make sure index is in range else if (fields.size() > 1 && fields[1] >= 0 && static_cast(fields[1]) < types.size()) ct = types[fields[1]]; // workaround for functions using "within group (order by)" syntax string fn = boost::to_upper_copy(wc->functionName()); if ((fn == "MEDIAN" || fn == "PERCENTILE_CONT" || fn == "PERCENTILE_DISC") && utils::is_nonnegative(peerIdx[0]) && peerIdx[0] < types.size()) ct = types[peerIdx[0]]; // create the functor based on function name boost::shared_ptr func = WindowFunctionType::makeWindowFunction(fn, ct, wc); // parse parms after peer and fields are set // functions may need to set order column index func->peer(peers); func->fieldIndex(fields); func->parseParms(parms); // window frame const WF_Frame& frame = wc->orderBy().fFrame; int frameUnit = (frame.fIsRange) ? WF__FRAME_RANGE : WF__FRAME_ROWS; if (frame.fStart.fFrame == WF_UNBOUNDED_PRECEDING && frame.fEnd.fFrame == WF_UNBOUNDED_FOLLOWING) frameUnit = WF__FRAME_ROWS; boost::shared_ptr upper = parseFrameBound(frame.fStart, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, true); boost::shared_ptr lower = parseFrameBound(frame.fEnd, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, false); boost::shared_ptr windows(new WindowFrame(frameUnit, upper, lower)); func->frameUnit(frameUnit); // add to the function list fFunctions.push_back( boost::shared_ptr(new WindowFunction(func, parts, orderbys, windows, rg, fRowIn))); fFunctionCount++; } // initialize window function expresssions fExpression = jobInfo.windowExps; for (RetColsVector::iterator i = fExpression.begin(); i < fExpression.end(); i++) { // output index (*i)->outputIndex(getColumnIndex(*i, colIndexMap, jobInfo)); // map the input indices const vector& scols = (*i)->simpleColumnList(); for (vector::const_iterator j = scols.begin(); j != scols.end(); j++) { uint64_t key = getTupleKey(jobInfo, *j); CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*j)->colType()); if (dictOid > 0) { key = jobInfo.keyInfo->dictKeyMap[key]; } map::iterator k = colIndexMap.find(key); if (k != colIndexMap.end()) { (*j)->inputIndex(k->second); } else { string name = jobInfo.keyInfo->tupleKeyToName[key]; cerr << name << " is not in tuple, key=" << key << endl; throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING); } } ArithmeticColumn* ac = dynamic_cast((*i).get()); FunctionColumn* fc = dynamic_cast((*i).get()); if (ac != NULL) { updateWindowCols(ac->expression(), colIndexMap, jobInfo); } else if (fc != NULL) { // RetColsVector wcList = fc->windowfunctionColumnList(); // for (RetColsVector::iterator j = wcList.begin(); j != wcList.end(); j++) // (*j)->inputIndex(getColumnIndex(*j, colIndexMap, jobInfo)); funcexp::FunctionParm parms = fc->functionParms(); for (vector::iterator j = parms.begin(); j < parms.end(); j++) updateWindowCols(j->get(), colIndexMap, jobInfo); } } // order by part if (jobInfo.wfqOrderby.size() > 0) { // query order by vector eqIdx; vector sorts; const RetColsVector& orderby = jobInfo.wfqOrderby; for (uint64_t i = 0; i < orderby.size(); i++) { // skip constant column if (dynamic_cast(orderby[i].get()) != NULL) continue; // get column index uint64_t idx = getColumnIndex(orderby[i], colIndexMap, jobInfo); sorts.push_back(IdbSortSpec(idx, orderby[i]->asc(), orderby[i]->nullsFirst())); } fQueryOrderBy.reset(new OrderByData(sorts, rg)); } // limit part fQueryLimitStart = jobInfo.wfqLimitStart; fQueryLimitCount = jobInfo.wfqLimitCount; // fix the delivered rowgroup data vector delColIdx; for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++) { // find the none constantant columns in the deliver // leave constants to annexstep for now. if (dynamic_cast((*i).get()) != NULL) continue; delColIdx.push_back(getColumnIndex(*i, colIndexMap, jobInfo)); } size_t retColCount = delColIdx.size(); vector pos1; vector oids1; vector keys1; vector scales1; vector precisions1; vector types1; vector csNums1; pos1.push_back(2); for (size_t i = 0; i < retColCount; i++) { size_t j = delColIdx[i]; pos1.push_back(pos1[i] + (pos[j + 1] - pos[j])); oids1.push_back(oids[j]); keys1.push_back(keys[j]); scales1.push_back(scales[j]); precisions1.push_back(precisions[j]); types1.push_back(types[j]); csNums1.push_back(csNums[j]); } fRowGroupDelivered = RowGroup(retColCount, pos1, oids1, keys1, types1, csNums1, scales1, precisions1, jobInfo.stringTableThreshold); if (jobInfo.trace) cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl; if (wfsUpdateStringTable > 1) fUseSSMutex = true; if (wfsUserFunctionCount > 1) fUseUFMutex = true; fRowGroupOut = fRowGroupDelivered; } void WindowFunctionStep::execute() { RGData rgData; Row row; fRowGroupIn.initRow(&row); bool more = fInputDL->next(fInputIterator, &rgData); uint64_t i = 0; // for RowGroup index in the fInRowGroupData if (traceOn()) dlTimes.setFirstReadTime(); StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; sts.msg_type = StepTeleStats::ST_START; sts.total_units_of_work = 1; postStepStartTele(sts); try { while (more && !cancelled()) { fRowGroupIn.setData(&rgData); fRowGroupIn.getRow(0, &row); uint64_t rowCnt = fRowGroupIn.getRowCount(); if (rowCnt > 0) { fInRowGroupData.push_back(rgData); uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition); if (fRm->getMemory(memAdd, fSessionMemLimit) == false) throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG); fMemUsage += memAdd; for (uint64_t j = 0; j < rowCnt; ++j) { if (i > 0x0000FFFFFFFFFFFFULL || j > 0x000000000000FFFFULL) throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG); fRows.push_back(RowPosition(i, j)); row.nextRow(); } //@bug6065, make StringStore::storeString() thread safe, default to false. rgData.useStoreStringMutex(fUseSSMutex); // For the User Data of UDAnF rgData.useUserDataMutex(fUseUFMutex); // window function does not change row count fRowsReturned += rowCnt; i++; } more = fInputDL->next(fInputIterator, &rgData); } } // try catch (...) { handleException(std::current_exception(), logging::ERR_READ_INPUT_DATALIST, logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::execute()"); } if (traceOn()) dlTimes.setLastReadTime(); // no need for the window function if aborted or result set is empty. if (cancelled() || fRows.size() == 0) { while (more) more = fInputDL->next(fInputIterator, &rgData); fOutputDL->endOfInput(); sts.msg_type = StepTeleStats::ST_SUMMARY; sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); if (traceOn()) { dlTimes.setEndOfInputTime(); printCalTrace(); } return; } // got something to work on try { if (fFunctionCount == 1) { doFunction(); } else { if (fTotalThreads > fFunctionCount) fTotalThreads = fFunctionCount; fFunctionThreads.clear(); fFunctionThreads.reserve(fTotalThreads); for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++) fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this))); // If cancelled, not all threads are started. jobstepThreadPool.join(fFunctionThreads); } if (!(cancelled())) { if (fIsSelect) doPostProcessForSelect(); else doPostProcessForDml(); } } catch (...) { handleException(std::current_exception(), logging::ERR_EXECUTE_WINDOW_FUNCTION, logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::execute()"); } fOutputDL->endOfInput(); sts.msg_type = StepTeleStats::ST_SUMMARY; sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); if (traceOn()) { dlTimes.setEndOfInputTime(); printCalTrace(); } return; } uint64_t WindowFunctionStep::nextFunctionIndex() { uint64_t idx = atomicInc(&fNextIndex); // return index in the function array return --idx; } void WindowFunctionStep::doFunction() { uint64_t i = 0; try { while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled()) { uint64_t memAdd = fRows.size() * sizeof(RowPosition); if (fRm->getMemory(memAdd, fSessionMemLimit) == false) throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG); fMemUsage += memAdd; fFunctions[i]->setCallback(this, i); (*fFunctions[i].get())(); } } catch (...) { handleException(std::current_exception(), logging::ERR_EXECUTE_WINDOW_FUNCTION, logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::doFunction()"); } } void WindowFunctionStep::doPostProcessForSelect() { FuncExp* fe = funcexp::FuncExp::instance(); std::shared_ptr mapping = makeMapping(fRowGroupIn, fRowGroupOut); Row rowIn, rowOut; fRowGroupIn.initRow(&rowIn); fRowGroupOut.initRow(&rowOut); RGData rgData; vector& rowData = *(fFunctions.back()->fRowData.get()); int64_t rowsLeft = rowData.size(); int64_t rowsInRg = 0; int64_t rgCapacity = 0; int64_t begin = fQueryLimitStart; int64_t count = (fQueryLimitCount == (uint64_t)-1) ? rowsLeft : fQueryLimitCount; int64_t end = begin + count; end = (end < rowsLeft) ? end : rowsLeft; rowsLeft = (end > begin) ? (end - begin) : 0; if (fQueryOrderBy.get() != NULL) sort(rowData.begin(), rowData.size()); for (int64_t i = begin; i < end; i++) { if (!rgData.hasRowData()) { rgCapacity = ((rowsLeft > 8192) ? 8192 : rowsLeft); rowsLeft -= rgCapacity; rgData.reinit(fRowGroupOut, rgCapacity); fRowGroupOut.setData(&rgData); fRowGroupOut.resetRowGroup(0); fRowGroupOut.setDBRoot(0); // not valid dbroot fRowGroupOut.getRow(0, &rowOut); rowsInRg = 0; } rowIn.setData(getPointer(rowData[i], fRowGroupIn, rowIn)); // evaluate the window function expressions before apply mapping if (fExpression.size() > 0) fe->evaluate(rowIn, fExpression); applyMapping(mapping, rowIn, &rowOut); rowOut.nextRow(); rowsInRg++; if (rowsInRg == rgCapacity) { fRowGroupOut.setRowCount(rowsInRg); fOutputDL->insert(rgData); rgData.clear(); } } } void WindowFunctionStep::doPostProcessForDml() { FuncExp* fe = funcexp::FuncExp::instance(); std::shared_ptr mapping = makeMapping(fRowGroupIn, fRowGroupOut); Row rowIn, rowOut; fRowGroupIn.initRow(&rowIn); fRowGroupOut.initRow(&rowOut); for (vector::iterator i = fInRowGroupData.begin(); i < fInRowGroupData.end(); i++) { fRowGroupIn.setData(&(*i)); RGData rgData = RGData(fRowGroupIn, fRowGroupIn.getRowCount()); fRowGroupOut.setData(&rgData); // @bug 5631. reset rowgroup before the data is populated. fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid()); fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot()); fRowGroupOut.setRowCount(fRowGroupIn.getRowCount()); fRowGroupIn.getRow(0, &rowIn); fRowGroupOut.getRow(0, &rowOut); for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i) { // evaluate the window function expressions before apply mapping if (fExpression.size() > 0) fe->evaluate(rowIn, fExpression); applyMapping(mapping, rowIn, &rowOut); rowIn.nextRow(); rowOut.nextRow(); } // fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid()); // fRowGroupOut.setRowCount(fRowGroupIn.getRowCount()); fOutputDL->insert(rgData); } } boost::shared_ptr WindowFunctionStep::parseFrameBoundRows(const execplan::WF_Boundary& b, const map& m, JobInfo& jobInfo) { boost::shared_ptr fb; if (b.fFrame == WF_CURRENT_ROW) { fb.reset(new FrameBoundRow(WF__CURRENT_ROW)); return fb; } ConstantColumn* cc = dynamic_cast(b.fVal.get()); if (cc != NULL) { Row dummy; bool isNull = false; int val = cc->getIntVal(dummy, isNull); if (val >= 0 && isNull == false) { int type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING; fb.reset(new FrameBoundConstantRow(type, val)); } else { string str("NULL"); if (!isNull) { ostringstream oss; oss << val; str = oss.str(); } throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str), ERR_WF_BOUND_OUT_OF_RANGE); } } else { int type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING; uint64_t id = getTupleKey(jobInfo, b.fVal); uint64_t idx = getColumnIndex(b.fVal, m, jobInfo); TupleInfo ti = getTupleInfo(id, jobInfo); switch (ti.dtype) { case execplan::CalpontSystemCatalog::TINYINT: case execplan::CalpontSystemCatalog::SMALLINT: case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::BIGINT: case execplan::CalpontSystemCatalog::DECIMAL: { fb.reset(new FrameBoundExpressionRow(type, id, idx)); break; } case execplan::CalpontSystemCatalog::DOUBLE: case execplan::CalpontSystemCatalog::UDOUBLE: { fb.reset(new FrameBoundExpressionRow(type, id, idx)); break; } case execplan::CalpontSystemCatalog::FLOAT: case execplan::CalpontSystemCatalog::UFLOAT: { fb.reset(new FrameBoundExpressionRow(type, id, idx)); break; } case execplan::CalpontSystemCatalog::UTINYINT: case execplan::CalpontSystemCatalog::USMALLINT: case execplan::CalpontSystemCatalog::UMEDINT: case execplan::CalpontSystemCatalog::UINT: case execplan::CalpontSystemCatalog::UBIGINT: case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::DATE: case execplan::CalpontSystemCatalog::DATETIME: case execplan::CalpontSystemCatalog::TIME: case execplan::CalpontSystemCatalog::TIMESTAMP: { fb.reset(new FrameBoundExpressionRow(type, id, idx)); break; } default: { string str = windowfunction::colType2String[ti.dtype]; throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str), ERR_WF_INVALID_BOUND_TYPE); break; } } } return fb; } boost::shared_ptr WindowFunctionStep::parseFrameBoundRange(const execplan::WF_Boundary& b, const map& m, const vector& o, JobInfo& jobInfo) { boost::shared_ptr fb; if (b.fFrame == WF_CURRENT_ROW) { fb.reset(new FrameBoundRange(WF__CURRENT_ROW)); return fb; } bool isConstant = false; bool isNull = false; Row dummy; ConstantColumn* cc = dynamic_cast(b.fVal.get()); if (cc != NULL) { isConstant = true; double val = cc->getDoubleVal(dummy, isNull); if (val < 0 || isNull) { string str("NULL"); if (!isNull) { ostringstream oss; oss << val; str = oss.str(); } throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str), ERR_WF_BOUND_OUT_OF_RANGE); } } int type = 0; vector ids; vector index; ids.push_back(getTupleKey(jobInfo, o[0])); index.push_back(getColumnIndex(o[0], m, jobInfo)); if (isConstant) { type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING; ids.push_back(-1); // dummy, n/a for constant index.push_back(-1); // dummy, n/a for constant } else { type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING; ids.push_back(getTupleKey(jobInfo, b.fVal)); index.push_back(getColumnIndex(b.fVal, m, jobInfo)); } ids.push_back(getTupleKey(jobInfo, b.fBound)); index.push_back(getColumnIndex(b.fBound, m, jobInfo)); FrameBoundRange* fbr = NULL; TupleInfo ti = getTupleInfo(ids[0], jobInfo); bool asc = o[0]->asc(); bool nlf = o[0]->nullsFirst(); switch (ti.dtype) { case execplan::CalpontSystemCatalog::TINYINT: case execplan::CalpontSystemCatalog::SMALLINT: case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::BIGINT: case execplan::CalpontSystemCatalog::DECIMAL: { if (isConstant) { int64_t v = cc->getIntVal(dummy, isNull); fbr = new FrameBoundConstantRange(type, asc, nlf, &v); fbr->isZero((v == 0)); } else { fbr = new FrameBoundExpressionRange(type, asc, nlf); } break; } case execplan::CalpontSystemCatalog::DOUBLE: case execplan::CalpontSystemCatalog::UDOUBLE: { if (isConstant) { double v = cc->getDoubleVal(dummy, isNull); fbr = new FrameBoundConstantRange(type, asc, nlf, &v); fbr->isZero((v == 0.0)); } else { fbr = new FrameBoundExpressionRange(type, asc, nlf); } break; } case execplan::CalpontSystemCatalog::FLOAT: case execplan::CalpontSystemCatalog::UFLOAT: { if (isConstant) { float v = cc->getFloatVal(dummy, isNull); fbr = new FrameBoundConstantRange(type, asc, nlf, &v); fbr->isZero((v == 0.0)); } else { fbr = new FrameBoundExpressionRange(type, asc, nlf); } break; } case execplan::CalpontSystemCatalog::UTINYINT: case execplan::CalpontSystemCatalog::USMALLINT: case execplan::CalpontSystemCatalog::UMEDINT: case execplan::CalpontSystemCatalog::UINT: case execplan::CalpontSystemCatalog::UBIGINT: case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::DATE: case execplan::CalpontSystemCatalog::DATETIME: case execplan::CalpontSystemCatalog::TIME: case execplan::CalpontSystemCatalog::TIMESTAMP: { if (isConstant) { uint64_t v = cc->getUintVal(dummy, isNull); fbr = new FrameBoundConstantRange(type, asc, nlf, &v); fbr->isZero((v == 0)); } else { fbr = new FrameBoundExpressionRange(type, asc, nlf); } break; } default: { string str = windowfunction::colType2String[ti.dtype]; throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str), ERR_WF_INVALID_BOUND_TYPE); break; } } fbr->setTupleId(ids); fbr->setIndex(index); fb.reset(fbr); return fb; } boost::shared_ptr WindowFunctionStep::parseFrameBound(const execplan::WF_Boundary& b, const map& m, const vector& o, const boost::shared_ptr& p, JobInfo& j, bool rows, bool s) { boost::shared_ptr fb; switch (b.fFrame) { case WF_UNBOUNDED_PRECEDING: { fb.reset(new FrameBound(WF__UNBOUNDED_PRECEDING)); break; } case WF_UNBOUNDED_FOLLOWING: { fb.reset(new FrameBound(WF__UNBOUNDED_FOLLOWING)); break; } case WF_CURRENT_ROW: case WF_PRECEDING: case WF_FOLLOWING: { if (rows) { fb = parseFrameBoundRows(b, m, j); } else { fb = parseFrameBoundRange(b, m, o, j); } break; } default: // unknown { throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_UNKNOWN_BOUND, b.fFrame), ERR_WF_UNKNOWN_BOUND); break; } } fb->peer(p); fb->start(s); return fb; } void WindowFunctionStep::updateWindowCols(ReturnedColumn* rc, const map& m, JobInfo& jobInfo) { if (rc == NULL) return; ArithmeticColumn* ac = dynamic_cast(rc); FunctionColumn* fc = dynamic_cast(rc); SimpleFilter* sf = dynamic_cast(rc); WindowFunctionColumn* wc = dynamic_cast(rc); if (wc) { uint64_t key = getExpTupleKey(jobInfo, wc->expressionId()); map::const_iterator j = m.find(key); if (j == m.end()) { string name = jobInfo.keyInfo->tupleKeyToName[key]; cerr << name << " is not in tuple, key=" << key << endl; throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING); } wc->inputIndex(j->second); } else if (ac) { updateWindowCols(ac->expression(), m, jobInfo); } else if (fc) { funcexp::FunctionParm parms = fc->functionParms(); for (vector::iterator i = parms.begin(); i < parms.end(); i++) updateWindowCols(i->get(), m, jobInfo); } else if (sf) { updateWindowCols(sf->lhs(), m, jobInfo); updateWindowCols(sf->rhs(), m, jobInfo); } } void WindowFunctionStep::updateWindowCols(ParseTree* pt, const map& m, JobInfo& jobInfo) { if (pt == NULL) return; updateWindowCols(pt->left(), m, jobInfo); updateWindowCols(pt->right(), m, jobInfo); TreeNode* tn = pt->data(); ArithmeticColumn* ac = dynamic_cast(tn); FunctionColumn* fc = dynamic_cast(tn); SimpleFilter* sf = dynamic_cast(tn); WindowFunctionColumn* wc = dynamic_cast(tn); if (wc) { uint64_t key = getExpTupleKey(jobInfo, wc->expressionId()); map::const_iterator j = m.find(key); if (j == m.end()) { string name = jobInfo.keyInfo->tupleKeyToName[key]; cerr << name << " is not in tuple, key=" << key << endl; throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING); } wc->inputIndex(j->second); } else if (ac) { updateWindowCols(ac->expression(), m, jobInfo); } else if (fc) { funcexp::FunctionParm parms = fc->functionParms(); for (vector::iterator i = parms.begin(); i < parms.end(); i++) updateWindowCols(i->get(), m, jobInfo); } else if (sf) { updateWindowCols(sf->lhs(), m, jobInfo); updateWindowCols(sf->rhs(), m, jobInfo); } } void WindowFunctionStep::sort(std::vector::iterator v, uint64_t n) { // recursive function termination condition. if (n < 2 || cancelled()) return; RowPosition p = *(v + n / 2); // pivot value vector::iterator l = v; // low address vector::iterator h = v + (n - 1); // high address while (l <= h && !cancelled()) { // Can use while here, but need check boundary and cancel status. if (fQueryOrderBy->operator()(getPointer(*l), getPointer(p))) { l++; } else if (fQueryOrderBy->operator()(getPointer(p), getPointer(*h))) { h--; } else { RowPosition t = *l; // temp value for swap *l++ = *h; *h-- = t; } } sort(v, std::distance(v, h) + 1); sort(l, std::distance(l, v) + n); } void WindowFunctionStep::printCalTrace() { time_t t = time(0); char timeString[50]; ctime_r(&t, timeString); timeString[strlen(timeString) - 1] = '\0'; ostringstream logStr; logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString << "; total rows returned-" << fRowsReturned << endl << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl << "\tJob completion status " << status() << endl; logEnd(logStr.str().c_str()); fExtendedInfo += logStr.str(); formatMiniStats(); } void WindowFunctionStep::formatMiniStats() { ostringstream oss; oss << "WFS " << "UM " << "- " << "- " << "- " << "- " << "- " << "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << fRowsReturned << " "; fMiniInfo += oss.str(); } } // namespace joblist