You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			531 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			531 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
|    Copyright (c) 2016-2020 MariaDB
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| //  $Id: subquerystep.cpp 6370 2010-03-18 02:58:09Z xlou $
 | |
| 
 | |
| #include <iostream>
 | |
| // #define NDEBUG
 | |
| #include <cassert>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/scoped_array.hpp>
 | |
| 
 | |
| #include <boost/scoped_ptr.hpp>
 | |
| #include <boost/shared_ptr.hpp>
 | |
| #include <boost/thread.hpp>
 | |
| #include <boost/uuid/uuid_io.hpp>
 | |
| using namespace boost;
 | |
| 
 | |
| #include "parsetree.h"
 | |
| #include "logicoperator.h"
 | |
| using namespace execplan;
 | |
| 
 | |
| #include "rowgroup.h"
 | |
| using namespace rowgroup;
 | |
| 
 | |
| #include "errorids.h"
 | |
| #include "exceptclasses.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "querytele.h"
 | |
| using namespace querytele;
 | |
| 
 | |
| #include "funcexp.h"
 | |
| 
 | |
| #include "jobstep.h"
 | |
| #include "jlf_common.h"
 | |
| #include "jlf_tuplejoblist.h"
 | |
| #include "expressionstep.h"
 | |
| #include "subquerystep.h"
 | |
| using namespace joblist;
 | |
| 
 | |
| namespace joblist
 | |
| {
 | |
| SubQueryStep::SubQueryStep(const JobInfo& jobInfo) : JobStep(jobInfo), fRowsReturned(0)
 | |
| {
 | |
|   fExtendedInfo = "SQS: ";
 | |
|   fQtc.stepParms().stepType = StepTeleStats::T_SQS;
 | |
| }
 | |
| 
 | |
| SubQueryStep::~SubQueryStep()
 | |
| {
 | |
| }
 | |
| 
 | |
| void SubQueryStep::run()
 | |
| {
 | |
|   fSubJobList->doQuery();
 | |
| }
 | |
| 
 | |
| void SubQueryStep::join()
 | |
| {
 | |
|   if (fRunner)
 | |
|     fRunner->join();
 | |
| }
 | |
| 
 | |
| void SubQueryStep::abort()
 | |
| {
 | |
|   JobStep::abort();
 | |
|   fSubJobList->abort();
 | |
| }
 | |
| 
 | |
| const string SubQueryStep::toString() const
 | |
| {
 | |
|   ostringstream oss;
 | |
|   oss << "SubQueryStep    ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
 | |
| 
 | |
|   if (fOutputJobStepAssociation.outSize() > 0)
 | |
|   {
 | |
|     oss << " out:";
 | |
| 
 | |
|     for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
 | |
|       oss << fOutputJobStepAssociation.outAt(i);
 | |
|   }
 | |
| 
 | |
|   return oss.str();
 | |
| }
 | |
| 
 | |
| /*
 | |
| void SubQueryStep::printCalTrace()
 | |
| {
 | |
|         time_t t = time (0);
 | |
|         char timeString[50];
 | |
|         ctime_r (&t, timeString);
 | |
|         timeString[strlen (timeString )-1] = '\0';
 | |
|         ostringstream logStr;
 | |
|         logStr  << "ses:" << fSessionId << " st: " << fStepId << " finished at "<< timeString
 | |
|                         << "; total rows returned-" << fRowsReturned << endl
 | |
|                         << "\t1st read " << dlTimes.FirstReadTimeString()
 | |
|                         << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-"
 | |
|                         << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
 | |
|                         << "s;\n\tJob completion status " << status() << endl;
 | |
|         logEnd(logStr.str().c_str());
 | |
|         fExtendedInfo += logStr.str();
 | |
|         formatMiniStats();
 | |
| }
 | |
| 
 | |
| 
 | |
| void SubQueryStep::formatMiniStats()
 | |
| {
 | |
|         ostringstream oss;
 | |
|         oss << "SQS "
 | |
|                 << "UM "
 | |
|                 << "- "
 | |
|                 << "- "
 | |
|                 << "- "
 | |
|                 << "- "
 | |
|                 << "- "
 | |
|                 << "- "
 | |
|                 << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
 | |
|                 << fRowsReturned << " ";
 | |
|         fMiniInfo += oss.str();
 | |
| }
 | |
| */
 | |
| 
 | |
| SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo)
 | |
|  : JobStep(jobInfo)
 | |
|  , fTableOid(s->tableOid())
 | |
|  , fSubStep(s)
 | |
|  , fRowsInput(0)
 | |
|  , fRowsReturned(0)
 | |
|  , fEndOfResult(false)
 | |
|  , fInputIterator(0)
 | |
|  , fOutputIterator(0)
 | |
|  , fRunner(0)
 | |
| {
 | |
|   fExtendedInfo = "SAS: ";
 | |
|   fAlias = s->alias();
 | |
|   fView = s->view();
 | |
|   fInputJobStepAssociation = s->outputAssociation();
 | |
|   fRowGroupIn = dynamic_cast<SubQueryStep*>(s.get())->getOutputRowGroup();
 | |
|   setOutputRowGroup(fRowGroupIn);
 | |
| }
 | |
| 
 | |
| SubAdapterStep::~SubAdapterStep()
 | |
| {
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::abort()
 | |
| {
 | |
|   JobStep::abort();
 | |
| 
 | |
|   if (fSubStep)
 | |
|     fSubStep->abort();
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::run()
 | |
| {
 | |
|   if (fInputJobStepAssociation.outSize() == 0)
 | |
|     throw logic_error("No input data list for subquery adapter step.");
 | |
| 
 | |
|   fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();
 | |
| 
 | |
|   if (fInputDL == NULL)
 | |
|     throw logic_error("Input is not a RowGroup data list.");
 | |
| 
 | |
|   fInputIterator = fInputDL->getIterator();
 | |
| 
 | |
|   if (fOutputJobStepAssociation.outSize() == 0)
 | |
|     throw logic_error("No output data list for non-delivery subquery adapter step.");
 | |
| 
 | |
|   fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();
 | |
| 
 | |
|   if (fOutputDL == NULL)
 | |
|     throw logic_error("Output is not a RowGroup data list.");
 | |
| 
 | |
|   if (fDelivery)
 | |
|     fOutputIterator = fOutputDL->getIterator();
 | |
| 
 | |
|   fRunner = jobstepThreadPool.invoke(Runner(this));
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::join()
 | |
| {
 | |
|   if (fRunner)
 | |
|     jobstepThreadPool.join(fRunner);
 | |
| }
 | |
| 
 | |
| uint32_t SubAdapterStep::nextBand(messageqcpp::ByteStream& bs)
 | |
| {
 | |
|   RGData rgDataOut;
 | |
|   bool more = false;
 | |
|   uint32_t rowCount = 0;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     bs.restart();
 | |
| 
 | |
|     more = fOutputDL->next(fOutputIterator, &rgDataOut);
 | |
| 
 | |
|     if (!more || cancelled())
 | |
|     {
 | |
|       //@bug4459.
 | |
|       while (more)
 | |
|         more = fOutputDL->next(fOutputIterator, &rgDataOut);
 | |
| 
 | |
|       fEndOfResult = true;
 | |
|     }
 | |
| 
 | |
|     if (more && !fEndOfResult)
 | |
|     {
 | |
|       fRowGroupDeliver.setData(&rgDataOut);
 | |
|       fRowGroupDeliver.serializeRGData(bs);
 | |
|       rowCount = fRowGroupDeliver.getRowCount();
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     handleException(std::current_exception(), logging::ERR_IN_DELIVERY, logging::ERR_ALWAYS_CRITICAL,
 | |
|                     "SubAdapterStep::nextBand()");
 | |
|     while (more)
 | |
|       more = fOutputDL->next(fOutputIterator, &rgDataOut);
 | |
|     fEndOfResult = true;
 | |
|   }
 | |
| 
 | |
|   if (fEndOfResult)
 | |
|   {
 | |
|     // send an empty / error band
 | |
|     RGData rgData(fRowGroupDeliver, 0U);
 | |
|     fRowGroupDeliver.setData(&rgData);
 | |
|     fRowGroupDeliver.resetRowGroup(0);
 | |
|     fRowGroupDeliver.setStatus(status());
 | |
|     fRowGroupDeliver.serializeRGData(bs);
 | |
|   }
 | |
| 
 | |
|   return rowCount;
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::setFeRowGroup(const rowgroup::RowGroup& rg)
 | |
| {
 | |
|   fRowGroupFe = rg;
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
 | |
| {
 | |
|   fRowGroupOut = fRowGroupDeliver = rg;
 | |
| 
 | |
|   if (fRowGroupFe.getColumnCount() == 0)
 | |
|     fIndexMap = makeMapping(fRowGroupIn, fRowGroupOut);
 | |
|   else
 | |
|     fIndexMap = makeMapping(fRowGroupFe, fRowGroupOut);
 | |
| 
 | |
|   checkDupOutputColumns();
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::checkDupOutputColumns()
 | |
| {
 | |
|   map<uint32_t, uint32_t> keymap;  // map<unique col key, col index in the row group>
 | |
|   fDupColumns.clear();
 | |
|   const vector<uint32_t>& keys = fRowGroupDeliver.getKeys();
 | |
| 
 | |
|   for (uint32_t i = 0; i < keys.size(); i++)
 | |
|   {
 | |
|     map<uint32_t, uint32_t>::iterator j = keymap.find(keys[i]);
 | |
| 
 | |
|     if (j == keymap.end())
 | |
|       keymap.insert(make_pair(keys[i], i));  // map key to col index
 | |
|     else
 | |
|       fDupColumns.push_back(make_pair(i, j->second));  // dest/src index pair
 | |
|   }
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::dupOutputColumns(Row& row)
 | |
| {
 | |
|   for (uint64_t i = 0; i < fDupColumns.size(); i++)
 | |
|     row.copyField(fDupColumns[i].first, fDupColumns[i].second);
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::outputRow(Row& rowIn, Row& rowOut)
 | |
| {
 | |
|   applyMapping(fIndexMap, rowIn, &rowOut);
 | |
| 
 | |
|   if (fDupColumns.size() > 0)
 | |
|     dupOutputColumns(rowOut);
 | |
| 
 | |
|   fRowGroupOut.incRowCount();
 | |
|   rowOut.nextRow();
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::deliverStringTableRowGroup(bool b)
 | |
| {
 | |
|   fRowGroupOut.setUseStringTable(b);
 | |
|   fRowGroupDeliver.setUseStringTable(b);
 | |
| }
 | |
| 
 | |
| bool SubAdapterStep::deliverStringTableRowGroup() const
 | |
| {
 | |
|   idbassert(fRowGroupOut.usesStringTable() == fRowGroupDeliver.usesStringTable());
 | |
|   return fRowGroupDeliver.usesStringTable();
 | |
| }
 | |
| 
 | |
| const string SubAdapterStep::toString() const
 | |
| {
 | |
|   ostringstream oss;
 | |
|   oss << "SubAdapterStep  ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
 | |
| 
 | |
|   if (fInputJobStepAssociation.outSize() > 0)
 | |
|     oss << fInputJobStepAssociation.outAt(0);
 | |
| 
 | |
|   if (fOutputJobStepAssociation.outSize() > 0)
 | |
|     oss << fOutputJobStepAssociation.outAt(0);
 | |
| 
 | |
|   return oss.str();
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::execute()
 | |
| {
 | |
|   RGData rgDataIn;
 | |
|   RGData rgDataOut;
 | |
|   Row rowIn;
 | |
|   Row rowFe;
 | |
|   Row rowOut;
 | |
|   fRowGroupIn.initRow(&rowIn);
 | |
|   fRowGroupOut.initRow(&rowOut);
 | |
| 
 | |
|   RGData rowFeData;
 | |
|   bool usesFE = false;
 | |
| 
 | |
|   if (fRowGroupFe.getColumnCount() > 0)
 | |
|   {
 | |
|     usesFE = true;
 | |
|     fRowGroupFe.initRow(&rowFe, true);
 | |
|     rowFeData = RGData(fRowGroupFe, 1);
 | |
|     fRowGroupFe.setData(&rowFeData);
 | |
|     fRowGroupFe.getRow(0, &rowFe);
 | |
|   }
 | |
| 
 | |
|   bool more = false;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_START, 1);
 | |
|     postStepStartTele(sts);
 | |
| 
 | |
|     fSubStep->run();
 | |
| 
 | |
|     more = fInputDL->next(fInputIterator, &rgDataIn);
 | |
| 
 | |
|     if (traceOn())
 | |
|       dlTimes.setFirstReadTime();
 | |
| 
 | |
|     while (more && !cancelled())
 | |
|     {
 | |
|       fRowGroupIn.setData(&rgDataIn);
 | |
|       rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
 | |
|       fRowGroupOut.setData(&rgDataOut);
 | |
|       fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
 | |
| 
 | |
|       fRowGroupIn.getRow(0, &rowIn);
 | |
|       fRowGroupOut.getRow(0, &rowOut);
 | |
| 
 | |
|       fRowsInput += fRowGroupIn.getRowCount();
 | |
| 
 | |
|       for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
 | |
|       {
 | |
|         if (fExpression.get() == NULL)
 | |
|         {
 | |
|           outputRow(rowIn, rowOut);
 | |
|         }
 | |
|         else if (!usesFE)
 | |
|         {
 | |
|           if (fExpression->evaluate(&rowIn))
 | |
|           {
 | |
|             outputRow(rowIn, rowOut);
 | |
|           }
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|           copyRow(rowIn, &rowFe, rowIn.getColumnCount());
 | |
| 
 | |
|           // memcpy(rowFe.getData(), rowIn.getData(), rowIn.getSize());
 | |
|           if (fExpression->evaluate(&rowFe))
 | |
|           {
 | |
|             outputRow(rowFe, rowOut);
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         rowIn.nextRow();
 | |
|       }
 | |
| 
 | |
|       if (fRowGroupOut.getRowCount() > 0)
 | |
|       {
 | |
|         fRowsReturned += fRowGroupOut.getRowCount();
 | |
|         fOutputDL->insert(rgDataOut);
 | |
|       }
 | |
| 
 | |
|       more = fInputDL->next(fInputIterator, &rgDataIn);
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL,
 | |
|                     "SubAdapterStep::execute()");
 | |
|   }
 | |
| 
 | |
|   if (cancelled())
 | |
|     while (more)
 | |
|       more = fInputDL->next(fInputIterator, &rgDataIn);
 | |
| 
 | |
|   if (traceOn())
 | |
|   {
 | |
|     dlTimes.setLastReadTime();
 | |
|     dlTimes.setEndOfInputTime();
 | |
|     printCalTrace();
 | |
|   }
 | |
| 
 | |
|   StepTeleStats sts(fQueryUuid, fStepUuid, StepTeleStats::ST_SUMMARY, 1, 1, fRowsReturned);
 | |
|   postStepSummaryTele(sts);
 | |
| 
 | |
|   // Bug 3136, let mini stats to be formatted if traceOn.
 | |
|   fOutputDL->endOfInput();
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::addExpression(const JobStepVector& exps, JobInfo& jobInfo)
 | |
| {
 | |
|   // maps key to the index in the RG
 | |
|   map<uint32_t, uint32_t> keyToIndexMap;
 | |
|   const vector<uint32_t>& keys = fRowGroupIn.getKeys();
 | |
| 
 | |
|   for (size_t i = 0; i < keys.size(); i++)
 | |
|     keyToIndexMap[keys[i]] = i;
 | |
| 
 | |
|   // combine the expression to one parse tree
 | |
|   ParseTree* filter = NULL;
 | |
| 
 | |
|   for (JobStepVector::const_iterator it = exps.begin(); it != exps.end(); it++)
 | |
|   {
 | |
|     ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
 | |
|     idbassert(e);
 | |
| 
 | |
|     e->updateInputIndex(keyToIndexMap, jobInfo);
 | |
| 
 | |
|     if (filter != NULL)
 | |
|     {
 | |
|       ParseTree* left = filter;
 | |
|       ParseTree* right = new ParseTree();
 | |
|       right->copyTree(*(e->expressionFilter()));
 | |
|       filter = new ParseTree(new LogicOperator("and"));
 | |
|       filter->left(left);
 | |
|       filter->right(right);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       filter = new ParseTree();
 | |
|       filter->copyTree(*(e->expressionFilter()));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // add to the expression wrapper
 | |
|   if (fExpression.get() == NULL)
 | |
|     fExpression.reset(new funcexp::FuncExpWrapper());
 | |
| 
 | |
|   fExpression->addFilter(boost::shared_ptr<execplan::ParseTree>(filter));
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::addExpression(const vector<SRCP>& exps)
 | |
| {
 | |
|   // add to the function wrapper
 | |
|   if (fExpression.get() == NULL)
 | |
|     fExpression.reset(new funcexp::FuncExpWrapper());
 | |
| 
 | |
|   for (vector<SRCP>::const_iterator i = exps.begin(); i != exps.end(); i++)
 | |
|     fExpression->addReturnedColumn(*i);
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::addFcnJoinExp(const vector<SRCP>& exps)
 | |
| {
 | |
|   addExpression(exps);
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::printCalTrace()
 | |
| {
 | |
|   time_t t = time(0);
 | |
|   char timeString[50];
 | |
|   ctime_r(&t, timeString);
 | |
|   timeString[strlen(timeString) - 1] = '\0';
 | |
|   ostringstream logStr;
 | |
|   logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
 | |
|          << "; total rows input-" << fRowsInput << "; total rows returned-" << fRowsReturned << endl
 | |
|          << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
 | |
|          << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
 | |
|          << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
 | |
|          << "\tJob completion status " << status() << endl;
 | |
|   logEnd(logStr.str().c_str());
 | |
|   fExtendedInfo += logStr.str();
 | |
|   formatMiniStats();
 | |
| }
 | |
| 
 | |
| void SubAdapterStep::formatMiniStats()
 | |
| {
 | |
|   /*
 | |
|       ostringstream oss;
 | |
|       oss << "SAS "
 | |
|               << "UM "
 | |
|               << "- "
 | |
|               << "- "
 | |
|               << "- "
 | |
|               << "- "
 | |
|               << "- "
 | |
|               << "- "
 | |
|               << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
 | |
|               << fRowsReturned << " ";
 | |
|       fMiniInfo += oss.str();
 | |
|   */
 | |
| }
 | |
| 
 | |
| }  // namespace joblist
 |