1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
mariadb-columnstore-engine/dbcon/joblist/windowfunctionstep.cpp

1602 lines
46 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (c) 2016-2020 MariaDB
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: windowfunctionstep.cpp 9681 2013-07-11 22:58:05Z xlou $
//#define NDEBUG
#include <cassert>
#include <sstream>
#include <iomanip>
using namespace std;
#include <boost/algorithm/string.hpp> // to_upper_copy
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/uuid/uuid_io.hpp>
using namespace boost;
#include "atomicops.h"
using namespace atomicops;
#include "loggingid.h"
#include "errorcodes.h"
#include "idberrorinfo.h"
using namespace logging;
#include "configcpp.h"
using namespace config;
#include "calpontselectexecutionplan.h"
#include "calpontsystemcatalog.h"
#include "aggregatecolumn.h"
#include "arithmeticcolumn.h"
#include "constantcolumn.h"
#include "functioncolumn.h"
#include "pseudocolumn.h"
#include "simplefilter.h"
#include "windowfunctioncolumn.h"
using namespace execplan;
#include "../../utils/windowfunction/windowfunction.h"
#include "../../utils/windowfunction/windowfunctiontype.h"
#include "../../utils/windowfunction/framebound.h"
#include "../../utils/windowfunction/frameboundrange.h"
#include "../../utils/windowfunction/frameboundrow.h"
#include "../../utils/windowfunction/windowframe.h"
using namespace windowfunction;
#include "rowgroup.h"
using namespace rowgroup;
using namespace ordering;
#include "funcexp.h"
using namespace funcexp;
#include "querytele.h"
using namespace querytele;
#include "jlf_common.h"
#include "jobstep.h"
#include "windowfunctionstep.h"
using namespace joblist;
#include "checks.h"
namespace
{
uint64_t getColumnIndex(const SRCP& c, const map<uint64_t, uint64_t>& m, JobInfo& jobInfo)
{
uint64_t key = getTupleKey(jobInfo, c, true);
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(c.get());
if (sc != NULL && !sc->schemaName().empty())
{
// special handling for dictionary
CalpontSystemCatalog::ColType ct = sc->colType();
// XXX use this before connector sets colType in sc correctly.
// type of pseudo column is set by connector
if (!(dynamic_cast<const PseudoColumn*>(sc)))
{
ct = jobInfo.csc->colType(sc->oid());
ct.charsetNumber = sc->colType().charsetNumber;
}
// X
CalpontSystemCatalog::OID dictOid = isDictCol(ct);
string alias(extractTableAlias(sc));
if (dictOid > 0)
{
TupleInfo ti = setTupleInfo(ct, dictOid, jobInfo, tableOid(sc, jobInfo.csc), sc, alias);
key = ti.key;
}
}
map<uint64_t, uint64_t>::const_iterator j = m.find(key);
if (j == m.end())
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING);
}
return j->second;
}
} // namespace
namespace joblist
{
WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo)
: JobStep(jobInfo)
, fRunner(0)
, fCatalog(jobInfo.csc)
, fRowsReturned(0)
, fEndOfResult(false)
, fIsSelect(true)
, fUseSSMutex(false)
, fUseUFMutex(false)
, fInputDL(NULL)
, fOutputDL(NULL)
, fInputIterator(-1)
, fOutputIterator(-1)
, fFunctionCount(0)
, fTotalThreads(1)
, fNextIndex(0)
, fMemUsage(0)
, fRm(jobInfo.rm)
, fSessionMemLimit(jobInfo.umMemLimit)
{
fTotalThreads = fRm->windowFunctionThreads();
fExtendedInfo = "WFS: ";
fQtc.stepParms().stepType = StepTeleStats::T_WFS;
}
WindowFunctionStep::~WindowFunctionStep()
{
if (fMemUsage > 0)
fRm->returnMemory(fMemUsage, fSessionMemLimit);
}
void WindowFunctionStep::run()
{
if (fInputJobStepAssociation.outSize() == 0)
throw logic_error("No input data list for window function step.");
fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (fInputDL == NULL)
throw logic_error("Input is not a RowGroup data list in window function step.");
fInputIterator = fInputDL->getIterator();
if (fOutputJobStepAssociation.outSize() == 0)
throw logic_error("No output data list for window function step.");
fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
if (fOutputDL == NULL)
throw logic_error("Output of window function step is not a RowGroup data list.");
if (fDelivery == true)
{
fOutputIterator = fOutputDL->getIterator();
}
fRunner = jobstepThreadPool.invoke(Runner(this));
}
void WindowFunctionStep::join()
{
if (fRunner)
jobstepThreadPool.join(fRunner);
}
uint32_t WindowFunctionStep::nextBand(messageqcpp::ByteStream& bs)
{
RGData rgDataOut;
bool more = false;
uint32_t rowCount = 0;
try
{
bs.restart();
more = fOutputDL->next(fOutputIterator, &rgDataOut);
if (more && !cancelled())
{
fRowGroupDelivered.setData(&rgDataOut);
fRowGroupDelivered.serializeRGData(bs);
rowCount = fRowGroupDelivered.getRowCount();
}
else
{
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
}
catch (...)
{
handleException(std::current_exception(), logging::ERR_IN_DELIVERY, logging::ERR_WF_DATA_SET_TOO_BIG,
"WindowFunctionStep::nextBand()");
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
if (fEndOfResult)
{
// send an empty / error band
rgDataOut.reinit(fRowGroupDelivered, 0);
fRowGroupDelivered.setData(&rgDataOut);
fRowGroupDelivered.resetRowGroup(0);
fRowGroupDelivered.setStatus(status());
fRowGroupDelivered.serializeRGData(bs);
}
return rowCount;
}
void WindowFunctionStep::setOutputRowGroup(const RowGroup& rg)
{
idbassert(0);
}
const RowGroup& WindowFunctionStep::getOutputRowGroup() const
{
return fRowGroupOut;
}
const RowGroup& WindowFunctionStep::getDeliveredRowGroup() const
{
return fRowGroupDelivered;
}
void WindowFunctionStep::deliverStringTableRowGroup(bool b)
{
fRowGroupOut.setUseStringTable(b);
fRowGroupDelivered.setUseStringTable(b);
}
bool WindowFunctionStep::deliverStringTableRowGroup() const
{
idbassert(fRowGroupOut.usesStringTable() == fRowGroupDelivered.usesStringTable());
return fRowGroupDelivered.usesStringTable();
}
const string WindowFunctionStep::toString() const
{
ostringstream oss;
oss << "WindowFunctionStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
oss << " in:";
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
oss << fInputJobStepAssociation.outAt(i);
if (fOutputJobStepAssociation.outSize() > 0)
{
oss << " out:";
for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
oss << fOutputJobStepAssociation.outAt(i);
}
return oss.str();
}
void WindowFunctionStep::AddSimplColumn(const vector<SimpleColumn*>& scs, JobInfo& jobInfo)
{
// append the simple columns if not already projected
set<UniqId> scProjected;
for (RetColsVector::iterator i = jobInfo.projectionCols.begin(); i != jobInfo.projectionCols.end(); i++)
{
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(i->get());
if (sc != NULL)
{
if (sc->schemaName().empty())
sc->oid(joblist::tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
scProjected.insert(UniqId(sc));
}
}
for (vector<SimpleColumn*>::const_iterator i = scs.begin(); i != scs.end(); i++)
{
if (scProjected.find(UniqId(*i)) == scProjected.end())
{
jobInfo.windowDels.push_back(SRCP((*i)->clone()));
// MCOL-3343 Enable this if we decide to allow Window Functions to run with
// aggregates with no group by. MariaDB allows this. Nobody else in the world does.
// There will be more work to get it to function if we try this.
// jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
scProjected.insert(UniqId(*i));
}
}
}
void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
{
// window functions in select clause, selected or in expression
jobInfo.windowDels = jobInfo.deliveredCols;
for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
{
const vector<WindowFunctionColumn*>& wcl = (*i)->windowfunctionColumnList();
RetColsVector wcList;
for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
wcList.push_back(SRCP((*j)->clone()));
if (!wcList.empty())
{
jobInfo.windowExps.push_back(*i);
jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
}
if (dynamic_cast<WindowFunctionColumn*>(i->get()) != NULL)
{
jobInfo.windowCols.push_back(*i);
jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
}
else if (!wcList.empty())
{
jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end());
for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
{
jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
}
}
}
// window functions in order by clause
const CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols();
RetColsVector wcInOrderby;
for (uint64_t i = 0; i < orderByCols.size(); i++)
{
if (orderByCols[i]->orderPos() == (uint64_t)(-1))
{
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get());
const vector<WindowFunctionColumn*>& wcl = orderByCols[i]->windowfunctionColumnList();
RetColsVector wcList;
for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
wcList.push_back(SRCP((*j)->clone()));
if (wc == NULL && wcList.empty())
continue;
// an window function or expression of window functions
wcInOrderby.push_back(orderByCols[i]);
if (!wcList.empty())
{
jobInfo.windowExps.push_back(orderByCols[i]);
jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
}
if (dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get()) != NULL)
{
jobInfo.windowCols.push_back(orderByCols[i]);
jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
}
else if (!wcList.empty())
{
jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end());
for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
{
jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
}
}
}
}
// no window function involved in the query
if (jobInfo.windowCols.empty())
return;
// Add in the non-window side of arithmetic columns and functions
for (uint64_t i = 0; i < jobInfo.windowExps.size(); i++)
{
const ArithmeticColumn* ac = dynamic_cast<const ArithmeticColumn*>(jobInfo.windowExps[i].get());
const FunctionColumn* fc = dynamic_cast<const FunctionColumn*>(jobInfo.windowExps[i].get());
if (ac != NULL && ac->windowfunctionColumnList().size() > 0)
{
AddSimplColumn(ac->simpleColumnList(), jobInfo);
}
else if (fc != NULL && fc->windowfunctionColumnList().size() > 0)
{
AddSimplColumn(fc->simpleColumnList(), jobInfo);
}
}
// reconstruct the delivered column list with auxiliary columns
set<uint64_t> colSet;
jobInfo.deliveredCols.resize(0);
for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
{
jobInfo.deliveredCols.push_back(*i);
uint64_t key = getTupleKey(jobInfo, *i, true);
// TODO: remove duplicates in select clause
colSet.insert(key);
}
// add window columns in orderby
for (RetColsVector::iterator i = wcInOrderby.begin(); i < wcInOrderby.end(); i++)
{
jobInfo.deliveredCols.push_back(*i);
uint64_t key = getTupleKey(jobInfo, *i, true);
colSet.insert(key);
}
// MCOL-3435 We haven't yet checked for aggregate, but we need to know
bool hasAggregation = false;
for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++)
{
if (dynamic_cast<AggregateColumn*>(jobInfo.deliveredCols[i].get()) != NULL)
{
hasAggregation = true;
break;
}
}
// add non-duplicate auxiliary columns
RetColsVector colsInAf;
for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
{
uint64_t key = getTupleKey(jobInfo, *i, true);
if (colSet.find(key) == colSet.end())
jobInfo.deliveredCols.push_back(*i);
RetColsVector columns = dynamic_cast<WindowFunctionColumn*>(i->get())->getColumnList();
for (RetColsVector::iterator j = columns.begin(); j < columns.end(); j++)
{
if (dynamic_cast<ConstantColumn*>(j->get()) != NULL)
continue;
key = getTupleKey(jobInfo, *j, true);
if (colSet.find(key) == colSet.end())
{
jobInfo.deliveredCols.push_back(*j);
// MCOL-3435 Allow Window Functions to run with aggregates with
// no group by by inserting a group by for window parameters.
if (hasAggregation)
{
// If an argument is an AggregateColumn, don't group by it.
if (dynamic_cast<AggregateColumn*>(j->get()) == NULL)
{
bool bFound = false;
for (std::vector<SRCP>::iterator igpc = csep->groupByCols().begin();
igpc < csep->groupByCols().end(); ++igpc)
{
if ((*igpc)->alias() == (*j)->alias())
{
bFound = true;
break;
}
}
if (!bFound)
{
csep->groupByCols().push_back(*j);
}
}
}
}
colSet.insert(key);
}
}
// for handling order by and limit in outer query
jobInfo.wfqLimitStart = csep->limitStart();
jobInfo.wfqLimitCount = csep->limitNum();
csep->limitStart(0);
csep->limitNum(-1);
if (csep->orderByCols().size() > 0)
{
jobInfo.wfqOrderby = csep->orderByCols();
csep->orderByCols().clear();
// add order by columns
for (RetColsVector::iterator i = jobInfo.wfqOrderby.begin(); i < jobInfo.wfqOrderby.end(); i++)
{
if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
continue;
uint64_t key = getTupleKey(jobInfo, *i, true);
if (colSet.find(key) == colSet.end())
jobInfo.deliveredCols.push_back(*i);
colSet.insert(key);
}
}
}
SJSTEP WindowFunctionStep::makeWindowFunctionStep(SJSTEP& step, JobInfo& jobInfo)
{
// create a window function step
WindowFunctionStep* ws = new WindowFunctionStep(jobInfo);
// connect to the feeding step
JobStepAssociation jsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
jsa.outAdd(spdl);
ws->inputAssociation(jsa);
ws->stepId(step->stepId() + 1);
step->outputAssociation(jsa);
AnyDataListSPtr spdlOut(new AnyDataList());
RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
dlOut->OID(CNX_VTABLE_ID);
spdlOut->rowGroupDL(dlOut);
JobStepAssociation jsaOut;
jsaOut.outAdd(spdlOut);
ws->outputAssociation(jsaOut);
// configure the rowgroups and index mapping
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(step.get());
idbassert(ds != NULL);
ws->initialize(ds->getDeliveredRowGroup(), jobInfo);
// restore the original delivery coloumns
jobInfo.deliveredCols = jobInfo.windowDels;
jobInfo.nonConstDelCols.clear();
for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
{
if (NULL == dynamic_cast<const ConstantColumn*>(i->get()))
jobInfo.nonConstDelCols.push_back(*i);
}
return SJSTEP(ws);
}
void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
{
if (jobInfo.trace)
cout << "Input to WindowFunctionStep: " << rg.toString() << endl;
// query type decides the output by dbroot or partition
// @bug 5631. Insert select should be treated as select
fIsSelect = (jobInfo.queryType == "SELECT" || jobInfo.queryType == "INSERT_SELECT");
// input row meta data
fRowGroupIn = rg;
fRowGroupIn.initRow(&fRowIn);
// make an input map(id, index)
map<uint64_t, uint64_t> colIndexMap;
uint64_t colCntIn = rg.getColumnCount();
const vector<uint32_t>& pos = rg.getOffsets();
const vector<uint32_t>& oids = rg.getOIDs();
const vector<uint32_t>& keys = rg.getKeys();
const vector<CalpontSystemCatalog::ColDataType>& types = rg.getColTypes();
const vector<uint32_t>& csNums = rg.getCharsetNumbers();
const vector<uint32_t>& scales = rg.getScale();
const vector<uint32_t>& precisions = rg.getPrecision();
for (uint64_t i = 0; i < colCntIn; i++)
colIndexMap.insert(make_pair(keys[i], i));
// @bug6065, window functions that will update string table
int64_t wfsUpdateStringTable = 0;
int64_t wfsUserFunctionCount = 0;
for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
{
bool isUDAF = false;
// window function type
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index
// @bug6065, window functions that will update string table
{
CalpontSystemCatalog::ColType rt = wc->resultType();
if ((types[ridx] == CalpontSystemCatalog::CHAR || types[ridx] == CalpontSystemCatalog::VARCHAR ||
types[ridx] == CalpontSystemCatalog::TEXT || types[ridx] == CalpontSystemCatalog::VARBINARY ||
types[ridx] == CalpontSystemCatalog::BLOB) &&
rg.getColumnWidth(ridx) >= jobInfo.stringTableThreshold)
{
++wfsUpdateStringTable;
}
}
// if (boost::iequals(wc->functionName(),"UDAF_FUNC")
if (wc->functionName() == "UDAF_FUNC")
{
isUDAF = true;
++wfsUserFunctionCount;
}
vector<int64_t> fields;
fields.push_back(ridx); // result
const RetColsVector& parms = wc->functionParms();
for (uint64_t i = 0; i < parms.size(); i++) // arguments
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(parms[i].get()) == NULL)
fields.push_back(getColumnIndex(parms[i], colIndexMap, jobInfo));
else
fields.push_back(-1);
}
// partition & order by
const RetColsVector& partitions = wc->partitions();
vector<uint64_t> eqIdx;
vector<uint64_t> peerIdx;
vector<IdbSortSpec> sorts;
for (uint64_t i = 0; i < partitions.size(); i++)
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(partitions[i].get()) != NULL)
continue;
// get column index
uint64_t idx = getColumnIndex(partitions[i], colIndexMap, jobInfo);
eqIdx.push_back(idx);
sorts.push_back(IdbSortSpec(idx, partitions[i]->asc(), partitions[i]->nullsFirst()));
}
const RetColsVector& orders = wc->orderBy().fOrders;
for (uint64_t i = 0; i < orders.size(); i++)
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(orders[i].get()) != NULL)
continue;
// get column index
uint64_t idx = getColumnIndex(orders[i], colIndexMap, jobInfo);
peerIdx.push_back(idx);
sorts.push_back(IdbSortSpec(idx, orders[i]->asc(), orders[i]->nullsFirst()));
}
// functors for sorting
boost::shared_ptr<EqualCompData> parts(new EqualCompData(eqIdx, rg));
boost::shared_ptr<OrderByData> orderbys(new OrderByData(sorts, rg));
boost::shared_ptr<EqualCompData> peers(new EqualCompData(peerIdx, rg));
// column type for functor templates
int ct = 0;
if (isUDAF)
{
ct = wc->getUDAFContext().getResultType();
}
// make sure index is in range
else if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
ct = types[fields[1]];
// workaround for functions using "within group (order by)" syntax
string fn = boost::to_upper_copy(wc->functionName());
if ((fn == "MEDIAN" || fn == "PERCENTILE_CONT" || fn == "PERCENTILE_DISC") &&
utils::is_nonnegative(peerIdx[0]) && peerIdx[0] < types.size())
ct = types[peerIdx[0]];
// create the functor based on function name
boost::shared_ptr<WindowFunctionType> func = WindowFunctionType::makeWindowFunction(fn, ct, wc);
// parse parms after peer and fields are set
// functions may need to set order column index
func->peer(peers);
func->fieldIndex(fields);
func->parseParms(parms);
// window frame
const WF_Frame& frame = wc->orderBy().fFrame;
int frameUnit = (frame.fIsRange) ? WF__FRAME_RANGE : WF__FRAME_ROWS;
if (frame.fStart.fFrame == WF_UNBOUNDED_PRECEDING && frame.fEnd.fFrame == WF_UNBOUNDED_FOLLOWING)
frameUnit = WF__FRAME_ROWS;
boost::shared_ptr<FrameBound> upper =
parseFrameBound(frame.fStart, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, true);
boost::shared_ptr<FrameBound> lower =
parseFrameBound(frame.fEnd, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, false);
boost::shared_ptr<WindowFrame> windows(new WindowFrame(frameUnit, upper, lower));
func->frameUnit(frameUnit);
// add to the function list
fFunctions.push_back(
boost::shared_ptr<WindowFunction>(new WindowFunction(func, parts, orderbys, windows, rg, fRowIn)));
fFunctionCount++;
}
// initialize window function expresssions
fExpression = jobInfo.windowExps;
for (RetColsVector::iterator i = fExpression.begin(); i < fExpression.end(); i++)
{
// output index
(*i)->outputIndex(getColumnIndex(*i, colIndexMap, jobInfo));
// map the input indices
const vector<SimpleColumn*>& scols = (*i)->simpleColumnList();
for (vector<SimpleColumn*>::const_iterator j = scols.begin(); j != scols.end(); j++)
{
uint64_t key = getTupleKey(jobInfo, *j);
CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*j)->colType());
if (dictOid > 0)
{
key = jobInfo.keyInfo->dictKeyMap[key];
}
map<uint64_t, uint64_t>::iterator k = colIndexMap.find(key);
if (k != colIndexMap.end())
{
(*j)->inputIndex(k->second);
}
else
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
ERR_WF_COLUMN_MISSING);
}
}
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>((*i).get());
FunctionColumn* fc = dynamic_cast<FunctionColumn*>((*i).get());
if (ac != NULL)
{
updateWindowCols(ac->expression(), colIndexMap, jobInfo);
}
else if (fc != NULL)
{
// RetColsVector wcList = fc->windowfunctionColumnList();
// for (RetColsVector::iterator j = wcList.begin(); j != wcList.end(); j++)
// (*j)->inputIndex(getColumnIndex(*j, colIndexMap, jobInfo));
funcexp::FunctionParm parms = fc->functionParms();
for (vector<execplan::SPTP>::iterator j = parms.begin(); j < parms.end(); j++)
updateWindowCols(j->get(), colIndexMap, jobInfo);
}
}
// order by part
if (jobInfo.wfqOrderby.size() > 0)
{
// query order by
vector<uint64_t> eqIdx;
vector<IdbSortSpec> sorts;
const RetColsVector& orderby = jobInfo.wfqOrderby;
for (uint64_t i = 0; i < orderby.size(); i++)
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(orderby[i].get()) != NULL)
continue;
// get column index
uint64_t idx = getColumnIndex(orderby[i], colIndexMap, jobInfo);
sorts.push_back(IdbSortSpec(idx, orderby[i]->asc(), orderby[i]->nullsFirst()));
}
fQueryOrderBy.reset(new OrderByData(sorts, rg));
}
// limit part
fQueryLimitStart = jobInfo.wfqLimitStart;
fQueryLimitCount = jobInfo.wfqLimitCount;
// fix the delivered rowgroup data
vector<uint64_t> delColIdx;
for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
{
// find the none constantant columns in the deliver
// leave constants to annexstep for now.
if (dynamic_cast<const ConstantColumn*>((*i).get()) != NULL)
continue;
delColIdx.push_back(getColumnIndex(*i, colIndexMap, jobInfo));
}
size_t retColCount = delColIdx.size();
vector<uint32_t> pos1;
vector<uint32_t> oids1;
vector<uint32_t> keys1;
vector<uint32_t> scales1;
vector<uint32_t> precisions1;
vector<CalpontSystemCatalog::ColDataType> types1;
vector<uint32_t> csNums1;
pos1.push_back(2);
for (size_t i = 0; i < retColCount; i++)
{
size_t j = delColIdx[i];
pos1.push_back(pos1[i] + (pos[j + 1] - pos[j]));
oids1.push_back(oids[j]);
keys1.push_back(keys[j]);
scales1.push_back(scales[j]);
precisions1.push_back(precisions[j]);
types1.push_back(types[j]);
csNums1.push_back(csNums[j]);
}
fRowGroupDelivered = RowGroup(retColCount, pos1, oids1, keys1, types1, csNums1, scales1, precisions1,
jobInfo.stringTableThreshold);
if (jobInfo.trace)
cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
if (wfsUpdateStringTable > 1)
fUseSSMutex = true;
if (wfsUserFunctionCount > 1)
fUseUFMutex = true;
fRowGroupOut = fRowGroupDelivered;
}
void WindowFunctionStep::execute()
{
RGData rgData;
Row row;
fRowGroupIn.initRow(&row);
bool more = fInputDL->next(fInputIterator, &rgData);
uint64_t i = 0; // for RowGroup index in the fInRowGroupData
if (traceOn())
dlTimes.setFirstReadTime();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_START;
sts.total_units_of_work = 1;
postStepStartTele(sts);
try
{
while (more && !cancelled())
{
fRowGroupIn.setData(&rgData);
fRowGroupIn.getRow(0, &row);
uint64_t rowCnt = fRowGroupIn.getRowCount();
if (rowCnt > 0)
{
fInRowGroupData.push_back(rgData);
uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition);
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
fMemUsage += memAdd;
for (uint64_t j = 0; j < rowCnt; ++j)
{
if (i > 0x0000FFFFFFFFFFFFULL || j > 0x000000000000FFFFULL)
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
fRows.push_back(RowPosition(i, j));
row.nextRow();
}
//@bug6065, make StringStore::storeString() thread safe, default to false.
rgData.useStoreStringMutex(fUseSSMutex);
// For the User Data of UDAnF
rgData.useUserDataMutex(fUseUFMutex);
// window function does not change row count
fRowsReturned += rowCnt;
i++;
}
more = fInputDL->next(fInputIterator, &rgData);
}
} // try
catch (...)
{
handleException(std::current_exception(), logging::ERR_READ_INPUT_DATALIST,
logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::execute()");
}
if (traceOn())
dlTimes.setLastReadTime();
// no need for the window function if aborted or result set is empty.
if (cancelled() || fRows.size() == 0)
{
while (more)
more = fInputDL->next(fInputIterator, &rgData);
fOutputDL->endOfInput();
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
dlTimes.setEndOfInputTime();
printCalTrace();
}
return;
}
// got something to work on
try
{
if (fFunctionCount == 1)
{
doFunction();
}
else
{
if (fTotalThreads > fFunctionCount)
fTotalThreads = fFunctionCount;
fFunctionThreads.clear();
fFunctionThreads.reserve(fTotalThreads);
for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++)
fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this)));
// If cancelled, not all threads are started.
jobstepThreadPool.join(fFunctionThreads);
}
if (!(cancelled()))
{
if (fIsSelect)
doPostProcessForSelect();
else
doPostProcessForDml();
}
}
catch (...)
{
handleException(std::current_exception(), logging::ERR_EXECUTE_WINDOW_FUNCTION,
logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::execute()");
}
fOutputDL->endOfInput();
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
dlTimes.setEndOfInputTime();
printCalTrace();
}
return;
}
uint64_t WindowFunctionStep::nextFunctionIndex()
{
uint64_t idx = atomicInc(&fNextIndex);
// return index in the function array
return --idx;
}
void WindowFunctionStep::doFunction()
{
uint64_t i = 0;
try
{
while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled())
{
uint64_t memAdd = fRows.size() * sizeof(RowPosition);
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
fMemUsage += memAdd;
fFunctions[i]->setCallback(this, i);
(*fFunctions[i].get())();
}
}
catch (...)
{
handleException(std::current_exception(), logging::ERR_EXECUTE_WINDOW_FUNCTION,
logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::doFunction()");
}
}
void WindowFunctionStep::doPostProcessForSelect()
{
FuncExp* fe = funcexp::FuncExp::instance();
std::shared_ptr<int[]> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
Row rowIn, rowOut;
fRowGroupIn.initRow(&rowIn);
fRowGroupOut.initRow(&rowOut);
RGData rgData;
vector<RowPosition>& rowData = *(fFunctions.back()->fRowData.get());
int64_t rowsLeft = rowData.size();
int64_t rowsInRg = 0;
int64_t rgCapacity = 0;
int64_t begin = fQueryLimitStart;
int64_t count = (fQueryLimitCount == (uint64_t)-1) ? rowsLeft : fQueryLimitCount;
int64_t end = begin + count;
end = (end < rowsLeft) ? end : rowsLeft;
rowsLeft = (end > begin) ? (end - begin) : 0;
if (fQueryOrderBy.get() != NULL)
sort(rowData.begin(), rowData.size());
for (int64_t i = begin; i < end; i++)
{
if (!rgData.hasRowData())
{
rgCapacity = ((rowsLeft > 8192) ? 8192 : rowsLeft);
rowsLeft -= rgCapacity;
rgData.reinit(fRowGroupOut, rgCapacity);
fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setDBRoot(0); // not valid dbroot
fRowGroupOut.getRow(0, &rowOut);
rowsInRg = 0;
}
rowIn.setData(getPointer(rowData[i], fRowGroupIn, rowIn));
// evaluate the window function expressions before apply mapping
if (fExpression.size() > 0)
fe->evaluate(rowIn, fExpression);
applyMapping(mapping, rowIn, &rowOut);
rowOut.nextRow();
rowsInRg++;
if (rowsInRg == rgCapacity)
{
fRowGroupOut.setRowCount(rowsInRg);
fOutputDL->insert(rgData);
rgData.clear();
}
}
}
void WindowFunctionStep::doPostProcessForDml()
{
FuncExp* fe = funcexp::FuncExp::instance();
std::shared_ptr<int[]> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
Row rowIn, rowOut;
fRowGroupIn.initRow(&rowIn);
fRowGroupOut.initRow(&rowOut);
for (vector<RGData>::iterator i = fInRowGroupData.begin(); i < fInRowGroupData.end(); i++)
{
fRowGroupIn.setData(&(*i));
RGData rgData = RGData(fRowGroupIn, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgData);
// @bug 5631. reset rowgroup before the data is populated.
fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
fRowGroupIn.getRow(0, &rowIn);
fRowGroupOut.getRow(0, &rowOut);
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
{
// evaluate the window function expressions before apply mapping
if (fExpression.size() > 0)
fe->evaluate(rowIn, fExpression);
applyMapping(mapping, rowIn, &rowOut);
rowIn.nextRow();
rowOut.nextRow();
}
// fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
// fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
fOutputDL->insert(rgData);
}
}
boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRows(const execplan::WF_Boundary& b,
const map<uint64_t, uint64_t>& m,
JobInfo& jobInfo)
{
boost::shared_ptr<FrameBound> fb;
if (b.fFrame == WF_CURRENT_ROW)
{
fb.reset(new FrameBoundRow(WF__CURRENT_ROW));
return fb;
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
if (cc != NULL)
{
Row dummy;
bool isNull = false;
int val = cc->getIntVal(dummy, isNull);
if (val >= 0 && isNull == false)
{
int type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
fb.reset(new FrameBoundConstantRow(type, val));
}
else
{
string str("NULL");
if (!isNull)
{
ostringstream oss;
oss << val;
str = oss.str();
}
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
ERR_WF_BOUND_OUT_OF_RANGE);
}
}
else
{
int type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
uint64_t id = getTupleKey(jobInfo, b.fVal);
uint64_t idx = getColumnIndex(b.fVal, m, jobInfo);
TupleInfo ti = getTupleInfo(id, jobInfo);
switch (ti.dtype)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
{
fb.reset(new FrameBoundExpressionRow<int64_t>(type, id, idx));
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
fb.reset(new FrameBoundExpressionRow<double>(type, id, idx));
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
fb.reset(new FrameBoundExpressionRow<float>(type, id, idx));
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
case execplan::CalpontSystemCatalog::UDECIMAL:
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
case execplan::CalpontSystemCatalog::TIME:
case execplan::CalpontSystemCatalog::TIMESTAMP:
{
fb.reset(new FrameBoundExpressionRow<uint64_t>(type, id, idx));
break;
}
default:
{
string str = windowfunction::colType2String[ti.dtype];
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
ERR_WF_INVALID_BOUND_TYPE);
break;
}
}
}
return fb;
}
boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRange(const execplan::WF_Boundary& b,
const map<uint64_t, uint64_t>& m,
const vector<SRCP>& o,
JobInfo& jobInfo)
{
boost::shared_ptr<FrameBound> fb;
if (b.fFrame == WF_CURRENT_ROW)
{
fb.reset(new FrameBoundRange(WF__CURRENT_ROW));
return fb;
}
bool isConstant = false;
bool isNull = false;
Row dummy;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
if (cc != NULL)
{
isConstant = true;
double val = cc->getDoubleVal(dummy, isNull);
if (val < 0 || isNull)
{
string str("NULL");
if (!isNull)
{
ostringstream oss;
oss << val;
str = oss.str();
}
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
ERR_WF_BOUND_OUT_OF_RANGE);
}
}
int type = 0;
vector<uint64_t> ids;
vector<int> index;
ids.push_back(getTupleKey(jobInfo, o[0]));
index.push_back(getColumnIndex(o[0], m, jobInfo));
if (isConstant)
{
type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
ids.push_back(-1); // dummy, n/a for constant
index.push_back(-1); // dummy, n/a for constant
}
else
{
type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
ids.push_back(getTupleKey(jobInfo, b.fVal));
index.push_back(getColumnIndex(b.fVal, m, jobInfo));
}
ids.push_back(getTupleKey(jobInfo, b.fBound));
index.push_back(getColumnIndex(b.fBound, m, jobInfo));
FrameBoundRange* fbr = NULL;
TupleInfo ti = getTupleInfo(ids[0], jobInfo);
bool asc = o[0]->asc();
bool nlf = o[0]->nullsFirst();
switch (ti.dtype)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
{
if (isConstant)
{
int64_t v = cc->getIntVal(dummy, isNull);
fbr = new FrameBoundConstantRange<int64_t>(type, asc, nlf, &v);
fbr->isZero((v == 0));
}
else
{
fbr = new FrameBoundExpressionRange<int64_t>(type, asc, nlf);
}
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
if (isConstant)
{
double v = cc->getDoubleVal(dummy, isNull);
fbr = new FrameBoundConstantRange<double>(type, asc, nlf, &v);
fbr->isZero((v == 0.0));
}
else
{
fbr = new FrameBoundExpressionRange<double>(type, asc, nlf);
}
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
if (isConstant)
{
float v = cc->getFloatVal(dummy, isNull);
fbr = new FrameBoundConstantRange<float>(type, asc, nlf, &v);
fbr->isZero((v == 0.0));
}
else
{
fbr = new FrameBoundExpressionRange<float>(type, asc, nlf);
}
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
case execplan::CalpontSystemCatalog::UDECIMAL:
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
case execplan::CalpontSystemCatalog::TIME:
case execplan::CalpontSystemCatalog::TIMESTAMP:
{
if (isConstant)
{
uint64_t v = cc->getUintVal(dummy, isNull);
fbr = new FrameBoundConstantRange<uint64_t>(type, asc, nlf, &v);
fbr->isZero((v == 0));
}
else
{
fbr = new FrameBoundExpressionRange<uint64_t>(type, asc, nlf);
}
break;
}
default:
{
string str = windowfunction::colType2String[ti.dtype];
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
ERR_WF_INVALID_BOUND_TYPE);
break;
}
}
fbr->setTupleId(ids);
fbr->setIndex(index);
fb.reset(fbr);
return fb;
}
boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBound(const execplan::WF_Boundary& b,
const map<uint64_t, uint64_t>& m,
const vector<SRCP>& o,
const boost::shared_ptr<EqualCompData>& p,
JobInfo& j, bool rows, bool s)
{
boost::shared_ptr<FrameBound> fb;
switch (b.fFrame)
{
case WF_UNBOUNDED_PRECEDING:
{
fb.reset(new FrameBound(WF__UNBOUNDED_PRECEDING));
break;
}
case WF_UNBOUNDED_FOLLOWING:
{
fb.reset(new FrameBound(WF__UNBOUNDED_FOLLOWING));
break;
}
case WF_CURRENT_ROW:
case WF_PRECEDING:
case WF_FOLLOWING:
{
if (rows)
{
fb = parseFrameBoundRows(b, m, j);
}
else
{
fb = parseFrameBoundRange(b, m, o, j);
}
break;
}
default: // unknown
{
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_UNKNOWN_BOUND, b.fFrame),
ERR_WF_UNKNOWN_BOUND);
break;
}
}
fb->peer(p);
fb->start(s);
return fb;
}
void WindowFunctionStep::updateWindowCols(ReturnedColumn* rc, const map<uint64_t, uint64_t>& m,
JobInfo& jobInfo)
{
if (rc == NULL)
return;
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(rc);
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
SimpleFilter* sf = dynamic_cast<SimpleFilter*>(rc);
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(rc);
if (wc)
{
uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
map<uint64_t, uint64_t>::const_iterator j = m.find(key);
if (j == m.end())
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING);
}
wc->inputIndex(j->second);
}
else if (ac)
{
updateWindowCols(ac->expression(), m, jobInfo);
}
else if (fc)
{
funcexp::FunctionParm parms = fc->functionParms();
for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
updateWindowCols(i->get(), m, jobInfo);
}
else if (sf)
{
updateWindowCols(sf->lhs(), m, jobInfo);
updateWindowCols(sf->rhs(), m, jobInfo);
}
}
void WindowFunctionStep::updateWindowCols(ParseTree* pt, const map<uint64_t, uint64_t>& m, JobInfo& jobInfo)
{
if (pt == NULL)
return;
updateWindowCols(pt->left(), m, jobInfo);
updateWindowCols(pt->right(), m, jobInfo);
TreeNode* tn = pt->data();
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(tn);
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(tn);
SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(tn);
if (wc)
{
uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
map<uint64_t, uint64_t>::const_iterator j = m.find(key);
if (j == m.end())
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING);
}
wc->inputIndex(j->second);
}
else if (ac)
{
updateWindowCols(ac->expression(), m, jobInfo);
}
else if (fc)
{
funcexp::FunctionParm parms = fc->functionParms();
for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
updateWindowCols(i->get(), m, jobInfo);
}
else if (sf)
{
updateWindowCols(sf->lhs(), m, jobInfo);
updateWindowCols(sf->rhs(), m, jobInfo);
}
}
void WindowFunctionStep::sort(std::vector<RowPosition>::iterator v, uint64_t n)
{
// recursive function termination condition.
if (n < 2 || cancelled())
return;
RowPosition p = *(v + n / 2); // pivot value
vector<RowPosition>::iterator l = v; // low address
vector<RowPosition>::iterator h = v + (n - 1); // high address
while (l <= h && !cancelled())
{
// Can use while here, but need check boundary and cancel status.
if (fQueryOrderBy->operator()(getPointer(*l), getPointer(p)))
{
l++;
}
else if (fQueryOrderBy->operator()(getPointer(p), getPointer(*h)))
{
h--;
}
else
{
RowPosition t = *l; // temp value for swap
*l++ = *h;
*h-- = t;
}
}
sort(v, std::distance(v, h) + 1);
sort(l, std::distance(l, v) + n);
}
void WindowFunctionStep::printCalTrace()
{
time_t t = time(0);
char timeString[50];
ctime_r(&t, timeString);
timeString[strlen(timeString) - 1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
<< "; total rows returned-" << fRowsReturned << endl
<< "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
<< "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
<< "\tJob completion status " << status() << endl;
logEnd(logStr.str().c_str());
fExtendedInfo += logStr.str();
formatMiniStats();
}
void WindowFunctionStep::formatMiniStats()
{
ostringstream oss;
oss << "WFS "
<< "UM "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
<< fRowsReturned << " ";
fMiniInfo += oss.str();
}
} // namespace joblist