You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1590 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1590 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
|    Copyright (c) 2016-2020 MariaDB
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| //  $Id: windowfunctionstep.cpp 9681 2013-07-11 22:58:05Z xlou $
 | |
| 
 | |
| // #define NDEBUG
 | |
| #include <cassert>
 | |
| #include <sstream>
 | |
| #include <iomanip>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/algorithm/string.hpp>  //  to_upper_copy
 | |
| #include <boost/shared_ptr.hpp>
 | |
| 
 | |
| #include <boost/thread.hpp>
 | |
| #include <boost/uuid/uuid_io.hpp>
 | |
| using namespace boost;
 | |
| 
 | |
| #include "atomicops.h"
 | |
| using namespace atomicops;
 | |
| 
 | |
| #include "loggingid.h"
 | |
| #include "errorcodes.h"
 | |
| #include "idberrorinfo.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "configcpp.h"
 | |
| using namespace config;
 | |
| 
 | |
| #include "calpontselectexecutionplan.h"
 | |
| #include "calpontsystemcatalog.h"
 | |
| #include "aggregatecolumn.h"
 | |
| #include "arithmeticcolumn.h"
 | |
| #include "constantcolumn.h"
 | |
| #include "functioncolumn.h"
 | |
| #include "pseudocolumn.h"
 | |
| #include "simplefilter.h"
 | |
| #include "windowfunctioncolumn.h"
 | |
| using namespace execplan;
 | |
| 
 | |
| #include "../../utils/windowfunction/windowfunction.h"
 | |
| #include "../../utils/windowfunction/windowfunctiontype.h"
 | |
| #include "../../utils/windowfunction/framebound.h"
 | |
| #include "../../utils/windowfunction/frameboundrange.h"
 | |
| #include "../../utils/windowfunction/frameboundrow.h"
 | |
| #include "../../utils/windowfunction/windowframe.h"
 | |
| using namespace windowfunction;
 | |
| 
 | |
| #include "rowgroup.h"
 | |
| using namespace rowgroup;
 | |
| 
 | |
| using namespace ordering;
 | |
| 
 | |
| #include "funcexp.h"
 | |
| using namespace funcexp;
 | |
| 
 | |
| #include "querytele.h"
 | |
| using namespace querytele;
 | |
| 
 | |
| #include "jlf_common.h"
 | |
| #include "jobstep.h"
 | |
| #include "windowfunctionstep.h"
 | |
| using namespace joblist;
 | |
| 
 | |
| #include "checks.h"
 | |
| 
 | |
| namespace
 | |
