You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
When a UNION operation involving DECIMAL datatypes with scale and digits before the decimal exceeds the currently supported maximum precision of 38, we throw an error to the user: "MCS-2060: Union operation exceeds maximum DECIMAL precision of 38". This is until MCOL-5417 is implemented where ColumnStore will have full parity with MariaDB server in terms of maximum supported DECIMAL precision and scale of 65 and 38 digits respectively.
5177 lines
168 KiB
C++
5177 lines
168 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2019-2021 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: jlf_tuplejoblist.cpp 9728 2013-07-26 22:08:20Z xlou $
|
|
|
|
// Cross engine needs to be at the top due to MySQL includes
|
|
#define PREFER_MY_CONFIG_H
|
|
#include "crossenginestep.h"
|
|
#include <iostream>
|
|
#include <stack>
|
|
#include <iterator>
|
|
#include <algorithm>
|
|
//#define NDEBUG
|
|
//#include <cassert>
|
|
#include <vector>
|
|
#include <set>
|
|
#include <map>
|
|
#include <limits>
|
|
using namespace std;
|
|
|
|
#include <boost/scoped_ptr.hpp>
|
|
#include <boost/shared_ptr.hpp>
|
|
using namespace boost;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
#include "logicoperator.h"
|
|
using namespace execplan;
|
|
|
|
#include "rowgroup.h"
|
|
#include "rowaggregation.h"
|
|
using namespace rowgroup;
|
|
|
|
#include "idberrorinfo.h"
|
|
#include "errorids.h"
|
|
#include "exceptclasses.h"
|
|
using namespace logging;
|
|
|
|
#include "dataconvert.h"
|
|
using namespace dataconvert;
|
|
|
|
#include "elementtype.h"
|
|
#include "jlf_common.h"
|
|
#include "limitedorderby.h"
|
|
#include "jobstep.h"
|
|
#include "primitivestep.h"
|
|
#include "expressionstep.h"
|
|
#include "subquerystep.h"
|
|
#include "tupleaggregatestep.h"
|
|
#include "tupleannexstep.h"
|
|
#include "tupleconstantstep.h"
|
|
#include "tuplehashjoin.h"
|
|
#include "tuplehavingstep.h"
|
|
#include "tupleunion.h"
|
|
#include "windowfunctionstep.h"
|
|
#include "configcpp.h"
|
|
#include "jlf_tuplejoblist.h"
|
|
using namespace joblist;
|
|
|
|
#include "statistics.h"
|
|
|
|
#ifdef __clang__
|
|
#pragma clang diagnostic push
|
|
#pragma clang diagnostic ignored "-Wpotentially-evaluated-expression"
|
|
// for warnings on typeid :expression with side effects will be evaluated despite being used as an operand to
|
|
// 'typeid'
|
|
#endif
|
|
|
|
namespace
|
|
{
|
|
// construct a pcolstep from column key
|
|
void tupleKeyToProjectStep(uint32_t key, JobStepVector& jsv, JobInfo& jobInfo)
|
|
{
|
|
// this JSA is for pcolstep construct, is not taking input/output
|
|
// because the pcolstep is to be added into TBPS
|
|
CalpontSystemCatalog::OID oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
|
|
DictOidToColOidMap::iterator mit = jobInfo.keyInfo->dictOidToColOid.find(oid);
|
|
|
|
// if the key is for a dictionary, start with its token key
|
|
if (mit != jobInfo.keyInfo->dictOidToColOid.end())
|
|
{
|
|
oid = mit->second;
|
|
|
|
for (map<uint32_t, uint32_t>::iterator i = jobInfo.keyInfo->dictKeyMap.begin();
|
|
i != jobInfo.keyInfo->dictKeyMap.end(); i++)
|
|
{
|
|
if (key == i->second)
|
|
{
|
|
key = i->first;
|
|
break;
|
|
}
|
|
}
|
|
|
|
jobInfo.tokenOnly[key] = false;
|
|
}
|
|
|
|
CalpontSystemCatalog::OID tableOid = jobInfo.keyInfo->tupleKeyToTableOid[key];
|
|
// JobStepAssociation dummyJsa;
|
|
// AnyDataListSPtr adl(new AnyDataList());
|
|
// RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
// dl->OID(oid);
|
|
// adl->rowGroupDL(dl);
|
|
// dummyJsa.outAdd(adl);
|
|
|
|
CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key];
|
|
|
|
if (jobInfo.keyInfo->token2DictTypeMap.find(key) != jobInfo.keyInfo->token2DictTypeMap.end())
|
|
ct = jobInfo.keyInfo->token2DictTypeMap[key];
|
|
|
|
uint32_t pt = jobInfo.keyInfo->pseudoType[key];
|
|
|
|
SJSTEP sjs;
|
|
|
|
if (pt == 0)
|
|
sjs.reset(new pColStep(oid, tableOid, ct, jobInfo));
|
|
else
|
|
sjs.reset(new PseudoColStep(oid, tableOid, pt, ct, jobInfo));
|
|
|
|
sjs->alias(jobInfo.keyInfo->tupleKeyVec[key].fTable);
|
|
sjs->view(jobInfo.keyInfo->tupleKeyVec[key].fView);
|
|
sjs->schema(jobInfo.keyInfo->tupleKeyVec[key].fSchema);
|
|
sjs->name(jobInfo.keyInfo->keyName[key]);
|
|
sjs->tupleId(key);
|
|
|
|
jsv.push_back(sjs);
|
|
|
|
bool tokenOnly = false;
|
|
map<uint32_t, bool>::iterator toIt = jobInfo.tokenOnly.find(key);
|
|
|
|
if (toIt != jobInfo.tokenOnly.end())
|
|
tokenOnly = toIt->second;
|
|
|
|
if (sjs.get()->isDictCol() && !tokenOnly)
|
|
{
|
|
// Need a dictionary step
|
|
uint32_t dictKey = jobInfo.keyInfo->dictKeyMap[key];
|
|
CalpontSystemCatalog::OID dictOid = jobInfo.keyInfo->tupleKeyVec[dictKey].fId;
|
|
sjs.reset(new pDictionaryStep(dictOid, tableOid, ct, jobInfo));
|
|
sjs->alias(jobInfo.keyInfo->tupleKeyVec[dictKey].fTable);
|
|
sjs->view(jobInfo.keyInfo->tupleKeyVec[dictKey].fView);
|
|
sjs->schema(jobInfo.keyInfo->tupleKeyVec[dictKey].fSchema);
|
|
sjs->name(jobInfo.keyInfo->keyName[dictKey]);
|
|
sjs->tupleId(dictKey);
|
|
|
|
jobInfo.keyInfo->dictOidToColOid[dictOid] = oid;
|
|
|
|
jsv.push_back(sjs);
|
|
}
|
|
}
|
|
|
|
inline void addColumnToRG(uint32_t cid, vector<uint32_t>& pos, vector<uint32_t>& oids, vector<uint32_t>& keys,
|
|
vector<uint32_t>& scale, vector<uint32_t>& precision,
|
|
vector<CalpontSystemCatalog::ColDataType>& types, vector<uint32_t>& csNums,
|
|
JobInfo& jobInfo)
|
|
{
|
|
TupleInfo ti(getTupleInfo(cid, jobInfo));
|
|
pos.push_back(pos.back() + ti.width);
|
|
oids.push_back(ti.oid);
|
|
keys.push_back(ti.key);
|
|
types.push_back(ti.dtype);
|
|
csNums.push_back(ti.csNum);
|
|
scale.push_back(ti.scale);
|
|
precision.push_back(ti.precision);
|
|
}
|
|
|
|
inline void addColumnInExpToRG(uint32_t cid, vector<uint32_t>& pos, vector<uint32_t>& oids,
|
|
vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
|
|
vector<CalpontSystemCatalog::ColDataType>& types, vector<uint32_t>& csNums,
|
|
JobInfo& jobInfo)
|
|
{
|
|
if (jobInfo.keyInfo->dictKeyMap.find(cid) != jobInfo.keyInfo->dictKeyMap.end())
|
|
cid = jobInfo.keyInfo->dictKeyMap[cid];
|
|
|
|
if (find(keys.begin(), keys.end(), cid) == keys.end())
|
|
addColumnToRG(cid, pos, oids, keys, scale, precision, types, csNums, jobInfo);
|
|
}
|
|
|
|
inline void addColumnsToRG(uint32_t tid, vector<uint32_t>& pos, vector<uint32_t>& oids,
|
|
vector<uint32_t>& keys, vector<uint32_t>& scale, vector<uint32_t>& precision,
|
|
vector<CalpontSystemCatalog::ColDataType>& types, vector<uint32_t>& csNums,
|
|
TableInfoMap& tableInfoMap, JobInfo& jobInfo)
|
|
{
|
|
// -- the selected columns
|
|
vector<uint32_t>& pjCol = tableInfoMap[tid].fProjectCols;
|
|
|
|
for (unsigned i = 0; i < pjCol.size(); i++)
|
|
{
|
|
addColumnToRG(pjCol[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
|
|
}
|
|
|
|
// -- any columns will be used in cross-table exps
|
|
vector<uint32_t>& exp2 = tableInfoMap[tid].fColsInExp2;
|
|
|
|
for (unsigned i = 0; i < exp2.size(); i++)
|
|
{
|
|
addColumnInExpToRG(exp2[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
|
|
}
|
|
|
|
// -- any columns will be used in returned exps
|
|
vector<uint32_t>& expr = tableInfoMap[tid].fColsInRetExp;
|
|
|
|
for (unsigned i = 0; i < expr.size(); i++)
|
|
{
|
|
addColumnInExpToRG(expr[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
|
|
}
|
|
|
|
// -- any columns will be used in final outer join expression
|
|
vector<uint32_t>& expo = tableInfoMap[tid].fColsInOuter;
|
|
|
|
for (unsigned i = 0; i < expo.size(); i++)
|
|
{
|
|
addColumnInExpToRG(expo[i], pos, oids, keys, scale, precision, types, csNums, jobInfo);
|
|
}
|
|
}
|
|
|
|
void constructJoinedRowGroup(RowGroup& rg, uint32_t large, uint32_t prev, bool root, set<uint32_t>& tableSet,
|
|
TableInfoMap& tableInfoMap, JobInfo& jobInfo)
|
|
{
|
|
// Construct the output rowgroup for the join.
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
// -- start with the join keys
|
|
// lead by joinkeys -- to have more controls on joins
|
|
// [loop throuh the key list to support compound join]
|
|
if (root == false) // not root
|
|
{
|
|
vector<uint32_t>& joinKeys = jobInfo.tableJoinMap[make_pair(large, prev)].fLeftKeys;
|
|
|
|
for (vector<uint32_t>::iterator i = joinKeys.begin(); i != joinKeys.end(); i++)
|
|
addColumnToRG(*i, pos, oids, keys, scale, precision, types, csNums, jobInfo);
|
|
}
|
|
|
|
// -- followed by the columns in select or expression
|
|
for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
|
|
addColumnsToRG(*i, pos, oids, keys, scale, precision, types, csNums, tableInfoMap, jobInfo);
|
|
|
|
RowGroup tmpRg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
|
|
rg = tmpRg;
|
|
}
|
|
|
|
void constructJoinedRowGroup(RowGroup& rg, set<uint32_t>& tableSet, TableInfoMap& tableInfoMap,
|
|
JobInfo& jobInfo)
|
|
{
|
|
// Construct the output rowgroup for the join.
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
|
|
{
|
|
// columns in select or expression
|
|
addColumnsToRG(*i, pos, oids, keys, scale, precision, types, csNums, tableInfoMap, jobInfo);
|
|
|
|
// keys to be joined if not already in the rowgroup
|
|
vector<uint32_t>& adjList = tableInfoMap[*i].fAdjacentList;
|
|
|
|
for (vector<uint32_t>::iterator j = adjList.begin(); j != adjList.end(); j++)
|
|
{
|
|
if (find(tableSet.begin(), tableSet.end(), *j) == tableSet.end())
|
|
{
|
|
// not joined
|
|
vector<uint32_t>& joinKeys = jobInfo.tableJoinMap[make_pair(*i, *j)].fLeftKeys;
|
|
|
|
for (vector<uint32_t>::iterator k = joinKeys.begin(); k != joinKeys.end(); k++)
|
|
{
|
|
if (find(keys.begin(), keys.end(), *k) == keys.end())
|
|
addColumnToRG(*k, pos, oids, keys, scale, precision, types, csNums, jobInfo);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
RowGroup tmpRg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
|
|
rg = tmpRg;
|
|
}
|
|
|
|
void updateExp2Cols(JobStepVector& expSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
|
|
{
|
|
for (JobStepVector::iterator it = expSteps.begin(); it != expSteps.end(); it++)
|
|
{
|
|
ExpressionStep* exps = dynamic_cast<ExpressionStep*>(it->get());
|
|
const vector<uint32_t>& tables = exps->tableKeys();
|
|
const vector<uint32_t>& columns = exps->columnKeys();
|
|
|
|
for (uint64_t i = 0; i < tables.size(); ++i)
|
|
{
|
|
vector<uint32_t>& exp2 = tableInfoMap[tables[i]].fColsInExp2;
|
|
vector<uint32_t>::iterator cit = find(exp2.begin(), exp2.end(), columns[i]);
|
|
|
|
if (cit != exp2.end())
|
|
exp2.erase(cit);
|
|
}
|
|
}
|
|
}
|
|
|
|
void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps, JobInfo& jobInfo)
|
|
{
|
|
SJSTEP spjs = querySteps.back();
|
|
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(spjs.get());
|
|
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
|
|
SubAdapterStep* sas = dynamic_cast<SubAdapterStep*>(spjs.get());
|
|
|
|
if (!bps && !thjs && !sas)
|
|
throw runtime_error("Bad last step");
|
|
|
|
// original output rowgroup of the step
|
|
TupleJobStep* tjs = dynamic_cast<TupleJobStep*>(spjs.get());
|
|
const RowGroup* rg0 = &(tjs->getOutputRowGroup());
|
|
|
|
if (jobInfo.trace)
|
|
cout << "Output RowGroup 0: " << rg0->toString() << endl;
|
|
|
|
// Construct a rowgroup that matches the select columns
|
|
TupleInfoVector v = jobInfo.pjColList;
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
for (unsigned i = 0; i < v.size(); i++)
|
|
{
|
|
pos.push_back(pos.back() + v[i].width);
|
|
oids.push_back(v[i].oid);
|
|
keys.push_back(v[i].key);
|
|
types.push_back(v[i].dtype);
|
|
csNums.push_back(v[i].csNum);
|
|
scale.push_back(v[i].scale);
|
|
precision.push_back(v[i].precision);
|
|
}
|
|
|
|
RowGroup rg1(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
|
|
|
|
// evaluate the returned/groupby expressions if any
|
|
JobStepVector& expSteps = jobInfo.returnedExpressions;
|
|
|
|
if (expSteps.size() > 0)
|
|
{
|
|
// create a RG has the keys not in rg0
|
|
pos.clear();
|
|
oids.clear();
|
|
keys.clear();
|
|
scale.clear();
|
|
precision.clear();
|
|
types.clear();
|
|
csNums.clear();
|
|
pos.push_back(2);
|
|
|
|
const vector<uint32_t>& keys0 = rg0->getKeys();
|
|
|
|
for (unsigned i = 0; i < v.size(); i++)
|
|
{
|
|
if (find(keys0.begin(), keys0.end(), v[i].key) == keys0.end())
|
|
{
|
|
pos.push_back(pos.back() + v[i].width);
|
|
oids.push_back(v[i].oid);
|
|
keys.push_back(v[i].key);
|
|
types.push_back(v[i].dtype);
|
|
csNums.push_back(v[i].csNum);
|
|
scale.push_back(v[i].scale);
|
|
precision.push_back(v[i].precision);
|
|
}
|
|
}
|
|
|
|
// for v0.9.3.0, the output and input to the expression are in the same row
|
|
// add the returned column into the rg0 as rg01
|
|
RowGroup rg01 = *rg0 + RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
|
|
jobInfo.stringTableThreshold);
|
|
|
|
if (jobInfo.trace)
|
|
cout << "Output RowGroup 01: " << rg01.toString() << endl;
|
|
|
|
map<uint32_t, uint32_t> keyToIndexMap0; // maps key to the index in the input RG
|
|
|
|
for (uint64_t i = 0; i < rg01.getKeys().size(); ++i)
|
|
keyToIndexMap0.insert(make_pair(rg01.getKeys()[i], i));
|
|
|
|
vector<SRCP> exps; // columns to be evaluated
|
|
|
|
for (JobStepVector::iterator eit = expSteps.begin(); eit != expSteps.end(); ++eit)
|
|
{
|
|
ExpressionStep* es = dynamic_cast<ExpressionStep*>(eit->get());
|
|
es->updateInputIndex(keyToIndexMap0, jobInfo);
|
|
es->updateOutputIndex(keyToIndexMap0, jobInfo); // same row as input
|
|
exps.push_back(es->expression());
|
|
}
|
|
|
|
// last step can be tbps (no join) or thjs, either one can have a group 3 expression
|
|
if (bps || thjs)
|
|
{
|
|
tjs->setOutputRowGroup(rg01);
|
|
tjs->setFcnExpGroup3(exps);
|
|
tjs->setFE23Output(rg1);
|
|
}
|
|
else if (sas)
|
|
{
|
|
sas->setFeRowGroup(rg01);
|
|
sas->addExpression(exps);
|
|
sas->setOutputRowGroup(rg1);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (thjs && thjs->hasFcnExpGroup2())
|
|
thjs->setFE23Output(rg1);
|
|
else
|
|
tjs->setOutputRowGroup(rg1);
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
cout << "Output RowGroup 1: " << rg1.toString() << endl;
|
|
|
|
if (jobInfo.hasAggregation == false)
|
|
{
|
|
if (thjs != NULL) // setup a few things for the final thjs step...
|
|
thjs->outputAssociation(JobStepAssociation());
|
|
|
|
deliverySteps[CNX_VTABLE_ID] = spjs;
|
|
}
|
|
else
|
|
{
|
|
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());
|
|
idbassert(tds != NULL);
|
|
SJSTEP ads = TupleAggregateStep::prepAggregate(spjs, jobInfo);
|
|
querySteps.push_back(ads);
|
|
|
|
if (ads.get() != NULL)
|
|
deliverySteps[CNX_VTABLE_ID] = ads;
|
|
else
|
|
throw std::logic_error("Failed to prepare Aggregation Delivery Step.");
|
|
}
|
|
|
|
if (jobInfo.havingStep)
|
|
{
|
|
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
|
|
|
|
AnyDataListSPtr spdlIn(new AnyDataList());
|
|
RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize);
|
|
dlIn->OID(CNX_VTABLE_ID);
|
|
spdlIn->rowGroupDL(dlIn);
|
|
JobStepAssociation jsaIn;
|
|
jsaIn.outAdd(spdlIn);
|
|
dynamic_cast<JobStep*>(ds)->outputAssociation(jsaIn);
|
|
jobInfo.havingStep->inputAssociation(jsaIn);
|
|
|
|
AnyDataListSPtr spdlOut(new AnyDataList());
|
|
RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
|
|
dlOut->OID(CNX_VTABLE_ID);
|
|
spdlOut->rowGroupDL(dlOut);
|
|
JobStepAssociation jsaOut;
|
|
jsaOut.outAdd(spdlOut);
|
|
jobInfo.havingStep->outputAssociation(jsaOut);
|
|
|
|
querySteps.push_back(jobInfo.havingStep);
|
|
dynamic_cast<TupleHavingStep*>(jobInfo.havingStep.get())->initialize(ds->getDeliveredRowGroup(), jobInfo);
|
|
deliverySteps[CNX_VTABLE_ID] = jobInfo.havingStep;
|
|
}
|
|
|
|
if (jobInfo.windowCols.size() > 0)
|
|
{
|
|
spjs = querySteps.back();
|
|
SJSTEP ws = WindowFunctionStep::makeWindowFunctionStep(spjs, jobInfo);
|
|
idbassert(ws.get());
|
|
querySteps.push_back(ws);
|
|
deliverySteps[CNX_VTABLE_ID] = ws;
|
|
}
|
|
|
|
// TODO MCOL-894 we don't need to run sorting|distinct
|
|
// every time
|
|
// if ((jobInfo.limitCount != (uint64_t) - 1) ||
|
|
// (jobInfo.constantCol == CONST_COL_EXIST) ||
|
|
// (jobInfo.hasDistinct))
|
|
// {
|
|
if (jobInfo.annexStep.get() == NULL)
|
|
jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo));
|
|
|
|
TupleAnnexStep* tas = dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get());
|
|
tas->setLimit(jobInfo.limitStart, jobInfo.limitCount);
|
|
|
|
if (jobInfo.orderByColVec.size() > 0)
|
|
{
|
|
tas->addOrderBy(new LimitedOrderBy());
|
|
if (jobInfo.orderByThreads > 1)
|
|
tas->setParallelOp();
|
|
tas->setMaxThreads(jobInfo.orderByThreads);
|
|
}
|
|
|
|
if (jobInfo.constantCol == CONST_COL_EXIST)
|
|
tas->addConstant(new TupleConstantStep(jobInfo));
|
|
|
|
if (jobInfo.hasDistinct)
|
|
tas->setDistinct();
|
|
|
|
// }
|
|
|
|
if (jobInfo.annexStep)
|
|
{
|
|
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
|
|
RowGroup rg2 = ds->getDeliveredRowGroup();
|
|
|
|
if (jobInfo.trace)
|
|
cout << "Output RowGroup 2: " << rg2.toString() << endl;
|
|
|
|
AnyDataListSPtr spdlIn(new AnyDataList());
|
|
RowGroupDL* dlIn;
|
|
if (jobInfo.orderByColVec.size() > 0)
|
|
dlIn = new RowGroupDL(jobInfo.orderByThreads, jobInfo.fifoSize);
|
|
else
|
|
dlIn = new RowGroupDL(1, jobInfo.fifoSize);
|
|
dlIn->OID(CNX_VTABLE_ID);
|
|
spdlIn->rowGroupDL(dlIn);
|
|
JobStepAssociation jsaIn;
|
|
jsaIn.outAdd(spdlIn);
|
|
dynamic_cast<JobStep*>(ds)->outputAssociation(jsaIn);
|
|
jobInfo.annexStep->inputAssociation(jsaIn);
|
|
|
|
AnyDataListSPtr spdlOut(new AnyDataList());
|
|
RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
|
|
dlOut->OID(CNX_VTABLE_ID);
|
|
spdlOut->rowGroupDL(dlOut);
|
|
JobStepAssociation jsaOut;
|
|
jsaOut.outAdd(spdlOut);
|
|
jobInfo.annexStep->outputAssociation(jsaOut);
|
|
|
|
querySteps.push_back(jobInfo.annexStep);
|
|
dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get())->initialize(rg2, jobInfo);
|
|
deliverySteps[CNX_VTABLE_ID] = jobInfo.annexStep;
|
|
}
|
|
|
|
// Check if constant false
|
|
if (jobInfo.constantFalse)
|
|
{
|
|
TupleConstantBooleanStep* tcs = new TupleConstantBooleanStep(jobInfo, false);
|
|
tcs->outputAssociation(querySteps.back().get()->outputAssociation());
|
|
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
|
|
tcs->initialize(tds->getDeliveredRowGroup(), jobInfo);
|
|
|
|
JobStepVector::iterator it = querySteps.begin();
|
|
|
|
while (it != querySteps.end())
|
|
{
|
|
if ((dynamic_cast<TupleAggregateStep*>(it->get()) != NULL) ||
|
|
(dynamic_cast<TupleAnnexStep*>(it->get()) != NULL))
|
|
break;
|
|
|
|
it++;
|
|
}
|
|
|
|
SJSTEP bs(tcs);
|
|
|
|
if (it != querySteps.end())
|
|
tcs->outputAssociation((*it)->inputAssociation());
|
|
else
|
|
deliverySteps[CNX_VTABLE_ID] = bs;
|
|
|
|
querySteps.erase(querySteps.begin(), it);
|
|
querySteps.insert(querySteps.begin(), bs);
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
|
|
|
|
if (ds)
|
|
cout << "Delivered RowGroup: " << ds->getDeliveredRowGroup().toString() << endl;
|
|
}
|
|
}
|
|
|
|
// add the project steps into the query TBPS and construct the output rowgroup
|
|
void addProjectStepsToBps(TableInfoMap::iterator& mit, BatchPrimitive* bps, JobInfo& jobInfo)
|
|
{
|
|
// make sure we have a good tuple bps
|
|
if (bps == NULL)
|
|
throw runtime_error("BPS is null");
|
|
|
|
// construct a pcolstep for each joinkey to be projected
|
|
vector<uint32_t>& joinKeys = mit->second.fJoinKeys;
|
|
JobStepVector keySteps;
|
|
vector<uint32_t> fjKeys;
|
|
|
|
for (vector<uint32_t>::iterator kit = joinKeys.begin(); kit != joinKeys.end(); kit++)
|
|
{
|
|
if (jobInfo.keyInfo.get()->tupleKeyToTableOid[*kit] != CNX_EXP_TABLE_ID)
|
|
tupleKeyToProjectStep(*kit, keySteps, jobInfo);
|
|
else
|
|
fjKeys.push_back(*kit);
|
|
}
|
|
|
|
// construct pcolstep for columns in expresssions
|
|
JobStepVector expSteps;
|
|
vector<uint32_t>& exp1 = mit->second.fColsInExp1;
|
|
|
|
for (vector<uint32_t>::iterator kit = exp1.begin(); kit != exp1.end(); kit++)
|
|
tupleKeyToProjectStep(*kit, expSteps, jobInfo);
|
|
|
|
vector<uint32_t>& exp2 = mit->second.fColsInExp2;
|
|
|
|
for (vector<uint32_t>::iterator kit = exp2.begin(); kit != exp2.end(); kit++)
|
|
tupleKeyToProjectStep(*kit, expSteps, jobInfo);
|
|
|
|
vector<uint32_t>& expRet = mit->second.fColsInRetExp;
|
|
|
|
for (vector<uint32_t>::iterator kit = expRet.begin(); kit != expRet.end(); kit++)
|
|
tupleKeyToProjectStep(*kit, expSteps, jobInfo);
|
|
|
|
vector<uint32_t>& expOut = mit->second.fColsInOuter;
|
|
|
|
for (vector<uint32_t>::iterator kit = expOut.begin(); kit != expOut.end(); kit++)
|
|
tupleKeyToProjectStep(*kit, expSteps, jobInfo);
|
|
|
|
vector<uint32_t>& expFj = mit->second.fColsInFuncJoin;
|
|
|
|
for (vector<uint32_t>::iterator kit = expFj.begin(); kit != expFj.end(); kit++)
|
|
tupleKeyToProjectStep(*kit, expSteps, jobInfo);
|
|
|
|
// for output rowgroup
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
// this psv is a copy of the project steps, the original vector in mit is not changed
|
|
JobStepVector psv = mit->second.fProjectSteps; // columns being selected
|
|
psv.insert(psv.begin(), keySteps.begin(), keySteps.end()); // add joinkeys to project
|
|
psv.insert(psv.end(), expSteps.begin(), expSteps.end()); // add expressions to project
|
|
set<uint32_t> seenCols; // columns already processed
|
|
|
|
// for passthru conversion
|
|
// passthru is disabled (default lastTupleId to -1) unless the TupleBPS::bop is BOP_AND.
|
|
uint64_t lastTupleId = -1;
|
|
TupleBPS* tbps = dynamic_cast<TupleBPS*>(bps);
|
|
|
|
if (tbps != NULL && tbps->getBOP() == BOP_AND && exp1.size() == 0)
|
|
lastTupleId = tbps->getLastTupleId();
|
|
|
|
for (JobStepVector::iterator it = psv.begin(); it != psv.end(); it++)
|
|
{
|
|
JobStep* js = it->get();
|
|
uint32_t tupleKey = js->tupleId();
|
|
|
|
if (seenCols.find(tupleKey) != seenCols.end())
|
|
continue;
|
|
|
|
// update processed column set
|
|
seenCols.insert(tupleKey);
|
|
|
|
// if the projected column is the last accessed predicate
|
|
pColStep* pcol = dynamic_cast<pColStep*>(js);
|
|
|
|
if (pcol != NULL && js->tupleId() == lastTupleId)
|
|
{
|
|
PassThruStep* pts = new PassThruStep(*pcol);
|
|
|
|
if (dynamic_cast<PseudoColStep*>(pcol))
|
|
pts->pseudoType(dynamic_cast<PseudoColStep*>(pcol)->pseudoColumnId());
|
|
|
|
pts->alias(pcol->alias());
|
|
pts->view(pcol->view());
|
|
pts->name(pcol->name());
|
|
pts->tupleId(pcol->tupleId());
|
|
it->reset(pts);
|
|
}
|
|
|
|
// add projected column to TBPS
|
|
bool tokenOnly = false;
|
|
map<uint32_t, bool>::iterator toIt = jobInfo.tokenOnly.find(js->tupleId());
|
|
|
|
if (toIt != jobInfo.tokenOnly.end())
|
|
tokenOnly = toIt->second;
|
|
|
|
if (it->get()->isDictCol() && !tokenOnly)
|
|
{
|
|
// if (jobInfo.trace && bps->tableOid() >= 3000)
|
|
// cout << "1 setting project BPP for " << tbps->toString() << " with "
|
|
//<< it->get()->toString() << " and " << (it+1)->get()->toString() << endl;
|
|
bps->setProjectBPP(it->get(), (it + 1)->get());
|
|
|
|
// this is a two-step project step, remove the token step from id vector
|
|
vector<uint32_t>& pjv = mit->second.fProjectCols;
|
|
uint32_t tokenKey = js->tupleId();
|
|
|
|
for (vector<uint32_t>::iterator i = pjv.begin(); i != pjv.end(); ++i)
|
|
{
|
|
if (*i == tokenKey)
|
|
{
|
|
pjv.erase(i);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// move to the dictionary step
|
|
js = (++it)->get();
|
|
tupleKey = js->tupleId();
|
|
seenCols.insert(tupleKey);
|
|
}
|
|
else
|
|
{
|
|
// if (jobInfo.trace && bps->tableOid() >= 3000)
|
|
// cout << "2 setting project BPP for " << tbps->toString() << " with "
|
|
//<< it->get()->toString() << " and " << "NULL" << endl;
|
|
bps->setProjectBPP(it->get(), NULL);
|
|
}
|
|
|
|
// add the tuple info of the column into the RowGroup
|
|
TupleInfo ti(getTupleInfo(tupleKey, jobInfo));
|
|
pos.push_back(pos.back() + ti.width);
|
|
oids.push_back(ti.oid);
|
|
keys.push_back(ti.key);
|
|
types.push_back(ti.dtype);
|
|
csNums.push_back(ti.csNum);
|
|
scale.push_back(ti.scale);
|
|
precision.push_back(ti.precision);
|
|
}
|
|
|
|
// add function join columns
|
|
for (vector<uint32_t>::iterator i = fjKeys.begin(); i != fjKeys.end(); i++)
|
|
{
|
|
TupleInfo ti(getTupleInfo(*i, jobInfo));
|
|
pos.push_back(pos.back() + ti.width);
|
|
oids.push_back(ti.oid);
|
|
keys.push_back(ti.key);
|
|
types.push_back(ti.dtype);
|
|
csNums.push_back(ti.csNum);
|
|
scale.push_back(ti.scale);
|
|
precision.push_back(ti.precision);
|
|
}
|
|
|
|
// construct RowGroup
|
|
RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
|
|
|
|
// fix the output association
|
|
AnyDataListSPtr spdl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
spdl->rowGroupDL(dl);
|
|
dl->OID(mit->first);
|
|
JobStepAssociation jsa;
|
|
jsa.outAdd(spdl);
|
|
bps->outputAssociation(jsa);
|
|
bps->setOutputRowGroup(rg);
|
|
}
|
|
|
|
// add one-table expression steps into the query TBPS
|
|
void addExpresssionStepsToBps(TableInfoMap::iterator& mit, SJSTEP& sjsp, JobInfo& jobInfo)
|
|
{
|
|
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(sjsp.get());
|
|
CalpontSystemCatalog::OID tableOid = mit->second.fTableOid;
|
|
JobStepVector& exps = mit->second.fOneTableExpSteps;
|
|
JobStepVector& fjs = mit->second.fFuncJoinExps;
|
|
ExpressionStep* exp0 = NULL;
|
|
|
|
if (exps.size() > 0)
|
|
exp0 = dynamic_cast<ExpressionStep*>(exps[0].get());
|
|
else
|
|
exp0 = dynamic_cast<ExpressionStep*>(fjs[0].get());
|
|
|
|
if (bps == NULL)
|
|
{
|
|
if (tableOid > 0)
|
|
{
|
|
uint32_t key0 = exp0->columnKey();
|
|
CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key0];
|
|
map<uint32_t, CalpontSystemCatalog::ColType>::iterator dkMit;
|
|
|
|
if (jobInfo.keyInfo->token2DictTypeMap.find(key0) != jobInfo.keyInfo->token2DictTypeMap.end())
|
|
ct = jobInfo.keyInfo->token2DictTypeMap[key0];
|
|
|
|
scoped_ptr<pColScanStep> pcss(new pColScanStep(exp0->oid(), tableOid, ct, jobInfo));
|
|
|
|
sjsp.reset(new TupleBPS(*pcss, jobInfo));
|
|
TupleBPS* tbps = dynamic_cast<TupleBPS*>(sjsp.get());
|
|
tbps->setJobInfo(&jobInfo);
|
|
tbps->setFirstStepType(SCAN);
|
|
|
|
// add the first column to BPP's filterSteps
|
|
tbps->setBPP(pcss.get());
|
|
|
|
bps = tbps;
|
|
}
|
|
else
|
|
{
|
|
sjsp.reset(new CrossEngineStep(mit->second.fSchema, mit->second.fName, mit->second.fAlias, jobInfo));
|
|
|
|
bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
|
|
}
|
|
}
|
|
|
|
// rowgroup for evaluating the one table expression
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
vector<uint32_t> cols;
|
|
JobStepVector& fjExp = mit->second.fFuncJoinExps;
|
|
|
|
for (JobStepVector::iterator it = fjExp.begin(); it != fjExp.end(); it++)
|
|
{
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
|
|
cols.push_back(getExpTupleKey(jobInfo, e->expressionId()));
|
|
}
|
|
|
|
cols.insert(cols.end(), mit->second.fColsInExp1.begin(), mit->second.fColsInExp1.end());
|
|
cols.insert(cols.end(), mit->second.fColsInFuncJoin.begin(), mit->second.fColsInFuncJoin.end());
|
|
uint32_t index = 0; // index in the rowgroup
|
|
map<uint32_t, uint32_t> keyToIndexMap; // maps key to the index in the RG
|
|
|
|
for (vector<uint32_t>::iterator kit = cols.begin(); kit != cols.end(); kit++)
|
|
{
|
|
uint32_t key = *kit;
|
|
|
|
if (jobInfo.keyInfo->dictKeyMap.find(key) != jobInfo.keyInfo->dictKeyMap.end())
|
|
key = jobInfo.keyInfo->dictKeyMap[key];
|
|
|
|
// check if this key is already in
|
|
if (keyToIndexMap.find(key) != keyToIndexMap.end())
|
|
continue;
|
|
|
|
// update processed column set
|
|
keyToIndexMap.insert(make_pair(key, index++));
|
|
|
|
// add the tuple info of the column into the RowGroup
|
|
TupleInfo ti(getTupleInfo(key, jobInfo));
|
|
pos.push_back(pos.back() + ti.width);
|
|
oids.push_back(ti.oid);
|
|
keys.push_back(ti.key);
|
|
types.push_back(ti.dtype);
|
|
csNums.push_back(ti.csNum);
|
|
scale.push_back(ti.scale);
|
|
precision.push_back(ti.precision);
|
|
}
|
|
|
|
// construct RowGroup and add to TBPS
|
|
RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
|
|
bps->setFE1Input(rg);
|
|
|
|
if (jobInfo.trace)
|
|
cout << "FE1 input RowGroup: " << rg.toString() << endl << endl;
|
|
|
|
// add the expression steps into TBPS, the input-indices are set in SCs.
|
|
for (JobStepVector::iterator it = exps.begin(); it != exps.end(); it++)
|
|
{
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
|
|
|
|
if (e->functionJoin())
|
|
continue;
|
|
|
|
e->updateInputIndex(keyToIndexMap, jobInfo);
|
|
boost::shared_ptr<ParseTree> sppt(new ParseTree);
|
|
sppt->copyTree(*(e->expressionFilter()));
|
|
bps->addFcnExpGroup1(sppt);
|
|
}
|
|
|
|
// add the function join expression steps into TBPS, too
|
|
if (fjs.size() > 0)
|
|
{
|
|
vector<SRCP> fjCols;
|
|
|
|
for (JobStepVector::iterator it = fjs.begin(); it != fjs.end(); it++)
|
|
{
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
|
|
|
|
if (e->virtualStep())
|
|
continue;
|
|
|
|
e->updateInputIndex(keyToIndexMap, jobInfo);
|
|
e->updateOutputIndex(keyToIndexMap, jobInfo);
|
|
fjCols.push_back(e->expression());
|
|
}
|
|
|
|
bps->addFcnJoinExp(fjCols);
|
|
}
|
|
}
|
|
|
|
bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
|
|
{
|
|
TableInfo& tableInfo = mit->second;
|
|
JobStepVector& qsv = tableInfo.fQuerySteps;
|
|
JobStepVector newSteps; // store combined steps
|
|
RowGroup rgOut; // rowgroup of combined steps
|
|
CalpontSystemCatalog::OID tableOid = tableInfo.fTableOid;
|
|
|
|
if (tableOid != CNX_VTABLE_ID)
|
|
{
|
|
// real table
|
|
if (qsv.size() == 0)
|
|
{
|
|
// find a column in FE1, FE2, or FE3
|
|
uint32_t key = -1;
|
|
|
|
if (tableInfo.fColsInExp1.size() > 0)
|
|
key = tableInfo.fColsInExp1[0];
|
|
else if (tableInfo.fColsInExp2.size() > 0)
|
|
key = tableInfo.fColsInExp2[0];
|
|
else if (tableInfo.fColsInRetExp.size() > 0)
|
|
key = tableInfo.fColsInRetExp[0];
|
|
else if (tableInfo.fColsInOuter.size() > 0)
|
|
key = tableInfo.fColsInOuter[0];
|
|
else if (tableInfo.fColsInColMap.size() > 0)
|
|
key = tableInfo.fColsInColMap[0];
|
|
else
|
|
throw runtime_error("No query step");
|
|
|
|
// construct a pcolscanstep to initialize the tbps
|
|
CalpontSystemCatalog::OID oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
|
|
CalpontSystemCatalog::ColType ct = jobInfo.keyInfo->colType[key];
|
|
map<uint32_t, CalpontSystemCatalog::ColType>::iterator dkMit;
|
|
|
|
if (jobInfo.keyInfo->token2DictTypeMap.find(key) != jobInfo.keyInfo->token2DictTypeMap.end())
|
|
ct = jobInfo.keyInfo->token2DictTypeMap[key];
|
|
|
|
SJSTEP sjs(new pColScanStep(oid, tableOid, ct, jobInfo));
|
|
sjs->alias(jobInfo.keyInfo->tupleKeyVec[key].fTable);
|
|
sjs->view(jobInfo.keyInfo->tupleKeyVec[key].fView);
|
|
sjs->schema(jobInfo.keyInfo->tupleKeyVec[key].fSchema);
|
|
sjs->name(jobInfo.keyInfo->keyName[key]);
|
|
sjs->tupleId(key);
|
|
qsv.push_back(sjs);
|
|
}
|
|
|
|
SJSTEP sjsp; // shared_ptr for the new BatchPrimitive
|
|
BatchPrimitive* bps = NULL; // pscan/pcol/filter/etc combined to
|
|
vector<DictionaryScanInfo> pdsVec; // pds for string filters
|
|
JobStepVector::iterator begin = qsv.begin();
|
|
JobStepVector::iterator end = qsv.end();
|
|
JobStepVector::iterator it = begin;
|
|
|
|
// make sure there is a pcolscan if there is a pcolstep
|
|
while (it != end)
|
|
{
|
|
if (typeid(*(it->get())) == typeid(pColScanStep))
|
|
break;
|
|
|
|
if (typeid(*(it->get())) == typeid(pColStep))
|
|
{
|
|
pColStep* pcs = dynamic_cast<pColStep*>(it->get());
|
|
(*it).reset(new pColScanStep(*pcs));
|
|
break;
|
|
}
|
|
|
|
it++;
|
|
}
|
|
|
|
// ---- predicates ----
|
|
// setup TBPS and dictionaryscan
|
|
it = begin;
|
|
|
|
while (it != end)
|
|
{
|
|
if (typeid(*(it->get())) == typeid(pColScanStep))
|
|
{
|
|
if (bps == NULL)
|
|
{
|
|
if (tableOid > 0)
|
|
{
|
|
sjsp.reset(new TupleBPS(*(dynamic_cast<pColScanStep*>(it->get())), jobInfo));
|
|
TupleBPS* tbps = dynamic_cast<TupleBPS*>(sjsp.get());
|
|
tbps->setJobInfo(&jobInfo);
|
|
tbps->setFirstStepType(SCAN);
|
|
bps = tbps;
|
|
}
|
|
else
|
|
{
|
|
sjsp.reset(
|
|
new CrossEngineStep(mit->second.fSchema, mit->second.fName, mit->second.fAlias, jobInfo));
|
|
bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
pColScanStep* pcss = dynamic_cast<pColScanStep*>(it->get());
|
|
(*it).reset(new pColStep(*pcss));
|
|
}
|
|
}
|
|
|
|
unsigned itInc = 1; // iterator increase number
|
|
unsigned numOfStepsAddToBps = 0; // # steps to be added into TBPS
|
|
|
|
if ((std::distance(it, end) > 2 && dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
|
|
(dynamic_cast<pColScanStep*>((it + 1)->get()) != NULL ||
|
|
dynamic_cast<pColStep*>((it + 1)->get()) != NULL) &&
|
|
dynamic_cast<TupleHashJoinStep*>((it + 2)->get()) != NULL) ||
|
|
(std::distance(it, end) > 1 && dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
|
|
dynamic_cast<TupleHashJoinStep*>((it + 1)->get()) != NULL))
|
|
{
|
|
// string access predicate
|
|
// setup pDictionaryScan
|
|
pDictionaryScan* pds = dynamic_cast<pDictionaryScan*>(it->get());
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
pos.push_back(2 + 8);
|
|
CalpontSystemCatalog::OID coid = jobInfo.keyInfo->dictOidToColOid[pds->oid()];
|
|
oids.push_back(coid);
|
|
uint32_t keyId = pds->tupleId();
|
|
keys.push_back(keyId);
|
|
types.push_back(CalpontSystemCatalog::BIGINT);
|
|
csNums.push_back(pds->colType().charsetNumber);
|
|
scale.push_back(0);
|
|
precision.push_back(0);
|
|
|
|
RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision,
|
|
jobInfo.stringTableThreshold);
|
|
|
|
if (jobInfo.trace)
|
|
cout << "RowGroup pds(and): " << rg.toString() << endl;
|
|
|
|
pds->setOutputRowGroup(rg);
|
|
newSteps.push_back(*it);
|
|
|
|
DictionaryScanInfo pdsInfo;
|
|
pdsInfo.fTokenId = keyId;
|
|
pdsInfo.fDl = pds->outputAssociation().outAt(0);
|
|
pdsInfo.fRowGroup = rg;
|
|
pdsVec.push_back(pdsInfo);
|
|
|
|
// save the token join to the last
|
|
itInc = 1;
|
|
numOfStepsAddToBps = 0;
|
|
}
|
|
else if (std::distance(begin, it) > 1 &&
|
|
(dynamic_cast<pDictionaryScan*>((it - 1)->get()) != NULL ||
|
|
dynamic_cast<pDictionaryScan*>((it - 2)->get()) != NULL) &&
|
|
dynamic_cast<TupleHashJoinStep*>(it->get()) != NULL)
|
|
{
|
|
// save the token join to the last, by pdsInfo
|
|
itInc = 1;
|
|
numOfStepsAddToBps = 0;
|
|
}
|
|
else if (std::distance(it, end) > 2 && dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
|
|
dynamic_cast<FilterStep*>((it + 2)->get()) != NULL)
|
|
{
|
|
itInc = 3;
|
|
numOfStepsAddToBps = 3;
|
|
}
|
|
else if (std::distance(it, end) > 3 && dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
|
|
dynamic_cast<pDictionaryStep*>((it + 2)->get()) != NULL &&
|
|
dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
|
|
{
|
|
itInc = 4;
|
|
numOfStepsAddToBps = 4;
|
|
}
|
|
else if (std::distance(it, end) > 3 && dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
|
|
dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
|
|
dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
|
|
{
|
|
itInc = 4;
|
|
numOfStepsAddToBps = 4;
|
|
}
|
|
else if (std::distance(it, end) > 4 && dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
|
|
dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
|
|
dynamic_cast<pDictionaryStep*>((it + 3)->get()) != NULL &&
|
|
dynamic_cast<FilterStep*>((it + 4)->get()) != NULL)
|
|
{
|
|
itInc = 5;
|
|
numOfStepsAddToBps = 5;
|
|
}
|
|
else if (std::distance(it, end) > 1 &&
|
|
(dynamic_cast<pColStep*>(it->get()) != NULL ||
|
|
dynamic_cast<pColScanStep*>(it->get()) != NULL) &&
|
|
dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL)
|
|
{
|
|
itInc = 2;
|
|
numOfStepsAddToBps = 2;
|
|
}
|
|
else if (dynamic_cast<pColStep*>(it->get()) != NULL)
|
|
{
|
|
pColStep* pcol = dynamic_cast<pColStep*>(it->get());
|
|
|
|
if (pcol->getFilters().size() == 0)
|
|
{
|
|
// not an access predicate, pcol for token will be added later if necessary
|
|
numOfStepsAddToBps = 0;
|
|
}
|
|
else
|
|
{
|
|
numOfStepsAddToBps = 1;
|
|
}
|
|
|
|
itInc = 1;
|
|
}
|
|
else if (dynamic_cast<pColScanStep*>(it->get()) != NULL)
|
|
{
|
|
numOfStepsAddToBps = 1;
|
|
itInc = 1;
|
|
}
|
|
else
|
|
{
|
|
// Not a combinable step, or step pattern not recognized.
|
|
cerr << boldStart << "Try to combine " << typeid(*(it->get())).name() << ": " << it->get()->oid()
|
|
<< " into TBPS" << boldStop << endl;
|
|
return false;
|
|
}
|
|
|
|
// now add the steps into the TBPS
|
|
if (numOfStepsAddToBps > 0 && bps == NULL)
|
|
throw runtime_error("BPS not created 1");
|
|
|
|
for (unsigned i = 0; i < numOfStepsAddToBps; i++)
|
|
{
|
|
auto pp = (it + i)->get();
|
|
bps->setBPP(pp);
|
|
bps->setStepCount();
|
|
bps->setLastTupleId(pp->tupleId());
|
|
}
|
|
|
|
it += itInc;
|
|
}
|
|
|
|
// add one-table expression steps to TBPS
|
|
if (tableInfo.fOneTableExpSteps.size() > 0 || tableInfo.fFuncJoinExps.size() > 0)
|
|
addExpresssionStepsToBps(mit, sjsp, jobInfo);
|
|
|
|
// add TBPS to the step vector
|
|
newSteps.push_back(sjsp);
|
|
|
|
// ---- projects ----
|
|
// now, add the joinkeys to the project step vector
|
|
addProjectStepsToBps(mit, bps, jobInfo);
|
|
|
|
// rowgroup has the joinkeys and selected columns
|
|
// this is the expected output of this table
|
|
rgOut = bps->getOutputRowGroup();
|
|
|
|
// add token joins
|
|
if (pdsVec.size() > 0)
|
|
{
|
|
// ---- token joins ----
|
|
// construct a TupleHashJoinStep
|
|
TupleBPS* tbps = dynamic_cast<TupleBPS*>(bps);
|
|
TupleHashJoinStep* thjs = new TupleHashJoinStep(jobInfo);
|
|
thjs->tableOid1(0);
|
|
thjs->tableOid2(tableInfo.fTableOid);
|
|
thjs->alias1(tableInfo.fAlias);
|
|
thjs->alias2(tableInfo.fAlias);
|
|
thjs->view1(tableInfo.fView);
|
|
thjs->view2(tableInfo.fView);
|
|
thjs->schema1(tableInfo.fSchema);
|
|
thjs->schema2(tableInfo.fSchema);
|
|
thjs->setLargeSideBPS(tbps);
|
|
thjs->joinId(-1); // token join is a filter force it done before other joins
|
|
thjs->setJoinType(INNER);
|
|
thjs->tokenJoin(mit->first);
|
|
tbps->incWaitToRunStepCnt();
|
|
SJSTEP spthjs(thjs);
|
|
|
|
// rowgroup of the TBPS side
|
|
// start with the expected output of the table, tokens will be appended
|
|
RowGroup rgTbps = rgOut;
|
|
|
|
// input jobstepassociation
|
|
// 1. small sides -- pdictionaryscan steps
|
|
vector<RowGroup> rgPdsVec;
|
|
map<uint32_t, uint32_t> addedCol;
|
|
vector<JoinType> jointypes;
|
|
vector<bool> typeless;
|
|
vector<vector<uint32_t>> smallKeyIndices;
|
|
vector<vector<uint32_t>> largeKeyIndices;
|
|
vector<string> tableNames;
|
|
JobStepAssociation inJsa;
|
|
|
|
for (vector<DictionaryScanInfo>::iterator i = pdsVec.begin(); i != pdsVec.end(); i++)
|
|
{
|
|
// add the token steps to the TBPS
|
|
uint32_t tupleKey = i->fTokenId;
|
|
map<uint32_t, uint32_t>::iterator k = addedCol.find(tupleKey);
|
|
unsigned largeSideIndex = rgTbps.getColumnCount();
|
|
|
|
if (k == addedCol.end())
|
|
{
|
|
SJSTEP sjs(new pColStep(jobInfo.keyInfo->tupleKeyVec[tupleKey].fId, tableInfo.fTableOid,
|
|
jobInfo.keyInfo->token2DictTypeMap[tupleKey], jobInfo));
|
|
sjs->alias(tableInfo.fAlias);
|
|
sjs->view(tableInfo.fView);
|
|
sjs->schema(tableInfo.fSchema);
|
|
sjs->name(jobInfo.keyInfo->keyName[tupleKey]);
|
|
sjs->tupleId(tupleKey);
|
|
bps->setProjectBPP(sjs.get(), NULL);
|
|
|
|
// Update info, which will be used to config the hashjoin later.
|
|
rgTbps += i->fRowGroup;
|
|
addedCol[tupleKey] = largeSideIndex;
|
|
}
|
|
else
|
|
{
|
|
largeSideIndex = k->second;
|
|
}
|
|
|
|
inJsa.outAdd(i->fDl);
|
|
tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[tupleKey].fTable);
|
|
rgPdsVec.push_back(i->fRowGroup);
|
|
jointypes.push_back(INNER);
|
|
typeless.push_back(false);
|
|
smallKeyIndices.push_back(vector<uint32_t>(1, 0));
|
|
largeKeyIndices.push_back(vector<uint32_t>(1, largeSideIndex));
|
|
}
|
|
|
|
// 2. large side
|
|
if (jobInfo.trace)
|
|
cout << "RowGroup bps(and): " << rgTbps.toString() << endl;
|
|
|
|
bps->setOutputRowGroup(rgTbps);
|
|
inJsa.outAdd(bps->outputAssociation().outAt(0));
|
|
|
|
// set input jobstepassociation
|
|
thjs->inputAssociation(inJsa);
|
|
thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
|
|
|
|
// output jobstepassociation
|
|
AnyDataListSPtr spdl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
spdl->rowGroupDL(dl);
|
|
dl->OID(mit->first);
|
|
JobStepAssociation jsaOut;
|
|
jsaOut.outAdd(spdl);
|
|
thjs->outputAssociation(jsaOut);
|
|
|
|
// config the tuplehashjoin
|
|
thjs->configSmallSideRG(rgPdsVec, tableNames);
|
|
thjs->configLargeSideRG(rgTbps);
|
|
thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
|
|
thjs->setOutputRowGroup(rgOut);
|
|
newSteps.push_back(spthjs);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// table derived from subquery
|
|
SubQueryStep* subStep = NULL;
|
|
SubAdapterStep* adaStep = NULL;
|
|
|
|
for (JobStepVector::iterator it = qsv.begin(); it != qsv.end(); it++)
|
|
{
|
|
if (((subStep = dynamic_cast<SubQueryStep*>(it->get())) != NULL) ||
|
|
((adaStep = dynamic_cast<SubAdapterStep*>(it->get())) != NULL))
|
|
newSteps.push_back(*it);
|
|
}
|
|
|
|
if (subStep == NULL && adaStep == NULL)
|
|
throw runtime_error("No step for subquery.");
|
|
|
|
if (subStep)
|
|
{
|
|
rgOut = subStep->getOutputRowGroup();
|
|
}
|
|
else
|
|
{
|
|
// add one-table expression steps to the adapter
|
|
if (tableInfo.fOneTableExpSteps.size() > 0)
|
|
adaStep->addExpression(tableInfo.fOneTableExpSteps, jobInfo);
|
|
|
|
// add function join steps
|
|
if (tableInfo.fFuncJoinExps.size() > 0)
|
|
{
|
|
// fe rowgroup info
|
|
RowGroup feRg = adaStep->getFeRowGroup();
|
|
|
|
if (feRg.getColumnCount() == 0)
|
|
feRg = adaStep->getOutputRowGroup();
|
|
|
|
const vector<uint32_t>& feKeys = feRg.getKeys();
|
|
map<uint32_t, uint32_t> keyToIndexMapFe;
|
|
|
|
for (uint64_t i = 0; i < feKeys.size(); ++i)
|
|
keyToIndexMapFe.insert(make_pair(feKeys[i], i));
|
|
|
|
// output rowgroup info
|
|
const RowGroup& outRg = adaStep->getOutputRowGroup();
|
|
const vector<uint32_t>& outKeys = outRg.getKeys();
|
|
map<uint32_t, uint32_t> keyToIndexMapOut;
|
|
|
|
for (uint64_t i = 0; i < outKeys.size(); ++i)
|
|
keyToIndexMapOut.insert(make_pair(outKeys[i], i));
|
|
|
|
// make sure the function join columns are present in the rgs
|
|
vector<uint32_t> fjKeys;
|
|
vector<SRCP> fjCols;
|
|
TupleInfoVector tis;
|
|
uint64_t lastFeIdx = feKeys.size();
|
|
JobStepVector& fjs = tableInfo.fFuncJoinExps;
|
|
|
|
for (JobStepVector::iterator it = fjs.begin(); it != fjs.end(); it++)
|
|
{
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(it->get());
|
|
TupleInfo ti = setExpTupleInfo(e->expression().get(), jobInfo);
|
|
|
|
if (find(feKeys.begin(), feKeys.end(), ti.key) == feKeys.end())
|
|
{
|
|
tis.push_back(ti);
|
|
keyToIndexMapFe.insert(make_pair(ti.key, lastFeIdx++));
|
|
}
|
|
|
|
e->updateInputIndex(keyToIndexMapFe, jobInfo);
|
|
e->updateOutputIndex(keyToIndexMapFe, jobInfo);
|
|
fjCols.push_back(e->expression());
|
|
}
|
|
|
|
// additional fields in the rowgroup
|
|
vector<uint32_t> pos;
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
pos.push_back(2);
|
|
|
|
for (unsigned i = 0; i < tis.size(); i++)
|
|
{
|
|
pos.push_back(pos.back() + tis[i].width);
|
|
oids.push_back(tis[i].oid);
|
|
keys.push_back(tis[i].key);
|
|
types.push_back(tis[i].dtype);
|
|
csNums.push_back(tis[i].csNum);
|
|
scale.push_back(tis[i].scale);
|
|
precision.push_back(tis[i].precision);
|
|
}
|
|
|
|
RowGroup addRg(oids.size(), pos, oids, keys, types, csNums, scale, precision,
|
|
jobInfo.stringTableThreshold);
|
|
|
|
RowGroup feRg1 = feRg;
|
|
RowGroup outRg1 = outRg;
|
|
|
|
if (addRg.getColumnCount() > 0)
|
|
{
|
|
feRg1 += addRg;
|
|
outRg1 += addRg;
|
|
}
|
|
|
|
adaStep->addFcnJoinExp(fjCols);
|
|
adaStep->setFeRowGroup(feRg1);
|
|
adaStep->setOutputRowGroup(outRg1);
|
|
}
|
|
|
|
rgOut = adaStep->getOutputRowGroup();
|
|
}
|
|
}
|
|
|
|
tableInfo.fDl = newSteps.back()->outputAssociation().outAt(0);
|
|
tableInfo.fRowGroup = rgOut;
|
|
|
|
if (jobInfo.trace)
|
|
cout << "RowGroup for " << mit->first << " : " << mit->second.fRowGroup.toString() << endl;
|
|
|
|
qsv.swap(newSteps);
|
|
|
|
return true;
|
|
}
|
|
|
|
bool addFunctionJoin(vector<uint32_t>& joinedTables, JobStepVector& joinSteps, set<uint32_t>& nodeSet,
|
|
set<pair<uint32_t, uint32_t>>& pathSet, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
|
|
{
|
|
// @bug3683, adding function joins for not joined tables, one pair at a time.
|
|
// design review comment:
|
|
// all convertable expressions between a pair of tables should be done all, or none.
|
|
vector<JobStep*>::iterator i = jobInfo.functionJoins.begin(); // candidates
|
|
set<pair<uint32_t, uint32_t>> functionJoinPairs; // pairs
|
|
bool added = false; // new node added
|
|
|
|
// for function join tables' scope checking, not to try semi join inside subquery.
|
|
set<uint32_t> tables; // tables to join
|
|
tables.insert(jobInfo.tableList.begin(), jobInfo.tableList.end());
|
|
|
|
// table pairs to be joined by function joins
|
|
TableJoinMap::iterator m1 = jobInfo.tableJoinMap.end();
|
|
TableJoinMap::iterator m2 = jobInfo.tableJoinMap.end();
|
|
|
|
for (; (i != jobInfo.functionJoins.end()); i++)
|
|
{
|
|
ExpressionStep* es = dynamic_cast<ExpressionStep*>((*i));
|
|
idbassert(es);
|
|
|
|
if (es->functionJoin())
|
|
continue; // already converted to a join
|
|
|
|
boost::shared_ptr<FunctionJoinInfo> fji = es->functionJoinInfo();
|
|
uint32_t key1 = fji->fJoinKey[0];
|
|
uint32_t key2 = fji->fJoinKey[1];
|
|
uint32_t tid1 = fji->fTableKey[0];
|
|
uint32_t tid2 = fji->fTableKey[1];
|
|
|
|
if (nodeSet.find(tid1) != nodeSet.end() && nodeSet.find(tid2) != nodeSet.end())
|
|
continue; // both connected, will be a cycle if added.
|
|
|
|
if (nodeSet.find(tid1) == nodeSet.end() && nodeSet.find(tid2) == nodeSet.end())
|
|
continue; // both isolated, wait until one is connected.
|
|
|
|
if (tables.find(tid1) == tables.end() || tables.find(tid2) == tables.end())
|
|
continue; // sub-query case
|
|
|
|
// one & only one is already connected
|
|
pair<uint32_t, uint32_t> p(tid1, tid2);
|
|
|
|
if (functionJoinPairs.empty())
|
|
{
|
|
functionJoinPairs.insert(make_pair(tid1, tid2));
|
|
functionJoinPairs.insert(make_pair(tid2, tid1));
|
|
tableInfoMap[tid1].fAdjacentList.push_back(tid2);
|
|
tableInfoMap[tid2].fAdjacentList.push_back(tid1);
|
|
|
|
if (find(joinedTables.begin(), joinedTables.end(), tid1) == joinedTables.end())
|
|
{
|
|
joinedTables.push_back(tid1);
|
|
nodeSet.insert(tid1);
|
|
pathSet.insert(make_pair(tid2, tid1));
|
|
}
|
|
else
|
|
{
|
|
joinedTables.push_back(tid2);
|
|
nodeSet.insert(tid2);
|
|
pathSet.insert(make_pair(tid1, tid2));
|
|
}
|
|
|
|
added = true;
|
|
|
|
m1 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid1, tid2), JoinData()));
|
|
m2 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid2, tid1), JoinData()));
|
|
|
|
if (m1 == jobInfo.tableJoinMap.end() || m2 == jobInfo.tableJoinMap.end())
|
|
throw runtime_error("Bad table map.");
|
|
|
|
TupleInfo ti1 = getTupleInfo(key1, jobInfo);
|
|
TupleInfo ti2 = getTupleInfo(key2, jobInfo);
|
|
|
|
// Enable Typeless JOIN for char and wide decimal types.
|
|
if (datatypes::isCharType(ti1.dtype) || (datatypes::isWideDecimalType(ti1.dtype, ti1.width) ||
|
|
datatypes::isWideDecimalType(ti2.dtype, ti2.width)))
|
|
m1->second.fTypeless = m2->second.fTypeless = true; // ti2 is compatible
|
|
else
|
|
m1->second.fTypeless = m2->second.fTypeless = false;
|
|
}
|
|
else if (functionJoinPairs.find(p) == functionJoinPairs.end())
|
|
{
|
|
continue; // different path
|
|
}
|
|
else
|
|
{
|
|
// path already added, do compound join
|
|
m1->second.fTypeless = m2->second.fTypeless = true;
|
|
}
|
|
|
|
// simple or compound function join
|
|
es->functionJoin(true);
|
|
updateTableKey(key1, tid1, jobInfo);
|
|
updateTableKey(key2, tid2, jobInfo);
|
|
|
|
tableInfoMap[tid1].fJoinKeys.push_back(key1);
|
|
tableInfoMap[tid2].fJoinKeys.push_back(key2);
|
|
|
|
if (fji->fStep[0])
|
|
tableInfoMap[tid1].fFuncJoinExps.push_back(fji->fStep[0]);
|
|
|
|
if (fji->fStep[1])
|
|
tableInfoMap[tid2].fFuncJoinExps.push_back(fji->fStep[1]);
|
|
|
|
vector<uint32_t>& cols1 = tableInfoMap[tid1].fColsInFuncJoin;
|
|
cols1.insert(cols1.end(), fji->fColumnKeys[0].begin(), fji->fColumnKeys[0].end());
|
|
vector<uint32_t>& cols2 = tableInfoMap[tid2].fColsInFuncJoin;
|
|
cols2.insert(cols2.end(), fji->fColumnKeys[1].begin(), fji->fColumnKeys[1].end());
|
|
|
|
// construct a join step
|
|
TupleHashJoinStep* thjs = new TupleHashJoinStep(jobInfo);
|
|
thjs->tableOid1(fji->fTableOid[0]);
|
|
thjs->tableOid2(fji->fTableOid[1]);
|
|
thjs->oid1(fji->fOid[0]);
|
|
thjs->oid2(fji->fOid[1]);
|
|
thjs->alias1(fji->fAlias[0]);
|
|
thjs->alias2(fji->fAlias[1]);
|
|
thjs->view1(fji->fView[0]);
|
|
thjs->view2(fji->fView[1]);
|
|
thjs->schema1(fji->fSchema[0]);
|
|
thjs->schema2(fji->fSchema[1]);
|
|
thjs->column1(fji->fExpression[0]);
|
|
thjs->column2(fji->fExpression[1]);
|
|
thjs->sequence1(fji->fSequence[0]);
|
|
thjs->sequence2(fji->fSequence[1]);
|
|
thjs->joinId(fji->fJoinId);
|
|
thjs->setJoinType(fji->fJoinType);
|
|
thjs->funcJoinInfo(fji);
|
|
thjs->tupleId1(key1);
|
|
thjs->tupleId2(key2);
|
|
SJSTEP spjs(thjs);
|
|
|
|
// check correlated info
|
|
JoinType joinType = fji->fJoinType;
|
|
|
|
if (!(joinType & CORRELATED))
|
|
{
|
|
joinSteps.push_back(spjs);
|
|
|
|
// Keep a map of the join (table, key) pairs
|
|
m1->second.fLeftKeys.push_back(key1);
|
|
m1->second.fRightKeys.push_back(key2);
|
|
|
|
m2->second.fLeftKeys.push_back(key2);
|
|
m2->second.fRightKeys.push_back(key1);
|
|
|
|
// Keep a map of the join type between the keys.
|
|
// OUTER join and SEMI/ANTI join are mutually exclusive.
|
|
if (joinType == LEFTOUTER)
|
|
{
|
|
m1->second.fTypes.push_back(SMALLOUTER);
|
|
m2->second.fTypes.push_back(LARGEOUTER);
|
|
jobInfo.outerOnTable.insert(tid2);
|
|
}
|
|
else if (joinType == RIGHTOUTER)
|
|
{
|
|
m1->second.fTypes.push_back(LARGEOUTER);
|
|
m2->second.fTypes.push_back(SMALLOUTER);
|
|
jobInfo.outerOnTable.insert(tid1);
|
|
}
|
|
else if ((joinType & SEMI) &&
|
|
((joinType & LEFTOUTER) == LEFTOUTER || (joinType & RIGHTOUTER) == RIGHTOUTER))
|
|
{
|
|
// @bug3998, DML UPDATE borrows "SEMI" flag,
|
|
// allowing SEMI and LARGEOUTER combination to support update with outer join.
|
|
if ((joinType & LEFTOUTER) == LEFTOUTER)
|
|
{
|
|
joinType ^= LEFTOUTER;
|
|
m1->second.fTypes.push_back(joinType);
|
|
m2->second.fTypes.push_back(joinType | LARGEOUTER);
|
|
jobInfo.outerOnTable.insert(tid2);
|
|
}
|
|
else
|
|
{
|
|
joinType ^= RIGHTOUTER;
|
|
m1->second.fTypes.push_back(joinType | LARGEOUTER);
|
|
m2->second.fTypes.push_back(joinType);
|
|
jobInfo.outerOnTable.insert(tid1);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
m1->second.fTypes.push_back(joinType);
|
|
m2->second.fTypes.push_back(joinType);
|
|
|
|
if (joinType == INNER)
|
|
{
|
|
jobInfo.innerOnTable.insert(tid1);
|
|
jobInfo.innerOnTable.insert(tid2);
|
|
}
|
|
}
|
|
|
|
// need id to keep the join order
|
|
m1->second.fJoinId = m2->second.fJoinId = thjs->joinId();
|
|
}
|
|
else
|
|
{
|
|
// one of the tables is in outer query
|
|
jobInfo.correlateSteps.push_back(spjs);
|
|
}
|
|
}
|
|
|
|
return added;
|
|
}
|
|
|
|
// This class represents a circular inner join graph transformer.
|
|
// It collects a cycles in the given join graph as disjoint set of the join edges, for each collected cycle
|
|
// removes the join edge (if column statistics available it tries to remove join edge which could produce
|
|
// highest intermediate join result, otherwise just a random edge in a cycle) and adds that join edge to
|
|
// jobinfo class be restored as `post join` equal fiter in a join ordering part.
|
|
class CircularJoinGraphTransformer
|
|
{
|
|
public:
|
|
// Ctor.
|
|
CircularJoinGraphTransformer(TableInfoMap& infoMap, JobInfo& jobInfo, JobStepVector& joinSteps)
|
|
: infoMap(infoMap), jobInfo(jobInfo), joinSteps(joinSteps)
|
|
{
|
|
}
|
|
// Delete all other ctrs/dctrs.
|
|
CircularJoinGraphTransformer() = delete;
|
|
CircularJoinGraphTransformer(const CircularJoinGraphTransformer&) = delete;
|
|
CircularJoinGraphTransformer(CircularJoinGraphTransformer&&) = delete;
|
|
CircularJoinGraphTransformer& operator=(const CircularJoinGraphTransformer&) = delete;
|
|
CircularJoinGraphTransformer& operator=(CircularJoinGraphTransformer&&) = delete;
|
|
virtual ~CircularJoinGraphTransformer() = default;
|
|
|
|
// Transform join graph.
|
|
void transformJoinGraph();
|
|
|
|
protected:
|
|
// Analyzes the `join graph` based on DFS algorithm.
|
|
virtual void analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable);
|
|
// For each cycle breaks it and collects join edges.
|
|
void breakCyclesAndCollectJoinEdges();
|
|
// Removes given `join edge` from the `join graph`.
|
|
void breakCycleAndCollectJoinEdge(const std::pair<JoinEdge, int64_t>& edgeForward);
|
|
// Initializes the `join graph` based on the table connections.
|
|
virtual void initializeJoinGraph();
|
|
// Check if the given join edge has FK - FK relations.
|
|
bool isForeignKeyForeignKeyLink(const JoinEdge& edge, statistics::StatisticsManager* statisticsManager);
|
|
// Based on column statistics tries to search `join edge` with maximum join cardinality.
|
|
virtual void chooseEdgeToTransform(Cycle& cycle, std::pair<JoinEdge, int64_t>& resultEdge);
|
|
// Removes given `tableId` from adjacent list.
|
|
void removeFromAdjacentList(uint32_t tableId, std::vector<uint32_t>& adjList);
|
|
// Removes associated join step associated with the given `joinEdge` from job steps.
|
|
void removeAssociatedHashJoinStepFromJoinSteps(const JoinEdge& joinEdge);
|
|
|
|
// Join information.
|
|
TableInfoMap& infoMap;
|
|
JobInfo& jobInfo;
|
|
JobStepVector& joinSteps;
|
|
|
|
// Represents a collection of cycles.
|
|
Cycles cycles;
|
|
// Represents internal `join graph.`
|
|
JoinGraph joinGraph;
|
|
// Represents a set of join edges to erase for each cycle in `join graph`.
|
|
JoinEdges edgesToTransform;
|
|
// Represents a table to start analysis.
|
|
uint32_t headTable{0};
|
|
};
|
|
|
|
// Circular inner joins methods.
|
|
void CircularJoinGraphTransformer::analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable)
|
|
{
|
|
// Mark as `GREY` to specify processing table node.
|
|
joinGraph[currentTable].fTableColor = JoinTableColor::GREY;
|
|
joinGraph[currentTable].fParent = prevTable;
|
|
|
|
// For each adjacent node.
|
|
for (const auto adjNode : joinGraph[currentTable].fAdjacentList)
|
|
{
|
|
if (prevTable != adjNode)
|
|
{
|
|
if (joinGraph[adjNode].fTableColor == JoinTableColor::GREY)
|
|
{
|
|
Cycle cycle;
|
|
const auto edgeForward = make_pair(currentTable, adjNode);
|
|
const auto edgeBackward = make_pair(adjNode, currentTable);
|
|
|
|
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
|
|
{
|
|
edgesToTransform.insert(edgeForward);
|
|
cycle.push_back(edgeForward);
|
|
}
|
|
|
|
auto nodeIt = currentTable;
|
|
auto nextNode = joinGraph[nodeIt].fParent;
|
|
// Walk back until we find node `adjNode` we identified before.
|
|
while (nextNode != std::numeric_limits<uint32_t>::max() && nextNode != adjNode)
|
|
{
|
|
const auto edgeForward = make_pair(nextNode, nodeIt);
|
|
const auto edgeBackward = make_pair(nodeIt, nextNode);
|
|
|
|
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
|
|
{
|
|
edgesToTransform.insert(edgeForward);
|
|
cycle.push_back(edgeForward);
|
|
}
|
|
|
|
nodeIt = nextNode;
|
|
nextNode = joinGraph[nodeIt].fParent;
|
|
}
|
|
|
|
// Add the last edge.
|
|
if (nextNode != std::numeric_limits<uint32_t>::max())
|
|
{
|
|
const auto edgeForward = make_pair(nextNode, nodeIt);
|
|
const auto edgeBackward = make_pair(nodeIt, nextNode);
|
|
|
|
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
|
|
{
|
|
edgesToTransform.insert(edgeForward);
|
|
cycle.push_back(edgeForward);
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace && cycle.size())
|
|
{
|
|
std::cout << "Cycle found.\n";
|
|
std::cout << "Collected cycle \n";
|
|
for (const auto& edge : cycle)
|
|
std::cout << "Join edge: " << edge.first << " <-> " << edge.second << '\n';
|
|
}
|
|
|
|
// Collect the cycle.
|
|
if (cycle.size())
|
|
cycles.push_back(std::move(cycle));
|
|
}
|
|
// If not visited - go there.
|
|
else if (joinGraph[adjNode].fTableColor == JoinTableColor::WHITE)
|
|
{
|
|
analyzeJoinGraph(adjNode, currentTable);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Mark `BLACK` to specify this node is finished.
|
|
joinGraph[currentTable].fTableColor = JoinTableColor::BLACK;
|
|
}
|
|
|
|
void CircularJoinGraphTransformer::removeFromAdjacentList(uint32_t tableId, std::vector<uint32_t>& adjList)
|
|
{
|
|
auto tableIdIt = std::find(adjList.begin(), adjList.end(), tableId);
|
|
if (tableIdIt != adjList.end())
|
|
adjList.erase(tableIdIt);
|
|
}
|
|
|
|
bool CircularJoinGraphTransformer::isForeignKeyForeignKeyLink(
|
|
const JoinEdge& edge, statistics::StatisticsManager* statisticsManager)
|
|
{
|
|
const auto end = jobInfo.tableJoinMap.end();
|
|
auto it = jobInfo.tableJoinMap.find(edge);
|
|
if (it == end)
|
|
{
|
|
it = jobInfo.tableJoinMap.find(make_pair(edge.second, edge.first));
|
|
if (it == end)
|
|
return false;
|
|
}
|
|
|
|
std::vector<statistics::KeyType> leftKeys, rightKeys;
|
|
std::vector<uint32_t> lOid, rOid;
|
|
|
|
for (auto key : it->second.fLeftKeys)
|
|
{
|
|
auto oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
|
|
if (!statisticsManager->hasKey(oid))
|
|
return false;
|
|
|
|
auto keyType = statisticsManager->getKeyType(oid);
|
|
leftKeys.push_back(keyType);
|
|
lOid.push_back(oid);
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "OID " << oid << " with key type " << (uint32_t)keyType << std::endl;
|
|
}
|
|
|
|
for (auto key : it->second.fRightKeys)
|
|
{
|
|
auto oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
|
|
if (!statisticsManager->hasKey(oid))
|
|
return false;
|
|
|
|
auto keyType = statisticsManager->getKeyType(oid);
|
|
rightKeys.push_back(keyType);
|
|
rOid.push_back(oid);
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "OID " << oid << " with key type " << (uint32_t)keyType << std::endl;
|
|
}
|
|
|
|
if (rightKeys.size() == 0 || leftKeys.size() == 0)
|
|
return false;
|
|
|
|
statistics::KeyType leftType = statistics::KeyType::PK;
|
|
for (auto keyType : leftKeys)
|
|
{
|
|
if (keyType == statistics::KeyType::FK)
|
|
{
|
|
leftType = keyType;
|
|
break;
|
|
}
|
|
}
|
|
|
|
statistics::KeyType rightType = statistics::KeyType::PK;
|
|
for (auto keyType : rightKeys)
|
|
{
|
|
if (keyType == statistics::KeyType::FK)
|
|
{
|
|
rightType = keyType;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (rightType == statistics::KeyType::FK && leftType == statistics::KeyType::FK)
|
|
{
|
|
if (jobInfo.trace)
|
|
{
|
|
std::cout << "Found FK <-> FK connection " << lOid.front() << " <-> " << rOid.front() << std::endl;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void CircularJoinGraphTransformer::chooseEdgeToTransform(Cycle& cycle,
|
|
std::pair<JoinEdge, int64_t>& resultEdge)
|
|
{
|
|
// Use statistics if possible.
|
|
auto* statisticsManager = statistics::StatisticsManager::instance();
|
|
for (auto& edgeForward : cycle)
|
|
{
|
|
// Check that `join edge` is aligned with our needs.
|
|
if (isForeignKeyForeignKeyLink(edgeForward, statisticsManager))
|
|
{
|
|
const auto edgeBackward = std::make_pair(edgeForward.second, edgeForward.first);
|
|
if (!jobInfo.joinEdgesToRestore.count(edgeForward) && !jobInfo.joinEdgesToRestore.count(edgeBackward))
|
|
{
|
|
resultEdge = std::make_pair(edgeForward, 0 /*Dummy weight*/);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "FK FK key not found, removing the first one inner join edge" << std::endl;
|
|
|
|
// Take just a first.
|
|
resultEdge = std::make_pair(cycle.front(), 0 /*Dummy weight*/);
|
|
}
|
|
|
|
void CircularJoinGraphTransformer::removeAssociatedHashJoinStepFromJoinSteps(const JoinEdge& joinEdge)
|
|
{
|
|
if (jobInfo.trace)
|
|
{
|
|
std::cout << "Join steps before transformation: " << std::endl;
|
|
for (auto joinStepIt = joinSteps.begin(); joinStepIt < joinSteps.end(); joinStepIt++)
|
|
|
|
{
|
|
auto* tupleHashJoinStep = dynamic_cast<TupleHashJoinStep*>(joinStepIt->get());
|
|
if (tupleHashJoinStep)
|
|
{
|
|
std::cout << "Tables for hash join: " << getTableKey(jobInfo, tupleHashJoinStep->tupleId1())
|
|
<< " <-> " << getTableKey(jobInfo, tupleHashJoinStep->tupleId2()) << std::endl;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Match the given `join edge` in `join steps` vector.
|
|
auto end = joinSteps.end();
|
|
auto joinStepIt = joinSteps.begin();
|
|
// We have to remove all `TupleHashJoinSteps` with the given table keys from join steps.
|
|
while (joinStepIt != end)
|
|
{
|
|
auto* tupleHashJoinStep = dynamic_cast<TupleHashJoinStep*>(joinStepIt->get());
|
|
if (tupleHashJoinStep)
|
|
{
|
|
const auto tableKey1 = getTableKey(jobInfo, tupleHashJoinStep->tupleId1());
|
|
const auto tableKey2 = getTableKey(jobInfo, tupleHashJoinStep->tupleId2());
|
|
|
|
if ((tableKey1 == joinEdge.first && tableKey2 == joinEdge.second) ||
|
|
(tableKey1 == joinEdge.second && tableKey2 == joinEdge.first))
|
|
{
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "Erase matched join step with keys: " << tableKey1 << " <-> " << tableKey2
|
|
<< std::endl;
|
|
|
|
joinStepIt = joinSteps.erase(joinStepIt);
|
|
end = joinSteps.end();
|
|
}
|
|
else
|
|
{
|
|
++joinStepIt;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
std::cout << "Join steps after transformation: " << std::endl;
|
|
for (auto joinStepIt = joinSteps.begin(); joinStepIt < joinSteps.end(); joinStepIt++)
|
|
|
|
{
|
|
auto* tupleHashJoinStep = dynamic_cast<TupleHashJoinStep*>(joinStepIt->get());
|
|
if (tupleHashJoinStep)
|
|
{
|
|
std::cout << "Tables for hash join: " << getTableKey(jobInfo, tupleHashJoinStep->tupleId1())
|
|
<< " <-> " << getTableKey(jobInfo, tupleHashJoinStep->tupleId2()) << std::endl;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void CircularJoinGraphTransformer::breakCycleAndCollectJoinEdge(
|
|
const std::pair<JoinEdge, int64_t>& joinEdgeWithWeight)
|
|
{
|
|
// Add edge to be restored.
|
|
jobInfo.joinEdgesToRestore.insert({joinEdgeWithWeight.first, joinEdgeWithWeight.second});
|
|
const auto edgeForward = joinEdgeWithWeight.first;
|
|
|
|
// Keep key columns in result rowgroups, to avoid columns elimination at the intermediate joins.
|
|
auto tableInfoIt = jobInfo.tableJoinMap.find(edgeForward);
|
|
auto& firstExp2 = infoMap[edgeForward.first].fColsInExp2;
|
|
firstExp2.insert(firstExp2.end(), tableInfoIt->second.fLeftKeys.begin(),
|
|
tableInfoIt->second.fLeftKeys.end());
|
|
auto& secondExp2 = infoMap[edgeForward.second].fColsInExp2;
|
|
secondExp2.insert(secondExp2.end(), tableInfoIt->second.fRightKeys.begin(),
|
|
tableInfoIt->second.fRightKeys.end());
|
|
|
|
// The edge is choosen on the previous step, we have to remove it from `adjacent list`.
|
|
removeFromAdjacentList(edgeForward.first, infoMap[edgeForward.second].fAdjacentList);
|
|
removeFromAdjacentList(edgeForward.second, infoMap[edgeForward.first].fAdjacentList);
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "Remove from cycle join edge: " << edgeForward.first << " <-> " << edgeForward.second
|
|
<< std::endl;
|
|
|
|
// Remove all associated `TupleHashJoinSteps` from join steps.
|
|
removeAssociatedHashJoinStepFromJoinSteps(edgeForward);
|
|
}
|
|
|
|
void CircularJoinGraphTransformer::breakCyclesAndCollectJoinEdges()
|
|
{
|
|
if (jobInfo.trace)
|
|
std::cout << "Collected cycles size: " << cycles.size() << std::endl;
|
|
|
|
// For each cycle.
|
|
for (auto& cycle : cycles)
|
|
{
|
|
std::pair<JoinEdge, int64_t> joinEdgeWithWeight;
|
|
chooseEdgeToTransform(cycle, joinEdgeWithWeight);
|
|
breakCycleAndCollectJoinEdge(joinEdgeWithWeight);
|
|
}
|
|
}
|
|
|
|
void CircularJoinGraphTransformer::initializeJoinGraph()
|
|
{
|
|
for (const auto& infoPair : infoMap)
|
|
{
|
|
JoinTableNode joinTableNode;
|
|
// Copy adjacent list.
|
|
joinTableNode.fAdjacentList = infoPair.second.fAdjacentList;
|
|
joinGraph[infoPair.first] = joinTableNode;
|
|
}
|
|
|
|
// For inner join we can choose any table to be a head.
|
|
headTable = joinGraph.begin()->first;
|
|
}
|
|
|
|
void CircularJoinGraphTransformer::transformJoinGraph()
|
|
{
|
|
initializeJoinGraph();
|
|
analyzeJoinGraph(/*currentTable=*/headTable, /*prevTable=*/std::numeric_limits<uint32_t>::max());
|
|
edgesToTransform.clear();
|
|
breakCyclesAndCollectJoinEdges();
|
|
}
|
|
|
|
// This class represents circular outer join graph transformer.
|
|
// It defines a weight for the particular edge in join graph from the prioriy of the joins defined by
|
|
// the user.
|
|
// In general lets a assume we have a cycle t1 -> t2 -> ... ti ... tn -> t1 (where n is natural number >= 3),
|
|
// the weighted graph will have 2 edges with maximum weights among other - (tn -> t1) and (tn - 1 -> tn)
|
|
// those are candidates for transformation.
|
|
// Adds a table with associated `join edge` to be on a large side.
|
|
class CircularOuterJoinGraphTransformer : public CircularJoinGraphTransformer
|
|
{
|
|
public:
|
|
// Ctor.
|
|
CircularOuterJoinGraphTransformer(TableInfoMap& infoMap, JobInfo& jobInfo, JobStepVector& joinSteps)
|
|
: CircularJoinGraphTransformer(infoMap, jobInfo, joinSteps)
|
|
{
|
|
}
|
|
// Delete all other ctrs/dcts.
|
|
CircularOuterJoinGraphTransformer() = delete;
|
|
CircularOuterJoinGraphTransformer(const CircularOuterJoinGraphTransformer&) = delete;
|
|
CircularOuterJoinGraphTransformer(CircularOuterJoinGraphTransformer&&) = delete;
|
|
CircularOuterJoinGraphTransformer& operator=(const CircularOuterJoinGraphTransformer&) = delete;
|
|
CircularOuterJoinGraphTransformer& operator=(CircularOuterJoinGraphTransformer&&) = delete;
|
|
~CircularOuterJoinGraphTransformer() override = default;
|
|
|
|
private:
|
|
// Analyzes the given `join graph`.
|
|
void analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable) override;
|
|
// Chooses a join edge to transform from the given cycle based on the join edge weight,
|
|
// the join edge for transformation has a maximum weight in a cycle.
|
|
void chooseEdgeToTransform(Cycle& cycle, std::pair<JoinEdge, int64_t>& resultEdge) override;
|
|
// Returns the min weight among all join weights related to the given `headTable`.
|
|
int64_t getSublingsMinWeight(uint32_t headTable, uint32_t associatedTable);
|
|
// Returns the max weight which is less than given `upperBoundWeight` among all join weights related to
|
|
// the given `headTable`.
|
|
int64_t getSublingsMaxWeightLessThan(uint32_t headTable, uint32_t associatedTable,
|
|
int64_t upperBoundWeight);
|
|
// Initializes `join graph` from the table connections.
|
|
void initializeJoinGraph() override;
|
|
|
|
// The map which represents a weight for each join edge in join graph.
|
|
std::map<JoinEdge, int64_t> joinEdgesToWeights;
|
|
};
|
|
|
|
int64_t CircularOuterJoinGraphTransformer::getSublingsMinWeight(uint32_t headTable, uint32_t associatedTable)
|
|
{
|
|
int64_t minWeight = std::numeric_limits<int64_t>::max();
|
|
for (const auto adjNode : joinGraph[headTable].fAdjacentList)
|
|
{
|
|
if (adjNode != associatedTable)
|
|
{
|
|
JoinEdge joinEdge(adjNode, headTable);
|
|
minWeight = std::min(joinEdgesToWeights[joinEdge], minWeight);
|
|
}
|
|
}
|
|
return minWeight;
|
|
}
|
|
|
|
int64_t CircularOuterJoinGraphTransformer::getSublingsMaxWeightLessThan(uint32_t headTable,
|
|
uint32_t associatedTable,
|
|
int64_t upperBoundWeight)
|
|
{
|
|
int64_t maxWeight = std::numeric_limits<int64_t>::min();
|
|
for (const auto adjNode : joinGraph[headTable].fAdjacentList)
|
|
{
|
|
if (adjNode != associatedTable)
|
|
{
|
|
JoinEdge joinEdge(adjNode, headTable);
|
|
const auto currentWeight = joinEdgesToWeights[joinEdge];
|
|
if (currentWeight < upperBoundWeight)
|
|
maxWeight = std::max(currentWeight, maxWeight);
|
|
}
|
|
}
|
|
return maxWeight;
|
|
}
|
|
|
|
void CircularOuterJoinGraphTransformer::initializeJoinGraph()
|
|
{
|
|
// Initialize a join graph at first.
|
|
CircularJoinGraphTransformer::initializeJoinGraph();
|
|
|
|
// Associate join weights.
|
|
if (jobInfo.trace)
|
|
std::cout << "Join edges with weights.\n";
|
|
|
|
int64_t minWeightFullGraph = std::numeric_limits<int64_t>::max();
|
|
JoinEdge joinEdgeWithMinWeight(0, 0);
|
|
|
|
// For each join step we associate a `join id` with `join edge`.
|
|
for (auto joinStepIt = joinSteps.begin(); joinStepIt < joinSteps.end(); joinStepIt++)
|
|
{
|
|
auto* tupleHashJoinStep = dynamic_cast<TupleHashJoinStep*>(joinStepIt->get());
|
|
if (tupleHashJoinStep)
|
|
{
|
|
const int64_t weight = tupleHashJoinStep->joinId();
|
|
const auto tableKey1 = getTableKey(jobInfo, tupleHashJoinStep->tupleId1());
|
|
const auto tableKey2 = getTableKey(jobInfo, tupleHashJoinStep->tupleId2());
|
|
|
|
// Edge forward.
|
|
JoinEdge edgeForward{tableKey1, tableKey2};
|
|
auto joinEdgeWeightIt = joinEdgesToWeights.find(edgeForward);
|
|
if (joinEdgeWeightIt == joinEdgesToWeights.end())
|
|
joinEdgesToWeights.insert({edgeForward, weight});
|
|
else
|
|
joinEdgeWeightIt->second = std::max(weight, joinEdgeWeightIt->second);
|
|
|
|
// Edge backward.
|
|
JoinEdge edgeBackward{tableKey2, tableKey1};
|
|
joinEdgeWeightIt = joinEdgesToWeights.find(edgeBackward);
|
|
if (joinEdgeWeightIt == joinEdgesToWeights.end())
|
|
joinEdgesToWeights.insert({edgeBackward, weight});
|
|
else
|
|
joinEdgeWeightIt->second = std::max(weight, joinEdgeWeightIt->second);
|
|
|
|
if (minWeightFullGraph > weight)
|
|
{
|
|
minWeightFullGraph = weight;
|
|
joinEdgeWithMinWeight = edgeForward;
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << edgeForward.first << " <-> " << edgeForward.second << " : " << weight << std::endl;
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "Minimum weight edge is: " << joinEdgeWithMinWeight.first << " <-> "
|
|
<< joinEdgeWithMinWeight.second << std::endl;
|
|
|
|
// Search for `head table` by the given join edge, we have 2 candidates.
|
|
// The head table is opposite to the table which has a join edge with minimum weight among all edges related
|
|
// to that table.
|
|
if (getSublingsMinWeight(joinEdgeWithMinWeight.first, joinEdgeWithMinWeight.second) >
|
|
getSublingsMinWeight(joinEdgeWithMinWeight.second, joinEdgeWithMinWeight.first))
|
|
headTable = joinEdgeWithMinWeight.first;
|
|
else
|
|
headTable = joinEdgeWithMinWeight.second;
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "Head table is: " << headTable << std::endl;
|
|
}
|
|
|
|
void CircularOuterJoinGraphTransformer::analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable)
|
|
{
|
|
joinGraph[currentTable].fTableColor = JoinTableColor::GREY;
|
|
joinGraph[currentTable].fParent = prevTable;
|
|
|
|
std::vector<std::pair<uint32_t, int64_t>> adjacentListWeighted;
|
|
// For each adjacent node.
|
|
for (const auto adjNode : joinGraph[currentTable].fAdjacentList)
|
|
{
|
|
if (prevTable != adjNode)
|
|
{
|
|
const JoinEdge joinEdge{currentTable, adjNode};
|
|
const auto weight = joinEdgesToWeights[joinEdge];
|
|
adjacentListWeighted.push_back({adjNode, weight});
|
|
}
|
|
}
|
|
|
|
// Sort vertices by weights.
|
|
std::sort(adjacentListWeighted.begin(), adjacentListWeighted.end(),
|
|
[](const std::pair<uint32_t, int64_t>& a, const std::pair<uint32_t, int64_t>& b) {
|
|
return a.second < b.second;
|
|
});
|
|
|
|
// For each weighted adjacent node.
|
|
for (const auto& adjNodeWeighted : adjacentListWeighted)
|
|
{
|
|
const auto adjNode = adjNodeWeighted.first;
|
|
// If visited and not a back edge consider as a cycle.
|
|
if (joinGraph[adjNode].fTableColor == JoinTableColor::GREY)
|
|
{
|
|
Cycle cycle;
|
|
const auto edgeForward = make_pair(currentTable, adjNode);
|
|
const auto edgeBackward = make_pair(adjNode, currentTable);
|
|
|
|
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
|
|
{
|
|
edgesToTransform.insert(edgeForward);
|
|
cycle.push_back(edgeForward);
|
|
}
|
|
|
|
auto nodeIt = currentTable;
|
|
auto nextNode = joinGraph[nodeIt].fParent;
|
|
// Walk back until we find node `adjNode` we identified before.
|
|
while (nextNode != std::numeric_limits<uint32_t>::max() && nextNode != adjNode)
|
|
{
|
|
const auto edgeForward = make_pair(nextNode, nodeIt);
|
|
const auto edgeBackward = make_pair(nodeIt, nextNode);
|
|
|
|
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
|
|
{
|
|
edgesToTransform.insert(edgeForward);
|
|
cycle.push_back(edgeForward);
|
|
}
|
|
|
|
nodeIt = nextNode;
|
|
nextNode = joinGraph[nodeIt].fParent;
|
|
}
|
|
|
|
// Add the last edge.
|
|
if (nextNode != std::numeric_limits<uint32_t>::max())
|
|
{
|
|
const auto edgeForward = make_pair(nextNode, nodeIt);
|
|
const auto edgeBackward = make_pair(nodeIt, nextNode);
|
|
|
|
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
|
|
{
|
|
edgesToTransform.insert(edgeForward);
|
|
cycle.push_back(edgeForward);
|
|
}
|
|
}
|
|
|
|
// Collect the cycle.
|
|
if (cycle.size())
|
|
cycles.push_back(std::move(cycle));
|
|
}
|
|
else if (joinGraph[adjNode].fTableColor == JoinTableColor::WHITE)
|
|
{
|
|
analyzeJoinGraph(adjNode, currentTable);
|
|
}
|
|
}
|
|
|
|
joinGraph[currentTable].fTableColor = JoinTableColor::BLACK;
|
|
}
|
|
|
|
void CircularOuterJoinGraphTransformer::chooseEdgeToTransform(Cycle& cycle,
|
|
std::pair<JoinEdge, int64_t>& resultEdge)
|
|
{
|
|
int64_t maxWeightInCycle = std::numeric_limits<int64_t>::min();
|
|
JoinEdge joinEdgeWithMaxWeight;
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "Collected cycle:\n";
|
|
|
|
// Search for a join edge with max weight in a given cycle.
|
|
for (const auto& edgeForward : cycle)
|
|
{
|
|
if (jobInfo.trace)
|
|
std::cout << "Join edge: " << edgeForward.first << " <-> " << edgeForward.second
|
|
<< " with weight: " << joinEdgesToWeights[edgeForward] << "\n";
|
|
|
|
if (joinEdgesToWeights[edgeForward] > maxWeightInCycle)
|
|
{
|
|
maxWeightInCycle = joinEdgesToWeights[edgeForward];
|
|
joinEdgeWithMaxWeight = edgeForward;
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "Join edge with max weight in a cycle: " << joinEdgeWithMaxWeight.first << " <-> "
|
|
<< joinEdgeWithMaxWeight.second << " weight: " << maxWeightInCycle << "\n";
|
|
|
|
// Search for `large side table`. The `large side table` is table related to the maximum join edge in a
|
|
// cycle, it has a maximum weight among all join edges related to that table and less than maximum join edge
|
|
// in a cycle.
|
|
uint32_t largeSideTable = joinEdgeWithMaxWeight.first;
|
|
if (getSublingsMaxWeightLessThan(joinEdgeWithMaxWeight.second, joinEdgeWithMaxWeight.first,
|
|
maxWeightInCycle) >
|
|
getSublingsMaxWeightLessThan(joinEdgeWithMaxWeight.first, joinEdgeWithMaxWeight.second,
|
|
maxWeightInCycle))
|
|
largeSideTable = joinEdgeWithMaxWeight.second;
|
|
|
|
if (maxWeightInCycle < 0)
|
|
maxWeightInCycle = std::numeric_limits<int64_t>::max() + maxWeightInCycle + 1;
|
|
idbassert(maxWeightInCycle > 0);
|
|
|
|
// Add large table to the map for the `join ordering` part.
|
|
if (!jobInfo.tablesForLargeSide.count(largeSideTable))
|
|
jobInfo.tablesForLargeSide.insert({largeSideTable, maxWeightInCycle});
|
|
|
|
if (jobInfo.trace)
|
|
std::cout << "Large side table: " << largeSideTable << std::endl;
|
|
|
|
// Assign a result edge.
|
|
resultEdge = std::make_pair(joinEdgeWithMaxWeight, maxWeightInCycle);
|
|
}
|
|
|
|
void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector& joinSteps, JobInfo& jobInfo)
|
|
{
|
|
bool spanningTree = true;
|
|
unsigned errcode = 0;
|
|
Message::Args args;
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << "Table Connection:" << endl;
|
|
|
|
for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
|
|
{
|
|
cout << i->first << " :";
|
|
vector<uint32_t>::iterator j = i->second.fAdjacentList.begin();
|
|
|
|
while (j != i->second.fAdjacentList.end())
|
|
cout << " " << *j++;
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
if (tableInfoMap.size() < 1)
|
|
{
|
|
spanningTree = false;
|
|
cerr << boldStart << "No table information." << boldStop << endl;
|
|
throw logic_error("No table information.");
|
|
}
|
|
else if (tableInfoMap.size() > 1)
|
|
{
|
|
// 1a. make sure all tables are joined if not a single table query.
|
|
set<uint32_t> nodeSet;
|
|
set<pair<uint32_t, uint32_t>> pathSet;
|
|
vector<uint32_t> joinedTables;
|
|
joinedTables.push_back((tableInfoMap.begin())->first);
|
|
|
|
for (size_t i = 0; i < joinedTables.size(); i++)
|
|
{
|
|
vector<uint32_t>& v = tableInfoMap[joinedTables[i]].fAdjacentList;
|
|
nodeSet.insert(joinedTables[i]);
|
|
|
|
for (vector<uint32_t>::iterator j = v.begin(); j != v.end(); j++)
|
|
{
|
|
if (nodeSet.find(*j) == nodeSet.end())
|
|
joinedTables.push_back(*j);
|
|
|
|
nodeSet.insert(*j);
|
|
pathSet.insert(make_pair(joinedTables[i], *j));
|
|
}
|
|
}
|
|
|
|
// 1b. convert expressions to function joins if not connected with simple column joins.
|
|
bool fjAdded = false;
|
|
|
|
while (joinedTables.size() < tableInfoMap.size() &&
|
|
addFunctionJoin(joinedTables, joinSteps, nodeSet, pathSet, tableInfoMap, jobInfo))
|
|
{
|
|
fjAdded = true;
|
|
|
|
for (size_t i = joinedTables.size() - 1; i < joinedTables.size(); i++)
|
|
{
|
|
vector<uint32_t>& v = tableInfoMap[joinedTables[i]].fAdjacentList;
|
|
|
|
for (vector<uint32_t>::iterator j = v.begin(); j != v.end(); j++)
|
|
{
|
|
if (find(joinedTables.begin(), joinedTables.end(), *j) == joinedTables.end())
|
|
joinedTables.push_back(*j);
|
|
|
|
nodeSet.insert(*j);
|
|
pathSet.insert(make_pair(joinedTables[i], *j));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace && fjAdded)
|
|
{
|
|
cout << "Table Connection after adding function join:" << endl;
|
|
|
|
for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
|
|
{
|
|
cout << i->first << " :";
|
|
vector<uint32_t>::iterator j = i->second.fAdjacentList.begin();
|
|
|
|
while (j != i->second.fAdjacentList.end())
|
|
cout << " " << *j++;
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
// Check that join is compatible
|
|
set<string> views1;
|
|
set<string> tables1;
|
|
string errStr;
|
|
|
|
vector<uint32_t>::iterator k = joinedTables.begin();
|
|
|
|
k = joinedTables.begin();
|
|
|
|
for (; k != joinedTables.end(); k++)
|
|
{
|
|
if (jobInfo.keyInfo->tupleKeyVec[*k].fView.empty())
|
|
tables1.insert(jobInfo.keyInfo->tupleKeyToName[*k]);
|
|
else
|
|
views1.insert(jobInfo.keyInfo->tupleKeyVec[*k].fView);
|
|
|
|
if (jobInfo.incompatibleJoinMap.find(*k) != jobInfo.incompatibleJoinMap.end())
|
|
{
|
|
errcode = ERR_INCOMPATIBLE_JOIN;
|
|
|
|
uint32_t key2 = jobInfo.incompatibleJoinMap[*k];
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[*k].fView.length() > 0)
|
|
{
|
|
string view2 = jobInfo.keyInfo->tupleKeyVec[key2].fView;
|
|
|
|
if (jobInfo.keyInfo->tupleKeyVec[*k].fView == view2)
|
|
{
|
|
// same view
|
|
errStr += "Tables in '" + view2 + "' have";
|
|
}
|
|
else if (view2.empty())
|
|
{
|
|
// view and real table
|
|
errStr += "'" + jobInfo.keyInfo->tupleKeyVec[*k].fView + "' and '" +
|
|
jobInfo.keyInfo->tupleKeyToName[key2] + "' have";
|
|
}
|
|
else
|
|
{
|
|
// two views
|
|
errStr += "'" + jobInfo.keyInfo->tupleKeyVec[*k].fView + "' and '" + view2 + "' have";
|
|
}
|
|
}
|
|
else
|
|
{
|
|
string view2 = jobInfo.keyInfo->tupleKeyVec[key2].fView;
|
|
|
|
if (view2.empty())
|
|
{
|
|
// two real tables
|
|
errStr += "'" + jobInfo.keyInfo->tupleKeyToName[*k] + "' and '" +
|
|
jobInfo.keyInfo->tupleKeyToName[key2] + "' have";
|
|
}
|
|
else
|
|
{
|
|
// real table and view
|
|
errStr += "'" + jobInfo.keyInfo->tupleKeyToName[*k] + "' and '" + view2 + "' have";
|
|
}
|
|
}
|
|
|
|
args.add(errStr);
|
|
spanningTree = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// 1c. check again if all tables are joined after pulling in function joins.
|
|
if (joinedTables.size() < tableInfoMap.size())
|
|
{
|
|
vector<uint32_t> notJoinedTables;
|
|
|
|
for (TableInfoMap::iterator i = tableInfoMap.begin(); i != tableInfoMap.end(); i++)
|
|
{
|
|
if (find(joinedTables.begin(), joinedTables.end(), i->first) == joinedTables.end())
|
|
notJoinedTables.push_back(i->first);
|
|
}
|
|
|
|
set<string> views2;
|
|
set<string> tables2;
|
|
k = notJoinedTables.begin();
|
|
|
|
for (; k != notJoinedTables.end(); k++)
|
|
{
|
|
if (jobInfo.keyInfo->tupleKeyVec[*k].fView.empty())
|
|
tables2.insert(jobInfo.keyInfo->tupleKeyToName[*k]);
|
|
else
|
|
views2.insert(jobInfo.keyInfo->tupleKeyVec[*k].fView);
|
|
}
|
|
|
|
if (errStr.empty())
|
|
{
|
|
errcode = ERR_MISS_JOIN;
|
|
|
|
// 1. check if all tables in a view are joined
|
|
for (set<string>::iterator s = views1.begin(); s != views1.end(); s++)
|
|
{
|
|
if (views2.find(*s) != views2.end())
|
|
{
|
|
errStr = "Tables in '" + (*s) + "' are";
|
|
}
|
|
}
|
|
|
|
// 2. tables and views are joined
|
|
if (errStr.empty())
|
|
{
|
|
string set1;
|
|
|
|
for (set<string>::iterator s = views1.begin(); s != views1.end(); s++)
|
|
{
|
|
if (set1.empty())
|
|
set1 = "'";
|
|
else
|
|
set1 += ", ";
|
|
|
|
set1 += (*s);
|
|
}
|
|
|
|
for (set<string>::iterator s = tables1.begin(); s != tables1.end(); s++)
|
|
{
|
|
if (set1.empty())
|
|
set1 = "'";
|
|
else
|
|
set1 += ", ";
|
|
|
|
set1 += (*s);
|
|
}
|
|
|
|
string set2;
|
|
|
|
for (set<string>::iterator s = views2.begin(); s != views2.end(); s++)
|
|
{
|
|
if (set2.empty())
|
|
set2 = "'";
|
|
else
|
|
set2 += ", ";
|
|
|
|
set2 += (*s);
|
|
}
|
|
|
|
for (set<string>::iterator s = tables2.begin(); s != tables2.end(); s++)
|
|
{
|
|
if (set2.empty())
|
|
set2 = "'";
|
|
else
|
|
set2 += ", ";
|
|
|
|
set2 += (*s);
|
|
}
|
|
|
|
errStr = set1 + "' and " + set2 + "' are";
|
|
args.add(errStr);
|
|
spanningTree = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2. Cycles.
|
|
if (spanningTree && (nodeSet.size() - pathSet.size() / 2 != 1))
|
|
{
|
|
std::unique_ptr<CircularJoinGraphTransformer> joinGraphTransformer;
|
|
if (jobInfo.outerOnTable.size() == 0)
|
|
joinGraphTransformer.reset(new CircularJoinGraphTransformer(tableInfoMap, jobInfo, joinSteps));
|
|
else
|
|
joinGraphTransformer.reset(new CircularOuterJoinGraphTransformer(tableInfoMap, jobInfo, joinSteps));
|
|
|
|
joinGraphTransformer->transformJoinGraph();
|
|
}
|
|
}
|
|
|
|
if (!spanningTree)
|
|
{
|
|
cerr << boldStart << IDBErrorInfo::instance()->errorMsg(errcode, args) << boldStop << endl;
|
|
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(errcode, args), errcode);
|
|
}
|
|
}
|
|
|
|
void outjoinPredicateAdjust(TableInfoMap& tableInfoMap, JobInfo& jobInfo)
|
|
{
|
|
std::set<uint32_t> tables = jobInfo.outerOnTable;
|
|
if (!tables.size())
|
|
return;
|
|
|
|
// Mixed outer/inner joins and a table with a `null filter`.
|
|
for (const auto tableId : jobInfo.innerOnTable)
|
|
{
|
|
if (jobInfo.tableHasIsNull.find(tableId) != jobInfo.tableHasIsNull.end())
|
|
tables.insert(tableId);
|
|
}
|
|
|
|
for (const auto tableId : tables)
|
|
{
|
|
// resetTableFilters(tableInfoMap[tableId], jobInfo)
|
|
TableInfo& tblInfo = tableInfoMap[tableId];
|
|
|
|
if (tblInfo.fTableOid != CNX_VTABLE_ID)
|
|
{
|
|
JobStepVector::iterator k = tblInfo.fQuerySteps.begin();
|
|
JobStepVector onClauseFilterSteps; //@bug5887, 5311
|
|
|
|
for (; k != tblInfo.fQuerySteps.end(); k++)
|
|
{
|
|
if ((*k)->onClauseFilter())
|
|
{
|
|
onClauseFilterSteps.push_back(*k);
|
|
continue;
|
|
}
|
|
|
|
uint32_t colKey = -1;
|
|
pColStep* pcs = dynamic_cast<pColStep*>(k->get());
|
|
pColScanStep* pcss = dynamic_cast<pColScanStep*>(k->get());
|
|
pDictionaryScan* pdss = dynamic_cast<pDictionaryScan*>(k->get());
|
|
pDictionaryStep* pdsp = dynamic_cast<pDictionaryStep*>(k->get());
|
|
vector<const execplan::Filter*>* filters = NULL;
|
|
int8_t bop = -1;
|
|
|
|
if (pcs != NULL)
|
|
{
|
|
filters = &(pcs->getFilters());
|
|
bop = pcs->BOP();
|
|
colKey = pcs->tupleId();
|
|
}
|
|
else if (pcss != NULL)
|
|
{
|
|
filters = &(pcss->getFilters());
|
|
bop = pcss->BOP();
|
|
colKey = pcss->tupleId();
|
|
}
|
|
else if (pdss != NULL)
|
|
{
|
|
filters = &(pdss->getFilters());
|
|
bop = pdss->BOP();
|
|
colKey = pdss->tupleId();
|
|
}
|
|
else if (pdsp != NULL)
|
|
{
|
|
filters = &(pdsp->getFilters());
|
|
bop = pdsp->BOP();
|
|
colKey = pdsp->tupleId();
|
|
}
|
|
|
|
if (filters != NULL && filters->size() > 0)
|
|
{
|
|
ParseTree* pt = new ParseTree((*filters)[0]->clone());
|
|
|
|
for (size_t i = 1; i < filters->size(); i++)
|
|
{
|
|
ParseTree* left = pt;
|
|
ParseTree* right = new ParseTree((*filters)[i]->clone());
|
|
ParseTree* op = (BOP_OR == bop) ? new ParseTree(new LogicOperator("or"))
|
|
: new ParseTree(new LogicOperator("and"));
|
|
op->left(left);
|
|
op->right(right);
|
|
|
|
pt = op;
|
|
}
|
|
|
|
ExpressionStep* es = new ExpressionStep(jobInfo);
|
|
|
|
if (es == NULL)
|
|
throw runtime_error("Failed to new ExpressionStep 2");
|
|
|
|
es->expressionFilter(pt, jobInfo);
|
|
SJSTEP sjstep(es);
|
|
jobInfo.outerJoinExpressions.push_back(sjstep);
|
|
tblInfo.fColsInOuter.push_back(colKey);
|
|
|
|
delete pt;
|
|
}
|
|
}
|
|
|
|
// Do not apply the primitive filters if there is an "IS NULL" in where clause.
|
|
if (jobInfo.tableHasIsNull.find(tableId) != jobInfo.tableHasIsNull.end())
|
|
tblInfo.fQuerySteps = onClauseFilterSteps;
|
|
}
|
|
|
|
jobInfo.outerJoinExpressions.insert(jobInfo.outerJoinExpressions.end(), tblInfo.fOneTableExpSteps.begin(),
|
|
tblInfo.fOneTableExpSteps.end());
|
|
tblInfo.fOneTableExpSteps.clear();
|
|
|
|
tblInfo.fColsInOuter.insert(tblInfo.fColsInOuter.end(), tblInfo.fColsInExp1.begin(),
|
|
tblInfo.fColsInExp1.end());
|
|
}
|
|
}
|
|
|
|
uint32_t getLargestTable(JobInfo& jobInfo, TableInfoMap& tableInfoMap, bool overrideLargeSideEstimate)
|
|
{
|
|
// Subquery in FROM clause assumptions:
|
|
// hint will be ignored, if the 1st table in FROM clause is a derived table.
|
|
if (jobInfo.keyInfo->tupleKeyVec[jobInfo.tableList[0]].fId < 3000)
|
|
overrideLargeSideEstimate = false;
|
|
|
|
// Bug 2123. Added logic to dynamically determine the large side table unless the SQL statement
|
|
// contained a hint saying to skip the estimation and use the FIRST table in the from clause.
|
|
// Prior to this, we were using the LAST table in the from clause. We switched it as there
|
|
// were some outer join sqls that couldn't be written with the large table last.
|
|
// Default to the first table in the from clause if:
|
|
// the user set the hint; or
|
|
// there is only one table in the query.
|
|
uint32_t ret = jobInfo.tableList.front();
|
|
|
|
if (jobInfo.tableList.size() <= 1)
|
|
{
|
|
return ret;
|
|
}
|
|
|
|
// Algorithm to dynamically determine the largest table.
|
|
uint64_t largestCardinality = 0;
|
|
uint64_t estimatedRowCount = 0;
|
|
|
|
// Loop through the tables and find the one with the largest estimated cardinality.
|
|
for (uint32_t i = 0; i < jobInfo.tableList.size(); i++)
|
|
{
|
|
jobInfo.tableSize[jobInfo.tableList[i]] = 0;
|
|
TableInfoMap::iterator it = tableInfoMap.find(jobInfo.tableList[i]);
|
|
|
|
if (it != tableInfoMap.end())
|
|
{
|
|
// @Bug 3771. Loop through the query steps until the tupleBPS is found instead of
|
|
// just looking at the first one. Tables in the query that included a filter on a
|
|
// dictionary column were not getting their row count estimated.
|
|
for (JobStepVector::iterator jsIt = it->second.fQuerySteps.begin();
|
|
jsIt != it->second.fQuerySteps.end(); jsIt++)
|
|
{
|
|
TupleBPS* tupleBPS = dynamic_cast<TupleBPS*>((*jsIt).get());
|
|
|
|
if (tupleBPS != NULL)
|
|
{
|
|
estimatedRowCount = tupleBPS->getEstimatedRowCount();
|
|
jobInfo.tableSize[jobInfo.tableList[i]] = estimatedRowCount;
|
|
|
|
if (estimatedRowCount > largestCardinality)
|
|
{
|
|
ret = jobInfo.tableList[i];
|
|
largestCardinality = estimatedRowCount;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// select /*! INFINIDB_ORDERED */
|
|
if (overrideLargeSideEstimate)
|
|
{
|
|
ret = jobInfo.tableList.front();
|
|
jobInfo.tableSize[ret] = numeric_limits<uint64_t>::max();
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
uint32_t getPrevLarge(uint32_t n, TableInfoMap& tableInfoMap)
|
|
{
|
|
// root node : no previous node;
|
|
// other node: only one immediate previous node;
|
|
int prev = -1;
|
|
vector<uint32_t>& adjList = tableInfoMap[n].fAdjacentList;
|
|
|
|
for (vector<uint32_t>::iterator i = adjList.begin(); i != adjList.end() && prev < 0; i++)
|
|
{
|
|
if (tableInfoMap[*i].fVisited == true)
|
|
prev = *i;
|
|
}
|
|
|
|
return prev;
|
|
}
|
|
|
|
uint32_t getKeyIndex(uint32_t key, const RowGroup& rg)
|
|
{
|
|
vector<uint32_t>::const_iterator i = rg.getKeys().begin();
|
|
|
|
for (; i != rg.getKeys().end(); ++i)
|
|
if (key == *i)
|
|
break;
|
|
|
|
if (i == rg.getKeys().end())
|
|
throw runtime_error("No key found.");
|
|
|
|
return std::distance(rg.getKeys().begin(), i);
|
|
}
|
|
|
|
bool joinInfoCompare(const SP_JoinInfo& a, const SP_JoinInfo& b)
|
|
{
|
|
return (a->fJoinData.fJoinId < b->fJoinData.fJoinId);
|
|
}
|
|
|
|
string joinTypeToString(const JoinType& joinType)
|
|
{
|
|
string ret;
|
|
|
|
if (joinType & INNER)
|
|
ret = "inner";
|
|
else if (joinType & LARGEOUTER)
|
|
ret = "largeOuter";
|
|
else if (joinType & SMALLOUTER)
|
|
ret = "smallOuter";
|
|
|
|
if (joinType & SEMI)
|
|
ret += "+semi";
|
|
|
|
if (joinType & ANTI)
|
|
ret += "+ant";
|
|
|
|
if (joinType & SCALAR)
|
|
ret += "+scalar";
|
|
|
|
if (joinType & MATCHNULLS)
|
|
ret += "+matchnulls";
|
|
|
|
if (joinType & WITHFCNEXP)
|
|
ret += "+exp";
|
|
|
|
if (joinType & CORRELATED)
|
|
ret += "+correlated";
|
|
|
|
return ret;
|
|
}
|
|
|
|
bool matchKeys(const vector<uint32_t>& keysToSearch, const vector<uint32_t>& keysToMatch)
|
|
{
|
|
std::unordered_set<uint32_t> keysMap;
|
|
for (const auto key : keysToSearch)
|
|
keysMap.insert(key);
|
|
|
|
for (const auto key : keysToMatch)
|
|
{
|
|
if (!keysMap.count(key))
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void tryToRestoreJoinEdges(JobInfo& jobInfo, JoinInfo* joinInfo, const RowGroup& largeSideRG,
|
|
std::vector<uint32_t>& smallKeyIndices, std::vector<uint32_t>& largeKeyIndices,
|
|
std::vector<std::string>& traces, std::map<int64_t, uint32_t>& joinIndexIdMap,
|
|
uint32_t smallSideIndex)
|
|
{
|
|
if (!jobInfo.joinEdgesToRestore.size())
|
|
return;
|
|
|
|
const RowGroup& smallSideRG = joinInfo->fRowGroup;
|
|
ostringstream oss;
|
|
if (jobInfo.trace)
|
|
oss << "\n\nTry to match edges for the small and large sides rowgroups\n";
|
|
|
|
std::vector<uint32_t> smallKeyIndicesToRestore;
|
|
std::vector<uint32_t> largeKeyIndicesToRestore;
|
|
std::vector<pair<JoinEdge, int64_t>> takenEdgesWithJoinIDs;
|
|
auto& joinEdgesToRestore = jobInfo.joinEdgesToRestore;
|
|
|
|
// We could have a multple join edges to restore from the same vertex e.g:
|
|
// t1 -> t2 -> t3
|
|
// ^ ^ |
|
|
// \ | V
|
|
// t5 <- t4
|
|
// Edges to restore: {t5, t2}, {t5, t1}
|
|
// Large side row group: {t5}
|
|
// Small side row group: {t1, t2, t3, t4}
|
|
// Large side join keys: {t5, t5}
|
|
// Small side join keys: {t2, t1}
|
|
for (const auto& [edge, joinId] : joinEdgesToRestore)
|
|
{
|
|
auto it = jobInfo.tableJoinMap.find(edge);
|
|
// Edge keys.
|
|
const auto& leftKeys = it->second.fLeftKeys;
|
|
const auto& rightKeys = it->second.fRightKeys;
|
|
// Keys for the given rowgroups.
|
|
// Large side and small side.
|
|
const auto& smallSideKeys = smallSideRG.getKeys();
|
|
const auto& largeSideKeys = largeSideRG.getKeys();
|
|
|
|
// Check if left in large and right in small.
|
|
if (matchKeys(largeSideKeys, leftKeys) && matchKeys(smallSideKeys, rightKeys))
|
|
{
|
|
for (uint32_t i = 0, e = leftKeys.size(); i < e; ++i)
|
|
largeKeyIndicesToRestore.push_back(getKeyIndex(leftKeys[i], largeSideRG));
|
|
|
|
for (uint32_t i = 0, e = rightKeys.size(); i < e; ++i)
|
|
smallKeyIndicesToRestore.push_back(getKeyIndex(rightKeys[i], smallSideRG));
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
oss << "Keys matched.\n";
|
|
oss << "Left keys:\n";
|
|
for (const auto key : leftKeys)
|
|
oss << key << " ";
|
|
oss << "\nRight keys:\n";
|
|
for (const auto key : rightKeys)
|
|
oss << key << " ";
|
|
oss << '\n';
|
|
}
|
|
|
|
takenEdgesWithJoinIDs.push_back({edge, joinId});
|
|
continue;
|
|
}
|
|
|
|
// Otherwise check right in large and left in small.
|
|
if (matchKeys(largeSideKeys, rightKeys) && matchKeys(smallSideKeys, leftKeys))
|
|
{
|
|
for (uint32_t i = 0, e = rightKeys.size(); i < e; ++i)
|
|
largeKeyIndicesToRestore.push_back(getKeyIndex(rightKeys[i], largeSideRG));
|
|
|
|
for (uint32_t i = 0, e = leftKeys.size(); i < e; ++i)
|
|
smallKeyIndicesToRestore.push_back(getKeyIndex(leftKeys[i], smallSideRG));
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
oss << "Keys matched.\n";
|
|
oss << "Left keys:\n";
|
|
for (const auto key : leftKeys)
|
|
oss << key << " ";
|
|
oss << "\nRight keys:\n";
|
|
for (const auto key : rightKeys)
|
|
oss << key << " ";
|
|
oss << '\n';
|
|
}
|
|
|
|
takenEdgesWithJoinIDs.push_back({edge, joinId});
|
|
}
|
|
}
|
|
|
|
// Check if keys were not matched.
|
|
if (!smallKeyIndicesToRestore.size())
|
|
{
|
|
if (jobInfo.trace)
|
|
oss << "Keys not matched.\n\n";
|
|
|
|
traces.push_back(oss.str());
|
|
return;
|
|
}
|
|
|
|
// Add keys.
|
|
smallKeyIndices.insert(smallKeyIndices.end(), smallKeyIndicesToRestore.begin(),
|
|
smallKeyIndicesToRestore.end());
|
|
largeKeyIndices.insert(largeKeyIndices.end(), largeKeyIndicesToRestore.begin(),
|
|
largeKeyIndicesToRestore.end());
|
|
// Mark as tupeless for multiple keys join.
|
|
joinInfo->fJoinData.fTypeless = true;
|
|
|
|
// Associate a join id and small side index for the on clause filters and remove taken edges.
|
|
for (const auto& [edge, joinId] : takenEdgesWithJoinIDs)
|
|
{
|
|
joinIndexIdMap[joinId] = smallSideIndex;
|
|
auto it = joinEdgesToRestore.find(edge);
|
|
joinEdgesToRestore.erase(it);
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
oss << "Keys restored.\n";
|
|
oss << "Small side keys:\n";
|
|
for (const auto key : smallKeyIndices)
|
|
oss << key << " ";
|
|
oss << "\nLarge side keys:\n";
|
|
for (const auto key : largeKeyIndices)
|
|
oss << key << " ";
|
|
oss << "\n\n";
|
|
}
|
|
|
|
traces.push_back(oss.str());
|
|
}
|
|
|
|
void matchEdgesInResultRowGroup(const JobInfo& jobInfo, const RowGroup& rg,
|
|
std::map<JoinEdge, int64_t>& edgesToRestore,
|
|
PostJoinFilterKeys& postJoinFilterKeys)
|
|
{
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << "\nTrying to match the RowGroup to apply a post join "
|
|
"filter\n";
|
|
}
|
|
|
|
std::vector<JoinEdge> takenEdges;
|
|
for (const auto& [edge, joinId] : edgesToRestore)
|
|
{
|
|
std::vector<uint32_t> currentKeys;
|
|
auto it = jobInfo.tableJoinMap.find(edge);
|
|
|
|
// Combine keys.
|
|
currentKeys = it->second.fLeftKeys;
|
|
currentKeys.insert(currentKeys.end(), it->second.fRightKeys.begin(), it->second.fRightKeys.end());
|
|
|
|
// Rowgroup keys.
|
|
const auto& rgKeys = rg.getKeys();
|
|
uint32_t keyIndex = 0;
|
|
uint32_t keySize = currentKeys.size();
|
|
|
|
// Search for keys in result rowgroup.
|
|
while (keyIndex < keySize)
|
|
{
|
|
auto keyIt = std::find(rgKeys.begin(), rgKeys.end(), currentKeys[keyIndex]);
|
|
// We have to match all keys.
|
|
if (keyIt == rgKeys.end())
|
|
break;
|
|
|
|
++keyIndex;
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
if (keyIndex == keySize)
|
|
cout << "\nRowGroup matched\n";
|
|
else
|
|
cout << "\nRowGroup not matched\n";
|
|
|
|
cout << rg.toString() << endl;
|
|
cout << "For the following keys:\n";
|
|
for (auto key : currentKeys)
|
|
cout << key << " ";
|
|
cout << endl;
|
|
}
|
|
|
|
// All keys matched in current Rowgroup.
|
|
if (keyIndex == keySize)
|
|
{
|
|
// Add macthed keys.
|
|
postJoinFilterKeys.push_back(make_pair(edge, currentKeys));
|
|
takenEdges.push_back(edge);
|
|
}
|
|
}
|
|
|
|
// Erase taken edges.
|
|
for (const auto& edge : takenEdges)
|
|
{
|
|
auto it = edgesToRestore.find(edge);
|
|
edgesToRestore.erase(it);
|
|
}
|
|
}
|
|
|
|
void createPostJoinFilters(const JobInfo& jobInfo, TableInfoMap& tableInfoMap,
|
|
const PostJoinFilterKeys& postJoinFilterKeys,
|
|
const std::map<uint32_t, uint32_t>& keyToIndexMap,
|
|
std::vector<SimpleFilter*>& postJoinFilters)
|
|
{
|
|
for (const auto& p : postJoinFilterKeys)
|
|
{
|
|
const auto& edge = p.first;
|
|
const auto& keys = p.second;
|
|
|
|
if (jobInfo.trace)
|
|
cout << "\nRestore a cycle as a post join filter\n";
|
|
|
|
uint32_t leftKeyIndex = 0;
|
|
uint32_t rightKeyIndex = keys.size() / 2;
|
|
// Left end is where right starts.
|
|
const uint32_t leftSize = rightKeyIndex;
|
|
|
|
while (leftKeyIndex < leftSize)
|
|
{
|
|
// Keys.
|
|
auto leftKey = keys[leftKeyIndex];
|
|
auto rightKey = keys[rightKeyIndex];
|
|
|
|
// Column oids.
|
|
auto leftOid = jobInfo.keyInfo->tupleKeyVec[leftKey].fId;
|
|
auto rightOid = jobInfo.keyInfo->tupleKeyVec[rightKey].fId;
|
|
|
|
// Column types.
|
|
auto leftType = jobInfo.keyInfo->colType[keys[leftKeyIndex]];
|
|
auto rightType = jobInfo.keyInfo->colType[keys[rightKeyIndex]];
|
|
|
|
CalpontSystemCatalog::TableColName leftTableColName;
|
|
CalpontSystemCatalog::TableColName rightTableColName;
|
|
|
|
// Check for the dict.
|
|
if (joblist::isDictCol(leftType) && joblist::isDictCol(rightType))
|
|
{
|
|
leftTableColName = jobInfo.csc->dictColName(leftOid);
|
|
rightTableColName = jobInfo.csc->dictColName(rightOid);
|
|
}
|
|
else
|
|
{
|
|
leftTableColName = jobInfo.csc->colName(leftOid);
|
|
rightTableColName = jobInfo.csc->colName(rightOid);
|
|
}
|
|
|
|
// Create columns.
|
|
auto* leftColumn =
|
|
new SimpleColumn(leftTableColName.schema, leftTableColName.table, leftTableColName.column);
|
|
auto* rightColumn =
|
|
new SimpleColumn(rightTableColName.schema, rightTableColName.table, rightTableColName.column);
|
|
|
|
// Set column indices in the result Rowgroup.
|
|
auto leftIndexIt = keyToIndexMap.find(leftKey);
|
|
if (leftIndexIt != keyToIndexMap.end())
|
|
{
|
|
leftColumn->inputIndex(leftIndexIt->second);
|
|
}
|
|
else
|
|
{
|
|
std::cerr << "Cannot find key: " << leftKey << " in the IndexMap " << std::endl;
|
|
throw logic_error("Post join filter: Cannot find key in the index map");
|
|
}
|
|
|
|
auto rightIndexIt = keyToIndexMap.find(rightKey);
|
|
if (rightIndexIt != keyToIndexMap.end())
|
|
{
|
|
rightColumn->inputIndex(rightIndexIt->second);
|
|
}
|
|
else
|
|
{
|
|
std::cerr << "Cannot find key: " << rightKey << " in the IndexMap " << std::endl;
|
|
throw logic_error("Post join filter: Cannot find key in the index map");
|
|
}
|
|
|
|
// Create an eq operator.
|
|
SOP eqPredicateOperator(new PredicateOperator("="));
|
|
// Set a type.
|
|
eqPredicateOperator->setOpType(leftColumn->resultType(), rightColumn->resultType());
|
|
// Create a post join filter.
|
|
SimpleFilter* joinFilter = new SimpleFilter(eqPredicateOperator, leftColumn, rightColumn);
|
|
postJoinFilters.push_back(joinFilter);
|
|
|
|
// Erase keys from fColsInExp2.
|
|
auto& firstExp2 = tableInfoMap[edge.first].fColsInExp2;
|
|
auto keyItInExp2 = std::find(firstExp2.begin(), firstExp2.end(), leftKey);
|
|
if (keyItInExp2 != firstExp2.end())
|
|
firstExp2.erase(keyItInExp2);
|
|
|
|
auto& secondExp2 = tableInfoMap[edge.second].fColsInExp2;
|
|
keyItInExp2 = std::find(secondExp2.begin(), secondExp2.end(), rightKey);
|
|
if (keyItInExp2 != secondExp2.end())
|
|
secondExp2.erase(keyItInExp2);
|
|
|
|
++leftKeyIndex;
|
|
++rightKeyIndex;
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
if (postJoinFilters.size())
|
|
{
|
|
cout << "Post join filters created." << endl;
|
|
for (auto* filter : postJoinFilters)
|
|
cout << filter->toString() << endl;
|
|
}
|
|
else
|
|
{
|
|
std::cout << "Post join filters were not created." << std::endl;
|
|
}
|
|
}
|
|
}
|
|
|
|
SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo& jobInfo,
|
|
vector<uint32_t>& joinOrder, std::map<JoinEdge, int64_t>& joinEdgesToRestore)
|
|
{
|
|
vector<SP_JoinInfo> smallSides;
|
|
tableInfoMap[large].fVisited = true;
|
|
tableInfoMap[large].fJoinedTables.insert(large);
|
|
set<uint32_t>& tableSet = tableInfoMap[large].fJoinedTables;
|
|
vector<uint32_t>& adjList = tableInfoMap[large].fAdjacentList;
|
|
uint32_t prevLarge = (uint32_t)getPrevLarge(large, tableInfoMap);
|
|
bool root = (prevLarge == (uint32_t)-1) ? true : false;
|
|
uint32_t link = large;
|
|
uint32_t cId = -1;
|
|
|
|
// Get small sides ready.
|
|
for (vector<uint32_t>::iterator i = adjList.begin(); i != adjList.end(); i++)
|
|
{
|
|
if (tableInfoMap[*i].fVisited == false)
|
|
{
|
|
cId = *i;
|
|
smallSides.push_back(joinToLargeTable(*i, tableInfoMap, jobInfo, joinOrder, joinEdgesToRestore));
|
|
|
|
tableSet.insert(tableInfoMap[*i].fJoinedTables.begin(), tableInfoMap[*i].fJoinedTables.end());
|
|
}
|
|
}
|
|
|
|
// Join with its small sides, if not a leaf node.
|
|
if (smallSides.size() > 0)
|
|
{
|
|
// non-leaf node, need a join
|
|
SJSTEP spjs = tableInfoMap[large].fQuerySteps.back();
|
|
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(spjs.get());
|
|
SubAdapterStep* tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
|
|
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
|
|
|
|
// @bug6158, try to put BPS on large side if possible
|
|
if (tsas && smallSides.size() == 1)
|
|
{
|
|
SJSTEP sspjs = tableInfoMap[cId].fQuerySteps.back();
|
|
BatchPrimitive* sbps = dynamic_cast<BatchPrimitive*>(sspjs.get());
|
|
TupleHashJoinStep* sthjs = dynamic_cast<TupleHashJoinStep*>(sspjs.get());
|
|
|
|
if (sbps || (sthjs && sthjs->tokenJoin() == cId))
|
|
{
|
|
SP_JoinInfo largeJoinInfo(new JoinInfo);
|
|
largeJoinInfo->fTableOid = tableInfoMap[large].fTableOid;
|
|
largeJoinInfo->fAlias = tableInfoMap[large].fAlias;
|
|
largeJoinInfo->fView = tableInfoMap[large].fView;
|
|
largeJoinInfo->fSchema = tableInfoMap[large].fSchema;
|
|
|
|
largeJoinInfo->fDl = tableInfoMap[large].fDl;
|
|
largeJoinInfo->fRowGroup = tableInfoMap[large].fRowGroup;
|
|
|
|
TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(large, cId));
|
|
|
|
if (mit == jobInfo.tableJoinMap.end())
|
|
throw runtime_error("Join step not found.");
|
|
|
|
largeJoinInfo->fJoinData = mit->second;
|
|
|
|
// switch large and small sides
|
|
joinOrder.back() = large;
|
|
large = cId;
|
|
smallSides[0] = largeJoinInfo;
|
|
tableInfoMap[large].fJoinedTables = tableSet;
|
|
|
|
bps = sbps;
|
|
thjs = sthjs;
|
|
tsas = NULL;
|
|
}
|
|
}
|
|
|
|
if (!bps && !thjs && !tsas)
|
|
{
|
|
if (dynamic_cast<SubQueryStep*>(spjs.get()))
|
|
throw IDBExcept(ERR_NON_SUPPORT_SUB_QUERY_TYPE);
|
|
|
|
throw runtime_error("Not supported join.");
|
|
}
|
|
|
|
size_t dcf = 0; // for dictionary column filters, 0 if thjs is null.
|
|
RowGroup largeSideRG = tableInfoMap[large].fRowGroup;
|
|
|
|
if (thjs && thjs->tokenJoin() == large)
|
|
{
|
|
dcf = thjs->getLargeKeys().size();
|
|
largeSideRG = thjs->getLargeRowGroup();
|
|
}
|
|
|
|
// info for debug trace
|
|
vector<string> tableNames;
|
|
vector<string> traces;
|
|
|
|
// sort the smallsides base on the joinId
|
|
sort(smallSides.begin(), smallSides.end(), joinInfoCompare);
|
|
int64_t lastJoinId = smallSides.back()->fJoinData.fJoinId;
|
|
|
|
// get info to config the TupleHashjoin
|
|
DataListVec smallSideDLs;
|
|
vector<RowGroup> smallSideRGs;
|
|
vector<JoinType> jointypes;
|
|
vector<bool> typeless;
|
|
vector<vector<uint32_t>> smallKeyIndices;
|
|
vector<vector<uint32_t>> largeKeyIndices;
|
|
|
|
for (vector<SP_JoinInfo>::iterator i = smallSides.begin(); i != smallSides.end(); i++)
|
|
{
|
|
JoinInfo* info = i->get();
|
|
smallSideDLs.push_back(info->fDl);
|
|
smallSideRGs.push_back(info->fRowGroup);
|
|
jointypes.push_back(info->fJoinData.fTypes[0]);
|
|
typeless.push_back(info->fJoinData.fTypeless);
|
|
|
|
vector<uint32_t> smallIndices;
|
|
vector<uint32_t> largeIndices;
|
|
const vector<uint32_t>& keys1 = info->fJoinData.fLeftKeys;
|
|
const vector<uint32_t>& keys2 = info->fJoinData.fRightKeys;
|
|
vector<uint32_t>::const_iterator k1 = keys1.begin();
|
|
vector<uint32_t>::const_iterator k2 = keys2.begin();
|
|
uint32_t stid = getTableKey(jobInfo, *k1);
|
|
tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[stid].fTable);
|
|
|
|
for (; k1 != keys1.end(); ++k1, ++k2)
|
|
{
|
|
smallIndices.push_back(getKeyIndex(*k1, info->fRowGroup));
|
|
largeIndices.push_back(getKeyIndex(*k2, largeSideRG));
|
|
}
|
|
|
|
smallKeyIndices.push_back(smallIndices);
|
|
largeKeyIndices.push_back(largeIndices);
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
// small side column
|
|
ostringstream smallKey, smallIndex;
|
|
string alias1 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys1.front())];
|
|
smallKey << alias1 << "-";
|
|
|
|
for (k1 = keys1.begin(); k1 != keys1.end(); ++k1)
|
|
{
|
|
CalpontSystemCatalog::OID oid1 = jobInfo.keyInfo->tupleKeyVec[*k1].fId;
|
|
CalpontSystemCatalog::TableColName tcn1 = jobInfo.csc->colName(oid1);
|
|
smallKey << "(" << tcn1.column << ":" << oid1 << ":" << *k1 << ")";
|
|
smallIndex << " " << getKeyIndex(*k1, info->fRowGroup);
|
|
}
|
|
|
|
// large side column
|
|
ostringstream largeKey, largeIndex;
|
|
string alias2 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys2.front())];
|
|
largeKey << alias2 << "-";
|
|
|
|
for (k2 = keys2.begin(); k2 != keys2.end(); ++k2)
|
|
{
|
|
CalpontSystemCatalog::OID oid2 = jobInfo.keyInfo->tupleKeyVec[*k2].fId;
|
|
CalpontSystemCatalog::TableColName tcn2 = jobInfo.csc->colName(oid2);
|
|
largeKey << "(" << tcn2.column << ":" << oid2 << ":" << *k2 << ")";
|
|
largeIndex << " " << getKeyIndex(*k2, largeSideRG);
|
|
}
|
|
|
|
ostringstream oss;
|
|
oss << smallKey.str() << " join on " << largeKey.str()
|
|
<< " joinType: " << info->fJoinData.fTypes.front() << "("
|
|
<< joinTypeToString(info->fJoinData.fTypes.front()) << ")"
|
|
<< (info->fJoinData.fTypeless ? " " : " !") << "typeless" << endl;
|
|
oss << "smallSideIndex-largeSideIndex :" << smallIndex.str() << " --" << largeIndex.str() << endl;
|
|
oss << "small side RG" << endl << info->fRowGroup.toString() << endl;
|
|
traces.push_back(oss.str());
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
ostringstream oss;
|
|
oss << "large side RG" << endl << largeSideRG.toString() << endl;
|
|
traces.push_back(oss.str());
|
|
}
|
|
|
|
if (bps || tsas)
|
|
{
|
|
thjs = new TupleHashJoinStep(jobInfo);
|
|
thjs->tableOid1(smallSides[0]->fTableOid);
|
|
thjs->tableOid2(tableInfoMap[large].fTableOid);
|
|
thjs->alias1(smallSides[0]->fAlias);
|
|
thjs->alias2(tableInfoMap[large].fAlias);
|
|
thjs->view1(smallSides[0]->fView);
|
|
thjs->view2(tableInfoMap[large].fView);
|
|
thjs->schema1(smallSides[0]->fSchema);
|
|
thjs->schema2(tableInfoMap[large].fSchema);
|
|
thjs->setLargeSideBPS(bps);
|
|
thjs->joinId(lastJoinId);
|
|
|
|
if (dynamic_cast<TupleBPS*>(bps) != NULL)
|
|
bps->incWaitToRunStepCnt();
|
|
|
|
SJSTEP spjs(thjs);
|
|
|
|
JobStepAssociation inJsa;
|
|
inJsa.outAdd(smallSideDLs, 0);
|
|
inJsa.outAdd(tableInfoMap[large].fDl);
|
|
thjs->inputAssociation(inJsa);
|
|
thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
|
|
|
|
JobStepAssociation outJsa;
|
|
AnyDataListSPtr spdl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
spdl->rowGroupDL(dl);
|
|
dl->OID(large);
|
|
outJsa.outAdd(spdl);
|
|
thjs->outputAssociation(outJsa);
|
|
thjs->configSmallSideRG(smallSideRGs, tableNames);
|
|
thjs->configLargeSideRG(tableInfoMap[large].fRowGroup);
|
|
thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
|
|
|
|
tableInfoMap[large].fQuerySteps.push_back(spjs);
|
|
tableInfoMap[large].fDl = spdl;
|
|
}
|
|
else
|
|
{
|
|
JobStepAssociation inJsa = thjs->inputAssociation();
|
|
|
|
if (inJsa.outSize() < 2)
|
|
throw runtime_error("Not enough input to a hashjoin.");
|
|
|
|
size_t last = inJsa.outSize() - 1;
|
|
inJsa.outAdd(smallSideDLs, last);
|
|
thjs->inputAssociation(inJsa);
|
|
thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
|
|
|
|
thjs->addSmallSideRG(smallSideRGs, tableNames);
|
|
thjs->addJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
|
|
}
|
|
|
|
RowGroup rg;
|
|
constructJoinedRowGroup(rg, link, prevLarge, root, tableSet, tableInfoMap, jobInfo);
|
|
thjs->setOutputRowGroup(rg);
|
|
tableInfoMap[large].fRowGroup = rg;
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << boldStart << "\n====== join info ======\n" << boldStop;
|
|
|
|
for (vector<string>::iterator t = traces.begin(); t != traces.end(); ++t)
|
|
cout << *t;
|
|
|
|
cout << "RowGroup join result: " << endl << rg.toString() << endl << endl;
|
|
}
|
|
|
|
// check if any cross-table expressions can be evaluated after the join
|
|
JobStepVector readyExpSteps;
|
|
JobStepVector& expSteps = jobInfo.crossTableExpressions;
|
|
JobStepVector::iterator eit = expSteps.begin();
|
|
|
|
while (eit != expSteps.end())
|
|
{
|
|
ExpressionStep* exp = dynamic_cast<ExpressionStep*>(eit->get());
|
|
|
|
if (exp == NULL)
|
|
throw runtime_error("Not an expression.");
|
|
|
|
if (exp->functionJoin())
|
|
{
|
|
eit++;
|
|
continue; // done as join
|
|
}
|
|
|
|
const vector<uint32_t>& tables = exp->tableKeys();
|
|
uint64_t i = 0;
|
|
|
|
for (; i < tables.size(); i++)
|
|
{
|
|
if (tableSet.find(tables[i]) == tableSet.end())
|
|
break;
|
|
}
|
|
|
|
// all tables for this expression are joined
|
|
if (tables.size() == i)
|
|
{
|
|
readyExpSteps.push_back(*eit);
|
|
eit = expSteps.erase(eit);
|
|
}
|
|
else
|
|
{
|
|
eit++;
|
|
}
|
|
}
|
|
|
|
// if root, handle the delayed outer join filters
|
|
if (root && jobInfo.outerJoinExpressions.size() > 0)
|
|
readyExpSteps.insert(readyExpSteps.end(), jobInfo.outerJoinExpressions.begin(),
|
|
jobInfo.outerJoinExpressions.end());
|
|
|
|
// Check if we have a `join edges` to restore as post join filter for result rowgroup.
|
|
PostJoinFilterKeys postJoinFilterKeys;
|
|
if (joinEdgesToRestore.size())
|
|
matchEdgesInResultRowGroup(jobInfo, rg, joinEdgesToRestore, postJoinFilterKeys);
|
|
|
|
// check additional compares for semi-join.
|
|
if (readyExpSteps.size() || postJoinFilterKeys.size())
|
|
{
|
|
// tables have additional comparisons
|
|
map<uint32_t, int> correlateTables; // index in thjs
|
|
map<uint32_t, ParseTree*> correlateCompare; // expression
|
|
// map keys to the indices in the RG
|
|
map<uint32_t, uint32_t> keyToIndexMap;
|
|
|
|
const auto& rowGroupKeys = rg.getKeys();
|
|
for (uint64_t i = 0, e = rowGroupKeys.size(); i < e; ++i)
|
|
keyToIndexMap.insert(make_pair(rowGroupKeys[i], i));
|
|
|
|
if (readyExpSteps.size() > 0)
|
|
{
|
|
for (size_t i = 0; i != smallSides.size(); i++)
|
|
{
|
|
if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
|
|
{
|
|
uint32_t tid = getTableKey(jobInfo, smallSides[i]->fTableOid, smallSides[i]->fAlias,
|
|
smallSides[i]->fSchema, smallSides[i]->fView);
|
|
correlateTables[tid] = i;
|
|
correlateCompare[tid] = NULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (readyExpSteps.size() && correlateTables.size())
|
|
{
|
|
// separate additional compare for each table pair
|
|
JobStepVector::iterator eit = readyExpSteps.begin();
|
|
|
|
while (eit != readyExpSteps.end())
|
|
{
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
|
|
|
|
if (e->selectFilter())
|
|
{
|
|
// @bug3780, leave select filter to normal expression
|
|
eit++;
|
|
continue;
|
|
}
|
|
|
|
const vector<uint32_t>& tables = e->tableKeys();
|
|
map<uint32_t, int>::iterator j = correlateTables.end();
|
|
|
|
for (size_t i = 0; i < tables.size(); i++)
|
|
{
|
|
j = correlateTables.find(tables[i]);
|
|
|
|
if (j != correlateTables.end())
|
|
break;
|
|
}
|
|
|
|
if (j == correlateTables.end())
|
|
{
|
|
eit++;
|
|
continue;
|
|
}
|
|
|
|
// map the input column index
|
|
e->updateInputIndex(keyToIndexMap, jobInfo);
|
|
ParseTree* pt = correlateCompare[j->first];
|
|
|
|
if (pt == NULL)
|
|
{
|
|
// first expression
|
|
pt = new ParseTree;
|
|
pt->copyTree(*(e->expressionFilter()));
|
|
}
|
|
else
|
|
{
|
|
// combine the expressions
|
|
ParseTree* left = pt;
|
|
ParseTree* right = new ParseTree;
|
|
right->copyTree(*(e->expressionFilter()));
|
|
pt = new ParseTree(new LogicOperator("and"));
|
|
pt->left(left);
|
|
pt->right(right);
|
|
}
|
|
|
|
correlateCompare[j->first] = pt;
|
|
eit = readyExpSteps.erase(eit);
|
|
}
|
|
|
|
map<uint32_t, int>::iterator k = correlateTables.begin();
|
|
|
|
while (k != correlateTables.end())
|
|
{
|
|
ParseTree* pt = correlateCompare[k->first];
|
|
|
|
if (pt != NULL)
|
|
{
|
|
boost::shared_ptr<ParseTree> sppt(pt);
|
|
thjs->addJoinFilter(sppt, dcf + k->second);
|
|
}
|
|
|
|
k++;
|
|
}
|
|
|
|
thjs->setJoinFilterInputRG(rg);
|
|
}
|
|
|
|
// Normal expression or post join filters.
|
|
if (readyExpSteps.size() || postJoinFilterKeys.size())
|
|
{
|
|
std::vector<SimpleFilter*> postJoinFilters;
|
|
if (postJoinFilterKeys.size())
|
|
createPostJoinFilters(jobInfo, tableInfoMap, postJoinFilterKeys, keyToIndexMap, postJoinFilters);
|
|
|
|
// Add the expression steps in where clause can be solved by this join to bps.
|
|
ParseTree* pt = NULL;
|
|
for (auto* joinFilter : postJoinFilters)
|
|
{
|
|
if (pt == nullptr)
|
|
{
|
|
pt = new ParseTree(joinFilter);
|
|
}
|
|
else
|
|
{
|
|
ParseTree* left = pt;
|
|
ParseTree* right = new ParseTree(joinFilter);
|
|
pt = new ParseTree(new LogicOperator("and"));
|
|
pt->left(left);
|
|
pt->right(right);
|
|
}
|
|
}
|
|
|
|
JobStepVector::iterator eit = readyExpSteps.begin();
|
|
for (; eit != readyExpSteps.end(); eit++)
|
|
{
|
|
// map the input column index
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
|
|
e->updateInputIndex(keyToIndexMap, jobInfo);
|
|
|
|
if (pt == NULL)
|
|
{
|
|
// first expression
|
|
pt = new ParseTree;
|
|
pt->copyTree(*(e->expressionFilter()));
|
|
}
|
|
else
|
|
{
|
|
// combine the expressions
|
|
ParseTree* left = pt;
|
|
ParseTree* right = new ParseTree;
|
|
right->copyTree(*(e->expressionFilter()));
|
|
pt = new ParseTree(new LogicOperator("and"));
|
|
pt->left(left);
|
|
pt->right(right);
|
|
}
|
|
}
|
|
|
|
if (pt)
|
|
{
|
|
boost::shared_ptr<ParseTree> sppt(pt);
|
|
thjs->addFcnExpGroup2(sppt);
|
|
}
|
|
}
|
|
|
|
// update the fColsInExp2 and construct the output RG
|
|
updateExp2Cols(readyExpSteps, tableInfoMap, jobInfo);
|
|
constructJoinedRowGroup(rg, link, prevLarge, root, tableSet, tableInfoMap, jobInfo);
|
|
|
|
if (thjs->hasFcnExpGroup2())
|
|
thjs->setFE23Output(rg);
|
|
else
|
|
thjs->setOutputRowGroup(rg);
|
|
|
|
tableInfoMap[large].fRowGroup = rg;
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << "RowGroup of " << tableInfoMap[large].fAlias << " after EXP G2: " << endl
|
|
<< rg.toString() << endl
|
|
<< endl;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Prepare the current table info to join with its large side.
|
|
SP_JoinInfo joinInfo(new JoinInfo);
|
|
joinInfo->fTableOid = tableInfoMap[large].fTableOid;
|
|
joinInfo->fAlias = tableInfoMap[large].fAlias;
|
|
joinInfo->fView = tableInfoMap[large].fView;
|
|
joinInfo->fSchema = tableInfoMap[large].fSchema;
|
|
|
|
joinInfo->fDl = tableInfoMap[large].fDl;
|
|
joinInfo->fRowGroup = tableInfoMap[large].fRowGroup;
|
|
|
|
if (root == false) // not root
|
|
{
|
|
TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(link, prevLarge));
|
|
|
|
if (mit == jobInfo.tableJoinMap.end())
|
|
throw runtime_error("Join step not found.");
|
|
|
|
joinInfo->fJoinData = mit->second;
|
|
}
|
|
|
|
joinOrder.push_back(large);
|
|
|
|
return joinInfo;
|
|
}
|
|
|
|
bool joinStepCompare(const SJSTEP& a, const SJSTEP& b)
|
|
{
|
|
return (dynamic_cast<TupleHashJoinStep*>(a.get())->joinId() <
|
|
dynamic_cast<TupleHashJoinStep*>(b.get())->joinId());
|
|
}
|
|
|
|
struct JoinOrderData
|
|
{
|
|
uint32_t fTid1;
|
|
uint32_t fTid2;
|
|
int64_t fJoinId;
|
|
};
|
|
|
|
void getJoinOrder(vector<JoinOrderData>& joins, JobStepVector& joinSteps, JobInfo& jobInfo)
|
|
{
|
|
sort(joinSteps.begin(), joinSteps.end(), joinStepCompare);
|
|
|
|
for (JobStepVector::iterator i = joinSteps.begin(); i < joinSteps.end(); i++)
|
|
{
|
|
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
|
|
JoinOrderData jo;
|
|
jo.fTid1 = getTableKey(jobInfo, thjs->tupleId1());
|
|
jo.fTid2 = getTableKey(jobInfo, thjs->tupleId2());
|
|
jo.fJoinId = thjs->joinId();
|
|
|
|
// not fastest, but good for a small list
|
|
vector<JoinOrderData>::iterator j;
|
|
|
|
for (j = joins.begin(); j < joins.end(); j++)
|
|
{
|
|
if ((j->fTid1 == jo.fTid1 && j->fTid2 == jo.fTid2) || (j->fTid1 == jo.fTid2 && j->fTid2 == jo.fTid1))
|
|
{
|
|
j->fJoinId = jo.fJoinId;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// insert unique join pair
|
|
if (j == joins.end())
|
|
joins.push_back(jo);
|
|
}
|
|
}
|
|
|
|
inline void updateJoinSides(uint32_t small, uint32_t large, map<uint32_t, SP_JoinInfo>& joinInfoMap,
|
|
vector<SP_JoinInfo>& smallSides, TableInfoMap& tableInfoMap, JobInfo& jobInfo)
|
|
{
|
|
TableJoinMap::iterator mit = jobInfo.tableJoinMap.find(make_pair(small, large));
|
|
|
|
if (mit == jobInfo.tableJoinMap.end())
|
|
throw runtime_error("Join step not found.");
|
|
|
|
joinInfoMap[small]->fJoinData = mit->second;
|
|
tableInfoMap[small].fJoinedTables.insert(small);
|
|
smallSides.push_back(joinInfoMap[small]);
|
|
|
|
tableInfoMap[large].fJoinedTables.insert(tableInfoMap[small].fJoinedTables.begin(),
|
|
tableInfoMap[small].fJoinedTables.end());
|
|
tableInfoMap[large].fJoinedTables.insert(large);
|
|
}
|
|
|
|
inline bool needsFeForSmallSides(const JobInfo& jobInfo, const std::vector<JoinOrderData>& joins,
|
|
const std::set<uint32_t>& smallSideTid, uint32_t tableId)
|
|
{
|
|
const auto it = jobInfo.joinFeTableMap.find(joins[tableId].fJoinId);
|
|
if (it != jobInfo.joinFeTableMap.end())
|
|
{
|
|
const set<uint32_t>& tids = it->second;
|
|
for (const auto si : smallSideTid)
|
|
{
|
|
if (tids.count(si))
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// For OUTER JOIN bug @2422/2633/3437/3759, join table based on join order.
|
|
// The largest table will be always the streaming table, other tables are always on small side.
|
|
void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& tableInfoMap,
|
|
JobInfo& jobInfo, vector<uint32_t>& joinOrder)
|
|
{
|
|
// populate the tableInfo for join
|
|
std::map<uint32_t, SP_JoinInfo> joinInfoMap; // <table, JoinInfo>
|
|
|
|
// <table, <last step involved, large priority> >
|
|
// large priority:
|
|
// -1 - must be on small side, like derived tables for semi join;
|
|
// 0 - prefer to be on small side, like FROM subquery;
|
|
// 1 - can be on either large or small side;
|
|
// 2 - must be on large side.
|
|
map<uint32_t, pair<SJSTEP, int64_t>> joinStepMap;
|
|
BatchPrimitive* bps = NULL;
|
|
SubAdapterStep* tsas = NULL;
|
|
TupleHashJoinStep* thjs = NULL;
|
|
|
|
for (vector<uint32_t>::iterator i = jobInfo.tableList.begin(); i < jobInfo.tableList.end(); i++)
|
|
{
|
|
SP_JoinInfo joinInfo(new JoinInfo);
|
|
joinInfo->fTableOid = tableInfoMap[*i].fTableOid;
|
|
joinInfo->fAlias = tableInfoMap[*i].fAlias;
|
|
joinInfo->fView = tableInfoMap[*i].fView;
|
|
joinInfo->fSchema = tableInfoMap[*i].fSchema;
|
|
|
|
joinInfo->fDl = tableInfoMap[*i].fDl;
|
|
joinInfo->fRowGroup = tableInfoMap[*i].fRowGroup;
|
|
|
|
joinInfoMap[*i] = joinInfo;
|
|
|
|
SJSTEP spjs = tableInfoMap[*i].fQuerySteps.back();
|
|
bps = dynamic_cast<BatchPrimitive*>(spjs.get());
|
|
tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
|
|
thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
|
|
TupleBPS* tbps = dynamic_cast<TupleBPS*>(spjs.get());
|
|
|
|
/* We have to specify for `TupleHasJoinStep` associated with key to be restored, to be on a large side,
|
|
to avoid situation with multiple small sides, when CS tries to merge join steps.
|
|
|
|
1. Consider the following join graph:
|
|
t2
|
|
/ \
|
|
t1 -- t3
|
|
|
|
{t3, t1} has a max weight, we choose that edge to remove from join graph.
|
|
|
|
2. Join graph after {t3, t1} edge was removed:
|
|
t2
|
|
/ \
|
|
t1 t3
|
|
|
|
a. It's legal for CS to schedule this joins as one step join as follow:
|
|
join(small sides {t1, t3}, large side {t2})
|
|
|
|
b. Instead of generating two steps join as follow:
|
|
t1_t2 = join(small sides {t1}, large side {t2})
|
|
join(small sides {t1_t2}, large side {t3)}
|
|
|
|
In case of `a` we unable to implement a join with a multiple keys.
|
|
*/
|
|
|
|
if (jobInfo.tablesForLargeSide.count(*i))
|
|
{
|
|
const auto tableWeight = jobInfo.tablesForLargeSide[*i];
|
|
joinStepMap[*i] = make_pair(spjs, tableWeight);
|
|
}
|
|
else if (*i == largest)
|
|
{
|
|
joinStepMap[*i] = make_pair(spjs, 2);
|
|
}
|
|
else if (tbps || thjs)
|
|
{
|
|
joinStepMap[*i] = make_pair(spjs, 1);
|
|
}
|
|
else if (tsas)
|
|
{
|
|
joinStepMap[*i] = make_pair(spjs, 0);
|
|
}
|
|
else
|
|
{
|
|
joinStepMap[*i] = make_pair(spjs, -1);
|
|
}
|
|
}
|
|
|
|
// sort the join steps based on join ID.
|
|
std::vector<JoinOrderData> joins;
|
|
getJoinOrder(joins, joinSteps, jobInfo);
|
|
|
|
// join the steps
|
|
int64_t lastJoinId = -1;
|
|
uint32_t large = (uint32_t)-1;
|
|
uint32_t small = (uint32_t)-1;
|
|
uint32_t prevLarge = (uint32_t)-1;
|
|
bool umstream = false;
|
|
vector<uint32_t> joinedTable;
|
|
uint32_t lastJoin = joins.size() - 1;
|
|
bool isSemijoin = true;
|
|
|
|
for (uint64_t js = 0; js < joins.size(); js++)
|
|
{
|
|
std::set<uint32_t> smallSideTid;
|
|
|
|
if (joins[js].fJoinId != 0)
|
|
isSemijoin = false;
|
|
|
|
std::vector<SP_JoinInfo> smallSides;
|
|
uint32_t tid1 = joins[js].fTid1;
|
|
uint32_t tid2 = joins[js].fTid2;
|
|
lastJoinId = joins[js].fJoinId;
|
|
|
|
// largest has already joined, and this join cannot be merged.
|
|
if (prevLarge == largest && tid1 != largest && tid2 != largest)
|
|
umstream = true;
|
|
|
|
if (joinStepMap[tid1].second > joinStepMap[tid2].second) // high priority
|
|
{
|
|
large = tid1;
|
|
small = tid2;
|
|
}
|
|
else if (joinStepMap[tid1].second == joinStepMap[tid2].second &&
|
|
jobInfo.tableSize[tid1] >= jobInfo.tableSize[tid2]) // favor t1 for hint
|
|
{
|
|
large = tid1;
|
|
small = tid2;
|
|
}
|
|
else
|
|
{
|
|
large = tid2;
|
|
small = tid1;
|
|
}
|
|
|
|
updateJoinSides(small, large, joinInfoMap, smallSides, tableInfoMap, jobInfo);
|
|
|
|
// This is a table for multiple join edges, always a stream table.
|
|
if (joinStepMap[large].second > 2)
|
|
umstream = true;
|
|
|
|
if (find(joinedTable.begin(), joinedTable.end(), small) == joinedTable.end())
|
|
joinedTable.push_back(small);
|
|
|
|
smallSideTid.insert(small);
|
|
|
|
// merge in the next step if the large side is the same
|
|
for (uint64_t ns = js + 1; ns < joins.size(); js++, ns++)
|
|
{
|
|
// Check if FE needs table in previous smallsides.
|
|
if (needsFeForSmallSides(jobInfo, joins, smallSideTid, ns))
|
|
{
|
|
// Mark as `umstream` to prevent an second type of merge optimization, when CS merges smallside into
|
|
// current `TupleHashJoinStep`.
|
|
umstream = true;
|
|
break;
|
|
}
|
|
|
|
uint32_t tid1 = joins[ns].fTid1;
|
|
uint32_t tid2 = joins[ns].fTid2;
|
|
uint32_t small = (uint32_t)-1;
|
|
|
|
if ((tid1 == large) && ((joinStepMap[tid1].second > joinStepMap[tid2].second) ||
|
|
(joinStepMap[tid1].second == joinStepMap[tid2].second &&
|
|
jobInfo.tableSize[tid1] >= jobInfo.tableSize[tid2])))
|
|
{
|
|
small = tid2;
|
|
}
|
|
else if ((tid2 == large) && ((joinStepMap[tid2].second > joinStepMap[tid1].second) ||
|
|
(joinStepMap[tid2].second == joinStepMap[tid1].second &&
|
|
jobInfo.tableSize[tid2] >= jobInfo.tableSize[tid1])))
|
|
{
|
|
small = tid1;
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
|
|
updateJoinSides(small, large, joinInfoMap, smallSides, tableInfoMap, jobInfo);
|
|
lastJoinId = joins[ns].fJoinId;
|
|
|
|
if (find(joinedTable.begin(), joinedTable.end(), small) == joinedTable.end())
|
|
joinedTable.push_back(small);
|
|
|
|
smallSideTid.insert(small);
|
|
}
|
|
|
|
joinedTable.push_back(large);
|
|
|
|
SJSTEP spjs = joinStepMap[large].first;
|
|
bps = dynamic_cast<BatchPrimitive*>(spjs.get());
|
|
tsas = dynamic_cast<SubAdapterStep*>(spjs.get());
|
|
thjs = dynamic_cast<TupleHashJoinStep*>(spjs.get());
|
|
|
|
if (!bps && !thjs && !tsas)
|
|
{
|
|
if (dynamic_cast<SubQueryStep*>(spjs.get()))
|
|
throw IDBExcept(ERR_NON_SUPPORT_SUB_QUERY_TYPE);
|
|
|
|
throw runtime_error("Not supported join.");
|
|
}
|
|
|
|
size_t startPos = 0; // start point to add new smallsides
|
|
RowGroup largeSideRG = joinInfoMap[large]->fRowGroup;
|
|
|
|
if (thjs && thjs->tokenJoin() == large)
|
|
largeSideRG = thjs->getLargeRowGroup();
|
|
|
|
// get info to config the TupleHashjoin
|
|
vector<string> traces;
|
|
vector<string> tableNames;
|
|
DataListVec smallSideDLs;
|
|
vector<RowGroup> smallSideRGs;
|
|
vector<JoinType> jointypes;
|
|
vector<bool> typeless;
|
|
vector<vector<uint32_t>> smallKeyIndices;
|
|
vector<vector<uint32_t>> largeKeyIndices;
|
|
|
|
// bug5764, make sure semi joins acting as filter is after outer join.
|
|
{
|
|
// the inner table filters have to be performed after outer join
|
|
vector<uint64_t> semijoins;
|
|
vector<uint64_t> smallouts;
|
|
|
|
for (size_t i = 0; i < smallSides.size(); i++)
|
|
{
|
|
// find the the small-outer and semi-join joins
|
|
JoinType jt = smallSides[i]->fJoinData.fTypes[0];
|
|
|
|
if (jt & SMALLOUTER)
|
|
smallouts.push_back(i);
|
|
else if (jt & (SEMI | ANTI | SCALAR | CORRELATED))
|
|
semijoins.push_back(i);
|
|
}
|
|
|
|
// check the join order, re-arrange if necessary
|
|
if (smallouts.size() > 0 && semijoins.size() > 0)
|
|
{
|
|
uint64_t lastSmallOut = smallouts.back();
|
|
uint64_t lastSemijoin = semijoins.back();
|
|
|
|
if (lastSmallOut > lastSemijoin)
|
|
{
|
|
vector<SP_JoinInfo> temp1;
|
|
vector<SP_JoinInfo> temp2;
|
|
size_t j = 0;
|
|
|
|
for (size_t i = 0; i < smallSides.size(); i++)
|
|
{
|
|
if (j < semijoins.size() && i == semijoins[j])
|
|
{
|
|
temp1.push_back(smallSides[i]);
|
|
j++;
|
|
}
|
|
else
|
|
{
|
|
temp2.push_back(smallSides[i]);
|
|
}
|
|
|
|
if (i == lastSmallOut)
|
|
temp2.insert(temp2.end(), temp1.begin(), temp1.end());
|
|
}
|
|
|
|
smallSides = temp2;
|
|
}
|
|
}
|
|
}
|
|
|
|
uint32_t smallSideIndex = 0;
|
|
// Join id to table id.
|
|
std::map<int64_t, uint32_t> joinIdIndexMap;
|
|
for (vector<SP_JoinInfo>::iterator i = smallSides.begin(); i != smallSides.end(); i++, smallSideIndex++)
|
|
{
|
|
JoinInfo* info = i->get();
|
|
smallSideDLs.push_back(info->fDl);
|
|
smallSideRGs.push_back(info->fRowGroup);
|
|
jointypes.push_back(info->fJoinData.fTypes[0]);
|
|
|
|
vector<uint32_t> smallIndices;
|
|
vector<uint32_t> largeIndices;
|
|
const vector<uint32_t>& keys1 = info->fJoinData.fLeftKeys;
|
|
const vector<uint32_t>& keys2 = info->fJoinData.fRightKeys;
|
|
vector<uint32_t>::const_iterator k1 = keys1.begin();
|
|
vector<uint32_t>::const_iterator k2 = keys2.begin();
|
|
uint32_t stid = getTableKey(jobInfo, *k1);
|
|
tableNames.push_back(jobInfo.keyInfo->tupleKeyVec[stid].fTable);
|
|
|
|
for (; k1 != keys1.end(); ++k1, ++k2)
|
|
{
|
|
smallIndices.push_back(getKeyIndex(*k1, info->fRowGroup));
|
|
largeIndices.push_back(getKeyIndex(*k2, largeSideRG));
|
|
}
|
|
|
|
// Try to restore `circular join edge` if possible.
|
|
tryToRestoreJoinEdges(jobInfo, info, largeSideRG, smallIndices, largeIndices, traces, joinIdIndexMap,
|
|
smallSideIndex);
|
|
typeless.push_back(info->fJoinData.fTypeless);
|
|
smallKeyIndices.push_back(smallIndices);
|
|
largeKeyIndices.push_back(largeIndices);
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
// small side column
|
|
ostringstream smallKey, smallIndex;
|
|
string alias1 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys1.front())];
|
|
smallKey << alias1 << "-";
|
|
|
|
for (k1 = keys1.begin(); k1 != keys1.end(); ++k1)
|
|
{
|
|
CalpontSystemCatalog::OID oid1 = jobInfo.keyInfo->tupleKeyVec[*k1].fId;
|
|
CalpontSystemCatalog::TableColName tcn1 = jobInfo.csc->colName(oid1);
|
|
smallKey << "(" << tcn1.column << ":" << oid1 << ":" << *k1 << ")";
|
|
smallIndex << " " << getKeyIndex(*k1, info->fRowGroup);
|
|
}
|
|
|
|
// large side column
|
|
ostringstream largeKey, largeIndex;
|
|
string alias2 = jobInfo.keyInfo->keyName[getTableKey(jobInfo, keys2.front())];
|
|
largeKey << alias2 << "-";
|
|
|
|
for (k2 = keys2.begin(); k2 != keys2.end(); ++k2)
|
|
{
|
|
CalpontSystemCatalog::OID oid2 = jobInfo.keyInfo->tupleKeyVec[*k2].fId;
|
|
CalpontSystemCatalog::TableColName tcn2 = jobInfo.csc->colName(oid2);
|
|
largeKey << "(" << tcn2.column << ":" << oid2 << ":" << *k2 << ")";
|
|
largeIndex << " " << getKeyIndex(*k2, largeSideRG);
|
|
}
|
|
|
|
ostringstream oss;
|
|
oss << smallKey.str() << " join on " << largeKey.str()
|
|
<< " joinType: " << info->fJoinData.fTypes.front() << "("
|
|
<< joinTypeToString(info->fJoinData.fTypes.front()) << ")"
|
|
<< (info->fJoinData.fTypeless ? " " : " !") << "typeless" << endl;
|
|
oss << "smallSideIndex-largeSideIndex :" << smallIndex.str() << " --" << largeIndex.str() << endl;
|
|
oss << "small side RG" << endl << info->fRowGroup.toString() << endl;
|
|
traces.push_back(oss.str());
|
|
}
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
ostringstream oss;
|
|
oss << "large side RG" << endl << largeSideRG.toString() << endl;
|
|
traces.push_back(oss.str());
|
|
}
|
|
|
|
if (bps || tsas || umstream || (thjs && joinStepMap[large].second < 1))
|
|
{
|
|
thjs = new TupleHashJoinStep(jobInfo);
|
|
thjs->tableOid1(smallSides[0]->fTableOid);
|
|
thjs->tableOid2(tableInfoMap[large].fTableOid);
|
|
thjs->alias1(smallSides[0]->fAlias);
|
|
thjs->alias2(tableInfoMap[large].fAlias);
|
|
thjs->view1(smallSides[0]->fView);
|
|
thjs->view2(tableInfoMap[large].fView);
|
|
thjs->schema1(smallSides[0]->fSchema);
|
|
thjs->schema2(tableInfoMap[large].fSchema);
|
|
thjs->setLargeSideBPS(bps);
|
|
thjs->joinId(lastJoinId);
|
|
|
|
if (dynamic_cast<TupleBPS*>(bps) != NULL)
|
|
bps->incWaitToRunStepCnt();
|
|
|
|
spjs.reset(thjs);
|
|
|
|
JobStepAssociation inJsa;
|
|
inJsa.outAdd(smallSideDLs, 0);
|
|
inJsa.outAdd(joinInfoMap[large]->fDl);
|
|
thjs->inputAssociation(inJsa);
|
|
thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
|
|
|
|
JobStepAssociation outJsa;
|
|
AnyDataListSPtr spdl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
spdl->rowGroupDL(dl);
|
|
dl->OID(large);
|
|
outJsa.outAdd(spdl);
|
|
thjs->outputAssociation(outJsa);
|
|
thjs->configSmallSideRG(smallSideRGs, tableNames);
|
|
thjs->configLargeSideRG(joinInfoMap[large]->fRowGroup);
|
|
thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
|
|
|
|
tableInfoMap[large].fQuerySteps.push_back(spjs);
|
|
tableInfoMap[large].fDl = spdl;
|
|
}
|
|
else // thjs && joinStepMap[large].second >= 1
|
|
{
|
|
JobStepAssociation inJsa = thjs->inputAssociation();
|
|
|
|
if (inJsa.outSize() < 2)
|
|
throw runtime_error("Not enough input to a hashjoin.");
|
|
|
|
startPos = inJsa.outSize() - 1;
|
|
inJsa.outAdd(smallSideDLs, startPos);
|
|
thjs->inputAssociation(inJsa);
|
|
thjs->setLargeSideDLIndex(inJsa.outSize() - 1);
|
|
thjs->addSmallSideRG(smallSideRGs, tableNames);
|
|
thjs->addJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices);
|
|
}
|
|
|
|
RowGroup rg;
|
|
set<uint32_t>& tableSet = tableInfoMap[large].fJoinedTables;
|
|
constructJoinedRowGroup(rg, tableSet, tableInfoMap, jobInfo);
|
|
thjs->setOutputRowGroup(rg);
|
|
tableInfoMap[large].fRowGroup = rg;
|
|
tableSet.insert(large);
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << boldStart << "\n====== join info ======\n" << boldStop;
|
|
|
|
for (vector<string>::iterator t = traces.begin(); t != traces.end(); ++t)
|
|
cout << *t;
|
|
|
|
cout << "RowGroup join result: " << endl << rg.toString() << endl << endl;
|
|
}
|
|
|
|
// The map for in clause filter.
|
|
for (size_t i = 0; i < smallSides.size(); i++)
|
|
{
|
|
if (smallSides[i]->fJoinData.fJoinId != 0)
|
|
joinIdIndexMap[smallSides[i]->fJoinData.fJoinId] = i;
|
|
}
|
|
|
|
// check if any cross-table expressions can be evaluated after the join
|
|
JobStepVector readyExpSteps;
|
|
JobStepVector& expSteps = jobInfo.crossTableExpressions;
|
|
JobStepVector::iterator eit = expSteps.begin();
|
|
|
|
while (eit != expSteps.end())
|
|
{
|
|
ExpressionStep* exp = dynamic_cast<ExpressionStep*>(eit->get());
|
|
|
|
if (exp == NULL)
|
|
throw runtime_error("Not an expression.");
|
|
|
|
if (exp->functionJoin())
|
|
{
|
|
eit++;
|
|
continue; // done as join
|
|
}
|
|
|
|
const vector<uint32_t>& tables = exp->tableKeys();
|
|
uint64_t i = 0;
|
|
|
|
for (; i < tables.size(); i++)
|
|
{
|
|
if (tableInfoMap[large].fJoinedTables.find(tables[i]) == tableInfoMap[large].fJoinedTables.end())
|
|
break;
|
|
}
|
|
|
|
// all tables for this expression are joined
|
|
bool ready = (tables.size() == i);
|
|
|
|
// for on clause condition, need check join ID
|
|
if (ready && exp->associatedJoinId() != 0)
|
|
{
|
|
auto x = joinIdIndexMap.find(exp->associatedJoinId());
|
|
ready = (x != joinIdIndexMap.end());
|
|
}
|
|
|
|
if (ready)
|
|
{
|
|
readyExpSteps.push_back(*eit);
|
|
eit = expSteps.erase(eit);
|
|
}
|
|
else
|
|
{
|
|
eit++;
|
|
}
|
|
}
|
|
|
|
// if last join step, handle the delayed outer join filters
|
|
if (js == lastJoin && jobInfo.outerJoinExpressions.size() > 0)
|
|
readyExpSteps.insert(readyExpSteps.end(), jobInfo.outerJoinExpressions.begin(),
|
|
jobInfo.outerJoinExpressions.end());
|
|
|
|
// check additional compares for semi-join
|
|
if (readyExpSteps.size() > 0)
|
|
{
|
|
map<uint32_t, uint32_t> keyToIndexMap; // map keys to the indices in the RG
|
|
|
|
const auto& rowGroupKeys = rg.getKeys();
|
|
for (uint64_t i = 0, e = rowGroupKeys.size(); i < e; ++i)
|
|
keyToIndexMap.insert(make_pair(rowGroupKeys[i], i));
|
|
|
|
// tables have additional comparisons
|
|
map<uint32_t, uint32_t> correlateTables; // index in thjs
|
|
map<uint32_t, ParseTree*> correlateCompare; // expression
|
|
|
|
for (uint32_t i = 0; i != smallSides.size(); i++)
|
|
{
|
|
if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
|
|
{
|
|
uint32_t tid = getTableKey(jobInfo, smallSides[i]->fTableOid, smallSides[i]->fAlias,
|
|
smallSides[i]->fSchema, smallSides[i]->fView);
|
|
correlateTables[tid] = i;
|
|
correlateCompare[tid] = NULL;
|
|
}
|
|
}
|
|
|
|
if (correlateTables.size() > 0)
|
|
{
|
|
// separate additional compare for each table pair
|
|
JobStepVector::iterator eit = readyExpSteps.begin();
|
|
|
|
while (eit != readyExpSteps.end())
|
|
{
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
|
|
|
|
if (e->selectFilter())
|
|
{
|
|
// @bug3780, leave select filter to normal expression
|
|
eit++;
|
|
continue;
|
|
}
|
|
|
|
const vector<uint32_t>& tables = e->tableKeys();
|
|
auto j = correlateTables.end();
|
|
|
|
for (uint32_t i = 0; i < tables.size(); i++)
|
|
{
|
|
j = correlateTables.find(tables[i]);
|
|
|
|
if (j != correlateTables.end())
|
|
break;
|
|
}
|
|
|
|
if (j == correlateTables.end())
|
|
{
|
|
eit++;
|
|
continue;
|
|
}
|
|
|
|
// map the input column index
|
|
e->updateInputIndex(keyToIndexMap, jobInfo);
|
|
ParseTree* pt = correlateCompare[j->first];
|
|
|
|
if (pt == NULL)
|
|
{
|
|
// first expression
|
|
pt = new ParseTree;
|
|
pt->copyTree(*(e->expressionFilter()));
|
|
}
|
|
else
|
|
{
|
|
// combine the expressions
|
|
ParseTree* left = pt;
|
|
ParseTree* right = new ParseTree;
|
|
right->copyTree(*(e->expressionFilter()));
|
|
pt = new ParseTree(new LogicOperator("and"));
|
|
pt->left(left);
|
|
pt->right(right);
|
|
}
|
|
|
|
correlateCompare[j->first] = pt;
|
|
eit = readyExpSteps.erase(eit);
|
|
}
|
|
|
|
auto k = correlateTables.begin();
|
|
|
|
while (k != correlateTables.end())
|
|
{
|
|
ParseTree* pt = correlateCompare[k->first];
|
|
|
|
if (pt != NULL)
|
|
{
|
|
boost::shared_ptr<ParseTree> sppt(pt);
|
|
thjs->addJoinFilter(sppt, startPos + k->second);
|
|
}
|
|
|
|
k++;
|
|
}
|
|
|
|
thjs->setJoinFilterInputRG(rg);
|
|
}
|
|
|
|
// normal expression if any
|
|
if (readyExpSteps.size() > 0)
|
|
{
|
|
// add the expression steps in where clause can be solved by this join to bps
|
|
ParseTree* pt = NULL;
|
|
JobStepVector::iterator eit = readyExpSteps.begin();
|
|
|
|
for (; eit != readyExpSteps.end(); eit++)
|
|
{
|
|
// map the input column index
|
|
ExpressionStep* e = dynamic_cast<ExpressionStep*>(eit->get());
|
|
e->updateInputIndex(keyToIndexMap, jobInfo);
|
|
|
|
// short circuit on clause expressions
|
|
auto x = joinIdIndexMap.find(e->associatedJoinId());
|
|
|
|
if (x != joinIdIndexMap.end())
|
|
{
|
|
ParseTree* joinFilter = new ParseTree;
|
|
joinFilter->copyTree(*(e->expressionFilter()));
|
|
boost::shared_ptr<ParseTree> sppt(joinFilter);
|
|
thjs->addJoinFilter(sppt, startPos + x->second);
|
|
thjs->setJoinFilterInputRG(rg);
|
|
continue;
|
|
}
|
|
|
|
if (pt == NULL)
|
|
{
|
|
// first expression
|
|
pt = new ParseTree;
|
|
pt->copyTree(*(e->expressionFilter()));
|
|
}
|
|
else
|
|
{
|
|
// combine the expressions
|
|
ParseTree* left = pt;
|
|
ParseTree* right = new ParseTree;
|
|
right->copyTree(*(e->expressionFilter()));
|
|
pt = new ParseTree(new LogicOperator("and"));
|
|
pt->left(left);
|
|
pt->right(right);
|
|
}
|
|
}
|
|
|
|
if (pt != NULL)
|
|
{
|
|
boost::shared_ptr<ParseTree> sppt(pt);
|
|
thjs->addFcnExpGroup2(sppt);
|
|
}
|
|
}
|
|
|
|
// update the fColsInExp2 and construct the output RG
|
|
updateExp2Cols(readyExpSteps, tableInfoMap, jobInfo);
|
|
constructJoinedRowGroup(rg, tableSet, tableInfoMap, jobInfo);
|
|
|
|
if (thjs->hasFcnExpGroup2())
|
|
thjs->setFE23Output(rg);
|
|
else
|
|
thjs->setOutputRowGroup(rg);
|
|
|
|
tableInfoMap[large].fRowGroup = rg;
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << "RowGroup of " << tableInfoMap[large].fAlias << " after EXP G2: " << endl
|
|
<< rg.toString() << endl
|
|
<< endl;
|
|
}
|
|
}
|
|
|
|
// update the info maps
|
|
int l = (joinStepMap[large].second == 2) ? 2 : 0;
|
|
|
|
if (isSemijoin)
|
|
joinStepMap[large] = make_pair(spjs, joinStepMap[large].second);
|
|
else
|
|
joinStepMap[large] = make_pair(spjs, l);
|
|
|
|
for (set<uint32_t>::iterator i = tableSet.begin(); i != tableSet.end(); i++)
|
|
{
|
|
joinInfoMap[*i]->fDl = tableInfoMap[large].fDl;
|
|
joinInfoMap[*i]->fRowGroup = tableInfoMap[large].fRowGroup;
|
|
|
|
if (*i != large)
|
|
{
|
|
//@bug6117, token should be done for small side tables.
|
|
SJSTEP smallJs = joinStepMap[*i].first;
|
|
TupleHashJoinStep* smallThjs = dynamic_cast<TupleHashJoinStep*>(smallJs.get());
|
|
|
|
if (smallThjs && smallThjs->tokenJoin())
|
|
smallThjs->tokenJoin(-1);
|
|
|
|
// Set join priority for smallsides.
|
|
joinStepMap[*i] = make_pair(spjs, l);
|
|
|
|
// Mark joined tables, smalls and large, as a group.
|
|
tableInfoMap[*i].fJoinedTables = tableInfoMap[large].fJoinedTables;
|
|
}
|
|
}
|
|
|
|
prevLarge = large;
|
|
}
|
|
|
|
// Keep join order by the table last used for picking the right delivery step.
|
|
{
|
|
for (vector<uint32_t>::reverse_iterator i = joinedTable.rbegin(); i < joinedTable.rend(); i++)
|
|
{
|
|
if (find(joinOrder.begin(), joinOrder.end(), *i) == joinOrder.end())
|
|
joinOrder.push_back(*i);
|
|
}
|
|
|
|
const uint64_t n = joinOrder.size();
|
|
const uint64_t h = n / 2;
|
|
const uint64_t e = n - 1;
|
|
|
|
for (uint64_t i = 0; i < h; i++)
|
|
std::swap(joinOrder[i], joinOrder[e - i]);
|
|
}
|
|
}
|
|
|
|
inline void joinTables(JobStepVector& joinSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo,
|
|
vector<uint32_t>& joinOrder, const bool overrideLargeSideEstimate)
|
|
{
|
|
uint32_t largestTable = getLargestTable(jobInfo, tableInfoMap, overrideLargeSideEstimate);
|
|
|
|
if (jobInfo.outerOnTable.size() == 0)
|
|
joinToLargeTable(largestTable, tableInfoMap, jobInfo, joinOrder, jobInfo.joinEdgesToRestore);
|
|
else
|
|
joinTablesInOrder(largestTable, joinSteps, tableInfoMap, jobInfo, joinOrder);
|
|
}
|
|
|
|
void makeNoTableJobStep(JobStepVector& querySteps, JobStepVector& projectSteps,
|
|
DeliveredTableMap& deliverySteps, JobInfo& jobInfo)
|
|
{
|
|
querySteps.clear();
|
|
projectSteps.clear();
|
|
deliverySteps.clear();
|
|
querySteps.push_back(TupleConstantStep::addConstantStep(jobInfo));
|
|
deliverySteps[CNX_VTABLE_ID] = querySteps.back();
|
|
}
|
|
} // namespace
|
|
|
|
namespace joblist
|
|
{
|
|
void associateTupleJobSteps(JobStepVector& querySteps, JobStepVector& projectSteps,
|
|
DeliveredTableMap& deliverySteps, JobInfo& jobInfo,
|
|
const bool overrideLargeSideEstimate)
|
|
{
|
|
if (jobInfo.trace)
|
|
{
|
|
const boost::shared_ptr<TupleKeyInfo>& keyInfo = jobInfo.keyInfo;
|
|
cout << "query steps:" << endl;
|
|
|
|
for (JobStepVector::iterator i = querySteps.begin(); i != querySteps.end(); ++i)
|
|
{
|
|
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
|
|
|
|
if (thjs == NULL)
|
|
{
|
|
int64_t id = ((*i)->tupleId() != (uint64_t)-1) ? (*i)->tupleId() : -1;
|
|
cout << typeid(*(i->get())).name() << ": " << (*i)->oid() << " " << id << " "
|
|
<< (int)((id != -1) ? getTableKey(jobInfo, id) : -1) << endl;
|
|
}
|
|
else
|
|
{
|
|
int64_t id1 = (thjs->tupleId1() != (uint64_t)-1) ? thjs->tupleId1() : -1;
|
|
int64_t id2 = (thjs->tupleId2() != (uint64_t)-1) ? thjs->tupleId2() : -1;
|
|
cout << typeid(*thjs).name() << ": " << thjs->oid1() << " " << id1 << " "
|
|
<< (int)((id1 != -1) ? getTableKey(jobInfo, id1) : -1) << " - " << thjs->getJoinType() << " - "
|
|
<< thjs->oid2() << " " << id2 << " " << (int)((id2 != -1) ? getTableKey(jobInfo, id2) : -1)
|
|
<< endl;
|
|
}
|
|
}
|
|
|
|
cout << "project steps:" << endl;
|
|
|
|
for (JobStepVector::iterator i = projectSteps.begin(); i != projectSteps.end(); ++i)
|
|
{
|
|
cout << typeid(*(i->get())).name() << ": " << (*i)->oid() << " " << (*i)->tupleId() << " "
|
|
<< getTableKey(jobInfo, (*i)->tupleId()) << endl;
|
|
}
|
|
|
|
cout << "delivery steps:" << endl;
|
|
|
|
for (DeliveredTableMap::iterator i = deliverySteps.begin(); i != deliverySteps.end(); ++i)
|
|
cout << typeid(*(i->second.get())).name() << endl;
|
|
|
|
cout << "\nTable Info: (key oid name alias view sub)" << endl;
|
|
|
|
for (uint32_t i = 0; i < keyInfo->tupleKeyVec.size(); ++i)
|
|
{
|
|
int64_t tid = jobInfo.keyInfo->colKeyToTblKey[i];
|
|
|
|
if (tid == i)
|
|
{
|
|
CalpontSystemCatalog::OID oid = keyInfo->tupleKeyVec[i].fId;
|
|
string alias = keyInfo->tupleKeyVec[i].fTable;
|
|
|
|
if (alias.length() < 1)
|
|
alias = "N/A";
|
|
|
|
string name = keyInfo->keyName[i];
|
|
|
|
if (name.empty())
|
|
name = "unknown";
|
|
|
|
string view = keyInfo->tupleKeyVec[i].fView;
|
|
|
|
if (view.length() < 1)
|
|
view = "N/A";
|
|
|
|
int sid = keyInfo->tupleKeyVec[i].fSubId;
|
|
cout << i << "\t" << oid << "\t" << name << "\t" << alias << "\t" << view << "\t" << hex << sid << dec
|
|
<< endl;
|
|
}
|
|
}
|
|
|
|
cout << "\nTupleKey vector: (tupleKey oid tableKey name alias view sub)" << endl;
|
|
|
|
for (uint32_t i = 0; i < keyInfo->tupleKeyVec.size(); ++i)
|
|
{
|
|
CalpontSystemCatalog::OID oid = keyInfo->tupleKeyVec[i].fId;
|
|
int64_t tid = jobInfo.keyInfo->colKeyToTblKey[i];
|
|
string alias = keyInfo->tupleKeyVec[i].fTable;
|
|
|
|
if (alias.length() < 1)
|
|
alias = "N/A";
|
|
|
|
// Expression IDs are borrowed from systemcatalog IDs, which are not used in tuple.
|
|
string name = keyInfo->keyName[i];
|
|
|
|
if (keyInfo->dictOidToColOid.find(oid) != keyInfo->dictOidToColOid.end())
|
|
{
|
|
name += "[d]"; // indicate this is a dictionary column
|
|
}
|
|
|
|
if (jobInfo.keyInfo->pseudoType[i] > 0)
|
|
{
|
|
name += "[p]"; // indicate this is a pseudo column
|
|
}
|
|
|
|
if (name.empty())
|
|
{
|
|
name = "unknown";
|
|
}
|
|
|
|
string view = keyInfo->tupleKeyVec[i].fView;
|
|
|
|
if (view.length() < 1)
|
|
view = "N/A";
|
|
|
|
int sid = keyInfo->tupleKeyVec[i].fSubId;
|
|
cout << i << "\t" << oid << "\t" << tid << "\t" << name << "\t" << alias << "\t" << view << "\t" << hex
|
|
<< sid << dec << endl;
|
|
}
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
// @bug 2771, handle no table select query
|
|
if (jobInfo.tableList.size() < 1)
|
|
{
|
|
makeNoTableJobStep(querySteps, projectSteps, deliverySteps, jobInfo);
|
|
return;
|
|
}
|
|
|
|
// Create a step vector for each table in the from clause.
|
|
TableInfoMap tableInfoMap;
|
|
|
|
for (uint64_t i = 0; i < jobInfo.tableList.size(); i++)
|
|
{
|
|
uint32_t tableUid = jobInfo.tableList[i];
|
|
tableInfoMap[tableUid] = TableInfo();
|
|
tableInfoMap[tableUid].fTableOid = jobInfo.keyInfo->tupleKeyVec[tableUid].fId;
|
|
tableInfoMap[tableUid].fName = jobInfo.keyInfo->keyName[tableUid];
|
|
tableInfoMap[tableUid].fAlias = jobInfo.keyInfo->tupleKeyVec[tableUid].fTable;
|
|
tableInfoMap[tableUid].fView = jobInfo.keyInfo->tupleKeyVec[tableUid].fView;
|
|
tableInfoMap[tableUid].fSchema = jobInfo.keyInfo->tupleKeyVec[tableUid].fSchema;
|
|
tableInfoMap[tableUid].fSubId = jobInfo.keyInfo->tupleKeyVec[tableUid].fSubId;
|
|
tableInfoMap[tableUid].fColsInColMap = jobInfo.columnMap[tableUid];
|
|
}
|
|
|
|
// Set of the columns being projected.
|
|
for (TupleInfoVector::iterator i = jobInfo.pjColList.begin(); i != jobInfo.pjColList.end(); i++)
|
|
jobInfo.returnColSet.insert(i->key);
|
|
|
|
// Strip constantbooleanquerySteps
|
|
for (uint64_t i = 0; i < querySteps.size();)
|
|
{
|
|
TupleConstantBooleanStep* bs = dynamic_cast<TupleConstantBooleanStep*>(querySteps[i].get());
|
|
ExpressionStep* es = dynamic_cast<ExpressionStep*>(querySteps[i].get());
|
|
|
|
if (bs != NULL)
|
|
{
|
|
// cosntant step
|
|
if (bs->boolValue() == false)
|
|
jobInfo.constantFalse = true;
|
|
|
|
querySteps.erase(querySteps.begin() + i);
|
|
}
|
|
else if (es != NULL && es->tableKeys().size() == 0)
|
|
{
|
|
// constant expression
|
|
ParseTree* p = es->expressionFilter(); // filter
|
|
|
|
if (p != NULL)
|
|
{
|
|
Row r; // dummy row
|
|
|
|
if (funcexp::FuncExp::instance()->evaluate(r, p) == false)
|
|
jobInfo.constantFalse = true;
|
|
|
|
querySteps.erase(querySteps.begin() + i);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
i++;
|
|
}
|
|
}
|
|
|
|
// double check if the function join canditates are still there.
|
|
JobStepVector steps = querySteps;
|
|
|
|
for (int64_t i = jobInfo.functionJoins.size() - 1; i >= 0; i--)
|
|
{
|
|
bool exist = false;
|
|
|
|
for (JobStepVector::iterator j = steps.begin(); j != steps.end() && !exist; ++j)
|
|
{
|
|
if (jobInfo.functionJoins[i] == j->get())
|
|
exist = true;
|
|
}
|
|
|
|
if (!exist)
|
|
jobInfo.functionJoins.erase(jobInfo.functionJoins.begin() + i);
|
|
}
|
|
|
|
// Concatenate query and project steps
|
|
steps.insert(steps.end(), projectSteps.begin(), projectSteps.end());
|
|
|
|
// Make sure each query step has an output DL
|
|
// This is necessary for toString() method on most steps
|
|
for (JobStepVector::iterator it = steps.begin(); it != steps.end(); ++it)
|
|
{
|
|
// if (dynamic_cast<OrDelimiter*>(it->get()))
|
|
// continue;
|
|
|
|
if (it->get()->outputAssociation().outSize() == 0)
|
|
{
|
|
JobStepAssociation jsa;
|
|
AnyDataListSPtr adl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
dl->OID(it->get()->oid());
|
|
adl->rowGroupDL(dl);
|
|
jsa.outAdd(adl);
|
|
it->get()->outputAssociation(jsa);
|
|
}
|
|
}
|
|
|
|
// Populate the TableInfo map with the job steps keyed by table ID.
|
|
JobStepVector joinSteps;
|
|
JobStepVector& expSteps = jobInfo.crossTableExpressions;
|
|
JobStepVector::iterator it = querySteps.begin();
|
|
JobStepVector::iterator end = querySteps.end();
|
|
|
|
while (it != end)
|
|
{
|
|
// Separate table joins from other predicates.
|
|
TupleHashJoinStep* thjs = dynamic_cast<TupleHashJoinStep*>(it->get());
|
|
ExpressionStep* exps = dynamic_cast<ExpressionStep*>(it->get());
|
|
SubAdapterStep* subs = dynamic_cast<SubAdapterStep*>(it->get());
|
|
|
|
if (thjs != NULL && thjs->tupleId1() != thjs->tupleId2())
|
|
{
|
|
// simple column and constant column semi join
|
|
if (thjs->tableOid2() == 0 && thjs->schema2().empty())
|
|
{
|
|
jobInfo.correlateSteps.push_back(*it++);
|
|
continue;
|
|
}
|
|
|
|
// check correlated join step
|
|
JoinType joinType = thjs->getJoinType();
|
|
|
|
if (joinType & CORRELATED)
|
|
{
|
|
// one of the tables is in outer query
|
|
jobInfo.correlateSteps.push_back(*it++);
|
|
continue;
|
|
}
|
|
|
|
// Save the join topology.
|
|
uint32_t key1 = thjs->tupleId1();
|
|
uint32_t key2 = thjs->tupleId2();
|
|
uint32_t tid1 = getTableKey(jobInfo, key1);
|
|
uint32_t tid2 = getTableKey(jobInfo, key2);
|
|
|
|
if (thjs->dictOid1() > 0)
|
|
key1 = jobInfo.keyInfo->dictKeyMap[key1];
|
|
|
|
if (thjs->dictOid2() > 0)
|
|
key2 = jobInfo.keyInfo->dictKeyMap[key2];
|
|
|
|
// not correlated
|
|
joinSteps.push_back(*it);
|
|
tableInfoMap[tid1].fJoinKeys.push_back(key1);
|
|
tableInfoMap[tid2].fJoinKeys.push_back(key2);
|
|
|
|
// save the function join expressions
|
|
boost::shared_ptr<FunctionJoinInfo> fji = thjs->funcJoinInfo();
|
|
|
|
if (fji)
|
|
{
|
|
if (fji->fStep[0])
|
|
{
|
|
tableInfoMap[tid1].fFuncJoinExps.push_back(fji->fStep[0]);
|
|
vector<uint32_t>& cols = tableInfoMap[tid1].fColsInFuncJoin;
|
|
cols.insert(cols.end(), fji->fColumnKeys[0].begin(), fji->fColumnKeys[0].end());
|
|
}
|
|
|
|
if (fji->fStep[1])
|
|
{
|
|
tableInfoMap[tid2].fFuncJoinExps.push_back(fji->fStep[1]);
|
|
vector<uint32_t>& cols = tableInfoMap[tid2].fColsInFuncJoin;
|
|
cols.insert(cols.end(), fji->fColumnKeys[1].begin(), fji->fColumnKeys[1].end());
|
|
}
|
|
}
|
|
|
|
// keep a join map
|
|
pair<uint32_t, uint32_t> tablePair(tid1, tid2);
|
|
TableJoinMap::iterator m1 = jobInfo.tableJoinMap.find(tablePair);
|
|
TableJoinMap::iterator m2 = jobInfo.tableJoinMap.end();
|
|
|
|
if (m1 == jobInfo.tableJoinMap.end())
|
|
{
|
|
tableInfoMap[tid1].fAdjacentList.push_back(tid2);
|
|
tableInfoMap[tid2].fAdjacentList.push_back(tid1);
|
|
|
|
m1 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid1, tid2), JoinData()));
|
|
m2 = jobInfo.tableJoinMap.insert(m1, make_pair(make_pair(tid2, tid1), JoinData()));
|
|
|
|
TupleInfo ti1(getTupleInfo(key1, jobInfo));
|
|
TupleInfo ti2(getTupleInfo(key2, jobInfo));
|
|
|
|
if (ti1.width > 8 || ti2.width > 8)
|
|
{
|
|
if (ti1.dtype == execplan::CalpontSystemCatalog::LONGDOUBLE ||
|
|
ti2.dtype == execplan::CalpontSystemCatalog::LONGDOUBLE)
|
|
{
|
|
m1->second.fTypeless = m2->second.fTypeless = false;
|
|
}
|
|
else
|
|
{
|
|
m1->second.fTypeless = m2->second.fTypeless = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
m1->second.fTypeless = m2->second.fTypeless = isCharType(ti1.dtype) || isCharType(ti2.dtype);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
m2 = jobInfo.tableJoinMap.find(make_pair(tid2, tid1));
|
|
m1->second.fTypeless = m2->second.fTypeless = true;
|
|
}
|
|
|
|
if (m1 == jobInfo.tableJoinMap.end() || m2 == jobInfo.tableJoinMap.end())
|
|
throw runtime_error("Bad table map.");
|
|
|
|
// Keep a map of the join (table, key) pairs
|
|
m1->second.fLeftKeys.push_back(key1);
|
|
m1->second.fRightKeys.push_back(key2);
|
|
|
|
m2->second.fLeftKeys.push_back(key2);
|
|
m2->second.fRightKeys.push_back(key1);
|
|
|
|
// Keep a map of the join type between the keys.
|
|
// OUTER join and SEMI/ANTI join are mutually exclusive.
|
|
if (joinType == LEFTOUTER)
|
|
{
|
|
m1->second.fTypes.push_back(SMALLOUTER);
|
|
m2->second.fTypes.push_back(LARGEOUTER);
|
|
jobInfo.outerOnTable.insert(tid2);
|
|
}
|
|
else if (joinType == RIGHTOUTER)
|
|
{
|
|
m1->second.fTypes.push_back(LARGEOUTER);
|
|
m2->second.fTypes.push_back(SMALLOUTER);
|
|
jobInfo.outerOnTable.insert(tid1);
|
|
}
|
|
else if ((joinType & SEMI) &&
|
|
((joinType & LEFTOUTER) == LEFTOUTER || (joinType & RIGHTOUTER) == RIGHTOUTER))
|
|
{
|
|
// @bug3998, DML UPDATE borrows "SEMI" flag,
|
|
// allowing SEMI and LARGEOUTER combination to support update with outer join.
|
|
if ((joinType & LEFTOUTER) == LEFTOUTER)
|
|
{
|
|
joinType ^= LEFTOUTER;
|
|
m1->second.fTypes.push_back(joinType);
|
|
m2->second.fTypes.push_back(joinType | LARGEOUTER);
|
|
jobInfo.outerOnTable.insert(tid2);
|
|
}
|
|
else
|
|
{
|
|
joinType ^= RIGHTOUTER;
|
|
m1->second.fTypes.push_back(joinType | LARGEOUTER);
|
|
m2->second.fTypes.push_back(joinType);
|
|
jobInfo.outerOnTable.insert(tid1);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
m1->second.fTypes.push_back(joinType);
|
|
m2->second.fTypes.push_back(joinType);
|
|
|
|
if (joinType == INNER)
|
|
{
|
|
jobInfo.innerOnTable.insert(tid1);
|
|
jobInfo.innerOnTable.insert(tid2);
|
|
}
|
|
}
|
|
|
|
// need id to keep the join order
|
|
m1->second.fJoinId = m2->second.fJoinId = thjs->joinId();
|
|
}
|
|
// Separate the expressions
|
|
else if (exps != NULL && subs == NULL)
|
|
{
|
|
const vector<uint32_t>& tables = exps->tableKeys();
|
|
const vector<uint32_t>& columns = exps->columnKeys();
|
|
bool tableInOuterQuery = false;
|
|
set<uint32_t> tableSet; // involved unique tables
|
|
|
|
for (uint64_t i = 0; i < tables.size(); ++i)
|
|
{
|
|
if (find(jobInfo.tableList.begin(), jobInfo.tableList.end(), tables[i]) != jobInfo.tableList.end())
|
|
tableSet.insert(tables[i]);
|
|
else
|
|
tableInOuterQuery = true;
|
|
}
|
|
|
|
if (tableInOuterQuery)
|
|
{
|
|
// all columns in subquery scope to be projected
|
|
for (uint64_t i = 0; i < tables.size(); ++i)
|
|
{
|
|
// outer-query columns
|
|
if (tableSet.find(tables[i]) == tableSet.end())
|
|
continue;
|
|
|
|
// subquery columns
|
|
uint32_t c = columns[i];
|
|
|
|
if (jobInfo.returnColSet.find(c) == jobInfo.returnColSet.end())
|
|
{
|
|
tableInfoMap[tables[i]].fProjectCols.push_back(c);
|
|
jobInfo.pjColList.push_back(getTupleInfo(c, jobInfo));
|
|
jobInfo.returnColSet.insert(c);
|
|
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(exps->columns()[i]);
|
|
|
|
if (sc != NULL)
|
|
jobInfo.deliveredCols.push_back(SRCP(sc->clone()));
|
|
}
|
|
}
|
|
|
|
jobInfo.correlateSteps.push_back(*it++);
|
|
continue;
|
|
}
|
|
|
|
// is the expression cross tables?
|
|
if (tableSet.size() == 1 && exps->associatedJoinId() == 0)
|
|
{
|
|
// single table and not in join on clause
|
|
uint32_t tid = tables[0];
|
|
|
|
for (uint64_t i = 0; i < columns.size(); ++i)
|
|
tableInfoMap[tid].fColsInExp1.push_back(columns[i]);
|
|
|
|
tableInfoMap[tid].fOneTableExpSteps.push_back(*it);
|
|
}
|
|
else
|
|
{
|
|
// WORKAROUND for limitation on join with filter
|
|
if (exps->associatedJoinId() != 0)
|
|
{
|
|
for (uint64_t i = 0; i < exps->columns().size(); ++i)
|
|
{
|
|
jobInfo.joinFeTableMap[exps->associatedJoinId()].insert(tables[i]);
|
|
}
|
|
}
|
|
|
|
// resolve after join: cross table or on clause conditions
|
|
for (uint64_t i = 0; i < columns.size(); ++i)
|
|
{
|
|
uint32_t cid = columns[i];
|
|
uint32_t tid = getTableKey(jobInfo, cid);
|
|
tableInfoMap[tid].fColsInExp2.push_back(cid);
|
|
}
|
|
|
|
expSteps.push_back(*it);
|
|
}
|
|
}
|
|
// Separate the other steps by unique ID.
|
|
else
|
|
{
|
|
uint32_t tid = -1;
|
|
uint64_t cid = (*it)->tupleId();
|
|
|
|
if (cid != (uint64_t)-1)
|
|
tid = getTableKey(jobInfo, (*it)->tupleId());
|
|
else
|
|
tid = getTableKey(jobInfo, it->get());
|
|
|
|
if (find(jobInfo.tableList.begin(), jobInfo.tableList.end(), tid) != jobInfo.tableList.end())
|
|
{
|
|
tableInfoMap[tid].fQuerySteps.push_back(*it);
|
|
}
|
|
else
|
|
{
|
|
jobInfo.correlateSteps.push_back(*it);
|
|
}
|
|
}
|
|
|
|
it++;
|
|
}
|
|
|
|
// @bug2634, delay isNull filter on outerjoin key
|
|
// @bug5374, delay predicates for outerjoin
|
|
outjoinPredicateAdjust(tableInfoMap, jobInfo);
|
|
|
|
// @bug4021, make sure there is real column to scan
|
|
for (TableInfoMap::iterator it = tableInfoMap.begin(); it != tableInfoMap.end(); it++)
|
|
{
|
|
uint32_t tableUid = it->first;
|
|
|
|
if (jobInfo.pseudoColTable.find(tableUid) == jobInfo.pseudoColTable.end())
|
|
continue;
|
|
|
|
JobStepVector& steps = tableInfoMap[tableUid].fQuerySteps;
|
|
JobStepVector::iterator s = steps.begin();
|
|
JobStepVector::iterator p = steps.end();
|
|
|
|
for (; s != steps.end(); s++)
|
|
{
|
|
if (typeid(*(s->get())) == typeid(pColScanStep) || typeid(*(s->get())) == typeid(pColStep))
|
|
break;
|
|
|
|
// @bug5893, iterator to the first pseudocolumn
|
|
if (typeid(*(s->get())) == typeid(PseudoColStep) && p == steps.end())
|
|
p = s;
|
|
}
|
|
|
|
if (s == steps.end())
|
|
{
|
|
map<uint64_t, SRCP>::iterator t = jobInfo.tableColMap.find(tableUid);
|
|
|
|
if (t == jobInfo.tableColMap.end())
|
|
{
|
|
string msg = jobInfo.keyInfo->tupleKeyToName[tableUid];
|
|
msg += " has no column in column map.";
|
|
throw runtime_error(msg);
|
|
}
|
|
|
|
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(t->second.get());
|
|
CalpontSystemCatalog::OID oid = sc->oid();
|
|
CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
|
|
CalpontSystemCatalog::ColType ct = sc->colType();
|
|
string alias(extractTableAlias(sc));
|
|
SJSTEP sjs(new pColScanStep(oid, tblOid, ct, jobInfo));
|
|
sjs->alias(alias);
|
|
sjs->view(sc->viewName());
|
|
sjs->schema(sc->schemaName());
|
|
sjs->name(sc->columnName());
|
|
TupleInfo ti(setTupleInfo(ct, oid, jobInfo, tblOid, sc, alias));
|
|
sjs->tupleId(ti.key);
|
|
steps.insert(steps.begin(), sjs);
|
|
|
|
if (isDictCol(ct) && jobInfo.tokenOnly.find(ti.key) == jobInfo.tokenOnly.end())
|
|
jobInfo.tokenOnly[ti.key] = true;
|
|
}
|
|
else if (s > p)
|
|
{
|
|
// @bug5893, make sure a pcol is in front of any pseudo step.
|
|
SJSTEP t = *s;
|
|
*s = *p;
|
|
*p = t;
|
|
}
|
|
}
|
|
|
|
// @bug3767, error out scalar subquery with aggregation and correlated additional comparison.
|
|
if (jobInfo.hasAggregation && (jobInfo.correlateSteps.size() > 0))
|
|
{
|
|
// expression filter
|
|
ExpressionStep* exp = NULL;
|
|
|
|
for (it = jobInfo.correlateSteps.begin(); it != jobInfo.correlateSteps.end(); it++)
|
|
{
|
|
if (((exp = dynamic_cast<ExpressionStep*>(it->get())) != NULL) && (!exp->functionJoin()))
|
|
break;
|
|
|
|
exp = NULL;
|
|
}
|
|
|
|
// correlated join step
|
|
TupleHashJoinStep* thjs = NULL;
|
|
|
|
for (it = jobInfo.correlateSteps.begin(); it != jobInfo.correlateSteps.end(); it++)
|
|
{
|
|
if ((thjs = dynamic_cast<TupleHashJoinStep*>(it->get())) != NULL)
|
|
break;
|
|
}
|
|
|
|
// @bug5202, error out not equal correlation and aggregation in subquery.
|
|
if ((exp != NULL) && (thjs != NULL) && (thjs->getJoinType() & CORRELATED))
|
|
throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_NEQ_AGG_SUB),
|
|
ERR_NON_SUPPORT_NEQ_AGG_SUB);
|
|
}
|
|
|
|
it = projectSteps.begin();
|
|
end = projectSteps.end();
|
|
|
|
while (it != end)
|
|
{
|
|
uint32_t tid = getTableKey(jobInfo, (*it)->tupleId());
|
|
tableInfoMap[tid].fProjectSteps.push_back(*it);
|
|
tableInfoMap[tid].fProjectCols.push_back((*it)->tupleId());
|
|
it++;
|
|
}
|
|
|
|
for (TupleInfoVector::iterator j = jobInfo.pjColList.begin(); j != jobInfo.pjColList.end(); j++)
|
|
{
|
|
if (jobInfo.keyInfo->tupleKeyVec[j->tkey].fId == CNX_EXP_TABLE_ID)
|
|
continue;
|
|
|
|
vector<uint32_t>& projectCols = tableInfoMap[j->tkey].fProjectCols;
|
|
|
|
if (find(projectCols.begin(), projectCols.end(), j->key) == projectCols.end())
|
|
projectCols.push_back(j->key);
|
|
}
|
|
|
|
JobStepVector& retExp = jobInfo.returnedExpressions;
|
|
|
|
for (it = retExp.begin(); it != retExp.end(); ++it)
|
|
{
|
|
ExpressionStep* exp = dynamic_cast<ExpressionStep*>(it->get());
|
|
|
|
if (exp == NULL)
|
|
throw runtime_error("Not an expression.");
|
|
|
|
for (uint64_t i = 0; i < exp->columnKeys().size(); ++i)
|
|
{
|
|
tableInfoMap[exp->tableKeys()[i]].fColsInRetExp.push_back(exp->columnKeys()[i]);
|
|
}
|
|
}
|
|
|
|
// reset all step vector
|
|
querySteps.clear();
|
|
projectSteps.clear();
|
|
deliverySteps.clear();
|
|
|
|
// Check if the tables and joins can be used to construct a spanning tree.
|
|
spanningTreeCheck(tableInfoMap, joinSteps, jobInfo);
|
|
|
|
// 1. combine job steps for each table
|
|
TableInfoMap::iterator mit;
|
|
|
|
for (mit = tableInfoMap.begin(); mit != tableInfoMap.end(); mit++)
|
|
if (combineJobStepsByTable(mit, jobInfo) == false)
|
|
throw runtime_error("combineJobStepsByTable failed.");
|
|
|
|
// 2. join the combined steps together to form the spanning tree
|
|
vector<uint32_t> joinOrder;
|
|
joinTables(joinSteps, tableInfoMap, jobInfo, joinOrder, overrideLargeSideEstimate);
|
|
|
|
// 3. put the steps together
|
|
for (vector<uint32_t>::iterator i = joinOrder.begin(); i != joinOrder.end(); ++i)
|
|
querySteps.insert(querySteps.end(), tableInfoMap[*i].fQuerySteps.begin(),
|
|
tableInfoMap[*i].fQuerySteps.end());
|
|
|
|
adjustLastStep(querySteps, deliverySteps, jobInfo); // to match the select clause
|
|
}
|
|
|
|
SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo& jobInfo)
|
|
{
|
|
vector<RowGroup> inputRGs;
|
|
vector<bool> distinct;
|
|
uint64_t colCount = jobInfo.deliveredCols.size();
|
|
|
|
vector<uint32_t> oids;
|
|
vector<uint32_t> keys;
|
|
vector<uint32_t> scale;
|
|
vector<uint32_t> precision;
|
|
vector<uint32_t> width;
|
|
vector<CalpontSystemCatalog::ColDataType> types;
|
|
vector<uint32_t> csNums;
|
|
JobStepAssociation jsaToUnion;
|
|
|
|
// bug4388, share code with connector for column type coversion
|
|
vector<vector<CalpontSystemCatalog::ColType>> queryColTypes;
|
|
|
|
for (uint64_t j = 0; j < colCount; ++j)
|
|
queryColTypes.push_back(vector<CalpontSystemCatalog::ColType>(queries.size()));
|
|
|
|
for (uint64_t i = 0; i < queries.size(); i++)
|
|
{
|
|
SJSTEP& spjs = queries[i];
|
|
TupleDeliveryStep* tds = dynamic_cast<TupleDeliveryStep*>(spjs.get());
|
|
|
|
if (tds == NULL)
|
|
{
|
|
throw runtime_error("Not a deliverable step.");
|
|
}
|
|
|
|
const RowGroup& rg = tds->getDeliveredRowGroup();
|
|
inputRGs.push_back(rg);
|
|
|
|
const vector<uint32_t>& scaleIn = rg.getScale();
|
|
const vector<uint32_t>& precisionIn = rg.getPrecision();
|
|
const vector<CalpontSystemCatalog::ColDataType>& typesIn = rg.getColTypes();
|
|
const vector<uint32_t>& csNumsIn = rg.getCharsetNumbers();
|
|
|
|
for (uint64_t j = 0; j < colCount; ++j)
|
|
{
|
|
queryColTypes[j][i].colDataType = typesIn[j];
|
|
queryColTypes[j][i].charsetNumber = csNumsIn[j];
|
|
queryColTypes[j][i].scale = scaleIn[j];
|
|
queryColTypes[j][i].precision = precisionIn[j];
|
|
queryColTypes[j][i].colWidth = rg.getColumnWidth(j);
|
|
}
|
|
|
|
if (i == 0)
|
|
{
|
|
const vector<uint32_t>& oidsIn = rg.getOIDs();
|
|
const vector<uint32_t>& keysIn = rg.getKeys();
|
|
oids.insert(oids.end(), oidsIn.begin(), oidsIn.begin() + colCount);
|
|
keys.insert(keys.end(), keysIn.begin(), keysIn.begin() + colCount);
|
|
}
|
|
|
|
// if all union types are UNION_ALL, distinctUnionNum is 0.
|
|
distinct.push_back(distinctUnionNum > i);
|
|
|
|
AnyDataListSPtr spdl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
spdl->rowGroupDL(dl);
|
|
dl->OID(CNX_VTABLE_ID);
|
|
JobStepAssociation jsa;
|
|
jsa.outAdd(spdl);
|
|
spjs->outputAssociation(jsa);
|
|
jsaToUnion.outAdd(spdl);
|
|
}
|
|
|
|
AnyDataListSPtr spdl(new AnyDataList());
|
|
RowGroupDL* dl = new RowGroupDL(1, jobInfo.fifoSize);
|
|
spdl->rowGroupDL(dl);
|
|
dl->OID(CNX_VTABLE_ID);
|
|
JobStepAssociation jsa;
|
|
jsa.outAdd(spdl);
|
|
TupleUnion* unionStep = new TupleUnion(CNX_VTABLE_ID, jobInfo);
|
|
unionStep->inputAssociation(jsaToUnion);
|
|
unionStep->outputAssociation(jsa);
|
|
|
|
// This return code in the call to convertUnionColType() below would
|
|
// always be 0. This is because convertUnionColType() is also called
|
|
// in the connector code in getSelectPlan()/getGroupPlan() which handle
|
|
// the non-zero return code scenarios from this function call and error
|
|
// out, in which case, the execution does not even get to ExeMgr.
|
|
unsigned int dummyUnionedTypeRc = 0;
|
|
|
|
// get unioned column types
|
|
for (uint64_t j = 0; j < colCount; ++j)
|
|
{
|
|
CalpontSystemCatalog::ColType colType =
|
|
CalpontSystemCatalog::ColType::convertUnionColType(queryColTypes[j], dummyUnionedTypeRc);
|
|
types.push_back(colType.colDataType);
|
|
csNums.push_back(colType.charsetNumber);
|
|
scale.push_back(colType.scale);
|
|
precision.push_back(colType.precision);
|
|
width.push_back(colType.colWidth);
|
|
}
|
|
|
|
vector<uint32_t> pos;
|
|
pos.push_back(2);
|
|
|
|
for (uint64_t i = 0; i < oids.size(); ++i)
|
|
pos.push_back(pos[i] + width[i]);
|
|
|
|
unionStep->setInputRowGroups(inputRGs);
|
|
unionStep->setDistinctFlags(distinct);
|
|
unionStep->setOutputRowGroup(
|
|
RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold));
|
|
|
|
// Fix for bug 4388 adjusts the result type at connector side, this workaround is obsolete.
|
|
// bug 3067, update the returned column types.
|
|
// This is a workaround as the connector always uses the first query' returned columns.
|
|
// ct.colDataType = types[i];
|
|
// ct.scale = scale[i];
|
|
// ct.colWidth = width[i];
|
|
|
|
for (size_t i = 0; i < jobInfo.deliveredCols.size(); i++)
|
|
{
|
|
CalpontSystemCatalog::ColType ct = jobInfo.deliveredCols[i]->resultType();
|
|
// XXX remove after connector change
|
|
ct.colDataType = types[i];
|
|
ct.scale = scale[i];
|
|
ct.colWidth = width[i];
|
|
|
|
// varchar/varbinary column width has been fudged, see fudgeWidth in jlf_common.cpp.
|
|
if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
|
|
ct.colWidth--;
|
|
else if (ct.colDataType == CalpontSystemCatalog::VARBINARY)
|
|
ct.colWidth -= 2;
|
|
|
|
jobInfo.deliveredCols[i]->resultType(ct);
|
|
}
|
|
|
|
if (jobInfo.trace)
|
|
{
|
|
cout << boldStart << "\ninput RGs: (distinct=" << distinctUnionNum << ")\n" << boldStop;
|
|
|
|
for (vector<RowGroup>::iterator i = inputRGs.begin(); i != inputRGs.end(); i++)
|
|
cout << i->toString() << endl << endl;
|
|
|
|
cout << boldStart << "output RG:\n" << boldStop << unionStep->getDeliveredRowGroup().toString() << endl;
|
|
}
|
|
|
|
return SJSTEP(unionStep);
|
|
}
|
|
|
|
} // namespace joblist
|
|
|
|
#ifdef __clang__
|
|
#pragma clang diagnostic pop
|
|
#endif
|