You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	The purpose of this changeset is to obtain list of partitions from SELECT_LEX structure and pass it down to joblist and then to CrossEngineStep to pass to InnoDB.
		
			
				
	
	
		
			803 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			803 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| //  $Id: expressionstep.cpp 9681 2013-07-11 22:58:05Z xlou $
 | |
| 
 | |
| //#define NDEBUG
 | |
| #include <cassert>
 | |
| #include <sstream>
 | |
| #include <iomanip>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/shared_ptr.hpp>
 | |
| 
 | |
| using namespace boost;
 | |
| 
 | |
| #include "messagequeue.h"
 | |
| using namespace messageqcpp;
 | |
| 
 | |
| #include "loggingid.h"
 | |
| #include "errorcodes.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "calpontsystemcatalog.h"
 | |
| #include "aggregatecolumn.h"
 | |
| #include "arithmeticcolumn.h"
 | |
| #include "constantcolumn.h"
 | |
| #include "functioncolumn.h"
 | |
| #include "pseudocolumn.h"
 | |
| #include "simplecolumn.h"
 | |
| #include "windowfunctioncolumn.h"
 | |
| #include "constantfilter.h"
 | |
| #include "simplefilter.h"
 | |
| using namespace execplan;
 | |
| 
 | |
| #include "jlf_common.h"
 | |
| #include "jlf_tuplejoblist.h"
 | |
| #include "rowgroup.h"
 | |
| using namespace rowgroup;
 | |
| 
 | |
| #include "expressionstep.h"
 | |
| 
 | |
| 
 | |
| namespace joblist
 | |