| {
 | |
| uint64_t getColumnIndex(const SRCP& c, const map<uint64_t, uint64_t>& m, JobInfo& jobInfo)
 | |
| {
 | |
|   uint64_t key = getTupleKey(jobInfo, c, true);
 | |
|   const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(c.get());
 | |
| 
 | |
|   if (sc != NULL && !sc->schemaName().empty())
 | |
|   {
 | |
|     // special handling for dictionary
 | |
|     CalpontSystemCatalog::ColType ct = sc->colType();
 | |
| 
 | |
|     // XXX use this before connector sets colType in sc correctly.
 | |
|     //    type of pseudo column is set by connector
 | |
|     if (!(dynamic_cast<const PseudoColumn*>(sc)))
 | |
|     {
 | |
|       ct = jobInfo.csc->colType(sc->oid());
 | |
|       ct.charsetNumber = sc->colType().charsetNumber;
 | |
|     }
 | |
| 
 | |
|     // X
 | |
|     CalpontSystemCatalog::OID dictOid = isDictCol(ct);
 | |
|     string alias(extractTableAlias(sc));
 | |
| 
 | |
|     if (dictOid > 0)
 | |
|     {
 | |
|       TupleInfo ti = setTupleInfo(ct, dictOid, jobInfo, tableOid(sc, jobInfo.csc), sc, alias);
 | |
|       key = ti.key;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   map<uint64_t, uint64_t>::const_iterator j = m.find(key);
 | |
| 
 | |
|   if (j == m.end())
 | |
|   {
 | |
|     string name = jobInfo.keyInfo->tupleKeyToName[key];
 | |
|     cerr << name << " is not in tuple, key=" << key << endl;
 | |
|     throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING);
 | |
|   }
 | |
| 
 | |
|   return j->second;
 | |
| }
 | |
| 
 | |
| }  // namespace
 | |
| 
 | |
| namespace joblist
 | |
| {
 | |
| WindowFunctionStep::WindowFunctionStep(const JobInfo& jobInfo)
 | |
|  : JobStep(jobInfo)
 | |
|  , fRunner(0)
 | |
|  , fCatalog(jobInfo.csc)
 | |
|  , fRowsReturned(0)
 | |
|  , fEndOfResult(false)
 | |
|  , fIsSelect(true)
 | |
|  , fUseSSMutex(false)
 | |
|  , fUseUFMutex(false)
 | |
|  , fInputDL(NULL)
 | |
|  , fOutputDL(NULL)
 | |
|  , fInputIterator(-1)
 | |
|  , fOutputIterator(-1)
 | |
|  , fFunctionCount(0)
 | |
|  , fTotalThreads(1)
 | |
|  , fNextIndex(0)
 | |
|  , fMemUsage(0)
 | |
|  , fRm(jobInfo.rm)
 | |
|  , fSessionMemLimit(jobInfo.umMemLimit)
 | |
| {
 | |
|   fTotalThreads = fRm->windowFunctionThreads();
 | |
|   fExtendedInfo = "WFS: ";
 | |
|   fQtc.stepParms().stepType = StepTeleStats::T_WFS;
 | |
| }
 | |
| 
 | |
| WindowFunctionStep::~WindowFunctionStep()
 | |
| {
 | |
|   if (fMemUsage > 0)
 | |
|     fRm->returnMemory(fMemUsage, fSessionMemLimit);
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::run()
 | |
| {
 | |
|   if (fInputJobStepAssociation.outSize() == 0)
 | |
|     throw logic_error("No input data list for window function step.");
 | |
| 
 | |
|   fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
 | |
| 
 | |
|   if (fInputDL == NULL)
 | |
|     throw logic_error("Input is not a RowGroup data list in window function step.");
 | |
| 
 | |
|   fInputIterator = fInputDL->getIterator();
 | |
| 
 | |
|   if (fOutputJobStepAssociation.outSize() == 0)
 | |
|     throw logic_error("No output data list for window function step.");
 | |
| 
 | |
|   fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
 | |
| 
 | |
|   if (fOutputDL == NULL)
 | |
|     throw logic_error("Output of window function step is not a RowGroup data list.");
 | |
| 
 | |
|   if (fDelivery == true)
 | |
|   {
 | |
|     fOutputIterator = fOutputDL->getIterator();
 | |
|   }
 | |
| 
 | |
|   fRunner = jobstepThreadPool.invoke(Runner(this));
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::join()
 | |
| {
 | |
|   if (fRunner)
 | |
|     jobstepThreadPool.join(fRunner);
 | |
| }
 | |
| 
 | |
| uint32_t WindowFunctionStep::nextBand(messageqcpp::ByteStream& bs)
 | |
| {
 | |
|   RGData rgDataOut;
 | |
|   bool more = false;
 | |
|   uint32_t rowCount = 0;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     bs.restart();
 | |
| 
 | |
|     more = fOutputDL->next(fOutputIterator, &rgDataOut);
 | |
| 
 | |
|     if (more && !cancelled())
 | |
|     {
 | |
|       fRowGroupDelivered.setData(&rgDataOut);
 | |
|       fRowGroupDelivered.serializeRGData(bs);
 | |
|       rowCount = fRowGroupDelivered.getRowCount();
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       while (more)
 | |
|         more = fOutputDL->next(fOutputIterator, &rgDataOut);
 | |
| 
 | |
|       fEndOfResult = true;
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     handleException(std::current_exception(), logging::ERR_IN_DELIVERY, logging::ERR_WF_DATA_SET_TOO_BIG,
 | |
|                     "WindowFunctionStep::nextBand()");
 | |
|     while (more)
 | |
|       more = fOutputDL->next(fOutputIterator, &rgDataOut);
 | |
|     fEndOfResult = true;
 | |
|   }
 | |
| 
 | |
|   if (fEndOfResult)
 | |
|   {
 | |
|     // send an empty / error band
 | |
|     rgDataOut.reinit(fRowGroupDelivered, 0);
 | |
|     fRowGroupDelivered.setData(&rgDataOut);
 | |
|     fRowGroupDelivered.resetRowGroup(0);
 | |
|     fRowGroupDelivered.setStatus(status());
 | |
|     fRowGroupDelivered.serializeRGData(bs);
 | |
|   }
 | |
| 
 | |
|   return rowCount;
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::setOutputRowGroup(const RowGroup& /*rg*/)
 | |
| {
 | |
|   idbassert(0);
 | |
| }
 | |
| 
 | |
| const RowGroup& WindowFunctionStep::getOutputRowGroup() const
 | |
| {
 | |
|   return fRowGroupOut;
 | |
| }
 | |
| 
 | |
| const RowGroup& WindowFunctionStep::getDeliveredRowGroup() const
 | |
| {
 | |
|   return fRowGroupDelivered;
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::deliverStringTableRowGroup(bool b)
 | |
| {
 | |
|   fRowGroupOut.setUseStringTable(b);
 | |
|   fRowGroupDelivered.setUseStringTable(b);
 | |
| }
 | |
| 
 | |
| bool WindowFunctionStep::deliverStringTableRowGroup() const
 | |
| {
 | |
|   idbassert(fRowGroupOut.usesStringTable() == fRowGroupDelivered.usesStringTable());
 | |
|   return fRowGroupDelivered.usesStringTable();
 | |
| }
 | |
| 
 | |
| const string WindowFunctionStep::toString() const
 | |
| {
 | |
|   ostringstream oss;
 | |
|   oss << "WindowFunctionStep   ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
 | |
| 
 | |
|   oss << " in:";
 | |
| 
 | |
|   for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
 | |
|     oss << fInputJobStepAssociation.outAt(i);
 | |
| 
 | |
|   if (fOutputJobStepAssociation.outSize() > 0)
 | |
|   {
 | |
|     oss << " out:";
 | |
| 
 | |
|     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
 | |
|       oss << fOutputJobStepAssociation.outAt(i);
 | |
|   }
 | |
| 
 | |
|   return oss.str();
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::AddSimplColumn(const vector<SimpleColumn*>& scs, JobInfo& jobInfo)
 | |
| {
 | |
|   // append the simple columns if not already projected
 | |
|   set<UniqId> scProjected;
 | |
| 
 | |
|   for (RetColsVector::iterator i = jobInfo.projectionCols.begin(); i != jobInfo.projectionCols.end(); i++)
 | |
|   {
 | |
|     SimpleColumn* sc = dynamic_cast<SimpleColumn*>(i->get());
 | |
| 
 | |
|     if (sc != NULL)
 | |
|     {
 | |
|       if (sc->schemaName().empty())
 | |
|         sc->oid(joblist::tableOid(sc, jobInfo.csc) + 1 + sc->colPosition());
 | |
| 
 | |
|       scProjected.insert(UniqId(sc));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   for (vector<SimpleColumn*>::const_iterator i = scs.begin(); i != scs.end(); i++)
 | |
|   {
 | |
|     if (scProjected.find(UniqId(*i)) == scProjected.end())
 | |
|     {
 | |
|       jobInfo.windowDels.push_back(SRCP((*i)->clone()));
 | |
|       // MCOL-3343 Enable this if we decide to allow Window Functions to run with
 | |
|       // aggregates with no group by. MariaDB allows this. Nobody else in the world does.
 | |
|       // There will be more work to get it to function if we try this.
 | |
|       //            jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
 | |
|       scProjected.insert(UniqId(*i));
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
 | |
| {
 | |
|   // window functions in select clause, selected or in expression
 | |
|   jobInfo.windowDels = jobInfo.deliveredCols;
 | |
| 
 | |
|   for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
 | |
|   {
 | |
|     const vector<WindowFunctionColumn*>& wcl = (*i)->windowfunctionColumnList();
 | |
|     RetColsVector wcList;
 | |
| 
 | |
|     for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
 | |
|       wcList.push_back(SRCP((*j)->clone()));
 | |
| 
 | |
|     if (!wcList.empty())
 | |
|     {
 | |
|       jobInfo.windowExps.push_back(*i);
 | |
|       jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
 | |
|     }
 | |
| 
 | |
|     if (dynamic_cast<WindowFunctionColumn*>(i->get()) != NULL)
 | |
|     {
 | |
|       jobInfo.windowCols.push_back(*i);
 | |
|       jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true));
 | |
|     }
 | |
|     else if (!wcList.empty())
 | |
|     {
 | |
|       jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end());
 | |
| 
 | |
|       for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
 | |
|       {
 | |
|         jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // window functions in order by clause
 | |
|   const CalpontSelectExecutionPlan::OrderByColumnList& orderByCols = csep->orderByCols();
 | |
|   RetColsVector wcInOrderby;
 | |
| 
 | |
|   for (uint64_t i = 0; i < orderByCols.size(); i++)
 | |
|   {
 | |
|     if (orderByCols[i]->orderPos() == (uint64_t)(-1))
 | |
|     {
 | |
|       WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get());
 | |
|       const vector<WindowFunctionColumn*>& wcl = orderByCols[i]->windowfunctionColumnList();
 | |
|       RetColsVector wcList;
 | |
| 
 | |
|       for (vector<WindowFunctionColumn*>::const_iterator j = wcl.begin(); j != wcl.end(); j++)
 | |
|         wcList.push_back(SRCP((*j)->clone()));
 | |
| 
 | |
|       if (wc == NULL && wcList.empty())
 | |
|         continue;
 | |
| 
 | |
|       // an window function or expression of window functions
 | |
|       wcInOrderby.push_back(orderByCols[i]);
 | |
| 
 | |
|       if (!wcList.empty())
 | |
|       {
 | |
|         jobInfo.windowExps.push_back(orderByCols[i]);
 | |
|         jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
 | |
|       }
 | |
| 
 | |
|       if (dynamic_cast<WindowFunctionColumn*>(orderByCols[i].get()) != NULL)
 | |
|       {
 | |
|         jobInfo.windowCols.push_back(orderByCols[i]);
 | |
|         jobInfo.windowSet.insert(getTupleKey(jobInfo, orderByCols[i], true));
 | |
|       }
 | |
|       else if (!wcList.empty())
 | |
|       {
 | |
|         jobInfo.windowCols.insert(jobInfo.windowCols.end(), wcList.begin(), wcList.end());
 | |
| 
 | |
|         for (RetColsVector::const_iterator k = wcList.begin(); k < wcList.end(); k++)
 | |
|         {
 | |
|           jobInfo.windowSet.insert(getTupleKey(jobInfo, *k, true));
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // no window function involved in the query
 | |
|   if (jobInfo.windowCols.empty())
 | |
|     return;
 | |
| 
 | |
|   // Add in the non-window side of arithmetic columns and functions
 | |
|   for (uint64_t i = 0; i < jobInfo.windowExps.size(); i++)
 | |
|   {
 | |
|     const ArithmeticColumn* ac = dynamic_cast<const ArithmeticColumn*>(jobInfo.windowExps[i].get());
 | |
|     const FunctionColumn* fc = dynamic_cast<const FunctionColumn*>(jobInfo.windowExps[i].get());
 | |
| 
 | |
|     if (ac != NULL && ac->windowfunctionColumnList().size() > 0)
 | |
|     {
 | |
|       AddSimplColumn(ac->simpleColumnList(), jobInfo);
 | |
|     }
 | |
|     else if (fc != NULL && fc->windowfunctionColumnList().size() > 0)
 | |
|     {
 | |
|       AddSimplColumn(fc->simpleColumnList(), jobInfo);
 | |
|     }
 | |
|   }
 | |
|   // reconstruct the delivered column list with auxiliary columns
 | |
|   set<uint64_t> colSet;
 | |
|   jobInfo.deliveredCols.resize(0);
 | |
| 
 | |
|   for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
 | |
|   {
 | |
|     jobInfo.deliveredCols.push_back(*i);
 | |
|     uint64_t key = getTupleKey(jobInfo, *i, true);
 | |
| 
 | |
|     // TODO: remove duplicates in select clause
 | |
|     colSet.insert(key);
 | |
|   }
 | |
| 
 | |
|   // add window columns in orderby
 | |
|   for (RetColsVector::iterator i = wcInOrderby.begin(); i < wcInOrderby.end(); i++)
 | |
|   {
 | |
|     jobInfo.deliveredCols.push_back(*i);
 | |
|     uint64_t key = getTupleKey(jobInfo, *i, true);
 | |
|     colSet.insert(key);
 | |
|   }
 | |
| 
 | |
|   // MCOL-3435 We haven't yet checked for aggregate, but we need to know
 | |
|   bool hasAggregation = false;
 | |
|   for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++)
 | |
|   {
 | |
|     if (dynamic_cast<AggregateColumn*>(jobInfo.deliveredCols[i].get()) != NULL)
 | |
|     {
 | |
|       hasAggregation = true;
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // add non-duplicate auxiliary columns
 | |
|   RetColsVector colsInAf;
 | |
| 
 | |
|   for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
 | |
|   {
 | |
|     uint64_t key = getTupleKey(jobInfo, *i, true);
 | |
| 
 | |
|     if (colSet.find(key) == colSet.end())
 | |
|       jobInfo.deliveredCols.push_back(*i);
 | |
| 
 | |
|     RetColsVector columns = dynamic_cast<WindowFunctionColumn*>(i->get())->getColumnList();
 | |
| 
 | |
|     for (RetColsVector::iterator j = columns.begin(); j < columns.end(); j++)
 | |
|     {
 | |
|       if (dynamic_cast<ConstantColumn*>(j->get()) != NULL)
 | |
|         continue;
 | |
| 
 | |
|       key = getTupleKey(jobInfo, *j, true);
 | |
| 
 | |
|       if (colSet.find(key) == colSet.end())
 | |
|       {
 | |
|         jobInfo.deliveredCols.push_back(*j);
 | |
|         // MCOL-3435 Allow Window Functions to run with aggregates with
 | |
|         // no group by by inserting a group by for window parameters.
 | |
|         if (hasAggregation)
 | |
|         {
 | |
|           // If an argument is an AggregateColumn, don't group by it.
 | |
|           if (dynamic_cast<AggregateColumn*>(j->get()) == NULL)
 | |
|           {
 | |
|             bool bFound = false;
 | |
|             for (std::vector<SRCP>::iterator igpc = csep->groupByCols().begin();
 | |
|                  igpc < csep->groupByCols().end(); ++igpc)
 | |
|             {
 | |
|               if ((*igpc)->alias() == (*j)->alias())
 | |
|               {
 | |
|                 bFound = true;
 | |
|                 break;
 | |
|               }
 | |
|             }
 | |
|             if (!bFound)
 | |
|             {
 | |
|               csep->groupByCols().push_back(*j);
 | |
|             }
 | |
|           }
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       colSet.insert(key);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // for handling order by and limit in outer query
 | |
|   jobInfo.wfqLimitStart = csep->limitStart();
 | |
|   jobInfo.wfqLimitCount = csep->limitNum();
 | |
|   csep->limitStart(0);
 | |
|   csep->limitNum(-1);
 | |
| 
 | |
|   if (csep->orderByCols().size() > 0)
 | |
|   {
 | |
|     jobInfo.wfqOrderby = csep->orderByCols();
 | |
|     csep->orderByCols().clear();
 | |
| 
 | |
|     // add order by columns
 | |
|     for (RetColsVector::iterator i = jobInfo.wfqOrderby.begin(); i < jobInfo.wfqOrderby.end(); i++)
 | |
|     {
 | |
|       if (dynamic_cast<ConstantColumn*>(i->get()) != NULL)
 | |
|         continue;
 | |
| 
 | |
|       uint64_t key = getTupleKey(jobInfo, *i, true);
 | |
| 
 | |
|       if (colSet.find(key) == colSet.end())
 | |
|         jobInfo.deliveredCols.push_back(*i);
 | |
| 
 | |
|       colSet.insert(key);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| SJSTEP WindowFunctionStep::makeWindowFunctionStep(SJSTEP& step, JobInfo& jobInfo)
 | |
| {
 | |
|   // create a window function step
 | |
|   WindowFunctionStep* ws = new WindowFunctionStep(jobInfo);
 | |
| 
 | |
|   // connect to the feeding step
 | |
|   JobStepAssociation jsa;
 | |
|   AnyDataListSPtr spdl(new AnyDataList());
 | |
|   RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
 | |
|   dl->OID(execplan::CNX_VTABLE_ID);
 | |
|   spdl->rowGroupDL(dl);
 | |
|   jsa.outAdd(spdl);
 | |
|   ws->inputAssociation(jsa);
 | |
|   ws->stepId(step->stepId() + 1);
 | |
|   step->outputAssociation(jsa);
 | |
| 
 | |
|   AnyDataListSPtr spdlOut(new AnyDataList());
 | |
|   RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
 | |
|   dlOut->OID(CNX_VTABLE_ID);
 | |
|   spdlOut->rowGroupDL(dlOut);
 | |
|   JobStepAssociation jsaOut;
 | |
|   jsaOut.outAdd(spdlOut);
 | |
|   ws->outputAssociation(jsaOut);
 | |
| 
 | |
|   // configure the rowgroups and index mapping
 | |
|   TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(step.get());
 | |
|   idbassert(ds != NULL);
 | |
|   ws->initialize(ds->getDeliveredRowGroup(), jobInfo);
 | |
| 
 | |
|   // restore the original delivery coloumns
 | |
|   jobInfo.deliveredCols = jobInfo.windowDels;
 | |
|   jobInfo.nonConstDelCols.clear();
 | |
| 
 | |
|   for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
 | |
|   {
 | |
|     if (NULL == dynamic_cast<const ConstantColumn*>(i->get()))
 | |
|       jobInfo.nonConstDelCols.push_back(*i);
 | |
|   }
 | |
| 
 | |
|   return SJSTEP(ws);
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
 | |
| {
 | |
|   if (jobInfo.trace)
 | |
|     cout << "Input to WindowFunctionStep: " << rg.toString() << endl;
 | |
| 
 | |
|   // query type decides the output by dbroot or partition
 | |
|   // @bug 5631. Insert select should be treated as select
 | |
|   fIsSelect = (jobInfo.queryType == "SELECT" || jobInfo.queryType == "INSERT_SELECT");
 | |
| 
 | |
|   // input row meta data
 | |
|   fRowGroupIn = rg;
 | |
|   fRowGroupIn.initRow(&fRowIn);
 | |
| 
 | |
|   // make an input map(id, index)
 | |
|   map<uint64_t, uint64_t> colIndexMap;
 | |
|   uint64_t colCntIn = rg.getColumnCount();
 | |
|   const vector<uint32_t>& pos = rg.getOffsets();
 | |
|   const vector<uint32_t>& oids = rg.getOIDs();
 | |
|   const vector<uint32_t>& keys = rg.getKeys();
 | |
|   const vector<CalpontSystemCatalog::ColDataType>& types = rg.getColTypes();
 | |
|   const vector<uint32_t>& csNums = rg.getCharsetNumbers();
 | |
|   const vector<uint32_t>& scales = rg.getScale();
 | |
|   const vector<uint32_t>& precisions = rg.getPrecision();
 | |
| 
 | |
|   for (uint64_t i = 0; i < colCntIn; i++)
 | |
|     colIndexMap.insert(make_pair(keys[i], i));
 | |
| 
 | |
|   // @bug6065, window functions that will update string table
 | |
|   int64_t wfsUpdateStringTable = 0;
 | |
|   int64_t wfsUserFunctionCount = 0;
 | |
| 
 | |
|   for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
 | |
|   {
 | |
|     bool isUDAF = false;
 | |
|     // window function type
 | |
|     WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
 | |
|     uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo);  // result index
 | |
|     // @bug6065, window functions that will update string table
 | |
|     {
 | |
|       CalpontSystemCatalog::ColType rt = wc->resultType();
 | |
| 
 | |
|       if ((types[ridx] == CalpontSystemCatalog::CHAR || types[ridx] == CalpontSystemCatalog::VARCHAR ||
 | |
|            types[ridx] == CalpontSystemCatalog::TEXT || types[ridx] == CalpontSystemCatalog::VARBINARY ||
 | |
|            types[ridx] == CalpontSystemCatalog::BLOB) &&
 | |
|           rg.getColumnWidth(ridx) >= jobInfo.stringTableThreshold)
 | |
|       {
 | |
|         ++wfsUpdateStringTable;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     //		if (boost::iequals(wc->functionName(),"UDAF_FUNC")
 | |
|     if (wc->functionName() == "UDAF_FUNC")
 | |
|     {
 | |
|       isUDAF = true;
 | |
|       ++wfsUserFunctionCount;
 | |
|     }
 | |
| 
 | |
|     vector<int64_t> fields;
 | |
|     fields.push_back(ridx);  // result
 | |
|     const RetColsVector& parms = wc->functionParms();
 | |
| 
 | |
|     for (uint64_t i = 0; i < parms.size(); i++)  // arguments
 | |
|     {
 | |
|       // skip constant column
 | |
|       if (dynamic_cast<const ConstantColumn*>(parms[i].get()) == NULL)
 | |
|         fields.push_back(getColumnIndex(parms[i], colIndexMap, jobInfo));
 | |
|       else
 | |
|         fields.push_back(-1);
 | |
|     }
 | |
| 
 | |
|     // partition & order by
 | |
|     const RetColsVector& partitions = wc->partitions();
 | |
|     vector<uint64_t> eqIdx;
 | |
|     vector<uint64_t> peerIdx;
 | |
|     vector<IdbSortSpec> sorts;
 | |
| 
 | |
|     for (uint64_t i = 0; i < partitions.size(); i++)
 | |
|     {
 | |
|       // skip constant column
 | |
|       if (dynamic_cast<const ConstantColumn*>(partitions[i].get()) != NULL)
 | |
|         continue;
 | |
| 
 | |
|       // get column index
 | |
|       uint64_t idx = getColumnIndex(partitions[i], colIndexMap, jobInfo);
 | |
|       eqIdx.push_back(idx);
 | |
|       sorts.push_back(IdbSortSpec(idx, partitions[i]->asc(), partitions[i]->nullsFirst()));
 | |
|     }
 | |
| 
 | |
|     const RetColsVector& orders = wc->orderBy().fOrders;
 | |
| 
 | |
|     for (uint64_t i = 0; i < orders.size(); i++)
 | |
|     {
 | |
|       // skip constant column
 | |
|       if (dynamic_cast<const ConstantColumn*>(orders[i].get()) != NULL)
 | |
|         continue;
 | |
| 
 | |
|       // get column index
 | |
|       uint64_t idx = getColumnIndex(orders[i], colIndexMap, jobInfo);
 | |
|       peerIdx.push_back(idx);
 | |
|       sorts.push_back(IdbSortSpec(idx, orders[i]->asc(), orders[i]->nullsFirst()));
 | |
|     }
 | |
| 
 | |
|     // functors for sorting
 | |
|     boost::shared_ptr<EqualCompData> parts(new EqualCompData(eqIdx, rg));
 | |
|     boost::shared_ptr<OrderByData> orderbys(new OrderByData(sorts, rg));
 | |
|     boost::shared_ptr<EqualCompData> peers(new EqualCompData(peerIdx, rg));
 | |
| 
 | |
|     // column type for functor templates
 | |
|     int ct = 0;
 | |
| 
 | |
|     if (isUDAF)
 | |
|     {
 | |
|       ct = wc->getUDAFContext().getResultType();
 | |
|     }
 | |
|     // make sure index is in range
 | |
|     else if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
 | |
|       ct = types[fields[1]];
 | |
| 
 | |
|     // workaround for functions using "within group (order by)" syntax
 | |
|     string fn = boost::to_upper_copy(wc->functionName());
 | |
| 
 | |
|     if ((fn == "MEDIAN" || fn == "PERCENTILE_CONT" || fn == "PERCENTILE_DISC") &&
 | |
|         utils::is_nonnegative(peerIdx[0]) && peerIdx[0] < types.size())
 | |
|       ct = types[peerIdx[0]];
 | |
| 
 | |
|     // create the functor based on function name
 | |
|     boost::shared_ptr<WindowFunctionType> func = WindowFunctionType::makeWindowFunction(fn, ct, wc);
 | |
| 
 | |
|     // parse parms after peer and fields are set
 | |
|     // functions may need to set order column index
 | |
|     func->peer(peers);
 | |
|     func->fieldIndex(fields);
 | |
|     func->parseParms(parms);
 | |
| 
 | |
|     // window frame
 | |
|     const WF_Frame& frame = wc->orderBy().fFrame;
 | |
|     int frameUnit = (frame.fIsRange) ? WF__FRAME_RANGE : WF__FRAME_ROWS;
 | |
| 
 | |
|     if (frame.fStart.fFrame == WF_UNBOUNDED_PRECEDING && frame.fEnd.fFrame == WF_UNBOUNDED_FOLLOWING)
 | |
|       frameUnit = WF__FRAME_ROWS;
 | |
| 
 | |
|     boost::shared_ptr<FrameBound> upper =
 | |
|         parseFrameBound(frame.fStart, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, true);
 | |
|     boost::shared_ptr<FrameBound> lower =
 | |
|         parseFrameBound(frame.fEnd, colIndexMap, orders, peers, jobInfo, !frame.fIsRange, false);
 | |
|     boost::shared_ptr<WindowFrame> windows(new WindowFrame(frameUnit, upper, lower));
 | |
|     func->frameUnit(frameUnit);
 | |
| 
 | |
|     // add to the function list
 | |
|     fFunctions.push_back(
 | |
|         boost::shared_ptr<WindowFunction>(new WindowFunction(func, parts, orderbys, windows, rg, fRowIn)));
 | |
|     fFunctionCount++;
 | |
|   }
 | |
| 
 | |
|   // initialize window function expresssions
 | |
|   fExpression = jobInfo.windowExps;
 | |
| 
 | |
|   for (RetColsVector::iterator i = fExpression.begin(); i < fExpression.end(); i++)
 | |
|   {
 | |
|     // output index
 | |
|     (*i)->outputIndex(getColumnIndex(*i, colIndexMap, jobInfo));
 | |
| 
 | |
|     // map the input indices
 | |
|     const vector<SimpleColumn*>& scols = (*i)->simpleColumnList();
 | |
| 
 | |
|     for (vector<SimpleColumn*>::const_iterator j = scols.begin(); j != scols.end(); j++)
 | |
|     {
 | |
|       uint64_t key = getTupleKey(jobInfo, *j);
 | |
|       CalpontSystemCatalog::OID dictOid = joblist::isDictCol((*j)->colType());
 | |
| 
 | |
|       if (dictOid > 0)
 | |
|       {
 | |
|         key = jobInfo.keyInfo->dictKeyMap[key];
 | |
|       }
 | |
| 
 | |
|       map<uint64_t, uint64_t>::iterator k = colIndexMap.find(key);
 | |
| 
 | |
|       if (k != colIndexMap.end())
 | |
|       {
 | |
|         (*j)->inputIndex(k->second);
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         string name = jobInfo.keyInfo->tupleKeyToName[key];
 | |
|         cerr << name << " is not in tuple, key=" << key << endl;
 | |
|         throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name),
 | |
|                         ERR_WF_COLUMN_MISSING);
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>((*i).get());
 | |
|     FunctionColumn* fc = dynamic_cast<FunctionColumn*>((*i).get());
 | |
| 
 | |
|     if (ac != NULL)
 | |
|     {
 | |
|       updateWindowCols(ac->expression(), colIndexMap, jobInfo);
 | |
|     }
 | |
|     else if (fc != NULL)
 | |
|     {
 | |
|       //			RetColsVector wcList = fc->windowfunctionColumnList();
 | |
|       //			for (RetColsVector::iterator j = wcList.begin(); j != wcList.end(); j++)
 | |
|       //				(*j)->inputIndex(getColumnIndex(*j, colIndexMap, jobInfo));
 | |
|       funcexp::FunctionParm parms = fc->functionParms();
 | |
| 
 | |
|       for (vector<execplan::SPTP>::iterator j = parms.begin(); j < parms.end(); j++)
 | |
|         updateWindowCols(j->get(), colIndexMap, jobInfo);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // order by part
 | |
|   if (jobInfo.wfqOrderby.size() > 0)
 | |
|   {
 | |
|     // query order by
 | |
|     vector<uint64_t> eqIdx;
 | |
|     vector<IdbSortSpec> sorts;
 | |
|     const RetColsVector& orderby = jobInfo.wfqOrderby;
 | |
| 
 | |
|     for (uint64_t i = 0; i < orderby.size(); i++)
 | |
|     {
 | |
|       // skip constant column
 | |
|       if (dynamic_cast<const ConstantColumn*>(orderby[i].get()) != NULL)
 | |
|         continue;
 | |
| 
 | |
|       // get column index
 | |
|       uint64_t idx = getColumnIndex(orderby[i], colIndexMap, jobInfo);
 | |
|       sorts.push_back(IdbSortSpec(idx, orderby[i]->asc(), orderby[i]->nullsFirst()));
 | |
|     }
 | |
| 
 | |
|     fQueryOrderBy.reset(new OrderByData(sorts, rg));
 | |
|   }
 | |
| 
 | |
|   // limit part
 | |
|   fQueryLimitStart = jobInfo.wfqLimitStart;
 | |
|   fQueryLimitCount = jobInfo.wfqLimitCount;
 | |
| 
 | |
|   // fix the delivered rowgroup data
 | |
|   vector<uint64_t> delColIdx;
 | |
| 
 | |
|   for (RetColsVector::iterator i = jobInfo.windowDels.begin(); i < jobInfo.windowDels.end(); i++)
 | |
|   {
 | |
|     // find the none constantant columns in the deliver
 | |
|     // leave constants to annexstep for now.
 | |
|     if (dynamic_cast<const ConstantColumn*>((*i).get()) != NULL)
 | |
|       continue;
 | |
| 
 | |
|     delColIdx.push_back(getColumnIndex(*i, colIndexMap, jobInfo));
 | |
|   }
 | |
| 
 | |
|   size_t retColCount = delColIdx.size();
 | |
|   vector<uint32_t> pos1;
 | |
|   vector<uint32_t> oids1;
 | |
|   vector<uint32_t> keys1;
 | |
|   vector<uint32_t> scales1;
 | |
|   vector<uint32_t> precisions1;
 | |
|   vector<CalpontSystemCatalog::ColDataType> types1;
 | |
|   vector<uint32_t> csNums1;
 | |
|   pos1.push_back(2);
 | |
| 
 | |
|   for (size_t i = 0; i < retColCount; i++)
 | |
|   {
 | |
|     size_t j = delColIdx[i];
 | |
|     pos1.push_back(pos1[i] + (pos[j + 1] - pos[j]));
 | |
|     oids1.push_back(oids[j]);
 | |
|     keys1.push_back(keys[j]);
 | |
|     scales1.push_back(scales[j]);
 | |
|     precisions1.push_back(precisions[j]);
 | |
|     types1.push_back(types[j]);
 | |
|     csNums1.push_back(csNums[j]);
 | |
|   }
 | |
| 
 | |
|   fRowGroupDelivered = RowGroup(retColCount, pos1, oids1, keys1, types1, csNums1, scales1, precisions1,
 | |
|                                 jobInfo.stringTableThreshold);
 | |
| 
 | |
|   if (jobInfo.trace)
 | |
|     cout << "delivered RG: " << fRowGroupDelivered.toString() << endl << endl;
 | |
| 
 | |
|   if (wfsUpdateStringTable > 1)
 | |
|     fUseSSMutex = true;
 | |
| 
 | |
|   if (wfsUserFunctionCount > 1)
 | |
|     fUseUFMutex = true;
 | |
| 
 | |
|   fRowGroupOut = fRowGroupDelivered;
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::execute()
 | |
| {
 | |
|   RGData rgData;
 | |
|   Row row;
 | |
|   fRowGroupIn.initRow(&row);
 | |
|   bool more = fInputDL->next(fInputIterator, &rgData);
 | |
|   uint64_t i = 0;  // for RowGroup index in the fInRowGroupData
 | |
| 
 | |
|   if (traceOn())
 | |
|     dlTimes.setFirstReadTime();
 | |
| 
 | |
|   {
 | |
|     StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_START, 1);
 | |
|     postStepStartTele(sts);
 | |
|   }
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     while (more && !cancelled())
 | |
|     {
 | |
|       fRowGroupIn.setData(&rgData);
 | |
|       fRowGroupIn.getRow(0, &row);
 | |
|       uint64_t rowCnt = fRowGroupIn.getRowCount();
 | |
| 
 | |
|       if (rowCnt > 0)
 | |
|       {
 | |
|         fInRowGroupData.push_back(rgData);
 | |
|         uint64_t memAdd = fRowGroupIn.getSizeWithStrings() + rowCnt * sizeof(RowPosition);
 | |
| 
 | |
|         if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
 | |
|           throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
 | |
| 
 | |
|         fMemUsage += memAdd;
 | |
| 
 | |
|         for (uint64_t j = 0; j < rowCnt; ++j)
 | |
|         {
 | |
|           if (i > 0x0000FFFFFFFFFFFFULL || j > 0x000000000000FFFFULL)
 | |
|             throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
 | |
| 
 | |
|           fRows.push_back(RowPosition(i, j));
 | |
|           row.nextRow();
 | |
|         }
 | |
| 
 | |
|         //@bug6065, make StringStore::storeString() thread safe, default to false.
 | |
|         rgData.useStoreStringMutex(fUseSSMutex);
 | |
|         // For the User Data of UDAnF
 | |
|         rgData.useUserDataMutex(fUseUFMutex);
 | |
| 
 | |
|         // window function does not change row count
 | |
|         fRowsReturned += rowCnt;
 | |
| 
 | |
|         i++;
 | |
|       }
 | |
| 
 | |
|       more = fInputDL->next(fInputIterator, &rgData);
 | |
|     }
 | |
|   }  // try
 | |
|   catch (...)
 | |
|   {
 | |
|     handleException(std::current_exception(), logging::ERR_READ_INPUT_DATALIST,
 | |
|                     logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::execute()");
 | |
|   }
 | |
| 
 | |
|   if (traceOn())
 | |
|     dlTimes.setLastReadTime();
 | |
| 
 | |
|   // no need for the window function if aborted or result set is empty.
 | |
|   if (cancelled() || fRows.size() == 0)
 | |
|   {
 | |
|     while (more)
 | |
|       more = fInputDL->next(fInputIterator, &rgData);
 | |
| 
 | |
|     fOutputDL->endOfInput();
 | |
| 
 | |
|     StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_SUMMARY, 1, 1, fRowsReturned);
 | |
|     postStepSummaryTele(sts);
 | |
| 
 | |
|     if (traceOn())
 | |
|     {
 | |
|       dlTimes.setEndOfInputTime();
 | |
|       printCalTrace();
 | |
|     }
 | |
| 
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   // got something to work on
 | |
|   try
 | |
|   {
 | |
|     if (fFunctionCount == 1)
 | |
|     {
 | |
|       doFunction();
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       if (fTotalThreads > fFunctionCount)
 | |
|         fTotalThreads = fFunctionCount;
 | |
| 
 | |
|       fFunctionThreads.clear();
 | |
|       fFunctionThreads.reserve(fTotalThreads);
 | |
| 
 | |
|       for (uint64_t i = 0; i < fTotalThreads && !cancelled(); i++)
 | |
|         fFunctionThreads.push_back(jobstepThreadPool.invoke(WFunction(this)));
 | |
| 
 | |
|       // If cancelled, not all threads are started.
 | |
|       jobstepThreadPool.join(fFunctionThreads);
 | |
|     }
 | |
| 
 | |
|     if (!(cancelled()))
 | |
|     {
 | |
|       if (fIsSelect)
 | |
|         doPostProcessForSelect();
 | |
|       else
 | |
|         doPostProcessForDml();
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     handleException(std::current_exception(), logging::ERR_EXECUTE_WINDOW_FUNCTION,
 | |
|                     logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::execute()");
 | |
|   }
 | |
| 
 | |
|   fOutputDL->endOfInput();
 | |
| 
 | |
|   StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_SUMMARY, 1, 1, fRowsReturned);
 | |
|   postStepSummaryTele(sts);
 | |
| 
 | |
|   if (traceOn())
 | |
|   {
 | |
|     dlTimes.setEndOfInputTime();
 | |
|     printCalTrace();
 | |
|   }
 | |
| 
 | |
|   return;
 | |
| }
 | |
| 
 | |
| uint64_t WindowFunctionStep::nextFunctionIndex()
 | |
| {
 | |
|   uint64_t idx = atomicInc(&fNextIndex);
 | |
| 
 | |
|   // return index in the function array
 | |
|   return --idx;
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::doFunction()
 | |
| {
 | |
|   uint64_t i = 0;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     while (((i = nextFunctionIndex()) < fFunctionCount) && !cancelled())
 | |
|     {
 | |
|       uint64_t memAdd = fRows.size() * sizeof(RowPosition);
 | |
| 
 | |
|       if (fRm->getMemory(memAdd, fSessionMemLimit) == false)
 | |
|         throw IDBExcept(ERR_WF_DATA_SET_TOO_BIG);
 | |
| 
 | |
|       fMemUsage += memAdd;
 | |
| 
 | |
|       fFunctions[i]->setCallback(this, i);
 | |
|       (*fFunctions[i].get())();
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     handleException(std::current_exception(), logging::ERR_EXECUTE_WINDOW_FUNCTION,
 | |
|                     logging::ERR_WF_DATA_SET_TOO_BIG, "WindowFunctionStep::doFunction()");
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::doPostProcessForSelect()
 | |
| {
 | |
|   FuncExp* fe = funcexp::FuncExp::instance();
 | |
|   std::shared_ptr<int[]> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
 | |
|   Row rowIn, rowOut;
 | |
|   fRowGroupIn.initRow(&rowIn);
 | |
|   fRowGroupOut.initRow(&rowOut);
 | |
|   RGData rgData;
 | |
|   vector<RowPosition>& rowData = *(fFunctions.back()->fRowData.get());
 | |
|   int64_t rowsLeft = rowData.size();
 | |
|   int64_t rowsInRg = 0;
 | |
|   int64_t rgCapacity = 0;
 | |
| 
 | |
|   int64_t begin = fQueryLimitStart;
 | |
|   int64_t count = (fQueryLimitCount == (uint64_t)-1) ? rowsLeft : fQueryLimitCount;
 | |
|   int64_t end = begin + count;
 | |
|   end = (end < rowsLeft) ? end : rowsLeft;
 | |
|   rowsLeft = (end > begin) ? (end - begin) : 0;
 | |
| 
 | |
|   if (fQueryOrderBy.get() != NULL)
 | |
|     sort(rowData.begin(), rowData.size());
 | |
| 
 | |
|   for (int64_t i = begin; i < end; i++)
 | |
|   {
 | |
|     if (!rgData.hasRowData())
 | |
|     {
 | |
|       rgCapacity = ((rowsLeft > 8192) ? 8192 : rowsLeft);
 | |
|       rowsLeft -= rgCapacity;
 | |
|       rgData.reinit(fRowGroupOut, rgCapacity);
 | |
| 
 | |
|       fRowGroupOut.setData(&rgData);
 | |
|       fRowGroupOut.resetRowGroup(0);
 | |
|       fRowGroupOut.setDBRoot(0);  // not valid dbroot
 | |
|       fRowGroupOut.getRow(0, &rowOut);
 | |
|       rowsInRg = 0;
 | |
|     }
 | |
| 
 | |
|     rowIn.setData(getPointer(rowData[i], fRowGroupIn, rowIn));
 | |
| 
 | |
|     // evaluate the window function expressions before apply mapping
 | |
|     if (fExpression.size() > 0)
 | |
|       fe->evaluate(rowIn, fExpression);
 | |
| 
 | |
|     applyMapping(mapping, rowIn, &rowOut);
 | |
|     rowOut.nextRow();
 | |
|     rowsInRg++;
 | |
| 
 | |
|     if (rowsInRg == rgCapacity)
 | |
|     {
 | |
|       fRowGroupOut.setRowCount(rowsInRg);
 | |
|       fOutputDL->insert(rgData);
 | |
|       rgData.clear();
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::doPostProcessForDml()
 | |
| {
 | |
|   FuncExp* fe = funcexp::FuncExp::instance();
 | |
|   std::shared_ptr<int[]> mapping = makeMapping(fRowGroupIn, fRowGroupOut);
 | |
|   Row rowIn, rowOut;
 | |
|   fRowGroupIn.initRow(&rowIn);
 | |
|   fRowGroupOut.initRow(&rowOut);
 | |
| 
 | |
|   for (vector<RGData>::iterator i = fInRowGroupData.begin(); i < fInRowGroupData.end(); i++)
 | |
|   {
 | |
|     fRowGroupIn.setData(&(*i));
 | |
|     RGData rgData = RGData(fRowGroupIn, fRowGroupIn.getRowCount());
 | |
|     fRowGroupOut.setData(&rgData);
 | |
|     // @bug 5631. reset rowgroup before the data is populated.
 | |
|     fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
 | |
|     fRowGroupOut.setDBRoot(fRowGroupIn.getDBRoot());
 | |
|     fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
 | |
| 
 | |
|     fRowGroupIn.getRow(0, &rowIn);
 | |
|     fRowGroupOut.getRow(0, &rowOut);
 | |
| 
 | |
|     for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
 | |
|     {
 | |
|       // evaluate the window function expressions before apply mapping
 | |
|       if (fExpression.size() > 0)
 | |
|         fe->evaluate(rowIn, fExpression);
 | |
| 
 | |
|       applyMapping(mapping, rowIn, &rowOut);
 | |
|       rowIn.nextRow();
 | |
|       rowOut.nextRow();
 | |
|     }
 | |
| 
 | |
|     // fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
 | |
|     // fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
 | |
| 
 | |
|     fOutputDL->insert(rgData);
 | |
|   }
 | |
| }
 | |
| 
 | |
| boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRows(const execplan::WF_Boundary& b,
 | |
|                                                                       const map<uint64_t, uint64_t>& m,
 | |
|                                                                       JobInfo& jobInfo)
 | |
| {
 | |
|   boost::shared_ptr<FrameBound> fb;
 | |
| 
 | |
|   if (b.fFrame == WF_CURRENT_ROW)
 | |
|   {
 | |
|     fb.reset(new FrameBoundRow(WF__CURRENT_ROW));
 | |
|     return fb;
 | |
|   }
 | |
| 
 | |
|   ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
 | |
| 
 | |
|   if (cc != NULL)
 | |
|   {
 | |
|     Row dummy;
 | |
|     bool isNull = false;
 | |
|     int val = cc->getIntVal(dummy, isNull);
 | |
| 
 | |
|     if (val >= 0 && isNull == false)
 | |
|     {
 | |
|       int type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
 | |
|       fb.reset(new FrameBoundConstantRow(type, val));
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       string str("NULL");
 | |
| 
 | |
|       if (!isNull)
 | |
|       {
 | |
|         ostringstream oss;
 | |
|         oss << val;
 | |
|         str = oss.str();
 | |
|       }
 | |
| 
 | |
|       throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
 | |
|                       ERR_WF_BOUND_OUT_OF_RANGE);
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     int type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
 | |
|     uint64_t id = getTupleKey(jobInfo, b.fVal);
 | |
|     uint64_t idx = getColumnIndex(b.fVal, m, jobInfo);
 | |
|     TupleInfo ti = getTupleInfo(id, jobInfo);
 | |
| 
 | |
|     switch (ti.dtype)
 | |
|     {
 | |
|       case execplan::CalpontSystemCatalog::TINYINT:
 | |
|       case execplan::CalpontSystemCatalog::SMALLINT:
 | |
|       case execplan::CalpontSystemCatalog::MEDINT:
 | |
|       case execplan::CalpontSystemCatalog::INT:
 | |
|       case execplan::CalpontSystemCatalog::BIGINT:
 | |
|       case execplan::CalpontSystemCatalog::DECIMAL:
 | |
|       {
 | |
|         fb.reset(new FrameBoundExpressionRow<int64_t>(type, id, idx));
 | |
|         break;
 | |
|       }
 | |
| 
 | |
|       case execplan::CalpontSystemCatalog::DOUBLE:
 | |
|       case execplan::CalpontSystemCatalog::UDOUBLE:
 | |
|       {
 | |
|         fb.reset(new FrameBoundExpressionRow<double>(type, id, idx));
 | |
|         break;
 | |
|       }
 | |
| 
 | |
|       case execplan::CalpontSystemCatalog::FLOAT:
 | |
|       case execplan::CalpontSystemCatalog::UFLOAT:
 | |
|       {
 | |
|         fb.reset(new FrameBoundExpressionRow<float>(type, id, idx));
 | |
|         break;
 | |
|       }
 | |
| 
 | |
|       case execplan::CalpontSystemCatalog::UTINYINT:
 | |
|       case execplan::CalpontSystemCatalog::USMALLINT:
 | |
|       case execplan::CalpontSystemCatalog::UMEDINT:
 | |
|       case execplan::CalpontSystemCatalog::UINT:
 | |
|       case execplan::CalpontSystemCatalog::UBIGINT:
 | |
|       case execplan::CalpontSystemCatalog::UDECIMAL:
 | |
|       case execplan::CalpontSystemCatalog::DATE:
 | |
|       case execplan::CalpontSystemCatalog::DATETIME:
 | |
|       case execplan::CalpontSystemCatalog::TIME:
 | |
|       case execplan::CalpontSystemCatalog::TIMESTAMP:
 | |
|       {
 | |
|         fb.reset(new FrameBoundExpressionRow<uint64_t>(type, id, idx));
 | |
|         break;
 | |
|       }
 | |
| 
 | |
|       default:
 | |
|       {
 | |
|         string str = windowfunction::colType2String[ti.dtype];
 | |
|         throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
 | |
|                         ERR_WF_INVALID_BOUND_TYPE);
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return fb;
 | |
| }
 | |
| 
 | |
| boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBoundRange(const execplan::WF_Boundary& b,
 | |
|                                                                        const map<uint64_t, uint64_t>& m,
 | |
|                                                                        const vector<SRCP>& o,
 | |
|                                                                        JobInfo& jobInfo)
 | |
| {
 | |
|   boost::shared_ptr<FrameBound> fb;
 | |
| 
 | |
|   if (b.fFrame == WF_CURRENT_ROW)
 | |
|   {
 | |
|     fb.reset(new FrameBoundRange(WF__CURRENT_ROW));
 | |
|     return fb;
 | |
|   }
 | |
| 
 | |
|   bool isConstant = false;
 | |
|   bool isNull = false;
 | |
|   Row dummy;
 | |
|   ConstantColumn* cc = dynamic_cast<ConstantColumn*>(b.fVal.get());
 | |
| 
 | |
|   if (cc != NULL)
 | |
|   {
 | |
|     isConstant = true;
 | |
|     double val = cc->getDoubleVal(dummy, isNull);
 | |
| 
 | |
|     if (val < 0 || isNull)
 | |
|     {
 | |
|       string str("NULL");
 | |
| 
 | |
|       if (!isNull)
 | |
|       {
 | |
|         ostringstream oss;
 | |
|         oss << val;
 | |
|         str = oss.str();
 | |
|       }
 | |
| 
 | |
|       throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_BOUND_OUT_OF_RANGE, str),
 | |
|                       ERR_WF_BOUND_OUT_OF_RANGE);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   int type = 0;
 | |
|   vector<uint64_t> ids;
 | |
|   vector<int> index;
 | |
|   ids.push_back(getTupleKey(jobInfo, o[0]));
 | |
|   index.push_back(getColumnIndex(o[0], m, jobInfo));
 | |
| 
 | |
|   if (isConstant)
 | |
|   {
 | |
|     type = (b.fFrame == WF_PRECEDING) ? WF__CONSTANT_PRECEDING : WF__CONSTANT_FOLLOWING;
 | |
|     ids.push_back(-1);    // dummy, n/a for constant
 | |
|     index.push_back(-1);  // dummy, n/a for constant
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     type = (b.fFrame == WF_PRECEDING) ? WF__EXPRESSION_PRECEDING : WF__EXPRESSION_FOLLOWING;
 | |
|     ids.push_back(getTupleKey(jobInfo, b.fVal));
 | |
|     index.push_back(getColumnIndex(b.fVal, m, jobInfo));
 | |
|   }
 | |
| 
 | |
|   ids.push_back(getTupleKey(jobInfo, b.fBound));
 | |
|   index.push_back(getColumnIndex(b.fBound, m, jobInfo));
 | |
| 
 | |
|   FrameBoundRange* fbr = NULL;
 | |
|   TupleInfo ti = getTupleInfo(ids[0], jobInfo);
 | |
|   bool asc = o[0]->asc();
 | |
|   bool nlf = o[0]->nullsFirst();
 | |
| 
 | |
|   switch (ti.dtype)
 | |
|   {
 | |
|     case execplan::CalpontSystemCatalog::TINYINT:
 | |
|     case execplan::CalpontSystemCatalog::SMALLINT:
 | |
|     case execplan::CalpontSystemCatalog::MEDINT:
 | |
|     case execplan::CalpontSystemCatalog::INT:
 | |
|     case execplan::CalpontSystemCatalog::BIGINT:
 | |
|     case execplan::CalpontSystemCatalog::DECIMAL:
 | |
|     {
 | |
|       if (isConstant)
 | |
|       {
 | |
|         int64_t v = cc->getIntVal(dummy, isNull);
 | |
|         fbr = new FrameBoundConstantRange<int64_t>(type, asc, nlf, &v);
 | |
|         fbr->isZero((v == 0));
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         fbr = new FrameBoundExpressionRange<int64_t>(type, asc, nlf);
 | |
|       }
 | |
| 
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     case execplan::CalpontSystemCatalog::DOUBLE:
 | |
|     case execplan::CalpontSystemCatalog::UDOUBLE:
 | |
|     {
 | |
|       if (isConstant)
 | |
|       {
 | |
|         double v = cc->getDoubleVal(dummy, isNull);
 | |
|         fbr = new FrameBoundConstantRange<double>(type, asc, nlf, &v);
 | |
|         fbr->isZero((v == 0.0));
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         fbr = new FrameBoundExpressionRange<double>(type, asc, nlf);
 | |
|       }
 | |
| 
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     case execplan::CalpontSystemCatalog::FLOAT:
 | |
|     case execplan::CalpontSystemCatalog::UFLOAT:
 | |
|     {
 | |
|       if (isConstant)
 | |
|       {
 | |
|         float v = cc->getFloatVal(dummy, isNull);
 | |
|         fbr = new FrameBoundConstantRange<float>(type, asc, nlf, &v);
 | |
|         fbr->isZero((v == 0.0));
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         fbr = new FrameBoundExpressionRange<float>(type, asc, nlf);
 | |
|       }
 | |
| 
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     case execplan::CalpontSystemCatalog::UTINYINT:
 | |
|     case execplan::CalpontSystemCatalog::USMALLINT:
 | |
|     case execplan::CalpontSystemCatalog::UMEDINT:
 | |
|     case execplan::CalpontSystemCatalog::UINT:
 | |
|     case execplan::CalpontSystemCatalog::UBIGINT:
 | |
|     case execplan::CalpontSystemCatalog::UDECIMAL:
 | |
|     case execplan::CalpontSystemCatalog::DATE:
 | |
|     case execplan::CalpontSystemCatalog::DATETIME:
 | |
|     case execplan::CalpontSystemCatalog::TIME:
 | |
|     case execplan::CalpontSystemCatalog::TIMESTAMP:
 | |
|     {
 | |
|       if (isConstant)
 | |
|       {
 | |
|         uint64_t v = cc->getUintVal(dummy, isNull);
 | |
|         fbr = new FrameBoundConstantRange<uint64_t>(type, asc, nlf, &v);
 | |
|         fbr->isZero((v == 0));
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         fbr = new FrameBoundExpressionRange<uint64_t>(type, asc, nlf);
 | |
|       }
 | |
| 
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     default:
 | |
|     {
 | |
|       string str = windowfunction::colType2String[ti.dtype];
 | |
|       throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_INVALID_BOUND_TYPE, str),
 | |
|                       ERR_WF_INVALID_BOUND_TYPE);
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   fbr->setTupleId(ids);
 | |
|   fbr->setIndex(index);
 | |
|   fb.reset(fbr);
 | |
| 
 | |
|   return fb;
 | |
| }
 | |
| 
 | |
| boost::shared_ptr<FrameBound> WindowFunctionStep::parseFrameBound(const execplan::WF_Boundary& b,
 | |
|                                                                   const map<uint64_t, uint64_t>& m,
 | |
|                                                                   const vector<SRCP>& o,
 | |
|                                                                   const boost::shared_ptr<EqualCompData>& p,
 | |
|                                                                   JobInfo& j, bool rows, bool s)
 | |
| {
 | |
|   boost::shared_ptr<FrameBound> fb;
 | |
| 
 | |
|   switch (b.fFrame)
 | |
|   {
 | |
|     case WF_UNBOUNDED_PRECEDING:
 | |
|     {
 | |
|       fb.reset(new FrameBound(WF__UNBOUNDED_PRECEDING));
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     case WF_UNBOUNDED_FOLLOWING:
 | |
|     {
 | |
|       fb.reset(new FrameBound(WF__UNBOUNDED_FOLLOWING));
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     case WF_CURRENT_ROW:
 | |
|     case WF_PRECEDING:
 | |
|     case WF_FOLLOWING:
 | |
|     {
 | |
|       if (rows)
 | |
|       {
 | |
|         fb = parseFrameBoundRows(b, m, j);
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         fb = parseFrameBoundRange(b, m, o, j);
 | |
|       }
 | |
| 
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     default:  //  unknown
 | |
|     {
 | |
|       throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_UNKNOWN_BOUND, b.fFrame),
 | |
|                       ERR_WF_UNKNOWN_BOUND);
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   fb->peer(p);
 | |
|   fb->start(s);
 | |
| 
 | |
|   return fb;
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::updateWindowCols(ReturnedColumn* rc, const map<uint64_t, uint64_t>& m,
 | |
|                                           JobInfo& jobInfo)
 | |
| {
 | |
|   if (rc == NULL)
 | |
|     return;
 | |
| 
 | |
|   ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(rc);
 | |
|   FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
 | |
|   SimpleFilter* sf = dynamic_cast<SimpleFilter*>(rc);
 | |
|   WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(rc);
 | |
| 
 | |
|   if (wc)
 | |
|   {
 | |
|     uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
 | |
|     map<uint64_t, uint64_t>::const_iterator j = m.find(key);
 | |
| 
 | |
|     if (j == m.end())
 | |
|     {
 | |
|       string name = jobInfo.keyInfo->tupleKeyToName[key];
 | |
|       cerr << name << " is not in tuple, key=" << key << endl;
 | |
|       throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING);
 | |
|     }
 | |
| 
 | |
|     wc->inputIndex(j->second);
 | |
|   }
 | |
|   else if (ac)
 | |
|   {
 | |
|     updateWindowCols(ac->expression(), m, jobInfo);
 | |
|   }
 | |
|   else if (fc)
 | |
|   {
 | |
|     funcexp::FunctionParm parms = fc->functionParms();
 | |
| 
 | |
|     for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
 | |
|       updateWindowCols(i->get(), m, jobInfo);
 | |
|   }
 | |
|   else if (sf)
 | |
|   {
 | |
|     updateWindowCols(sf->lhs(), m, jobInfo);
 | |
|     updateWindowCols(sf->rhs(), m, jobInfo);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::updateWindowCols(ParseTree* pt, const map<uint64_t, uint64_t>& m, JobInfo& jobInfo)
 | |
| {
 | |
|   if (pt == NULL)
 | |
|     return;
 | |
| 
 | |
|   updateWindowCols(pt->left(), m, jobInfo);
 | |
|   updateWindowCols(pt->right(), m, jobInfo);
 | |
| 
 | |
|   TreeNode* tn = pt->data();
 | |
|   ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(tn);
 | |
|   FunctionColumn* fc = dynamic_cast<FunctionColumn*>(tn);
 | |
|   SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
 | |
|   WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(tn);
 | |
| 
 | |
|   if (wc)
 | |
|   {
 | |
|     uint64_t key = getExpTupleKey(jobInfo, wc->expressionId());
 | |
|     map<uint64_t, uint64_t>::const_iterator j = m.find(key);
 | |
| 
 | |
|     if (j == m.end())
 | |
|     {
 | |
|       string name = jobInfo.keyInfo->tupleKeyToName[key];
 | |
|       cerr << name << " is not in tuple, key=" << key << endl;
 | |
|       throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_WF_COLUMN_MISSING, name), ERR_WF_COLUMN_MISSING);
 | |
|     }
 | |
| 
 | |
|     wc->inputIndex(j->second);
 | |
|   }
 | |
|   else if (ac)
 | |
|   {
 | |
|     updateWindowCols(ac->expression(), m, jobInfo);
 | |
|   }
 | |
|   else if (fc)
 | |
|   {
 | |
|     funcexp::FunctionParm parms = fc->functionParms();
 | |
| 
 | |
|     for (vector<execplan::SPTP>::iterator i = parms.begin(); i < parms.end(); i++)
 | |
|       updateWindowCols(i->get(), m, jobInfo);
 | |
|   }
 | |
|   else if (sf)
 | |
|   {
 | |
|     updateWindowCols(sf->lhs(), m, jobInfo);
 | |
|     updateWindowCols(sf->rhs(), m, jobInfo);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::sort(std::vector<RowPosition>::iterator v, uint64_t n)
 | |
| {
 | |
|   // recursive function termination condition.
 | |
|   if (n < 2 || cancelled())
 | |
|     return;
 | |
| 
 | |
|   RowPosition p = *(v + n / 2);                   // pivot value
 | |
|   vector<RowPosition>::iterator l = v;            // low   address
 | |
|   vector<RowPosition>::iterator h = v + (n - 1);  // high  address
 | |
| 
 | |
|   while (l <= h && !cancelled())
 | |
|   {
 | |
|     // Can use while here, but need check boundary and cancel status.
 | |
|     if (fQueryOrderBy->operator()(getPointer(*l), getPointer(p)))
 | |
|     {
 | |
|       l++;
 | |
|     }
 | |
|     else if (fQueryOrderBy->operator()(getPointer(p), getPointer(*h)))
 | |
|     {
 | |
|       h--;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       RowPosition t = *l;  // temp value for swap
 | |
|       *l++ = *h;
 | |
|       *h-- = t;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sort(v, std::distance(v, h) + 1);
 | |
|   sort(l, std::distance(l, v) + n);
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::printCalTrace()
 | |
| {
 | |
|   time_t t = time(0);
 | |
|   char timeString[50];
 | |
|   ctime_r(&t, timeString);
 | |
|   timeString[strlen(timeString) - 1] = '\0';
 | |
|   ostringstream logStr;
 | |
|   logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
 | |
|          << "; total rows returned-" << fRowsReturned << endl
 | |
|          << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
 | |
|          << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
 | |
|          << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
 | |
|          << "\tJob completion status " << status() << endl;
 | |
|   logEnd(logStr.str().c_str());
 | |
|   fExtendedInfo += logStr.str();
 | |
|   formatMiniStats();
 | |
| }
 | |
| 
 | |
| void WindowFunctionStep::formatMiniStats()
 | |
| {
 | |
|   ostringstream oss;
 | |
|   oss << "WFS " << "UM " << "- " << "- " << "- " << "- " << "- " << "- "
 | |
|       << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << fRowsReturned
 | |
|       << " ";
 | |
|   fMiniInfo += oss.str();
 | |
| }
 | |
| 
 | |
| }  // namespace joblist
 |