mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
This patch introduces an internal aggregate operator SELECT_SOME that is automatically added to columns that are not in GROUP BY. It "computes" some plausible value of the column (actually, last one passed). Along the way it fixes incorrect handling of HAVING being transferred into WHERE, window function handling and a bit of other inconsistencies.
794 lines
22 KiB
C++
794 lines
22 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: expressionstep.cpp 9681 2013-07-11 22:58:05Z xlou $
|
|
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
#include <sstream>
|
|
#include <iomanip>
|
|
using namespace std;
|
|
|
|
#include <boost/shared_ptr.hpp>
|
|
|
|
using namespace boost;
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "loggingid.h"
|
|
#include "errorcodes.h"
|
|
using namespace logging;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
#include "aggregatecolumn.h"
|
|
#include "arithmeticcolumn.h"
|
|
#include "constantcolumn.h"
|
|
#include "functioncolumn.h"
|
|
#include "pseudocolumn.h"
|
|
#include "simplecolumn.h"
|
|
#include "windowfunctioncolumn.h"
|
|
#include "constantfilter.h"
|
|
#include "simplefilter.h"
|
|
using namespace execplan;
|
|
|
|
#include "jlf_common.h"
|
|
#include "jlf_tuplejoblist.h"
|
|
#include "rowgroup.h"
|
|
using namespace rowgroup;
|
|
|
|
#include "expressionstep.h"
|
|
|
|
|
|
namespace joblist
|
|
{
|
|
ExpressionStep::ExpressionStep()
|
|
: fExpressionFilter(NULL)
|
|
, fExpressionId(-1)
|
|
, fVarBinOK(false)
|
|
, fSelectFilter(false)
|
|
, fAssociatedJoinId(0)
|
|
, fDoJoin(false)
|
|
, fVirtual(false)
|
|
{
|
|
}
|
|
|
|
ExpressionStep::ExpressionStep(const JobInfo& jobInfo)
|
|
: JobStep(jobInfo)
|
|
, fExpressionFilter(NULL)
|
|
, fExpressionId(-1)
|
|
, fVarBinOK(false)
|
|
, fSelectFilter(false)
|
|
, fAssociatedJoinId(0)
|
|
, fDoJoin(false)
|
|
, fVirtual(false)
|
|
{
|
|
}
|
|
|
|
ExpressionStep::ExpressionStep(const ExpressionStep& rhs)
|
|
: JobStep(rhs)
|
|
, fExpression(rhs.expression())
|
|
, fExpressionFilter(NULL)
|
|
, fExpressionId(rhs.expressionId())
|
|
, fAliases(rhs.aliases())
|
|
, fViews(rhs.views())
|
|
, fSchemas(rhs.schemas())
|
|
, fTableKeys(rhs.tableKeys())
|
|
, fColumnKeys(rhs.columnKeys())
|
|
, fVarBinOK(rhs.fVarBinOK)
|
|
, fSelectFilter(rhs.fSelectFilter)
|
|
, fAssociatedJoinId(rhs.fAssociatedJoinId)
|
|
, fDoJoin(rhs.fDoJoin)
|
|
, fVirtual(rhs.fVirtual)
|
|
{
|
|
if (rhs.expressionFilter() != NULL)
|
|
fExpressionFilter = new ParseTree(*(rhs.expressionFilter()));
|
|
}
|
|
|
|
ExpressionStep::~ExpressionStep()
|
|
{
|
|
if (fExpressionFilter != NULL)
|
|
delete fExpressionFilter;
|
|
}
|
|
|
|
void ExpressionStep::run()
|
|
{
|
|
}
|
|
|
|
void ExpressionStep::join()
|
|
{
|
|
}
|
|
|
|
void ExpressionStep::expression(const SRCP exp, JobInfo& jobInfo)
|
|
{
|
|
fExpression = exp;
|
|
|
|
// set expression Id
|
|
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(fExpression.get());
|
|
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(fExpression.get());
|
|
fExpressionId = exp.get()->expressionId();
|
|
|
|
if (ac != NULL || fc != NULL)
|
|
addColumn(exp.get(), jobInfo);
|
|
}
|
|
|
|
void ExpressionStep::expressionFilter(const Filter* filter, JobInfo& jobInfo)
|
|
{
|
|
Filter* f = filter->clone();
|
|
fExpressionFilter = new ParseTree(f);
|
|
idbassert(fExpressionFilter != NULL);
|
|
|
|
if (fExpressionFilter == NULL)
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "ExpressionStep: Failed to create a new ParseTree";
|
|
cerr << boldStart << errmsg.str() << boldStop << endl;
|
|
throw runtime_error(errmsg.str());
|
|
}
|
|
|
|
addFilter(fExpressionFilter, jobInfo);
|
|
|
|
// populate the oid vectors
|
|
SimpleFilter* sf = dynamic_cast<SimpleFilter*>(f);
|
|
|
|
if (sf != NULL && sf->op()->data() == "=")
|
|
functionJoinCheck(sf, jobInfo);
|
|
}
|
|
|
|
void ExpressionStep::expressionFilter(const ParseTree* filter, JobInfo& jobInfo)
|
|
{
|
|
fExpressionFilter = new ParseTree();
|
|
idbassert(fExpressionFilter != NULL);
|
|
|
|
if (fExpressionFilter == NULL)
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "ExpressionStep: Failed to create a new ParseTree";
|
|
cerr << boldStart << errmsg.str() << boldStop << endl;
|
|
throw runtime_error(errmsg.str());
|
|
}
|
|
|
|
fExpressionFilter->copyTree(*filter);
|
|
|
|
addFilter(fExpressionFilter, jobInfo);
|
|
}
|
|
|
|
void ExpressionStep::addSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
|
{
|
|
addColumn(sf->lhs(), jobInfo);
|
|
addColumn(sf->rhs(), jobInfo);
|
|
}
|
|
|
|
void ExpressionStep::addFilter(ParseTree* filter, JobInfo& jobInfo)
|
|
{
|
|
stack<ParseTree*> filterStack;
|
|
|
|
while (filter || !filterStack.empty())
|
|
{
|
|
if (filter != NULL)
|
|
{
|
|
filterStack.push(filter);
|
|
filter = filter->left();
|
|
}
|
|
else if (!filterStack.empty())
|
|
{
|
|
filter = filterStack.top();
|
|
filterStack.pop();
|
|
|
|
TreeNode* tn = filter->data();
|
|
filter = filter->right();
|
|
|
|
ReturnedColumn* rc = dynamic_cast<ReturnedColumn*>(tn);
|
|
SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);
|
|
ConstantFilter* cf = dynamic_cast<ConstantFilter*>(tn);
|
|
Operator* op = dynamic_cast<Operator*>(tn);
|
|
|
|
if (rc != NULL)
|
|
{
|
|
addColumn(rc, jobInfo);
|
|
}
|
|
else if (sf != NULL)
|
|
{
|
|
addSimpleFilter(sf, jobInfo);
|
|
}
|
|
else if (cf != NULL)
|
|
{
|
|
const ConstantFilter::FilterList& fs = cf->filterList();
|
|
|
|
for (ConstantFilter::FilterList::const_iterator i = fs.begin(); i != fs.end(); i++)
|
|
{
|
|
SimpleFilter* f = dynamic_cast<SimpleFilter*>(i->get());
|
|
|
|
if (f != NULL)
|
|
addSimpleFilter(f, jobInfo);
|
|
else
|
|
throw logic_error("unknow filter type in constant filter.");
|
|
}
|
|
}
|
|
else if (op == NULL)
|
|
{
|
|
throw logic_error("tree node not handled in Expression step.");
|
|
}
|
|
}
|
|
}
|
|
|
|
#if 0 // have to workaround correlation on exp, the following approach does not work
|
|
// extract simple columns from parse tree
|
|
vector<SimpleColumn*> scv;
|
|
filter->walk(getSimpleCols, &scv);
|
|
|
|
// populate the oid vectors
|
|
for (vector<SimpleColumn*>::iterator it = scv.begin(); it != scv.end(); it++)
|
|
addColumn(*it, jobInfo);
|
|
|
|
// aggregate columns
|
|
vector<AggregateColumn*> acv;
|
|
filter->walk(getAggCols, &acv);
|
|
|
|
for (vector<AggregateColumn*>::iterator it = acv.begin(); it != acv.end(); it++)
|
|
addColumn(*it, jobInfo);
|
|
|
|
// window function columns
|
|
vector<WindowFunctionColumn*> wcv;
|
|
filter->walk(getWindowFunctionCols, &wcv);
|
|
|
|
for (vector<WindowFunctionColumn*>::iterator it = wcv.begin(); it != wcv.end(); it++)
|
|
addColumn(*it, jobInfo);
|
|
|
|
#endif
|
|
|
|
return;
|
|
}
|
|
|
|
void ExpressionStep::addColumn(ReturnedColumn* rc, JobInfo& jobInfo)
|
|
{
|
|
const vector<SimpleColumn*>* scs = NULL;
|
|
const vector<WindowFunctionColumn*>* wcs = NULL;
|
|
ArithmeticColumn* ac = NULL;
|
|
FunctionColumn* fc = NULL;
|
|
SimpleColumn* sc = NULL;
|
|
WindowFunctionColumn* wc = NULL;
|
|
|
|
// workaround for exp(sc) in (sub) where correlation is set on exp, but not sc.
|
|
// populate it to sc for correct scope resolution
|
|
uint64_t correlated = rc->joinInfo();
|
|
|
|
if (NULL != (ac = dynamic_cast<ArithmeticColumn*>(rc)))
|
|
{
|
|
scs = &(ac->simpleColumnList());
|
|
wcs = &(ac->windowfunctionColumnList());
|
|
}
|
|
else if (NULL != (fc = dynamic_cast<FunctionColumn*>(rc)))
|
|
{
|
|
scs = &(fc->simpleColumnList());
|
|
wcs = &(fc->windowfunctionColumnList());
|
|
fVarBinOK = ((strcmp(fc->functionName().c_str(), "hex") == 0) ||
|
|
(strcmp(fc->functionName().c_str(), "octet_length") == 0) ||
|
|
(strcmp(fc->functionName().c_str(), "length") == 0));
|
|
}
|
|
|
|
if (scs != NULL || wcs != NULL)
|
|
{
|
|
if (scs != NULL)
|
|
{
|
|
vector<SimpleColumn*>::const_iterator cit = scs->begin();
|
|
vector<SimpleColumn*>::const_iterator end = scs->end();
|
|
|
|
while (cit != end)
|
|
{
|
|
SimpleColumn* sc = *cit;
|
|
sc->joinInfo(sc->joinInfo() | correlated);
|
|
populateColumnInfo(*cit, jobInfo);
|
|
++cit;
|
|
}
|
|
}
|
|
|
|
if (wcs != NULL)
|
|
{
|
|
vector<WindowFunctionColumn*>::const_iterator cit = wcs->begin();
|
|
vector<WindowFunctionColumn*>::const_iterator end = wcs->end();
|
|
|
|
while (cit != end)
|
|
{
|
|
populateColumnInfo(*cit, jobInfo);
|
|
++cit;
|
|
}
|
|
}
|
|
}
|
|
else if (NULL != (sc = dynamic_cast<SimpleColumn*>(rc)))
|
|
{
|
|
populateColumnInfo(sc, jobInfo);
|
|
}
|
|
else if (NULL != (wc = dynamic_cast<WindowFunctionColumn*>(rc)))
|
|
{
|
|
populateColumnInfo(rc, jobInfo);
|
|
}
|
|
else if (NULL != (dynamic_cast<AggregateColumn*>(rc)))
|
|
{
|
|
populateColumnInfo(rc, jobInfo);
|
|
}
|
|
else
|
|
{
|
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(rc);
|
|
|
|
// idbassert(cc != NULL)
|
|
if (cc == NULL)
|
|
{
|
|
std::ostringstream errmsg;
|
|
errmsg << "ExpressionStep: " << typeid(*rc).name() << " in expression.";
|
|
cerr << boldStart << errmsg.str() << boldStop << endl;
|
|
throw logic_error(errmsg.str());
|
|
}
|
|
}
|
|
}
|
|
|
|
void ExpressionStep::populateColumnInfo(ReturnedColumn* rc, JobInfo& jobInfo)
|
|
{
|
|
// As of bug3695, make sure varbinary is not used in function expression.
|
|
if ((rc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
|
|
rc->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
|
|
!fVarBinOK)
|
|
throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
|
|
|
|
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
|
|
WindowFunctionColumn* wc = NULL;
|
|
AggregateColumn* ac = NULL;
|
|
|
|
if (NULL != sc)
|
|
return populateColumnInfo(sc, jobInfo);
|
|
else if (NULL != (wc = dynamic_cast<WindowFunctionColumn*>(rc)))
|
|
return populateColumnInfo(wc, jobInfo);
|
|
else if (NULL != (ac = dynamic_cast<AggregateColumn*>(rc)))
|
|
return populateColumnInfo(ac, jobInfo);
|
|
else // for now only allow simple and windowfunction column, more work to do.
|
|
throw runtime_error("Error in parsing expression.");
|
|
}
|
|
|
|
void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo)
|
|
{
|
|
// As of bug3695, make sure varbinary is not used in function expression.
|
|
if ((sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
|
|
sc->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
|
|
!fVarBinOK)
|
|
throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
|
|
|
|
CalpontSystemCatalog::OID tblOid = joblist::tableOid(sc, jobInfo.csc);
|
|
string alias = extractTableAlias(sc);
|
|
string view = sc->viewName();
|
|
string schema = sc->schemaName();
|
|
fTableOids.push_back(tblOid);
|
|
CalpontSystemCatalog::ColType ct;
|
|
|
|
if (schema.empty())
|
|
{
|
|
sc->oid(tblOid + 1 + sc->colPosition());
|
|
ct = sc->resultType();
|
|
}
|
|
else if (sc->isColumnStore() == false)
|
|
{
|
|
ct = sc->colType();
|
|
}
|
|
else
|
|
{
|
|
ct = sc->colType();
|
|
|
|
// XXX use this before connector sets colType in sc correctly.
|
|
// type of pseudo column is set by connector
|
|
if (dynamic_cast<PseudoColumn*>(sc) == NULL)
|
|
{
|
|
ct = jobInfo.csc->colType(sc->oid());
|
|
ct.charsetNumber = sc->colType().charsetNumber;
|
|
}
|
|
// X
|
|
if (ct.scale == 0) // keep passed original ct for decimal type
|
|
sc->resultType(ct); // update from mysql type to calpont type
|
|
}
|
|
|
|
fAliases.push_back(alias);
|
|
fViews.push_back(view);
|
|
fSchemas.push_back(schema);
|
|
fTableKeys.push_back(makeTableKey(jobInfo, sc));
|
|
fColumns.push_back(sc);
|
|
|
|
TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tblOid, sc, alias));
|
|
fColumnKeys.push_back(ti.key);
|
|
|
|
// @bug 2990, MySQL timestamp/time/date/datetime type is different from IDB type
|
|
if (ti.dtype == CalpontSystemCatalog::DATE || ti.dtype == CalpontSystemCatalog::DATETIME ||
|
|
ti.dtype == CalpontSystemCatalog::TIME || ti.dtype == CalpontSystemCatalog::TIMESTAMP)
|
|
{
|
|
if (ti.dtype != ct.colDataType)
|
|
{
|
|
ct.colWidth = ti.width;
|
|
ct.colDataType = ti.dtype;
|
|
ct.scale = ti.scale;
|
|
ct.precision = ti.precision;
|
|
sc->resultType(ct);
|
|
}
|
|
}
|
|
|
|
CalpontSystemCatalog::OID dictOid = joblist::isDictCol(ct);
|
|
|
|
if (dictOid > 0)
|
|
{
|
|
uint32_t tupleKey = ti.key;
|
|
jobInfo.tokenOnly[tupleKey] = false;
|
|
jobInfo.keyInfo->dictOidToColOid[dictOid] = sc->oid();
|
|
ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias);
|
|
jobInfo.keyInfo->dictKeyMap[tupleKey] = ti.key;
|
|
}
|
|
}
|
|
|
|
void ExpressionStep::populateColumnInfo(WindowFunctionColumn* wc, JobInfo& jobInfo)
|
|
{
|
|
// As of bug3695, make sure varbinary is not used in function expression.
|
|
if ((wc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
|
|
wc->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
|
|
!fVarBinOK)
|
|
throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
|
|
|
|
// This is for window function in IN/EXISTS sub-query.
|
|
// In 4.0 implementation, the window function is cloned to where clause in a simple filter.
|
|
// This can be identified because SQL syntax does NOT allow window function in where clause.
|
|
// Workaround here until a better way found.
|
|
TupleInfo ti(setExpTupleInfo(wc->resultType(), wc->expressionId(), wc->alias(), jobInfo));
|
|
uint64_t wcKey = ti.key;
|
|
string alias("");
|
|
string view("");
|
|
string schema("");
|
|
fTableOids.push_back(jobInfo.keyInfo->tupleKeyToTableOid[wcKey]);
|
|
fAliases.push_back(alias);
|
|
fViews.push_back(view);
|
|
fSchemas.push_back(schema);
|
|
fTableKeys.push_back(jobInfo.keyInfo->colKeyToTblKey[wcKey]);
|
|
fColumnKeys.push_back(wcKey);
|
|
fColumns.push_back(wc);
|
|
}
|
|
|
|
void ExpressionStep::populateColumnInfo(AggregateColumn* ac, JobInfo& jobInfo)
|
|
{
|
|
// As of bug3695, make sure varbinary is not used in function expression.
|
|
if ((ac->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
|
|
ac->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
|
|
!fVarBinOK)
|
|
throw runtime_error("VARBINARY/BLOB in filter or function is not supported.");
|
|
|
|
// This is for aggregate function in IN/EXISTS sub-query.
|
|
TupleInfo ti(setExpTupleInfo(ac->resultType(), ac->expressionId(), ac->alias(), jobInfo));
|
|
uint64_t acKey = ti.key;
|
|
string alias("");
|
|
string view("");
|
|
string schema("");
|
|
fTableOids.push_back(jobInfo.keyInfo->tupleKeyToTableOid[acKey]);
|
|
fAliases.push_back(alias);
|
|
fViews.push_back(view);
|
|
fSchemas.push_back(schema);
|
|
fTableKeys.push_back(jobInfo.keyInfo->colKeyToTblKey[acKey]);
|
|
fColumnKeys.push_back(acKey);
|
|
fColumns.push_back(ac);
|
|
}
|
|
|
|
void ExpressionStep::updateInputIndex(map<uint32_t, uint32_t>& indexMap, const JobInfo& jobInfo)
|
|
{
|
|
// expression is handled as function join already
|
|
if (fDoJoin)
|
|
return;
|
|
|
|
if (jobInfo.trace)
|
|
cout << "Input indices of Expression:" << (int64_t)fExpressionId << endl;
|
|
|
|
for (vector<ReturnedColumn*>::iterator it = fColumns.begin(); it != fColumns.end(); ++it)
|
|
{
|
|
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(*it);
|
|
|
|
if (sc != NULL)
|
|
{
|
|
CalpontSystemCatalog::OID oid = sc->oid();
|
|
CalpontSystemCatalog::OID dictOid = 0;
|
|
CalpontSystemCatalog::ColType ct;
|
|
uint32_t key = fColumnKeys[std::distance(fColumns.begin(), it)];
|
|
|
|
if (sc->schemaName().empty())
|
|
{
|
|
ct = sc->resultType();
|
|
}
|
|
else if (sc->isColumnStore() == false)
|
|
{
|
|
ct = sc->colType();
|
|
}
|
|
else
|
|
{
|
|
ct = sc->colType();
|
|
|
|
// XXX use this before connector sets colType in sc correctly.
|
|
// type of pseudo column is set by connector
|
|
if (dynamic_cast<PseudoColumn*>(sc) == NULL)
|
|
{
|
|
ct = jobInfo.csc->colType(oid);
|
|
ct.charsetNumber = sc->colType().charsetNumber;
|
|
}
|
|
|
|
// X
|
|
dictOid = joblist::isDictCol(ct);
|
|
|
|
if (dictOid > 0)
|
|
key = jobInfo.keyInfo->dictKeyMap[key];
|
|
}
|
|
|
|
sc->inputIndex(indexMap[key]);
|
|
|
|
if (jobInfo.trace)
|
|
cout << "OID/key:" << (dictOid ? dictOid : oid) << "/" << key << "(" << sc->tableAlias() << "):";
|
|
}
|
|
else
|
|
{
|
|
(*it)->inputIndex(indexMap[getExpTupleKey(jobInfo, (*it)->expressionId())]);
|
|
|
|
if (jobInfo.trace)
|
|
cout << "EID:" << (*it)->expressionId();
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
cout << (*it)->inputIndex() << endl;
|
|
}
|
|
|
|
// if substutes exist, update the original column
|
|
for (map<SimpleColumn*, ReturnedColumn*>::iterator k = fSubMap.begin(); k != fSubMap.end(); k++)
|
|
k->second->inputIndex(k->first->inputIndex());
|
|
}
|
|
|
|
void ExpressionStep::updateOutputIndex(map<uint32_t, uint32_t>& indexMap, const JobInfo& jobInfo)
|
|
{
|
|
fExpression->outputIndex(indexMap[getExpTupleKey(jobInfo, fExpressionId)]);
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << "output index of Expression:" << (int64_t)fExpressionId << ":" << fExpression->outputIndex()
|
|
<< endl
|
|
<< endl;
|
|
}
|
|
}
|
|
|
|
void ExpressionStep::updateColumnOidAlias(JobInfo& jobInfo)
|
|
{
|
|
for (size_t i = 0; i < fColumns.size(); i++)
|
|
{
|
|
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(fColumns[i]);
|
|
|
|
// virtual table columns
|
|
if (sc != NULL && sc->schemaName().empty())
|
|
{
|
|
fTableOids[i] = joblist::tableOid(sc, jobInfo.csc);
|
|
fAliases[i] = extractTableAlias(sc);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ExpressionStep::substitute(uint64_t i, const SSC& ssc)
|
|
{
|
|
fVsc.insert(ssc); // save a local copy in case the virtual table in subquery be out of scope.
|
|
fSubMap[ssc.get()] = fColumns[i];
|
|
fColumns[i] = ssc.get();
|
|
}
|
|
|
|
void ExpressionStep::functionJoinCheck(SimpleFilter* sf, JobInfo& jobInfo)
|
|
{
|
|
if ((sf->lhs()->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
|
|
sf->lhs()->resultType().colDataType == CalpontSystemCatalog::BLOB) &&
|
|
!fVarBinOK)
|
|
throw runtime_error("VARBINARY/BLOB in join is not supported.");
|
|
|
|
// only handle one & only one table at each side, and not the same table
|
|
fFunctionJoinInfo.reset(new FunctionJoinInfo);
|
|
|
|
if ((parseFuncJoinColumn(sf->lhs(), jobInfo) == false) ||
|
|
(parseFuncJoinColumn(sf->rhs(), jobInfo) == false) ||
|
|
(fFunctionJoinInfo->fTableKey[0] == fFunctionJoinInfo->fTableKey[1]))
|
|
{
|
|
// not convertible
|
|
fFunctionJoinInfo.reset();
|
|
return;
|
|
}
|
|
|
|
if ((!compatibleColumnTypes(sf->lhs()->resultType(), sf->rhs()->resultType(), true)) &&
|
|
(fFunctionJoinInfo->fTableKey.size() == 2))
|
|
{
|
|
uint32_t t1 = fFunctionJoinInfo->fTableKey[0];
|
|
uint32_t t2 = fFunctionJoinInfo->fTableKey[1];
|
|
jobInfo.incompatibleJoinMap[t1] = t2;
|
|
jobInfo.incompatibleJoinMap[t2] = t1;
|
|
|
|
// not convertible
|
|
fFunctionJoinInfo.reset();
|
|
return;
|
|
}
|
|
|
|
SJSTEP sjstep;
|
|
ReturnedColumn* sfLhs = sf->lhs();
|
|
ReturnedColumn* sfRhs = sf->rhs();
|
|
|
|
if (dynamic_cast<SimpleColumn*>(sfLhs) == NULL)
|
|
{
|
|
SRCP lhs(sfLhs->clone());
|
|
ExpressionStep* esLhs = new ExpressionStep(jobInfo);
|
|
esLhs->expression(lhs, jobInfo);
|
|
sjstep.reset(esLhs);
|
|
}
|
|
|
|
fFunctionJoinInfo->fStep.push_back(sjstep);
|
|
sjstep.reset();
|
|
|
|
if (dynamic_cast<SimpleColumn*>(sfRhs) == NULL)
|
|
{
|
|
SRCP rhs(sfRhs->clone());
|
|
ExpressionStep* esRhs = new ExpressionStep(jobInfo);
|
|
esRhs->expression(rhs, jobInfo);
|
|
sjstep.reset(esRhs);
|
|
}
|
|
|
|
fFunctionJoinInfo->fStep.push_back(sjstep);
|
|
|
|
JoinType jt = INNER;
|
|
|
|
if (sfLhs->returnAll())
|
|
jt = LEFTOUTER;
|
|
else if (sfRhs->returnAll())
|
|
jt = RIGHTOUTER;
|
|
|
|
uint64_t joinInfo = sfLhs->joinInfo() | sfRhs->joinInfo();
|
|
int64_t correlatedSide = 0;
|
|
ExpressionStep* ve = NULL;
|
|
|
|
if (joinInfo != 0)
|
|
{
|
|
if (joinInfo & JOIN_SEMI)
|
|
jt |= SEMI;
|
|
|
|
if (joinInfo & JOIN_ANTI)
|
|
jt |= ANTI;
|
|
|
|
if (joinInfo & JOIN_SCALAR)
|
|
jt |= SCALAR;
|
|
|
|
if (joinInfo & JOIN_NULL_MATCH)
|
|
jt |= MATCHNULLS;
|
|
|
|
if (joinInfo & JOIN_CORRELATED)
|
|
jt |= CORRELATED;
|
|
|
|
if (joinInfo & JOIN_OUTER_SELECT)
|
|
jt |= LARGEOUTER;
|
|
|
|
if (sfLhs->joinInfo() & JOIN_CORRELATED)
|
|
{
|
|
correlatedSide = 1;
|
|
ve = dynamic_cast<ExpressionStep*>(fFunctionJoinInfo->fStep[1].get());
|
|
}
|
|
else if (sfRhs->joinInfo() & JOIN_CORRELATED)
|
|
{
|
|
correlatedSide = 2;
|
|
ve = dynamic_cast<ExpressionStep*>(fFunctionJoinInfo->fStep[0].get());
|
|
}
|
|
}
|
|
|
|
if (ve != NULL)
|
|
ve->virtualStep();
|
|
|
|
fFunctionJoinInfo->fJoinType = jt;
|
|
fFunctionJoinInfo->fCorrelatedSide = correlatedSide;
|
|
fFunctionJoinInfo->fJoinId = ++jobInfo.joinNum;
|
|
|
|
jobInfo.functionJoins.push_back(this);
|
|
}
|
|
|
|
bool ExpressionStep::parseFuncJoinColumn(ReturnedColumn* rc, JobInfo& jobInfo)
|
|
{
|
|
set<uint32_t> tids; // tables used in the expression
|
|
set<uint32_t> cids; // columns used in the expression
|
|
uint32_t key = -1; // join key
|
|
uint32_t tid = -1; // table Id of the simple column
|
|
bool isSc = false; // rc is a simple column
|
|
|
|
ArithmeticColumn* ac = dynamic_cast<ArithmeticColumn*>(rc);
|
|
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
|
|
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
|
|
|
|
if (sc != NULL)
|
|
{
|
|
isSc = true;
|
|
key = getTupleKey(jobInfo, sc);
|
|
tid = getTableKey(jobInfo, key);
|
|
|
|
if (jobInfo.keyInfo->dictKeyMap.find(key) != jobInfo.keyInfo->dictKeyMap.end())
|
|
key = jobInfo.keyInfo->dictKeyMap[key];
|
|
|
|
tids.insert(tid);
|
|
cids.insert(key);
|
|
}
|
|
else if (ac != NULL || fc != NULL)
|
|
{
|
|
TupleInfo ti(setExpTupleInfo(rc, jobInfo));
|
|
key = ti.key;
|
|
|
|
for (uint32_t i = 0; i < rc->simpleColumnList().size(); i++)
|
|
{
|
|
sc = rc->simpleColumnList()[i];
|
|
uint32_t cid = getTupleKey(jobInfo, sc);
|
|
tid = getTableKey(jobInfo, cid);
|
|
tids.insert(tid);
|
|
cids.insert(cid);
|
|
}
|
|
}
|
|
|
|
int32_t tableOid = -1;
|
|
int32_t oid = -1;
|
|
string alias;
|
|
string view;
|
|
string schema;
|
|
|
|
if (sc && tids.size() == 1)
|
|
{
|
|
tableOid = joblist::tableOid(sc, jobInfo.csc);
|
|
oid = sc->oid();
|
|
alias = extractTableAlias(sc);
|
|
view = sc->viewName();
|
|
schema = sc->schemaName();
|
|
}
|
|
else if (dynamic_cast<AggregateColumn*>(rc) || dynamic_cast<WindowFunctionColumn*>(rc) ||
|
|
dynamic_cast<ArithmeticColumn*>(rc) || dynamic_cast<FunctionColumn*>(rc))
|
|
{
|
|
tableOid = execplan::CNX_VTABLE_ID;
|
|
oid = rc->expressionId();
|
|
alias = jobInfo.subAlias;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (isSc == false)
|
|
jobInfo.keyInfo->functionJoinKeys.insert(key);
|
|
|
|
fFunctionJoinInfo->fExpression.push_back(rc);
|
|
fFunctionJoinInfo->fJoinKey.push_back(key);
|
|
fFunctionJoinInfo->fTableKey.push_back(tid);
|
|
fFunctionJoinInfo->fColumnKeys.push_back(cids);
|
|
fFunctionJoinInfo->fTableOid.push_back(tableOid);
|
|
fFunctionJoinInfo->fOid.push_back(oid);
|
|
fFunctionJoinInfo->fSequence.push_back(rc->sequence());
|
|
fFunctionJoinInfo->fAlias.push_back(alias);
|
|
fFunctionJoinInfo->fView.push_back(view);
|
|
fFunctionJoinInfo->fSchema.push_back(schema);
|
|
|
|
return true;
|
|
}
|
|
|
|
const string ExpressionStep::toString() const
|
|
{
|
|
ostringstream oss;
|
|
oss << "ExpressionStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;
|
|
|
|
oss << " in:";
|
|
|
|
for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
|
|
oss << fInputJobStepAssociation.outAt(i);
|
|
|
|
return oss.str();
|
|
}
|
|
|
|
} // namespace joblist
|