| {
 | |
| ExpressionStep::ExpressionStep()
 | |
|  : fExpressionFilter(NULL)
 | |
|  , fExpressionId(-1)
 | |
|  , fVarBinOK(false)
 | |
|  , fSelectFilter(false)
 | |
|  , fAssociatedJoinId(0)
 | |
|  , fDoJoin(false)
 | |
|  , fVirtual(false)
 | |
| {
 | |
| }
 | |
| 
 | |
| ExpressionStep::ExpressionStep(const JobInfo& jobInfo)
 | |
|  : JobStep(jobInfo)
 | |
|  , fExpressionFilter(NULL)
 | |
|  , fExpressionId(-1)
 | |
|  , fVarBinOK(false)
 | |
|  , fSelectFilter(false)
 | |
|  , fAssociatedJoinId(0)
 | |
|  , fDoJoin(false)
 | |
|  , fVirtual(false)
 | |
| {
 | |
| }
 | |
| 
 | |
| ExpressionStep::ExpressionStep(const ExpressionStep& rhs)
 | |
|  : JobStep(rhs)
 | |
|  , fExpression(rhs.expression())
 | |
|  , fExpressionFilter(NULL)
 | |
|  , fExpressionId(rhs.expressionId())
 | |
|  , fAliases(rhs.aliases())
 | |
|  , fViews(rhs.views())
 | |
|  , fSchemas(rhs.schemas())
 | |
|  , fPartitionss(rhs.fPartitionss)
 | |
|  , fTableKeys(rhs.tableKeys())
 | |
|  , fColumnKeys(rhs.columnKeys())
 | |
|  , fVarBinOK(rhs.fVarBinOK)
 | |
|  , fSelectFilter(rhs.fSelectFilter)
 | |
|  , fAssociatedJoinId(rhs.fAssociatedJoinId)
 | |
|  , fDoJoin(rhs.fDoJoin)
 | |
|  , fVirtual(rhs.fVirtual)
 | |
| {
 | |
|   if (rhs.expressionFilter() != NULL)
 | |
|     fExpressionFilter = new ParseTree(*(rhs.expressionFilter()));
 | |
| }
 | |
| 
 | |
| ExpressionStep::~ExpressionStep()
 | |
| {
 | |
|   if (fExpressionFilter != NULL)
 | |
|     delete fExpressionFilter;
 | |
| }
 | |
| 
 | |
| void ExpressionStep::run()
 | |
| {
 | |
| }
 | |
| 
 | |
| void ExpressionStep::join()
 | |
| {
 | |
| }
 | |
| 
 | |
| void ExpressionStep::expression(const SRCP exp, JobInfo& jobInfo)
 | |
| {
 | |
|   fExpression = exp;
 | |
| 
 | |
|   // set expression Id
 | |
|   ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(fExpression.get());
 | |
|   FunctionColumn* fc = dynamic_cast<FunctionColumn*>(fExpression.get());
 | |
|   fExpressionId = exp.get()->expressionId();
 | |
| 
 | |
|   if (ac != NULL || fc != NULL)
 | |
|     addColumn(exp.get(), jobInfo);
 | |
| }
 | |
| 
 | |
| void ExpressionStep::expressionFilter(const Filter* filter, JobInfo& jobInfo)
 | |
| {
 | |
|   Filter* f = filter->clone();
 | |
|   fExpressionFilter = new ParseTree(f);
 | |
|   idbassert(fExpressionFilter != NULL);
 | |
| 
 | |
|   if (fExpressionFilter == NULL)
 | |
|   {
 | |
|     std::ostringstream errmsg;
 | |
|     errmsg << "ExpressionStep: Failed to create a new ParseTree";
 | |
|     cerr << boldStart << errmsg.str() << boldStop << endl;
 | |
|     throw runtime_error(errmsg.str());
 | |
|   }
 | |
| 
 | |
|   addFilter(fExpressionFilter, jobInfo);
 | |
| 
 | |
|   // populate the oid vectors
 | |
|   SimpleFilter* sf = dynamic_cast<SimpleFilter*>(f);
 | |
| 
 | |
|   if (sf != NULL && sf->op()->data() == "=")
 | |
|     functionJoinCheck(sf, jobInfo);
 | |
| }
 | |
| 
 | |
| void ExpressionStep::expressionFilter(const ParseTree* filter, JobInfo& jobInfo)
 | |
| {
 | |
|   fExpressionFilter = new ParseTree();
 | |
|   idbassert(fExpressionFilter != NULL);
 | |
| 
 | |
|   if (fExpressionFilter == NULL)
 | |
|   {
 | |
|     std::ostringstream errmsg;
 | |
|     errmsg << "ExpressionStep: Failed to create a new ParseTree";
 | |
|     cerr << boldStart << errmsg.str() << boldStop << endl;
 | |
|     throw runtime_error(errmsg.str());
 | |
|   }
 | |
| 
 | |
|   fExpressionFilter->copyTree(*filter);
 | |
| 
 | |
|   addFilter(fExpressionFilter, jobInfo);
 | |
| }
 | |
| 
 | |
| void ExpressionStep::addSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
 | |
| {
 | |
|   addColumn(sf->lhs(), jobInfo);
 | |
|   addColumn(sf->rhs(), jobInfo);
 | |
| }
 | |
| 
 | |
| void ExpressionStep::addFilter(ParseTree* filter, JobInfo& jobInfo)
 | |
| {
 | |
|   stack<ParseTree*> filterStack;
 | |
| 
 | |
|   while (filter || !filterStack.empty())
 | |
|   {
 | |
|     if (filter != NULL)
 | |
|     {
 | |
|       filterStack.push(filter);
 | |
|       filter = filter->left();
 | |
|     }
 | |
|     else if (!filterStack.empty())
 | |
|     {
 | |
|       filter = filterStack.top();
 | |
|       filterStack.pop();
 | |
| 
 | |
|       TreeNode* tn = filter->data();
 | |
|       filter = filter->right();
 | |
| 
 | |
|       ReturnedColumn* rc = dynamic_cast<ReturnedColumn*>(tn);
 | |
|       SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
 | |
|       ConstantFilter* cf = dynamic_cast<ConstantFilter*>(tn);
 | |
|       Operator* op = dynamic_cast<Operator*>(tn);
 | |
| 
 | |
|       if (rc != NULL)
 | |
|       {
 | |
|         addColumn(rc, jobInfo);
 | |
|       }
 | |
|       else if (sf != NULL)
 | |
|       {
 | |
|         addSimpleFilter(sf, jobInfo);
 | |
|       }
 | |
|       else if (cf != NULL)
 | |
|       {
 | |
|         const ConstantFilter::FilterList& fs = cf->filterList();
 | |
| 
 | |
|         for (ConstantFilter::FilterList::const_iterator i = fs.begin(); i != fs.end(); i++)
 | |
|         {
 | |
|           SimpleFilter* f = dynamic_cast<SimpleFilter*>(i->get());
 | |
| 
 | |
|           if (f != NULL)
 | |
|             addSimpleFilter(f, jobInfo);
 | |
|           else
 | |
|             throw logic_error("unknow filter type in constant filter.");
 | |
|         }
 | |
|       }
 | |
|       else if (op == NULL)
 | |
|       {
 | |
|         throw logic_error("tree node not handled in Expression step.");
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
| #if 0  // have to workaround correlation on exp, the following approach does not work
 | |
|     // extract simple columns from parse tree
 | |
|     vector<SimpleColumn*> scv;
 | |
|     filter->walk(getSimpleCols, &scv);
 | |
| 
 | |
|     // populate the oid vectors
 | |
|     for (vector<SimpleColumn*>::iterator it = scv.begin(); it != scv.end(); it++)
 | |
|         addColumn(*it, jobInfo);
 | |
| 
 | |
|     // aggregate columns
 | |
|     vector<AggregateColumn*> acv;
 | |
|     filter->walk(getAggCols, &acv);
 | |
| 
 | |
|     for (vector<AggregateColumn*>::iterator it = acv.begin(); it != acv.end(); it++)
 | |
|         addColumn(*it, jobInfo);
 | |
| 
 | |
|     // window function columns
 | |
|     vector<WindowFunctionColumn*> wcv;
 | |
|     filter->walk(getWindowFunctionCols, &wcv);
 | |
| 
 | |
|     for (vector<WindowFunctionColumn*>::iterator it = wcv.begin(); it != wcv.end(); it++)
 | |
|         addColumn(*it, jobInfo);
 | |
| 
 | |
| #endif
 | |
| 
 | |
|   return;
 | |
| }
 | |
| 
 | |
| void ExpressionStep::addColumn(ReturnedColumn* rc, JobInfo& jobInfo)
 | |
| {
 | |
|   const vector<SimpleColumn*>* scs = NULL;
 | |
|   const vector<WindowFunctionColumn*>* wcs = NULL;
 | |
|   ArithmeticColumn* ac = NULL;
 | |
|   FunctionColumn* fc = NULL;
 | |
|   SimpleColumn* sc = NULL;
 | |
|   WindowFunctionColumn* wc = NULL;
 | |
| 
 | |
|   // workaround for exp(sc) in (sub) where correlation is set on exp, but not sc.
 | |
|   //     populate it to sc for correct scope resolution
 | |
|   uint64_t correlated = rc->joinInfo();
 | |
| 
 | |
|   if (NULL != (ac = dynamic_cast<ArithmeticColumn*>(rc)))
 | |
|   {
 | |
|     scs = &(ac->simpleColumnList());
 | |
|     wcs = &(ac->windowfunctionColumnList());
 | |
|   }
 | |
|   else if (NULL != (fc = dynamic_cast<FunctionColumn*>(rc)))
 | |
|   {
 | |
|     scs = &(fc->simpleColumnList());
 | |
|     wcs = &(fc->windowfunctionColumnList());
 | |
|     fVarBinOK = ((strcmp(fc->functionName().c_str(), "hex") == 0) ||
 | |
|                  (strcmp(fc->functionName().c_str(), "octet_length") == 0) ||
 | |
|                  (strcmp(fc->functionName().c_str(), "length") == 0));
 | |
|   }
 | |
| 
 | |
|   if (scs != NULL || wcs != NULL)
 | |
|   {
 | |
|     if (scs != NULL)
 | |
|     {
 | |
|       vector<SimpleColumn*>::const_iterator cit = scs->begin();
 | |
|       vector<SimpleColumn*>::const_iterator end = scs->end();
 | |
| 
 | |
|       while (cit != end)
 | |
|       {
 | |
|         SimpleColumn* sc = *cit;
 | |
|         sc->joinInfo(sc->joinInfo() | correlated);
 | |
|         populateColumnInfo(*cit, jobInfo);
 | |
|         ++cit;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (wcs != NULL)
 | |
|     {
 | |
|       vector<WindowFunctionColumn*>::const_iterator cit = wcs->begin();
 | |
|       vector<WindowFunctionColumn*>::const_iterator end = wcs->end();
 | |
| 
 | |
|       while (cit != end)
 | |
|       {
 | |
|         populateColumnInfo(*cit, jobInfo);
 | |
|         ++cit;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   else if (NULL != (sc = dynamic_cast<SimpleColumn*>(rc)))
 | |
|   {
 | |
|     populateColumnInfo(sc, jobInfo);
 | |
|   }
 | |
|   else if (NULL != (wc = dynamic_cast<WindowFunctionColumn*>(rc)))
 | |
|   {
 | |
|     populateColumnInfo(rc, jobInfo);
 | |
|   }
 | |
|   else if (NULL != (dynamic_cast<AggregateColumn*>(rc)))
 | |
|   {
 | |
|     populateColumnInfo(rc, jobInfo);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     ConstantColumn* cc = dynamic_cast<ConstantColumn*>(rc);
 | |
| 
 | |
|     //		idbassert(cc != NULL)
 | |
|     if (cc == NULL)
 | |
|     {
 | |
|       std::ostringstream errmsg;
 | |
|       errmsg << "ExpressionStep: " << typeid(*rc).name() << " in expression.";
 | |
|       cerr << boldStart << errmsg.str() << boldStop << endl;
 | |
|       throw logic_error(errmsg.str());
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void ExpressionStep::populateColumnInfo(ReturnedColumn* rc, JobInfo& jobInfo)
 | |
| {
 | |
|   // As of bug3695, make sure varbinary is not used in function expression.
 | |
|   if ((rc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
 | |
|        rc->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
 | |
|       !fVarBinOK)
 | |
|     throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
 | |
| 
 | |
|   SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
 | |
|   WindowFunctionColumn* wc = NULL;
 | |
|   AggregateColumn* ac = NULL;
 | |
| 
 | |
|   if (NULL != sc)
 | |
|     return populateColumnInfo(sc, jobInfo);
 | |
|   else if (NULL != (wc = dynamic_cast<WindowFunctionColumn*>(rc)))
 | |
|     return populateColumnInfo(wc, jobInfo);
 | |
|   else if (NULL != (ac = dynamic_cast<AggregateColumn*>(rc)))
 | |
|     return populateColumnInfo(ac, jobInfo);
 | |
|   else  // for now only allow simple and windowfunction column, more work to do.
 | |
|     throw runtime_error("Error in parsing expression.");
 | |
| }
 | |
| 
 | |
| void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo)
 | |
| {
 | |
|   // As of bug3695, make sure varbinary is not used in function expression.
 | |
|   if ((sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
 | |
|        sc->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
 | |
|       !fVarBinOK)
 | |
|     throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
 | |
| 
 | |
|   CalpontSystemCatalog::OID tblOid = joblist::tableOid(sc, jobInfo.csc);
 | |
|   string alias = extractTableAlias(sc);
 | |
|   string view = sc->viewName();
 | |
|   string schema = sc->schemaName();
 | |
|   execplan::Partitions part = sc->partitions();
 | |
|   fTableOids.push_back(tblOid);
 | |
|   CalpontSystemCatalog::ColType ct;
 | |
| 
 | |
|   if (schema.empty())
 | |
|   {
 | |
|     sc->oid(tblOid + 1 + sc->colPosition());
 | |
|     ct = sc->resultType();
 | |
|   }
 | |
|   else if (sc->isColumnStore() == false)
 | |
|   {
 | |
|     ct = sc->colType();
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     ct = sc->colType();
 | |
| 
 | |
|     // XXX use this before connector sets colType in sc correctly.
 | |
|     //    type of pseudo column is set by connector
 | |
|     if (dynamic_cast<PseudoColumn*>(sc) == NULL)
 | |
|     {
 | |
|       ct = jobInfo.csc->colType(sc->oid());
 | |
|       ct.charsetNumber = sc->colType().charsetNumber;
 | |
|     }
 | |
|     // X
 | |
|     if (ct.scale == 0)     // keep passed original ct for decimal type
 | |
|       sc->resultType(ct);  // update from mysql type to calpont type
 | |
|   }
 | |
| 
 | |
|   fAliases.push_back(alias);
 | |
|   fViews.push_back(view);
 | |
|   fPartitionss.push_back(part);
 | |
|   fSchemas.push_back(schema);
 | |
|   fTableKeys.push_back(makeTableKey(jobInfo, sc));
 | |
|   fColumns.push_back(sc);
 | |
| 
 | |
|   TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tblOid, sc, alias));
 | |
|   fColumnKeys.push_back(ti.key);
 | |
| 
 | |
|   // @bug 2990, MySQL timestamp/time/date/datetime type is different from IDB type
 | |
|   if (ti.dtype == CalpontSystemCatalog::DATE || ti.dtype == CalpontSystemCatalog::DATETIME ||
 | |
|       ti.dtype == CalpontSystemCatalog::TIME || ti.dtype == CalpontSystemCatalog::TIMESTAMP)
 | |
|   {
 | |
|     if (ti.dtype != ct.colDataType)
 | |
|     {
 | |
|       ct.colWidth = ti.width;
 | |
|       ct.colDataType = ti.dtype;
 | |
|       ct.scale = ti.scale;
 | |
|       ct.precision = ti.precision;
 | |
|       sc->resultType(ct);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   CalpontSystemCatalog::OID dictOid = joblist::isDictCol(ct);
 | |
| 
 | |
|   if (dictOid > 0)
 | |
|   {
 | |
|     uint32_t tupleKey = ti.key;
 | |
|     jobInfo.tokenOnly[tupleKey] = false;
 | |
|     jobInfo.keyInfo->dictOidToColOid[dictOid] = sc->oid();
 | |
|     ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias);
 | |
|     jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void ExpressionStep::populateColumnInfo(WindowFunctionColumn* wc, JobInfo& jobInfo)
 | |
| {
 | |
|   // As of bug3695, make sure varbinary is not used in function expression.
 | |
|   if ((wc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
 | |
|        wc->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
 | |
|       !fVarBinOK)
 | |
|     throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
 | |
| 
 | |
|   // This is for window function in IN/EXISTS sub-query.
 | |
|   // In 4.0 implementation, the window function is cloned to where clause in a simple filter.
 | |
|   // This can be identified because SQL syntax does NOT allow window function in where clause.
 | |
|   // Workaround here until a better way found.
 | |
|   TupleInfo ti(setExpTupleInfo(wc->resultType(), wc->expressionId(), wc->alias(), jobInfo));
 | |
|   uint64_t wcKey = ti.key;
 | |
|   string alias("");
 | |
|   string view("");
 | |
|   string schema("");
 | |
|   execplan::Partitions part;
 | |
|   fTableOids.push_back(jobInfo.keyInfo->tupleKeyToTableOid[wcKey]);
 | |
|   fAliases.push_back(alias);
 | |
|   fViews.push_back(view);
 | |
|   fPartitionss.push_back(part);
 | |
|   fSchemas.push_back(schema);
 | |
|   fTableKeys.push_back(jobInfo.keyInfo->colKeyToTblKey[wcKey]);
 | |
|   fColumnKeys.push_back(wcKey);
 | |
|   fColumns.push_back(wc);
 | |
| }
 | |
| 
 | |
| void ExpressionStep::populateColumnInfo(AggregateColumn* ac, JobInfo& jobInfo)
 | |
| {
 | |
|   // As of bug3695, make sure varbinary is not used in function expression.
 | |
|   if ((ac->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
 | |
|        ac->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
 | |
|       !fVarBinOK)
 | |
|     throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
 | |
| 
 | |
|   // This is for aggregate function in IN/EXISTS sub-query.
 | |
|   TupleInfo ti(setExpTupleInfo(ac->resultType(), ac->expressionId(), ac->alias(), jobInfo));
 | |
|   uint64_t acKey = ti.key;
 | |
|   string alias("");
 | |
|   string view("");
 | |
|   string schema("");
 | |
|   fTableOids.push_back(jobInfo.keyInfo->tupleKeyToTableOid[acKey]);
 | |
|   fAliases.push_back(alias);
 | |
|   fViews.push_back(view);
 | |
|   fPartitionss.push_back(execplan::Partitions());
 | |
|   fSchemas.push_back(schema);
 | |
|   fTableKeys.push_back(jobInfo.keyInfo->colKeyToTblKey[acKey]);
 | |
|   fColumnKeys.push_back(acKey);
 | |
|   fColumns.push_back(ac);
 | |
| }
 | |
| 
 | |
| void ExpressionStep::updateInputIndex(map<uint32_t, uint32_t>& indexMap, const JobInfo& jobInfo)
 | |
| {
 | |
|   // expression is handled as function join already
 | |
|   if (fDoJoin)
 | |
|     return;
 | |
| 
 | |
|   if (jobInfo.trace)
 | |
|     cout << "Input indices of Expression:" << (int64_t)fExpressionId << endl;
 | |
| 
 | |
|   for (vector<ReturnedColumn*>::iterator it = fColumns.begin(); it != fColumns.end(); ++it)
 | |
|   {
 | |
|     SimpleColumn* sc = dynamic_cast<SimpleColumn*>(*it);
 | |
| 
 | |
|     if (sc != NULL)
 | |
|     {
 | |
|       CalpontSystemCatalog::OID oid = sc->oid();
 | |
|       CalpontSystemCatalog::OID dictOid = 0;
 | |
|       CalpontSystemCatalog::ColType ct;
 | |
|       uint32_t key = fColumnKeys[std::distance(fColumns.begin(), it)];
 | |
| 
 | |
|       if (sc->schemaName().empty())
 | |
|       {
 | |
|         ct = sc->resultType();
 | |
|       }
 | |
|       else if (sc->isColumnStore() == false)
 | |
|       {
 | |
|         ct = sc->colType();
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         ct = sc->colType();
 | |
| 
 | |
|         // XXX use this before connector sets colType in sc correctly.
 | |
|         //    type of pseudo column is set by connector
 | |
|         if (dynamic_cast<PseudoColumn*>(sc) == NULL)
 | |
|         {
 | |
|           ct = jobInfo.csc->colType(oid);
 | |
|           ct.charsetNumber = sc->colType().charsetNumber;
 | |
|         }
 | |
| 
 | |
|         // X
 | |
|         dictOid = joblist::isDictCol(ct);
 | |
| 
 | |
|         if (dictOid > 0)
 | |
|           key = jobInfo.keyInfo->dictKeyMap[key];
 | |
|       }
 | |
| 
 | |
|       sc->inputIndex(indexMap[key]);
 | |
| 
 | |
|       if (jobInfo.trace)
 | |
|         cout << "OID/key:" << (dictOid ? dictOid : oid) << "/" << key << "(" << sc->tableAlias() << "):";
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       (*it)->inputIndex(indexMap[getExpTupleKey(jobInfo, (*it)->expressionId())]);
 | |
| 
 | |
|       if (jobInfo.trace)
 | |
|         cout << "EID:" << (*it)->expressionId();
 | |
|     }
 | |
| 
 | |
|     if (jobInfo.trace)
 | |
|       cout << (*it)->inputIndex() << endl;
 | |
|   }
 | |
| 
 | |
|   // if substutes exist, update the original column
 | |
|   for (map<SimpleColumn*, ReturnedColumn*>::iterator k = fSubMap.begin(); k != fSubMap.end(); k++)
 | |
|     k->second->inputIndex(k->first->inputIndex());
 | |
| }
 | |
| 
 | |
| void ExpressionStep::updateOutputIndex(map<uint32_t, uint32_t>& indexMap, const JobInfo& jobInfo)
 | |
| {
 | |
|   fExpression->outputIndex(indexMap[getExpTupleKey(jobInfo, fExpressionId)]);
 | |
| 
 | |
|   if (jobInfo.trace)
 | |
|   {
 | |
|     cout << "output index of Expression:" << (int64_t)fExpressionId << ":" << fExpression->outputIndex()
 | |
|          << endl
 | |
|          << endl;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void ExpressionStep::updateColumnOidAlias(JobInfo& jobInfo)
 | |
| {
 | |
|   for (size_t i = 0; i < fColumns.size(); i++)
 | |
|   {
 | |
|     SimpleColumn* sc = dynamic_cast<SimpleColumn*>(fColumns[i]);
 | |
| 
 | |
|     // virtual table columns
 | |
|     if (sc != NULL && sc->schemaName().empty())
 | |
|     {
 | |
|       fTableOids[i] = joblist::tableOid(sc, jobInfo.csc);
 | |
|       fAliases[i] = extractTableAlias(sc);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void ExpressionStep::substitute(uint64_t i, const SSC& ssc)
 | |
| {
 | |
|   fVsc.insert(ssc);  // save a local copy in case the virtual table in subquery be out of scope.
 | |
|   fSubMap[ssc.get()] = fColumns[i];
 | |
|   fColumns[i] = ssc.get();
 | |
| }
 | |
| 
 | |
| void ExpressionStep::functionJoinCheck(SimpleFilter* sf, JobInfo& jobInfo)
 | |
| {
 | |
|   if ((sf->lhs()->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
 | |
|        sf->lhs()->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
 | |
|       !fVarBinOK)
 | |
|     throw runtime_error("VARBINARY/BLOB in join is not supported.");
 | |
| 
 | |
|   // only handle one & only one table at each side, and not the same table
 | |
|   fFunctionJoinInfo.reset(new FunctionJoinInfo);
 | |
| 
 | |
|   if ((parseFuncJoinColumn(sf->lhs(), jobInfo) == false) ||
 | |
|       (parseFuncJoinColumn(sf->rhs(), jobInfo) == false) ||
 | |
|       (fFunctionJoinInfo->fTableKey[0] == fFunctionJoinInfo->fTableKey[1]))
 | |
|   {
 | |
|     // not convertible
 | |
|     fFunctionJoinInfo.reset();
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if ((!compatibleColumnTypes(sf->lhs()->resultType(), sf->rhs()->resultType(), true)) &&
 | |
|       (fFunctionJoinInfo->fTableKey.size() == 2))
 | |
|   {
 | |
|     uint32_t t1 = fFunctionJoinInfo->fTableKey[0];
 | |
|     uint32_t t2 = fFunctionJoinInfo->fTableKey[1];
 | |
|     jobInfo.incompatibleJoinMap[t1] = t2;
 | |
|     jobInfo.incompatibleJoinMap[t2] = t1;
 | |
| 
 | |
|     // not convertible
 | |
|     fFunctionJoinInfo.reset();
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   SJSTEP sjstep;
 | |
|   ReturnedColumn* sfLhs = sf->lhs();
 | |
|   ReturnedColumn* sfRhs = sf->rhs();
 | |
| 
 | |
|   if (dynamic_cast<SimpleColumn*>(sfLhs) == NULL)
 | |
|   {
 | |
|     SRCP lhs(sfLhs->clone());
 | |
|     ExpressionStep* esLhs = new ExpressionStep(jobInfo);
 | |
|     esLhs->expression(lhs, jobInfo);
 | |
|     sjstep.reset(esLhs);
 | |
|   }
 | |
| 
 | |
|   fFunctionJoinInfo->fStep.push_back(sjstep);
 | |
|   sjstep.reset();
 | |
| 
 | |
|   if (dynamic_cast<SimpleColumn*>(sfRhs) == NULL)
 | |
|   {
 | |
|     SRCP rhs(sfRhs->clone());
 | |
|     ExpressionStep* esRhs = new ExpressionStep(jobInfo);
 | |
|     esRhs->expression(rhs, jobInfo);
 | |
|     sjstep.reset(esRhs);
 | |
|   }
 | |
| 
 | |
|   fFunctionJoinInfo->fStep.push_back(sjstep);
 | |
| 
 | |
|   JoinType jt = INNER;
 | |
| 
 | |
|   if (sfLhs->returnAll())
 | |
|     jt = LEFTOUTER;
 | |
|   else if (sfRhs->returnAll())
 | |
|     jt = RIGHTOUTER;
 | |
| 
 | |
|   uint64_t joinInfo = sfLhs->joinInfo() | sfRhs->joinInfo();
 | |
|   int64_t correlatedSide = 0;
 | |
|   ExpressionStep* ve = NULL;
 | |
| 
 | |
|   if (joinInfo != 0)
 | |
|   {
 | |
|     if (joinInfo & JOIN_SEMI)
 | |
|       jt |= SEMI;
 | |
| 
 | |
|     if (joinInfo & JOIN_ANTI)
 | |
|       jt |= ANTI;
 | |
| 
 | |
|     if (joinInfo & JOIN_SCALAR)
 | |
|       jt |= SCALAR;
 | |
| 
 | |
|     if (joinInfo & JOIN_NULL_MATCH)
 | |
|       jt |= MATCHNULLS;
 | |
| 
 | |
|     if (joinInfo & JOIN_CORRELATED)
 | |
|       jt |= CORRELATED;
 | |
| 
 | |
|     if (joinInfo & JOIN_OUTER_SELECT)
 | |
|       jt |= LARGEOUTER;
 | |
| 
 | |
|     if (sfLhs->joinInfo() & JOIN_CORRELATED)
 | |
|     {
 | |
|       correlatedSide = 1;
 | |
|       ve = dynamic_cast<ExpressionStep*>(fFunctionJoinInfo->fStep[1].get());
 | |
|     }
 | |
|     else if (sfRhs->joinInfo() & JOIN_CORRELATED)
 | |
|     {
 | |
|       correlatedSide = 2;
 | |
|       ve = dynamic_cast<ExpressionStep*>(fFunctionJoinInfo->fStep[0].get());
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (ve != NULL)
 | |
|     ve->virtualStep();
 | |
| 
 | |
|   fFunctionJoinInfo->fJoinType = jt;
 | |
|   fFunctionJoinInfo->fCorrelatedSide = correlatedSide;
 | |
|   fFunctionJoinInfo->fJoinId = ++jobInfo.joinNum;
 | |
| 
 | |
|   jobInfo.functionJoins.push_back(this);
 | |
| }
 | |
| 
 | |
| bool ExpressionStep::parseFuncJoinColumn(ReturnedColumn* rc, JobInfo& jobInfo)
 | |
| {
 | |
|   set<uint32_t> tids;  // tables used in the expression
 | |
|   set<uint32_t> cids;  // columns used in the expression
 | |
|   uint32_t key = -1;   // join key
 | |
|   uint32_t tid = -1;   // table Id of the simple column
 | |
|   bool isSc = false;   // rc is a simple column
 | |
| 
 | |
|   ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(rc);
 | |
|   FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
 | |
|   SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
 | |
| 
 | |
|   if (sc != NULL)
 | |
|   {
 | |
|     isSc = true;
 | |
|     key = getTupleKey(jobInfo, sc);
 | |
|     tid = getTableKey(jobInfo, key);
 | |
| 
 | |
|     if (jobInfo.keyInfo->dictKeyMap.find(key) != jobInfo.keyInfo->dictKeyMap.end())
 | |
|       key = jobInfo.keyInfo->dictKeyMap[key];
 | |
| 
 | |
|     tids.insert(tid);
 | |
|     cids.insert(key);
 | |
|   }
 | |
|   else if (ac != NULL || fc != NULL)
 | |
|   {
 | |
|     TupleInfo ti(setExpTupleInfo(rc, jobInfo));
 | |
|     key = ti.key;
 | |
| 
 | |
|     for (uint32_t i = 0; i < rc->simpleColumnList().size(); i++)
 | |
|     {
 | |
|       sc = rc->simpleColumnList()[i];
 | |
|       uint32_t cid = getTupleKey(jobInfo, sc);
 | |
|       tid = getTableKey(jobInfo, cid);
 | |
|       tids.insert(tid);
 | |
|       cids.insert(cid);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   int32_t tableOid = -1;
 | |
|   int32_t oid = -1;
 | |
|   string alias;
 | |
|   string view;
 | |
|   string schema;
 | |
|   execplan::Partitions part;
 | |
| 
 | |
|   if (sc && tids.size() == 1)
 | |
|   {
 | |
|     tableOid = joblist::tableOid(sc, jobInfo.csc);
 | |
|     oid = sc->oid();
 | |
|     alias = extractTableAlias(sc);
 | |
|     view = sc->viewName();
 | |
|     schema = sc->schemaName();
 | |
|     part = sc->partitions();
 | |
|   }
 | |
|   else if (dynamic_cast<AggregateColumn*>(rc) || dynamic_cast<WindowFunctionColumn*>(rc) ||
 | |
|            dynamic_cast<ArithmeticColumn*>(rc) || dynamic_cast<FunctionColumn*>(rc))
 | |
|   {
 | |
|     tableOid = execplan::CNX_VTABLE_ID;
 | |
|     oid = rc->expressionId();
 | |
|     alias = jobInfo.subAlias;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   if (isSc == false)
 | |
|     jobInfo.keyInfo->functionJoinKeys.insert(key);
 | |
| 
 | |
|   fFunctionJoinInfo->fExpression.push_back(rc);
 | |
|   fFunctionJoinInfo->fJoinKey.push_back(key);
 | |
|   fFunctionJoinInfo->fTableKey.push_back(tid);
 | |
|   fFunctionJoinInfo->fColumnKeys.push_back(cids);
 | |
|   fFunctionJoinInfo->fTableOid.push_back(tableOid);
 | |
|   fFunctionJoinInfo->fOid.push_back(oid);
 | |
|   fFunctionJoinInfo->fSequence.push_back(rc->sequence());
 | |
|   fFunctionJoinInfo->fAlias.push_back(alias);
 | |
|   fFunctionJoinInfo->fView.push_back(view);
 | |
|   fFunctionJoinInfo->fPartitionss.push_back(part);
 | |
|   fFunctionJoinInfo->fSchema.push_back(schema);
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| const string ExpressionStep::toString() const
 | |
| {
 | |
|   ostringstream oss;
 | |
|   oss << "ExpressionStep  ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
 | |
| 
 | |
|   oss << " in:";
 | |
| 
 | |
|   for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
 | |
|     oss << fInputJobStepAssociation.outAt(i);
 | |
| 
 | |
|   return oss.str();
 | |
| }
 | |
| 
 | |
| }  // namespace joblist
 |