1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-10 01:22:48 +03:00
Files
mariadb-columnstore-engine/dbcon/joblist/windowfunctionstep.cpp
Andrew Hutchings 6128293ad3 MCOL-671 Fix TEXT/BLOB single row SELECT WHERE
pDictionaryScan won't work for BLOB/TEXT since it requires searching the
data file and rebuilding the token from matches. The tokens can't be
rebuild correctly due the bits in the token used for block counts. This
patch forces the use of pDictionaryStep instead for WHERE conditions.

In addition this patch adds support for TEXT/BLOB in various parts of
the job step processing. This fixes things like error 202 during an
UPDATE with a join condition on TEXT/BLOB columns.
2017-04-21 11:21:59 +01:00

1517 lines
41 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: windowfunctionstep.cpp 9681 2013-07-11 22:58:05Z xlou $
//#define NDEBUG
#include <cassert>
#include <sstream>
#include <iomanip>
using namespace std;
#include <boost/algorithm/string.hpp> // to_upper_copy
#include <boost/shared_ptr.hpp>
#include <boost/shared_array.hpp>
#include <boost/thread.hpp>
#include <boost/uuid/uuid_io.hpp>
using namespace boost;
#include "atomicops.h"
using namespace atomicops;
#include "loggingid.h"
#include "errorcodes.h"
#include "idberrorinfo.h"
using namespace logging;
#include "configcpp.h"
using namespace config;
#include "calpontselectexecutionplan.h"
#include "calpontsystemcatalog.h"
#include "aggregatecolumn.h"
#include "arithmeticcolumn.h"
#include "constantcolumn.h"
#include "functioncolumn.h"
#include "pseudocolumn.h"
#include "simplefilter.h"
#include "windowfunctioncolumn.h"
using namespace execplan;
#include "../../utils/windowfunction/windowfunction.h"
#include "../../utils/windowfunction/windowfunctiontype.h"
#include "../../utils/windowfunction/framebound.h"
#include "../../utils/windowfunction/frameboundrange.h"
#include "../../utils/windowfunction/frameboundrow.h"
#include "../../utils/windowfunction/windowframe.h"
using namespace windowfunction;
#include "rowgroup.h"
using namespace rowgroup;
using namespace ordering;
#include "funcexp.h"
using namespace funcexp;
#include "querytele.h"
using namespace querytele;
#include "jlf_common.h"
#include "jobstep.h"
#include "windowfunctionstep.h"
using namespace joblist;
namespace
{
string keyName(uint64_t i, uint32_t key, const joblist::JobInfo& jobInfo)
{
string name = jobInfo.projectionCols[i]->alias();
if (name.empty())
{
name = jobInfo.keyInfo->tupleKeyToName[key];
if (jobInfo.keyInfo->tupleKeyVec[key].fId < 100)
name = "Expression/Function";
}
return name = "'" + name + "'";
}
uint64_t getColumnIndex(const SRCP& c, const map<uint64_t, uint64_t>& m, JobInfo& jobInfo)
{
uint64_t key = getTupleKey(jobInfo, c, true);
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(c.get());
if (sc != NULL && !sc->schemaName().empty())
{
// special handling for dictionary
CalpontSystemCatalog::ColType ct = sc->colType();
//XXX use this before connector sets colType in sc correctly.
// type of pseudo column is set by connector
if (!(dynamic_cast<const PseudoColumn*>(sc)))
ct = jobInfo.csc->colType(sc->oid());
//X
CalpontSystemCatalog::OID dictOid = isDictCol(ct);
string alias(extractTableAlias(sc));
if (dictOid > 0)
{
TupleInfo ti =
setTupleInfo(ct, dictOid, jobInfo, tableOid(sc, jobInfo.csc), sc, alias);
key = ti.key;
}
}
map<uint64_t, uint64_t>::const_iterator j = m.find(key);
if (j == m.end())
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
ERR_WF_COLUMN_MISSING);
}
return j->second;
}
}
namespace joblist
{
WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo) :
JobStep(jobInfo),
fRunner(0),
fCatalog(jobInfo.csc),
fRowsReturned(0),
fEndOfResult(false),
fIsSelect(true),
fUseSSMutex(false),
fInputDL(NULL),
fOutputDL(NULL),
fInputIterator(-1),
fOutputIterator(-1),
fFunctionCount(0),
fTotalThreads(1),
fNextIndex(0),
fMemUsage(0),
fRm(jobInfo.rm),
fSessionMemLimit(jobInfo.umMemLimit)
{
fTotalThreads = fRm->windowFunctionThreads();
fExtendedInfo = "WFS: ";
fQtc.stepParms().stepType = StepTeleStats::T_WFS;
}
WindowFunctionStep::~WindowFunctionStep()
{
if (fMemUsage > 0)
fRm->returnMemory(fMemUsage, fSessionMemLimit);
}
void WindowFunctionStep::run()
{
if (fInputJobStepAssociation.outSize() == 0)
throw logic_error("No input data list for window function step.");
fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
if (fInputDL == NULL)
throw logic_error("Input is not a RowGroup data list in window function step.");
fInputIterator = fInputDL->getIterator();
if (fOutputJobStepAssociation.outSize() == 0)
throw logic_error("No output data list for window function step.");
fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
if (fOutputDL == NULL)
throw logic_error("Output of window function step is not a RowGroup data list.");
if (fDelivery == true)
{
fOutputIterator = fOutputDL->getIterator();
}
fRunner = jobstepThreadPool.invoke(Runner(this));
}
void WindowFunctionStep::join()
{
if (fRunner)
jobstepThreadPool.join(fRunner);
}
uint32_t WindowFunctionStep::nextBand(messageqcpp::ByteStream &bs)
{
RGData rgDataOut;
bool more = false;
uint32_t rowCount = 0;
try
{
bs.restart();
more = fOutputDL->next(fOutputIterator, &rgDataOut);
if (more && !cancelled())
{
fRowGroupDelivered.setData(&rgDataOut);
fRowGroupDelivered.serializeRGData(bs);
rowCount = fRowGroupDelivered.getRowCount();
}
else
{
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
}
catch (IDBExcept& iex)
{
handleException(iex.what(), iex.errorCode());
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
catch(const std::exception& ex)
{
handleException(ex.what(), ERR_IN_DELIVERY);
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
catch(...)
{
handleException("WindowFunctionStep caught an unknown exception", ERR_IN_DELIVERY);
while (more)
more = fOutputDL->next(fOutputIterator, &rgDataOut);
fEndOfResult = true;
}
if (fEndOfResult)
{
// send an empty / error band
rgDataOut.reinit(fRowGroupDelivered, 0);
fRowGroupDelivered.setData(&rgDataOut);
fRowGroupDelivered.resetRowGroup(0);
fRowGroupDelivered.setStatus(status());
fRowGroupDelivered.serializeRGData(bs);
}
return rowCount;
}
void WindowFunctionStep::setOutputRowGroup(const RowGroup& rg)
{
idbassert(0);
}
const RowGroup& WindowFunctionStep::getOutputRowGroup() const
{
return fRowGroupOut;
}
const RowGroup& WindowFunctionStep::getDeliveredRowGroup() const
{
return fRowGroupDelivered;
}
void WindowFunctionStep::deliverStringTableRowGroup(bool b)
{
fRowGroupOut.setUseStringTable(b);
fRowGroupDelivered.setUseStringTable(b);
}
bool WindowFunctionStep::deliverStringTableRowGroup() const
{
idbassert(fRowGroupOut.usesStringTable() == fRowGroupDelivered.usesStringTable());
return fRowGroupDelivered.usesStringTable();
}
const string WindowFunctionStep::toString() const
{
ostringstream oss;
oss << "WindowFunctionStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
oss << " in:";
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
oss << fInputJobStepAssociation.outAt(i);
if (fOutputJobStepAssociation.outSize() > 0)
{
oss << " out:";
for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
oss << fOutputJobStepAssociation.outAt(i);
}
return oss.str();
}
void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
{
// window functions in select clause, selected or in expression
jobInfo.windowDels = jobInfo.deliveredCols;
for (RetColsVector::iterator i=jobInfo.windowDels.begin(); i<jobInfo.windowDels.end(); i++)
{
const vector<WindowFunctionColumn*>& wcl = (*i)->windowfunctionColumnList();
RetColsVector wcList;
for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
wcList.push_back(SRCP((*j)->clone()));
if (!wcList.empty())
{
jobInfo.windowExps.push_back(*i);
jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
}
if (dynamic_cast<WindowFunctionColumn*>(i->get()) != NULL)
{
jobInfo.windowCols.push_back(*i);
jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
}
else if (!wcList.empty())
{
jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end());
for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
{
jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
}
}
}
// window functions in order by clause
const CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols();
RetColsVector wcInOrderby;
for (uint64_t i = 0; i < orderByCols.size(); i++)
{
if (orderByCols[i]->orderPos() == (uint64_t)(-1))
{
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get());
const vector<WindowFunctionColumn*>& wcl = orderByCols[i]->windowfunctionColumnList();
RetColsVector wcList;
for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
wcList.push_back(SRCP((*j)->clone()));
if (wc == NULL && wcList.empty())
continue;
// an window function or expression of window functions
wcInOrderby.push_back(orderByCols[i]);
if (!wcList.empty())
{
jobInfo.windowExps.push_back(orderByCols[i]);
jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
}
if (dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get()) != NULL)
{
jobInfo.windowCols.push_back(orderByCols[i]);
jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
}
else if (!wcList.empty())
{
jobInfo.windowCols.insert(
jobInfo.windowCols.end(), wcList.begin(), wcList.end());
for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
{
jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
}
}
}
}
// no window function involved in the query
if (jobInfo.windowCols.empty())
return;
// reconstruct the delivered column list with auxiliary columns
set<uint64_t> colSet;
jobInfo.deliveredCols.resize(0);
for (RetColsVector::iterator i=jobInfo.windowDels.begin(); i<jobInfo.windowDels.end(); i++)
{
jobInfo.deliveredCols.push_back(*i);
uint64_t key = getTupleKey(jobInfo, *i, true);
// TODO: remove duplicates in select clause
colSet.insert(key);
}
// add window columns in orderby
for (RetColsVector::iterator i = wcInOrderby.begin(); i < wcInOrderby.end(); i++)
{
jobInfo.deliveredCols.push_back(*i);
uint64_t key = getTupleKey(jobInfo, *i, true);
colSet.insert(key);
}
// add non-duplicate auxiliary columns
RetColsVector colsInAf;
for (RetColsVector::iterator i=jobInfo.windowCols.begin(); i<jobInfo.windowCols.end(); i++)
{
uint64_t key = getTupleKey(jobInfo, *i, true);
if (colSet.find(key) == colSet.end())
jobInfo.deliveredCols.push_back(*i);
RetColsVector columns = dynamic_cast<WindowFunctionColumn*>(i->get())->getColumnList();
for (RetColsVector::iterator j = columns.begin(); j < columns.end(); j++)
{
if (dynamic_cast<ConstantColumn*>(j->get()) != NULL)
continue;
key = getTupleKey(jobInfo, *j, true);
if (colSet.find(key) == colSet.end())
jobInfo.deliveredCols.push_back(*j);
colSet.insert(key);
}
}
// for handling order by and limit in outer query
jobInfo.wfqLimitStart = csep->limitStart();
jobInfo.wfqLimitCount = csep->limitNum();
csep->limitStart(0);
csep->limitNum(-1);
if (csep->orderByCols().size() > 0)
{
jobInfo.wfqOrderby = csep->orderByCols();
csep->orderByCols().clear();
// add order by columns
for (RetColsVector::iterator i=jobInfo.wfqOrderby.begin(); i<jobInfo.wfqOrderby.end(); i++)
{
if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
continue;
uint64_t key = getTupleKey(jobInfo, *i, true);
if (colSet.find(key) == colSet.end())
jobInfo.deliveredCols.push_back(*i);
colSet.insert(key);
}
}
}
SJSTEP WindowFunctionStep::makeWindowFunctionStep(SJSTEP& step, JobInfo& jobInfo)
{
// create a window function step
WindowFunctionStep* ws = new WindowFunctionStep(jobInfo);
// connect to the feeding step
JobStepAssociation jsa;
AnyDataListSPtr spdl(new AnyDataList());
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
dl->OID(execplan::CNX_VTABLE_ID);
spdl->rowGroupDL(dl);
jsa.outAdd(spdl);
ws->inputAssociation(jsa);
ws->stepId(step->stepId()+1);
step->outputAssociation(jsa);
AnyDataListSPtr spdlOut(new AnyDataList());
RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
dlOut->OID(CNX_VTABLE_ID);
spdlOut->rowGroupDL(dlOut);
JobStepAssociation jsaOut;
jsaOut.outAdd(spdlOut);
ws->outputAssociation(jsaOut);
// configure the rowgroups and index mapping
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(step.get());
idbassert(ds != NULL);
ws->initialize(ds->getDeliveredRowGroup(), jobInfo);
// restore the original delivery coloumns
jobInfo.deliveredCols = jobInfo.windowDels;
jobInfo.nonConstDelCols.clear();
for (RetColsVector::iterator i=jobInfo.windowDels.begin(); i<jobInfo.windowDels.end(); i++)
{
if (NULL == dynamic_cast<const ConstantColumn*>(i->get()))
jobInfo.nonConstDelCols.push_back(*i);
}
return SJSTEP(ws);
}
void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
{
if (jobInfo.trace) cout << "Input to WindowFunctionStep: " << rg.toString() << endl;
// query type decides the output by dbroot or partition
// @bug 5631. Insert select should be treated as select
fIsSelect = (jobInfo.queryType == "SELECT" ||
jobInfo.queryType == "INSERT_SELECT");
// input row meta data
fRowGroupIn = rg;
fRowGroupIn.initRow(&fRowIn);
// make an input map(id, index)
map<uint64_t, uint64_t> colIndexMap;
uint64_t colCntIn = rg.getColumnCount();
const vector<uint32_t>& pos = rg.getOffsets();
const vector<uint32_t>& oids = rg.getOIDs();
const vector<uint32_t>& keys = rg.getKeys();
const vector<CalpontSystemCatalog::ColDataType>& types = rg.getColTypes();
const vector<uint32_t>& scales = rg.getScale();
const vector<uint32_t>& precisions = rg.getPrecision();
for (uint64_t i = 0; i < colCntIn; i++)
colIndexMap.insert(make_pair(keys[i], i));
// @bug6065, window functions that will update string table
int64_t wfsUpdateStringTable = 0;
for (RetColsVector::iterator i=jobInfo.windowCols.begin(); i<jobInfo.windowCols.end(); i++)
{
// window function type
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index
// @bug6065, window functions that will update string table
{
CalpontSystemCatalog::ColType rt = wc->resultType();
if ((types[ridx] == CalpontSystemCatalog::CHAR ||
types[ridx] == CalpontSystemCatalog::VARCHAR ||
types[ridx] == CalpontSystemCatalog::TEXT) &&
rg.getColumnWidth(ridx) >= jobInfo.stringTableThreshold)
{
wfsUpdateStringTable++;
}
}
vector<int64_t> fields;
fields.push_back(ridx); // result
const RetColsVector& parms = wc->functionParms();
for (uint64_t i = 0; i < parms.size(); i++) // arguments
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(parms[i].get()) == NULL)
fields.push_back(getColumnIndex(parms[i], colIndexMap, jobInfo));
else
fields.push_back(-1);
}
// partition & order by
const RetColsVector& partitions = wc->partitions();
vector<uint64_t> eqIdx;
vector<uint64_t> peerIdx;
vector<IdbSortSpec> sorts;
for (uint64_t i = 0; i < partitions.size(); i++)
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(partitions[i].get()) != NULL)
continue;
// get column index
uint64_t idx = getColumnIndex(partitions[i], colIndexMap, jobInfo);
eqIdx.push_back(idx);
sorts.push_back(IdbSortSpec(idx, partitions[i]->asc(), partitions[i]->nullsFirst()));
}
const RetColsVector& orders = wc->orderBy().fOrders;
for (uint64_t i = 0; i < orders.size(); i++)
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(orders[i].get()) != NULL)
continue;
// get column index
uint64_t idx = getColumnIndex(orders[i], colIndexMap, jobInfo);
peerIdx.push_back(idx);
sorts.push_back(IdbSortSpec(idx, orders[i]->asc(), orders[i]->nullsFirst()));
}
// functors for sorting
boost::shared_ptr<EqualCompData> parts(new EqualCompData(eqIdx, rg));
boost::shared_ptr<OrderByData> orderbys(new OrderByData(sorts, rg));
boost::shared_ptr<EqualCompData> peers(new EqualCompData(peerIdx, rg));
// column type for functor templates
int ct = 0;
// make sure index is in range
if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
ct = types[fields[1]];
// workaround for functions using "within group (order by)" syntax
string fn = boost::to_upper_copy(wc->functionName());
if ( (fn == "MEDIAN" || fn == "PERCENTILE_CONT" || fn == "PERCENTILE_DISC") &&
peerIdx[0] >= 0 && peerIdx[0] < types.size() )
ct = types[peerIdx[0]];
// create the functor based on function name
boost::shared_ptr<WindowFunctionType> func =
WindowFunctionType::makeWindowFunction(fn, ct);
// parse parms after peer and fields are set
// functions may need to set order column index
func->peer(peers);
func->fieldIndex(fields);
func->parseParms(parms);
// window frame
const WF_Frame& frame = wc->orderBy().fFrame;
int frameUnit = (frame.fIsRange) ? WF__FRAME_RANGE : WF__FRAME_ROWS;
if (frame.fStart.fFrame == WF_UNBOUNDED_PRECEDING &&
frame.fEnd.fFrame == WF_UNBOUNDED_FOLLOWING)
frameUnit = WF__FRAME_ROWS;
boost::shared_ptr<FrameBound> upper = parseFrameBound(
frame.fStart, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, true);
boost::shared_ptr<FrameBound> lower = parseFrameBound(
frame.fEnd, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, false);
boost::shared_ptr<WindowFrame> windows(new WindowFrame(frameUnit, upper, lower));
func->frameUnit(frameUnit);
// add to the function list
fFunctions.push_back(boost::shared_ptr<WindowFunction>(
new WindowFunction(func, parts, orderbys, windows, rg, fRowIn)));
fFunctionCount++;
}
// initialize window function expresssions
fExpression = jobInfo.windowExps;
for (RetColsVector::iterator i = fExpression.begin(); i < fExpression.end(); i++)
{
// output index
(*i)->outputIndex(getColumnIndex(*i, colIndexMap, jobInfo));
// map the input indices
const vector<SimpleColumn*>& scols = (*i)->simpleColumnList();
for (vector<SimpleColumn*>::const_iterator j = scols.begin(); j != scols.end(); j++)
{
uint64_t key = getTupleKey(jobInfo, *j);
CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*j)->colType());
if (dictOid > 0)
{
key = jobInfo.keyInfo->dictKeyMap[key];
}
map<uint64_t, uint64_t>::iterator k = colIndexMap.find(key);
if (k != colIndexMap.end())
{
(*j)->inputIndex(k->second);
}
else
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
ERR_WF_COLUMN_MISSING);
}
}
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>((*i).get());
FunctionColumn* fc = dynamic_cast<FunctionColumn*>((*i).get());
if (ac != NULL)
{
updateWindowCols(ac->expression(), colIndexMap, jobInfo);
}
else if (fc != NULL)
{
// RetColsVector wcList = fc->windowfunctionColumnList();
// for (RetColsVector::iterator j = wcList.begin(); j != wcList.end(); j++)
// (*j)->inputIndex(getColumnIndex(*j, colIndexMap, jobInfo));
funcexp::FunctionParm parms = fc->functionParms();
for (vector<execplan::SPTP>::iterator j = parms.begin(); j < parms.end(); j++)
updateWindowCols(j->get(), colIndexMap, jobInfo);
}
}
// order by part
if (jobInfo.wfqOrderby.size() > 0)
{
// query order by
vector<uint64_t> eqIdx;
vector<IdbSortSpec> sorts;
const RetColsVector& orderby = jobInfo.wfqOrderby;
for (uint64_t i = 0; i < orderby.size(); i++)
{
// skip constant column
if (dynamic_cast<const ConstantColumn*>(orderby[i].get()) != NULL)
continue;
// get column index
uint64_t idx = getColumnIndex(orderby[i], colIndexMap, jobInfo);
sorts.push_back(IdbSortSpec(idx, orderby[i]->asc(), orderby[i]->nullsFirst()));
}
fQueryOrderBy.reset(new OrderByData(sorts, rg));
}
// limit part
fQueryLimitStart = jobInfo.wfqLimitStart;
fQueryLimitCount = jobInfo.wfqLimitCount;
// fix the delivered rowgroup data
vector<uint64_t> delColIdx;
for (RetColsVector::iterator i=jobInfo.windowDels.begin(); i<jobInfo.windowDels.end(); i++)
{
// find the none constantant columns in the deliver
// leave constants to annexstep for now.
if (dynamic_cast<const ConstantColumn*>((*i).get()) != NULL)
continue;
delColIdx.push_back(getColumnIndex(*i, colIndexMap, jobInfo));
}
size_t retColCount = delColIdx.size();
vector<uint32_t> pos1;
vector<uint32_t> oids1;
vector<uint32_t> keys1;
vector<uint32_t> scales1;
vector<uint32_t> precisions1;
vector<CalpontSystemCatalog::ColDataType> types1;
pos1.push_back(2);
for (size_t i = 0; i < retColCount; i++)
{
size_t j = delColIdx[i];
pos1.push_back(pos1[i] + (pos[j+1] - pos[j]));
oids1.push_back(oids[j]);
keys1.push_back(keys[j]);
scales1.push_back(scales[j]);
precisions1.push_back(precisions[j]);
types1.push_back(types[j]);
}
fRowGroupDelivered = RowGroup(
retColCount,pos1,oids1,keys1,types1,scales1,precisions1,jobInfo.stringTableThreshold);
if (jobInfo.trace)
cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
if (wfsUpdateStringTable > 1)
fUseSSMutex = true;
fRowGroupOut = fRowGroupDelivered;
}
void WindowFunctionStep::execute()
{
RGData rgData;
Row row;
fRowGroupIn.initRow(&row);
bool more = fInputDL->next(fInputIterator, &rgData);
uint64_t i = 0; // for RowGroup index in the fInRowGroupData
if (traceOn()) dlTimes.setFirstReadTime();
StepTeleStats sts;
sts.query_uuid = fQueryUuid;
sts.step_uuid = fStepUuid;
sts.msg_type = StepTeleStats::ST_START;
sts.total_units_of_work = 1;
postStepStartTele(sts);
try
{
while (more && !cancelled())
{
fRowGroupIn.setData(&rgData);
fRowGroupIn.getRow(0, &row);
uint64_t rowCnt = fRowGroupIn.getRowCount();
if (rowCnt > 0)
{
fInRowGroupData.push_back(rgData);
uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition);
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
fMemUsage += memAdd;
for (uint64_t j = 0; j < rowCnt; ++j)
{
if (i > 0x0000FFFFFFFFFFFFULL || j > 0x000000000000FFFFULL)
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
fRows.push_back(RowPosition(i, j));
row.nextRow();
}
//@bug6065, make StringStore::storeString() thread safe, default to false.
rgData.useStoreStringMutex(fUseSSMutex);
// window function does not change row count
fRowsReturned += rowCnt;
i++;
}
more = fInputDL->next(fInputIterator, &rgData);
}
} // try
catch(const IDBExcept &idb) {
handleException(idb.what(), idb.errorCode());
}
catch(const std::exception& ex)
{
handleException(ex.what(), ERR_READ_INPUT_DATALIST);
}
catch(...)
{
handleException("WindowFunctionStep caught an unknown exception", ERR_READ_INPUT_DATALIST);
}
if (traceOn())
dlTimes.setLastReadTime();
// no need for the window function if aborted or result set is empty.
if (cancelled() || fRows.size() == 0)
{
while (more)
more = fInputDL->next(fInputIterator, &rgData);
fOutputDL->endOfInput();
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
dlTimes.setEndOfInputTime();
printCalTrace();
}
return;
}
// got something to work on
try
{
if (fFunctionCount == 1)
{
doFunction();
}
else
{
if (fTotalThreads > fFunctionCount)
fTotalThreads = fFunctionCount;
fFunctionThreads.clear();
fFunctionThreads.reserve(fTotalThreads);
for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++)
fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this)));
// If cancelled, not all threads are started.
jobstepThreadPool.join(fFunctionThreads);
}
if (!(cancelled()))
{
if (fIsSelect)
doPostProcessForSelect();
else
doPostProcessForDml();
}
}
catch(const std::exception& ex)
{
handleException(ex.what(), ERR_EXECUTE_WINDOW_FUNCTION);
}
catch(...)
{
handleException("WindowFunctionStep caught an unknown exception",
ERR_EXECUTE_WINDOW_FUNCTION);
}
fOutputDL->endOfInput();
sts.msg_type = StepTeleStats::ST_SUMMARY;
sts.total_units_of_work = sts.units_of_work_completed = 1;
sts.rows = fRowsReturned;
postStepSummaryTele(sts);
if (traceOn())
{
dlTimes.setEndOfInputTime();
printCalTrace();
}
return;
}
uint64_t WindowFunctionStep::nextFunctionIndex()
{
uint64_t idx = atomicInc(&fNextIndex);
// return index in the function array
return --idx;
}
void WindowFunctionStep::doFunction()
{
uint64_t i = 0;
try
{
while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled())
{
uint64_t memAdd = fRows.size() * sizeof(RowPosition);
if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
fMemUsage += memAdd;
fFunctions[i]->setCallback(this, i);
(*fFunctions[i].get())();
}
}
catch (IDBExcept& iex)
{
handleException(iex.what(), iex.errorCode());
}
catch(const std::exception& ex)
{
handleException(ex.what(), ERR_EXECUTE_WINDOW_FUNCTION);
}
catch(...)
{
handleException("doFunction caught an unknown exception", ERR_EXECUTE_WINDOW_FUNCTION);
}
}
void WindowFunctionStep::doPostProcessForSelect()
{
FuncExp* fe = funcexp::FuncExp::instance();
boost::shared_array<int> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
Row rowIn, rowOut;
fRowGroupIn.initRow(&rowIn);
fRowGroupOut.initRow(&rowOut);
RGData rgData;
vector<RowPosition>& rowData = *(fFunctions.back()->fRowData.get());
int64_t rowsLeft = rowData.size();
int64_t rowsInRg = 0;
int64_t rgCapacity = 0;
int64_t begin = fQueryLimitStart;
int64_t count = (fQueryLimitCount == (uint64_t) -1) ? rowsLeft : fQueryLimitCount;
int64_t end = begin + count;
end = (end < rowsLeft) ? end : rowsLeft;
rowsLeft = (end > begin) ? (end - begin) : 0;
if (fQueryOrderBy.get() != NULL)
sort(rowData.begin(), rowData.size());
for (int64_t i = begin; i < end; i++)
{
if (rgData.rowData.get() == NULL)
{
rgCapacity = ((rowsLeft > 8192) ? 8192 : rowsLeft);
rowsLeft -= rgCapacity;
rgData.reinit(fRowGroupOut, rgCapacity);
fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setDBRoot(0); // not valid dbroot
fRowGroupOut.getRow(0, &rowOut);
rowsInRg = 0;
}
rowIn.setData(getPointer(rowData[i], fRowGroupIn, rowIn));
// evaluate the window function expressions before apply mapping
if (fExpression.size() > 0)
fe->evaluate(rowIn, fExpression);
applyMapping(mapping, rowIn, &rowOut);
rowOut.nextRow();
rowsInRg++;
if (rowsInRg == rgCapacity)
{
fRowGroupOut.setRowCount(rowsInRg);
fOutputDL->insert(rgData);
rgData.clear();
}
}
}
void WindowFunctionStep::doPostProcessForDml()
{
FuncExp* fe = funcexp::FuncExp::instance();
boost::shared_array<int> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
Row rowIn, rowOut;
fRowGroupIn.initRow(&rowIn);
fRowGroupOut.initRow(&rowOut);
for (vector<RGData>::iterator i = fInRowGroupData.begin();
i < fInRowGroupData.end(); i++)
{
fRowGroupIn.setData(&(*i));
RGData rgData = RGData(fRowGroupIn, fRowGroupIn.getRowCount());
fRowGroupOut.setData(&rgData);
// @bug 5631. reset rowgroup before the data is populated.
fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
fRowGroupIn.getRow(0, &rowIn);
fRowGroupOut.getRow(0, &rowOut);
for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
{
// evaluate the window function expressions before apply mapping
if (fExpression.size() > 0)
fe->evaluate(rowIn, fExpression);
applyMapping(mapping, rowIn, &rowOut);
rowIn.nextRow();
rowOut.nextRow();
}
//fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
//fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
fOutputDL->insert(rgData);
}
}
void WindowFunctionStep::handleException(string errStr, int errCode)
{
cerr << "Exception: " << errStr << endl;
catchHandler(errStr, errCode, fErrorInfo, fSessionId);
}
boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRows(
const execplan::WF_Boundary& b,
const map<uint64_t, uint64_t>& m,
JobInfo& jobInfo)
{
boost::shared_ptr<FrameBound> fb;
if (b.fFrame == WF_CURRENT_ROW)
{
fb.reset(new FrameBoundRow(WF__CURRENT_ROW));
return fb;
}
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
if (cc != NULL)
{
Row dummy;
bool isNull = false;
int val = cc->getIntVal(dummy, isNull);
if (val >= 0 && isNull == false)
{
int type = (b.fFrame==WF_PRECEDING)?WF__CONSTANT_PRECEDING:WF__CONSTANT_FOLLOWING;
fb.reset(new FrameBoundConstantRow(type, val));
}
else
{
string str("NULL");
if (!isNull)
{
ostringstream oss;
oss << val;
str = oss.str();
}
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
ERR_WF_BOUND_OUT_OF_RANGE);
}
}
else
{
int type = (b.fFrame==WF_PRECEDING)?WF__EXPRESSION_PRECEDING:WF__EXPRESSION_FOLLOWING;
uint64_t id = getTupleKey(jobInfo, b.fVal);
uint64_t idx = getColumnIndex(b.fVal, m, jobInfo);
TupleInfo ti = getTupleInfo(id, jobInfo);
switch(ti.dtype)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
{
fb.reset(new FrameBoundExpressionRow<int64_t>(type, id, idx));
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
fb.reset(new FrameBoundExpressionRow<double>(type, id, idx));
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
fb.reset(new FrameBoundExpressionRow<float>(type, id, idx));
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
case execplan::CalpontSystemCatalog::UDECIMAL:
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
{
fb.reset(new FrameBoundExpressionRow<uint64_t>(type, id, idx));
break;
}
default:
{
string str = windowfunction::colType2String[ti.dtype];
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
ERR_WF_INVALID_BOUND_TYPE);
break;
}
}
}
return fb;
}
boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRange(const execplan::WF_Boundary& b,
const map<uint64_t, uint64_t>& m,
const vector<SRCP>& o,
JobInfo& jobInfo)
{
boost::shared_ptr<FrameBound> fb;
if (b.fFrame == WF_CURRENT_ROW)
{
fb.reset(new FrameBoundRange(WF__CURRENT_ROW));
return fb;
}
bool isConstant = false;
bool isNull = false;
Row dummy;
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
if (cc != NULL)
{
isConstant = true;
double val = cc->getDoubleVal(dummy, isNull);
if (val < 0 || isNull)
{
string str("NULL");
if (!isNull)
{
ostringstream oss;
oss << val;
str = oss.str();
}
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
ERR_WF_BOUND_OUT_OF_RANGE);
}
}
int type = 0;
vector<uint64_t> ids;
vector<int> index;
ids.push_back(getTupleKey(jobInfo, o[0]));
index.push_back(getColumnIndex(o[0], m, jobInfo));
if (isConstant)
{
type = (b.fFrame==WF_PRECEDING)?WF__CONSTANT_PRECEDING:WF__CONSTANT_FOLLOWING;
ids.push_back(-1); // dummy, n/a for constant
index.push_back(-1); // dummy, n/a for constant
}
else
{
type = (b.fFrame==WF_PRECEDING)?WF__EXPRESSION_PRECEDING:WF__EXPRESSION_FOLLOWING;
ids.push_back(getTupleKey(jobInfo, b.fVal));
index.push_back(getColumnIndex(b.fVal, m, jobInfo));
}
ids.push_back(getTupleKey(jobInfo, b.fBound));
index.push_back(getColumnIndex(b.fBound, m, jobInfo));
FrameBoundRange* fbr = NULL;
TupleInfo ti = getTupleInfo(ids[0], jobInfo);
bool asc = o[0]->asc();
bool nlf = o[0]->nullsFirst();
switch(ti.dtype)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DECIMAL:
{
if (isConstant)
{
int64_t v = cc->getIntVal(dummy, isNull);
fbr = new FrameBoundConstantRange<int64_t>(type, asc, nlf, &v);
fbr->isZero((v == 0));
}
else
{
fbr = new FrameBoundExpressionRange<int64_t>(type, asc, nlf);
}
break;
}
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
if (isConstant)
{
double v = cc->getDoubleVal(dummy, isNull);
fbr = new FrameBoundConstantRange<double>(type, asc, nlf, &v);
fbr->isZero((v == 0.0));
}
else
{
fbr = new FrameBoundExpressionRange<double>(type, asc, nlf);
}
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::UFLOAT:
{
if (isConstant)
{
float v = cc->getFloatVal(dummy, isNull);
fbr = new FrameBoundConstantRange<float>(type, asc, nlf, &v);
fbr->isZero((v == 0.0));
}
else
{
fbr = new FrameBoundExpressionRange<float>(type, asc, nlf);
}
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
case execplan::CalpontSystemCatalog::UDECIMAL:
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::DATETIME:
{
if (isConstant)
{
uint64_t v = cc->getUintVal(dummy, isNull);
fbr = new FrameBoundConstantRange<uint64_t>(type, asc, nlf, &v);
fbr->isZero((v == 0));
}
else
{
fbr = new FrameBoundExpressionRange<uint64_t>(type, asc, nlf);
}
break;
}
default:
{
string str = windowfunction::colType2String[ti.dtype];
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
ERR_WF_INVALID_BOUND_TYPE);
break;
}
}
fbr->setTupleId(ids);
fbr->setIndex(index);
fb.reset(fbr);
return fb;
}
boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBound(const execplan::WF_Boundary& b,
const map<uint64_t, uint64_t>& m,
const vector<SRCP>& o,
const boost::shared_ptr<EqualCompData>& p,
JobInfo& j,
bool rows,
bool s)
{
boost::shared_ptr<FrameBound> fb;
switch(b.fFrame)
{
case WF_UNBOUNDED_PRECEDING:
{
fb.reset(new FrameBound(WF__UNBOUNDED_PRECEDING));
break;
}
case WF_UNBOUNDED_FOLLOWING:
{
fb.reset(new FrameBound(WF__UNBOUNDED_FOLLOWING));
break;
}
case WF_CURRENT_ROW:
case WF_PRECEDING:
case WF_FOLLOWING:
{
if (rows)
{
fb = parseFrameBoundRows(b, m, j);
}
else
{
fb = parseFrameBoundRange(b, m, o, j);
}
break;
}
default: // unknown
{
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_UNKNOWN_BOUND, b.fFrame),
ERR_WF_UNKNOWN_BOUND);
break;
}
}
fb->peer(p);
fb->start(s);
return fb;
}
void WindowFunctionStep::updateWindowCols(ReturnedColumn* rc,
const map<uint64_t, uint64_t>& m,
JobInfo& jobInfo)
{
if (rc == NULL)
return;
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(rc);
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
SimpleFilter* sf = dynamic_cast<SimpleFilter*>(rc);
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(rc);
if (wc)
{
uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
map<uint64_t, uint64_t>::const_iterator j = m.find(key);
if (j == m.end())
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
ERR_WF_COLUMN_MISSING);
}
wc->inputIndex(j->second);
}
else if (ac)
{
updateWindowCols(ac->expression(), m, jobInfo);
}
else if (fc)
{
funcexp::FunctionParm parms = fc->functionParms();
for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
updateWindowCols(i->get(), m, jobInfo);
}
else if (sf)
{
updateWindowCols(sf->lhs(), m, jobInfo);
updateWindowCols(sf->rhs(), m, jobInfo);
}
}
void WindowFunctionStep::updateWindowCols(ParseTree* pt,
const map<uint64_t, uint64_t>& m,
JobInfo& jobInfo)
{
if (pt == NULL)
return;
updateWindowCols(pt->left(), m, jobInfo);
updateWindowCols(pt->right(), m, jobInfo);
TreeNode* tn = pt->data();
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(tn);
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(tn);
SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(tn);
if (wc)
{
uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
map<uint64_t, uint64_t>::const_iterator j = m.find(key);
if (j == m.end())
{
string name = jobInfo.keyInfo->tupleKeyToName[key];
cerr << name << " is not in tuple, key=" << key << endl;
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
ERR_WF_COLUMN_MISSING);
}
wc->inputIndex(j->second);
}
else if (ac)
{
updateWindowCols(ac->expression(), m, jobInfo);
}
else if (fc)
{
funcexp::FunctionParm parms = fc->functionParms();
for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
updateWindowCols(i->get(), m, jobInfo);
}
else if (sf)
{
updateWindowCols(sf->lhs(), m, jobInfo);
updateWindowCols(sf->rhs(), m, jobInfo);
}
}
void WindowFunctionStep::sort(std::vector<RowPosition>::iterator v, uint64_t n)
{
// recursive function termination condition.
if (n < 2 || cancelled())
return;
RowPosition p = *(v + n/2); // pivot value
vector<RowPosition>::iterator l = v; // low address
vector<RowPosition>::iterator h = v + (n - 1); // high address
while (l <= h && !cancelled())
{
// Can use while here, but need check boundary and cancel status.
if (fQueryOrderBy->operator()(getPointer(*l), getPointer(p)))
{
l++;
}
else if (fQueryOrderBy->operator()(getPointer(p), getPointer(*h)))
{
h--;
}
else
{
RowPosition t = *l; // temp value for swap
*l++ = *h;
*h-- = t;
}
}
sort(v, distance(v, h) + 1);
sort(l, distance(l, v) + n);
}
void WindowFunctionStep::printCalTrace()
{
time_t t = time (0);
char timeString[50];
ctime_r (&t, timeString);
timeString[strlen (timeString )-1] = '\0';
ostringstream logStr;
logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at "<< timeString
<< "; total rows returned-" << fRowsReturned << endl
<< "\t1st read " << dlTimes.FirstReadTimeString()
<< "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
<< "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
<< "\tJob completion status " << status() << endl;
logEnd(logStr.str().c_str());
fExtendedInfo += logStr.str();
formatMiniStats();
}
void WindowFunctionStep::formatMiniStats()
{
ostringstream oss;
oss << "WFS "
<< "UM "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< "- "
<< JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
<< fRowsReturned << " ";
fMiniInfo += oss.str();
}
} //namespace
// vim:ts=4 sw=4: