You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			634 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			634 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
|    Copyright (C) 2019 MariaDB Corporation
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| //  $Id: subquerytransformer.cpp 6406 2010-03-26 19:18:37Z xlou $
 | |
| 
 | |
| #include <iostream>
 | |
| //#define NDEBUG
 | |
| #include <cassert>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/scoped_ptr.hpp>
 | |
| #include <boost/shared_ptr.hpp>
 | |
| #include <boost/algorithm/string/case_conv.hpp>
 | |
| using namespace boost;
 | |
| 
 | |
| #include "aggregatecolumn.h"
 | |
| #include "windowfunctioncolumn.h"
 | |
| using namespace execplan;
 | |
| 
 | |
| #include "rowgroup.h"
 | |
| using namespace rowgroup;
 | |
| 
 | |
| #include "errorids.h"
 | |
| #include "exceptclasses.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "jobstep.h"
 | |
| #include "jlf_common.h"
 | |
| #include "jlf_tuplejoblist.h"
 | |
| #include "distributedenginecomm.h"
 | |
| #include "expressionstep.h"
 | |
| #include "tuplehashjoin.h"
 | |
| #include "subquerystep.h"
 | |
| #include "subquerytransformer.h"
 | |
| 
 | |
| namespace joblist
 | |
| {
 | |
| SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err)
 | |
|  : fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
 | |
| {
 | |
| }
 | |
| 
 | |
| SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err, const string& view)
 | |
|  : fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
 | |
| {
 | |
|   fVtable.view(view);
 | |
| }
 | |
| 
 | |
| SubQueryTransformer::SubQueryTransformer(JobInfo* jobInfo, SErrorInfo& err, const string& alias,
 | |
|                                          const string& view)
 | |
|  : fOutJobInfo(jobInfo), fSubJobInfo(NULL), fErrorInfo(err)
 | |
| {
 | |
|   fVtable.alias(alias);
 | |
|   fVtable.view(view);
 | |
| }
 | |
| 
 | |
| SubQueryTransformer::~SubQueryTransformer()
 | |
| {
 | |
|   // OK to delete NULL ptr
 | |
|   delete fSubJobInfo;
 | |
|   fSubJobInfo = NULL;
 | |
| }
 | |
| 
 | |
| SJSTEP& SubQueryTransformer::makeSubQueryStep(execplan::CalpontSelectExecutionPlan* csep,
 | |
|                                               bool subInFromClause)
 | |
| {
 | |
|     // Setup job info, job list and error status relation.
 | |
|   fSubJobInfo = new JobInfo(fOutJobInfo->rm);
 | |
|   fSubJobInfo->sessionId = fOutJobInfo->sessionId;
 | |
|   fSubJobInfo->txnId = fOutJobInfo->txnId;
 | |
|   fSubJobInfo->verId = fOutJobInfo->verId;
 | |
|   fSubJobInfo->statementId = fOutJobInfo->statementId;
 | |
|   fSubJobInfo->queryType = fOutJobInfo->queryType;
 | |
|   fSubJobInfo->csc = fOutJobInfo->csc;
 | |
|   fSubJobInfo->trace = fOutJobInfo->trace;
 | |
|   fSubJobInfo->traceFlags = fOutJobInfo->traceFlags;
 | |
|   fSubJobInfo->isExeMgr = fOutJobInfo->isExeMgr;
 | |
|   fSubJobInfo->subLevel = fOutJobInfo->subLevel + 1;
 | |
|   fSubJobInfo->keyInfo = fOutJobInfo->keyInfo;
 | |
|   fSubJobInfo->stringScanThreshold = fOutJobInfo->stringScanThreshold;
 | |
|   fSubJobInfo->tryTuples = true;
 | |
|   fSubJobInfo->errorInfo = fErrorInfo;
 | |
|   fOutJobInfo->subNum++;
 | |
|   fSubJobInfo->subCount = fOutJobInfo->subCount;
 | |
|   fSubJobInfo->subId = ++(*(fSubJobInfo->subCount));
 | |
|   fSubJobInfo->pJobInfo = fOutJobInfo;
 | |
|   fSubJobList.reset(new TupleJobList(true));
 | |
|   fSubJobList->priority(csep->priority());
 | |
|   fSubJobInfo->projectingTableOID = fSubJobList->projectingTableOIDPtr();
 | |
|   fSubJobInfo->jobListPtr = fSubJobList.get();
 | |
|   fSubJobInfo->stringTableThreshold = fOutJobInfo->stringTableThreshold;
 | |
|   fSubJobInfo->localQuery = fOutJobInfo->localQuery;
 | |
|   fSubJobInfo->uuid = fOutJobInfo->uuid;
 | |
|   fSubJobInfo->timeZone = fOutJobInfo->timeZone;
 | |
| 
 | |
|   fOutJobInfo->jobListPtr->addSubqueryJobList(fSubJobList);
 | |
| 
 | |
|   fSubJobInfo->smallSideLimit = fOutJobInfo->smallSideLimit;
 | |
|   fSubJobInfo->largeSideLimit = fOutJobInfo->largeSideLimit;
 | |
|   fSubJobInfo->smallSideUsage = fOutJobInfo->smallSideUsage;
 | |
|   fSubJobInfo->partitionSize = fOutJobInfo->partitionSize;
 | |
|   fSubJobInfo->umMemLimit = fOutJobInfo->umMemLimit;
 | |
|   fSubJobInfo->isDML = fOutJobInfo->isDML;
 | |
|   fSubJobInfo->orderByThreads = csep->orderByThreads();
 | |
| 
 | |
| 
 | |
|   // Update v-table's alias.
 | |
|   fVtable.name("$sub");
 | |
| 
 | |
|   if (fVtable.alias().empty())
 | |
|   {
 | |
|     ostringstream oss;
 | |
|     oss << "$sub_" << fSubJobInfo->subId << "_" << fSubJobInfo->subLevel << "_" << fOutJobInfo->subNum;
 | |
|     fVtable.alias(oss.str());
 | |
|   }
 | |
| 
 | |
|   fSubJobInfo->subAlias = fVtable.alias();  //@bug5844, unique alias for sub
 | |
| 
 | |
|   // Make the jobsteps out of the execution plan.
 | |
|   JobStepVector querySteps;
 | |
|   JobStepVector projectSteps;
 | |
|   DeliveredTableMap deliverySteps;
 | |
| 
 | |
|   if (csep->unionVec().size() == 0)
 | |
|     makeJobSteps(csep, *fSubJobInfo, querySteps, projectSteps, deliverySteps);
 | |
|   else if (subInFromClause)
 | |
|     makeUnionJobSteps(csep, *fSubJobInfo, querySteps, projectSteps, deliverySteps);
 | |
|   else
 | |
|     throw IDBExcept(ERR_UNION_IN_SUBQUERY);
 | |
| 
 | |
|   if (fSubJobInfo->trace)
 | |
|   {
 | |
|     ostringstream oss;
 | |
|     oss << boldStart << "\nsubquery " << fSubJobInfo->subLevel << "." << fOutJobInfo->subNum
 | |
|         << " steps:" << boldStop << endl;
 | |
|     ostream_iterator<JobStepVector::value_type> oIter(oss, "\n");
 | |
|     copy(querySteps.begin(), querySteps.end(), oIter);
 | |
|     cout << oss.str();
 | |
|   }
 | |
| 
 | |
|   // Add steps to the joblist.
 | |
|   fSubJobList->addQuery(querySteps);
 | |
|   fSubJobList->addDelivery(deliverySteps);
 | |
|   fSubJobList->putEngineComm(DistributedEngineComm::instance(fOutJobInfo->rm));
 | |
|   csep->setDynamicParseTreeVec(fSubJobInfo->dynamicParseTreeVec);
 | |
| 
 | |
|   // Get the correlated steps
 | |
|   fCorrelatedSteps = fSubJobInfo->correlateSteps;
 | |
|   fSubReturnedCols = fSubJobInfo->deliveredCols;
 | |
| 
 | |
|   // Convert subquery to step.
 | |
|   SubQueryStep* sqs = new SubQueryStep(*fSubJobInfo);
 | |
|   sqs->tableOid(fVtable.tableOid());
 | |
|   sqs->alias(fVtable.alias());
 | |
|   sqs->subJoblist(fSubJobList);
 | |
|   sqs->setOutputRowGroup(fSubJobList->getOutputRowGroup());
 | |
|   AnyDataListSPtr spdl(new AnyDataList());
 | |
|   RowGroupDL* dl = new RowGroupDL(1, fSubJobInfo->fifoSize);
 | |
|   spdl->rowGroupDL(dl);
 | |
|   dl->OID(fVtable.tableOid());
 | |
|   JobStepAssociation jsa;
 | |
|   jsa.outAdd(spdl);
 | |
|   (querySteps.back())->outputAssociation(jsa);
 | |
|   sqs->outputAssociation(jsa);
 | |
|   fSubQueryStep.reset(sqs);
 | |
| 
 | |
|   // Update the v-table columns and rowgroup
 | |
|   vector<uint32_t> pos;
 | |
|   vector<uint32_t> oids;
 | |
|   vector<uint32_t> keys;
 | |
|   vector<uint32_t> scale;
 | |
|   vector<uint32_t> precision;
 | |
|   vector<CalpontSystemCatalog::ColDataType> types;
 | |
|   vector<uint32_t> csNums;
 | |
|   pos.push_back(2);
 | |
| 
 | |
|   CalpontSystemCatalog::OID tblOid = fVtable.tableOid();
 | |
|   string tableName = fVtable.name();
 | |
|   string alias = fVtable.alias();
 | |
|   const RowGroup& rg = fSubJobList->getOutputRowGroup();
 | |
|   Row row;
 | |
|   rg.initRow(&row);
 | |
|   uint64_t outputCols =
 | |
|       rg.getColumnCount() < fSubReturnedCols.size() ? rg.getColumnCount() : fSubReturnedCols.size();
 | |
| 
 | |
|   for (uint64_t i = 0; i < outputCols; i++)
 | |
|   {
 | |
|     fVtable.addColumn(fSubReturnedCols[i]);
 | |
|     SimpleColumn* sc = dynamic_cast<SimpleColumn*>(fSubReturnedCols[i].get());
 | |
|     if (sc)
 | |
|     {
 | |
|       fVtable.partitions(sc->partitions());
 | |
|     }
 | |
| 
 | |
|     // make sure the column type is the same as rowgroup
 | |
|     CalpontSystemCatalog::ColType ct = fVtable.columnType(i);
 | |
|     CalpontSystemCatalog::ColDataType colDataTypeInRg = row.getColTypes()[i];
 | |
| 
 | |
|     if (dynamic_cast<AggregateColumn*>(fSubReturnedCols[i].get()) != NULL ||
 | |
|         dynamic_cast<WindowFunctionColumn*>(fSubReturnedCols[i].get()) != NULL)
 | |
|     {
 | |
|       // skip char/varchar/varbinary column because the colWidth in row is fudged.
 | |
|       if (colDataTypeInRg != CalpontSystemCatalog::VARCHAR && colDataTypeInRg != CalpontSystemCatalog::CHAR &&
 | |
|           colDataTypeInRg != CalpontSystemCatalog::VARBINARY &&
 | |
|           colDataTypeInRg != CalpontSystemCatalog::TEXT && colDataTypeInRg != CalpontSystemCatalog::BLOB)
 | |
|       {
 | |
|         ct.colWidth = row.getColumnWidth(i);
 | |
|         ct.colDataType = row.getColTypes()[i];
 | |
|         ct.charsetNumber = row.getCharsetNumber(i);
 | |
|         ct.scale = row.getScale(i);
 | |
| 
 | |
|         if (colDataTypeInRg != CalpontSystemCatalog::FLOAT &&
 | |
|             colDataTypeInRg != CalpontSystemCatalog::UFLOAT &&
 | |
|             colDataTypeInRg != CalpontSystemCatalog::DOUBLE &&
 | |
|             colDataTypeInRg != CalpontSystemCatalog::UDOUBLE &&
 | |
|             colDataTypeInRg != CalpontSystemCatalog::LONGDOUBLE)
 | |
|         {
 | |
|           if (ct.scale != 0 && ct.precision != -1)
 | |
|             ct.colDataType = CalpontSystemCatalog::DECIMAL;
 | |
|         }
 | |
| 
 | |
|         ct.precision = row.getPrecision(i);
 | |
|         fVtable.columnType(ct, i);
 | |
|       }
 | |
|     }
 | |
|     // MySQL timestamp/time/date/datetime type is different from IDB type
 | |
|     else if (colDataTypeInRg == CalpontSystemCatalog::DATE ||
 | |
|              colDataTypeInRg == CalpontSystemCatalog::DATETIME ||
 | |
|              colDataTypeInRg == CalpontSystemCatalog::TIMESTAMP ||
 | |
|              colDataTypeInRg == CalpontSystemCatalog::TIME)
 | |
|     {
 | |
|       ct.colWidth = row.getColumnWidth(i);
 | |
|       ct.colDataType = row.getColTypes()[i];
 | |
|       ct.scale = row.getScale(i);
 | |
|       ct.precision = row.getPrecision(i);
 | |
|       fVtable.columnType(ct, i);
 | |
|     }
 | |
| 
 | |
|     // build tuple info to export to outer query
 | |
|     TupleInfo ti(setTupleInfo(fVtable.columnType(i), fVtable.columnOid(i), *fOutJobInfo, tblOid,
 | |
|                               fVtable.columns()[i].get(), alias));
 | |
| 
 | |
|     if (i < rg.getColumnCount())
 | |
|     {
 | |
|       pos.push_back(pos.back() + ti.width);
 | |
|       oids.push_back(ti.oid);
 | |
|       keys.push_back(ti.key);
 | |
|       types.push_back(ti.dtype);
 | |
|       csNums.push_back(ti.csNum);
 | |
|       scale.push_back(ti.scale);
 | |
|       precision.push_back(ti.precision);
 | |
|     }
 | |
| 
 | |
|     fOutJobInfo->vtableColTypes[UniqId(fVtable.columnOid(i), fVtable.alias(), "", "", execplan::Partitions())] =
 | |
|         fVtable.columnType(i);
 | |
|   }
 | |
| 
 | |
|   RowGroup rg1(oids.size(), pos, oids, keys, types, csNums, scale, precision, csep->stringTableThreshold());
 | |
|   rg1.setUseStringTable(rg.usesStringTable());
 | |
| 
 | |
|   dynamic_cast<SubQueryStep*>(fSubQueryStep.get())->setOutputRowGroup(rg1);
 | |
| 
 | |
|   return fSubQueryStep;
 | |
| }
 | |
| 
 | |
| void SubQueryTransformer::checkCorrelateInfo(TupleHashJoinStep* thjs, const JobInfo& jobInfo)
 | |
| {
 | |
|   int pos = (thjs->correlatedSide() == 1) ? thjs->sequence2() : thjs->sequence1();
 | |
| 
 | |
|   if (pos == -1 || (size_t)pos >= fVtable.columns().size())
 | |
|   {
 | |
|     uint64_t id = (thjs->correlatedSide() == 1) ? thjs->tupleId2() : thjs->tupleId1();
 | |
|     string alias = jobInfo.keyInfo->tupleKeyVec[id].fTable;
 | |
|     string name = jobInfo.keyInfo->keyName[id];
 | |
| 
 | |
|     if (!name.empty() && alias.length() > 0)
 | |
|       name = alias + "." + name;
 | |
| 
 | |
|     Message::Args args;
 | |
|     args.add(name);
 | |
|     string errMsg(IDBErrorInfo::instance()->errorMsg(ERR_CORRELATE_COL_MISSING, args));
 | |
|     cerr << errMsg << ": " << pos << endl;
 | |
|     throw IDBExcept(errMsg, ERR_CORRELATE_COL_MISSING);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void SubQueryTransformer::updateCorrelateInfo()
 | |
| {
 | |
|   // put vtable into the table list to resolve correlated filters
 | |
|   // Temp fix for @bug3932 until outer join has no dependency on table order.
 | |
|   // Insert at [1], not to mess with OUTER join and hint(INFINIDB_ORDERED -- bug2317).
 | |
|   fOutJobInfo->tableList.insert(
 | |
|       fOutJobInfo->tableList.begin() + 1,
 | |
|       makeTableKey(*fOutJobInfo, fVtable.tableOid(), fVtable.name(), fVtable.alias(), "", fVtable.view(), fVtable.partitions()));
 | |
| 
 | |
|   // tables in outer level
 | |
|   set<uint32_t> outTables;
 | |
| 
 | |
|   for (uint64_t i = 0; i < fOutJobInfo->tableList.size(); i++)
 | |
|     outTables.insert(fOutJobInfo->tableList[i]);
 | |
| 
 | |
|   // tables in subquery level
 | |
|   set<uint32_t> subTables;
 | |
| 
 | |
|   for (uint64_t i = 0; i < fSubJobInfo->tableList.size(); i++)
 | |
|     subTables.insert(fSubJobInfo->tableList[i]);
 | |
| 
 | |
|   // any function joins
 | |
|   fOutJobInfo->functionJoins.insert(fOutJobInfo->functionJoins.end(), fSubJobInfo->functionJoins.begin(),
 | |
|                                     fSubJobInfo->functionJoins.end());
 | |
| 
 | |
|   // Update correlated steps
 | |
|   const map<UniqId, uint32_t>& subMap = fVtable.columnMap();
 | |
| 
 | |
|   for (JobStepVector::iterator i = fCorrelatedSteps.begin(); i != fCorrelatedSteps.end(); i++)
 | |
|   {
 | |
|     TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
 | |
|     ExpressionStep* es = dynamic_cast<ExpressionStep*>(i->get());
 | |
| 
 | |
|     if (thjs)
 | |
|     {
 | |
|       if (thjs->getJoinType() & CORRELATED)
 | |
|       {
 | |
|         checkCorrelateInfo(thjs, *fSubJobInfo);
 | |
|         thjs->setJoinType(thjs->getJoinType() ^ CORRELATED);
 | |
| 
 | |
|         if (thjs->correlatedSide() == 1)
 | |
|         {
 | |
|           int pos = thjs->sequence2();
 | |
|           thjs->tableOid2(fVtable.tableOid());
 | |
|           thjs->oid2(fVtable.columnOid(pos));
 | |
|           thjs->alias2(fVtable.alias());
 | |
|           thjs->view2(fVtable.columns()[pos]->viewName());
 | |
|           thjs->schema2(fVtable.columns()[pos]->schemaName());
 | |
|           thjs->dictOid2(0);
 | |
|           thjs->sequence2(-1);
 | |
|           thjs->joinId(fOutJobInfo->joinNum++);
 | |
| 
 | |
|           CalpontSystemCatalog::ColType ct2 = fVtable.columnType(pos);
 | |
|           TupleInfo ti(setTupleInfo(ct2, thjs->oid2(), *fOutJobInfo, thjs->tableOid2(),
 | |
|                                     fVtable.columns()[pos].get(), thjs->alias2()));
 | |
|           thjs->tupleId2(ti.key);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|           int pos = thjs->sequence1();
 | |
|           thjs->tableOid1(fVtable.tableOid());
 | |
|           thjs->oid1(fVtable.columnOid(pos));
 | |
|           thjs->alias1(fVtable.alias());
 | |
|           thjs->view1(fVtable.columns()[pos]->viewName());
 | |
|           thjs->schema1(fVtable.columns()[pos]->schemaName());
 | |
|           thjs->dictOid1(0);
 | |
|           thjs->sequence1(-1);
 | |
|           thjs->joinId(fOutJobInfo->joinNum++);
 | |
| 
 | |
|           CalpontSystemCatalog::ColType ct1 = fVtable.columnType(pos);
 | |
|           TupleInfo ti(setTupleInfo(ct1, thjs->oid1(), *fOutJobInfo, thjs->tableOid1(),
 | |
|                                     fVtable.columns()[pos].get(), thjs->alias1()));
 | |
|           thjs->tupleId1(ti.key);
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       /*
 | |
|                           else // oid1 == 0, dictionary column
 | |
|                           {
 | |
|                                   CalpontSystemCatalog::ColType dct;
 | |
|                                   dct.colDataType = CalpontSystemCatalog::BIGINT;
 | |
|                                   dct.colWidth = 8;
 | |
|                                   dct.scale = 0;
 | |
|                                   dct.precision = 0;
 | |
|                           }
 | |
|       */
 | |
|     }
 | |
|     else if (es)
 | |
|     {
 | |
|       // already handled by function join
 | |
|       if (es->functionJoin())
 | |
|         continue;
 | |
| 
 | |
|       vector<ReturnedColumn*>& scList = es->columns();
 | |
|       vector<CalpontSystemCatalog::OID>& tableOids = es->tableOids();
 | |
|       vector<string>& aliases = es->aliases();
 | |
|       vector<string>& views = es->views();
 | |
|       vector<execplan::Partitions>& partitions = es->partitionss();
 | |
|       vector<string>& schemas = es->schemas();
 | |
|       vector<uint32_t>& tableKeys = es->tableKeys();
 | |
|       vector<uint32_t>& columnKeys = es->columnKeys();
 | |
| 
 | |
|       // update simple columns
 | |
|       for (uint64_t j = 0; j < scList.size(); j++)
 | |
|       {
 | |
|         SimpleColumn* sc = dynamic_cast<SimpleColumn*>(scList[j]);
 | |
| 
 | |
|         if (sc != NULL)
 | |
|         {
 | |
|           if (subTables.find(tableKeys[j]) != subTables.end())
 | |
|           {
 | |
|             const map<UniqId, uint32_t>::const_iterator k =
 | |
|                 subMap.find(UniqId(sc->oid(), aliases[j], schemas[j], views[j], partitions[j]));
 | |
| 
 | |
|             if (k == subMap.end())
 | |
|               throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
 | |
| 
 | |
|             sc->schemaName("");
 | |
|             sc->tableName(fVtable.name());
 | |
|             sc->tableAlias(fVtable.alias());
 | |
|             sc->viewName(fVtable.view());
 | |
|             sc->oid(fVtable.columnOid(k->second));
 | |
|             sc->columnName(fVtable.columns()[k->second]->columnName());
 | |
| 	    sc->partitions(fVtable.partitions());
 | |
|             const CalpontSystemCatalog::ColType& ct = fVtable.columnType(k->second);
 | |
|             TupleInfo ti = setTupleInfo(ct, sc->oid(), *fOutJobInfo, fVtable.tableOid(), sc, fVtable.alias());
 | |
| 
 | |
|             tableOids[j] = execplan::CNX_VTABLE_ID;
 | |
|             aliases[j] = sc->tableAlias();
 | |
|             views[j] = sc->viewName();
 | |
|             schemas[j] = sc->schemaName();
 | |
|             columnKeys[j] = ti.key;
 | |
|             tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
 | |
| 	    partitions[j] = sc->partitions();
 | |
|           }
 | |
|           else
 | |
|           {
 | |
|             const CalpontSystemCatalog::ColType& ct = fOutJobInfo->keyInfo->colType[columnKeys[j]];
 | |
|             sc->joinInfo(0);
 | |
|             TupleInfo ti = setTupleInfo(ct, sc->oid(), *fOutJobInfo, tableOids[j], sc, aliases[j]);
 | |
| 
 | |
|             columnKeys[j] = ti.key;
 | |
|             tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
 | |
|           }
 | |
|         }
 | |
|         else if (dynamic_cast<WindowFunctionColumn*>(scList[j]) != NULL)
 | |
|         {
 | |
|           // workaround for window function IN/EXISTS subquery
 | |
|           const map<UniqId, uint32_t>::const_iterator k =
 | |
|               subMap.find(UniqId(scList[j]->expressionId(), "", "", "", execplan::Partitions()));
 | |
| 
 | |
|           if (k == subMap.end())
 | |
|             throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
 | |
| 
 | |
|           sc = fVtable.columns()[k->second].get();
 | |
|           es->substitute(j, fVtable.columns()[k->second]);
 | |
|           CalpontSystemCatalog::ColType ct = sc->colType();
 | |
|           string alias(extractTableAlias(sc));
 | |
|           CalpontSystemCatalog::OID tblOid = fVtable.tableOid();
 | |
|           TupleInfo ti(setTupleInfo(ct, sc->oid(), *fOutJobInfo, tblOid, sc, alias));
 | |
| 
 | |
|           tableOids[j] = execplan::CNX_VTABLE_ID;
 | |
|           columnKeys[j] = ti.key;
 | |
|           tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       es->updateColumnOidAlias(*fOutJobInfo);
 | |
| 
 | |
|       // update function join info
 | |
|       boost::shared_ptr<FunctionJoinInfo>& fji = es->functionJoinInfo();
 | |
| 
 | |
|       if (fji && fji->fCorrelatedSide)
 | |
|       {
 | |
|         // replace sub side with a virtual column
 | |
|         int64_t subSide = (fji->fCorrelatedSide == 1) ? 1 : 0;
 | |
|         ReturnedColumn* rc = fji->fExpression[subSide];
 | |
|         SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
 | |
| 
 | |
|         if (sc == NULL)
 | |
|         {
 | |
|           UniqId colId = UniqId(rc->expressionId(), "", "", "", execplan::Partitions());
 | |
|           const map<UniqId, uint32_t>::const_iterator k = subMap.find(colId);
 | |
| 
 | |
|           if (k == subMap.end())
 | |
|             throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
 | |
| 
 | |
|           SSC vc = fVtable.columns()[k->second];
 | |
|           sc = vc.get();
 | |
|         }
 | |
| 
 | |
|         // virtual table info in outer query
 | |
|         TupleInfo ti(
 | |
|             setTupleInfo(sc->colType(), sc->oid(), *fOutJobInfo, fVtable.tableOid(), sc, fVtable.alias()));
 | |
| 
 | |
|         set<uint32_t> cids;
 | |
|         cids.insert(ti.key);
 | |
|         fji->fJoinKey[subSide] = ti.key;
 | |
|         fji->fTableKey[subSide] = ti.tkey;
 | |
|         fji->fColumnKeys[subSide] = cids;
 | |
|         fji->fTableOid[subSide] = fVtable.tableOid();
 | |
|         fji->fAlias[subSide] = fVtable.alias();
 | |
|         fji->fView[subSide] = fVtable.view();
 | |
|         fji->fSchema[subSide] = "";
 | |
| 
 | |
|         // turn off correlated flag
 | |
|         fji->fExpression[fji->fCorrelatedSide - 1]->joinInfo(0);
 | |
|         fji->fJoinType ^= CORRELATED;
 | |
|         fji->fJoinId = 0;
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       JobStep* j = i->get();
 | |
|       uint32_t tid = -1;
 | |
|       uint64_t cid = j->tupleId();
 | |
| 
 | |
|       if (cid != (uint64_t)-1)
 | |
|         tid = getTableKey(*fOutJobInfo, cid);
 | |
|       else
 | |
|         tid = getTableKey(*fOutJobInfo, j);
 | |
| 
 | |
|       if (outTables.find(tid) == outTables.end())
 | |
|       {
 | |
|         if (subMap.find(UniqId(j->oid(), j->alias(), j->schema(), j->view(), j->partitions(), 0)) != subMap.end())
 | |
|           // throw CorrelateFailExcept();
 | |
|           throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void SubQueryTransformer::run()
 | |
| {
 | |
|   // not to be called for base class
 | |
| }
 | |
| 
 | |
| // ------ SimpleScalarTransformer ------
 | |
| SimpleScalarTransformer::SimpleScalarTransformer(JobInfo* jobInfo, SErrorInfo& status, bool e)
 | |
|  : SubQueryTransformer(jobInfo, status)
 | |
|  , fInputDl(NULL)
 | |
|  , fDlIterator(-1)
 | |
|  , fEmptyResultSet(true)
 | |
|  , fExistFilter(e)
 | |
| {
 | |
| }
 | |
| 
 | |
| SimpleScalarTransformer::SimpleScalarTransformer(const SubQueryTransformer& rhs)
 | |
|  : SubQueryTransformer(rhs.outJobInfo(), rhs.errorInfo())
 | |
|  , fInputDl(NULL)
 | |
|  , fDlIterator(-1)
 | |
|  , fEmptyResultSet(true)
 | |
|  , fExistFilter(false)
 | |
| {
 | |
|   fSubJobList = rhs.subJobList();
 | |
|   fSubQueryStep = rhs.subQueryStep();
 | |
| }
 | |
| 
 | |
| SimpleScalarTransformer::~SimpleScalarTransformer()
 | |
| {
 | |
| }
 | |
| 
 | |
| void SimpleScalarTransformer::run()
 | |
| {
 | |
|   // set up receiver
 | |
|   fRowGroup = dynamic_cast<SubQueryStep*>(fSubQueryStep.get())->getOutputRowGroup();
 | |
|   fRowGroup.initRow(&fRow, true);
 | |
|   fInputDl = fSubQueryStep->outputAssociation().outAt(0)->rowGroupDL();
 | |
|   fDlIterator = fInputDl->getIterator();
 | |
| 
 | |
|   // run the subquery
 | |
|   fSubJobList->doQuery();
 | |
| 
 | |
|   // retrieve the scalar result
 | |
|   getScalarResult();
 | |
| 
 | |
|   // check result count
 | |
|   if (fErrorInfo->errCode == ERR_MORE_THAN_1_ROW)
 | |
|     throw MoreThan1RowExcept();
 | |
| }
 | |
| 
 | |
| void SimpleScalarTransformer::getScalarResult()
 | |
| {
 | |
|   RGData rgData;
 | |
|   bool more;
 | |
| 
 | |
|   more = fInputDl->next(fDlIterator, &rgData);
 | |
| 
 | |
|   while (more)
 | |
|   {
 | |
|     fRowGroup.setData(&rgData);
 | |
| 
 | |
|     // Only need one row for scalar filter
 | |
|     if (fEmptyResultSet && fRowGroup.getRowCount() == 1)
 | |
|     {
 | |
|       fEmptyResultSet = false;
 | |
|       Row row;
 | |
|       fRowGroup.initRow(&row);
 | |
|       fRowGroup.getRow(0, &row);
 | |
|       fRowData.reset(new uint8_t[fRow.getSize()]);
 | |
|       fRow.setData(rowgroup::Row::Pointer(fRowData.get()));
 | |
|       copyRow(row, &fRow);
 | |
| 
 | |
|       // For exist filter, stop the query after one or more rows retrieved.
 | |
|       if (fExistFilter)
 | |
|       {
 | |
|         fErrorInfo->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW);
 | |
|         fErrorInfo->errCode = ERR_MORE_THAN_1_ROW;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     // more than 1 row case:
 | |
|     //   not empty set, and get new rows
 | |
|     //   empty set, but get more than 1 rows
 | |
|     else if (fRowGroup.getRowCount() > 0)
 | |
|     {
 | |
|       // Stop the query after some rows retrieved.
 | |
|       fEmptyResultSet = false;
 | |
|       fErrorInfo->errMsg = IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW);
 | |
|       fErrorInfo->errCode = ERR_MORE_THAN_1_ROW;
 | |
|     }
 | |
| 
 | |
|     // For scalar filter, have to check all blocks to ensure only one row.
 | |
|     if (fErrorInfo->errCode != 0)
 | |
|       while (more)
 | |
|         more = fInputDl->next(fDlIterator, &rgData);
 | |
|     else
 | |
|       more = fInputDl->next(fDlIterator, &rgData);
 | |
|   }
 | |
| }
 | |
| 
 | |
| }  // namespace joblist
 |