1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4617 Move in-to-exists predicate creation and injection into the engine.

We earlier leveraged the server functionality provided by

Item_in_subselect::create_in_to_exists_cond and
Item_in_subselect::inject_in_to_exists_cond

to create and inject the in-to-exists predicate into an IN
subquery's JOIN struct. With this patch, we leave the IN subquery's
JOIN unaltered and instead directly perform this predicate creation
and injection into ColumnStore's select execution plan.
This commit is contained in:
Gagan Goel
2021-03-26 15:41:01 +00:00
parent 374103220c
commit f167a6e505
6 changed files with 1373 additions and 186 deletions

View File

@ -1677,6 +1677,180 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It
return true;
}
void checkOuterTableColumn(gp_walk_info* gwip,
const CalpontSystemCatalog::TableAliasName& tan,
execplan::ReturnedColumn* col)
{
set<CalpontSystemCatalog::TableAliasName>::const_iterator it;
bool notInner = true;
for (it = gwip->innerTables.begin(); it != gwip->innerTables.end(); ++it)
{
if (tan.alias == it->alias && tan.view == it->view)
notInner = false;
}
if (notInner)
{
col->returnAll(true);
IDEBUG( cerr << "setting returnAll on " << tan << endl);
}
}
bool buildEqualityPredicate(execplan::ReturnedColumn* lhs,
execplan::ReturnedColumn* rhs,
gp_walk_info* gwip,
boost::shared_ptr<Operator>& sop,
const Item_func::Functype& funcType,
const vector<Item*>& itemList,
bool isInSubs)
{
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
// push the column that is associated with the correlated column to the returned
// column list, so the materialized view have the complete projection list.
// e.g. tout.c1 in (select tin.c1 from tin where tin.c2=tout.c2);
// the projetion list of subquery will have tin.c1, tin.c2.
ReturnedColumn* correlatedCol = nullptr;
ReturnedColumn* localCol = nullptr;
if (rhs->joinInfo() & JOIN_CORRELATED)
{
correlatedCol = rhs;
localCol = lhs;
}
else if (lhs->joinInfo() & JOIN_CORRELATED)
{
correlatedCol = lhs;
localCol = rhs;
}
if (correlatedCol && localCol)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(localCol);
if ((!cc || (cc && funcType == Item_func::EQ_FUNC)) &&
!(localCol->joinInfo() & JOIN_CORRELATED))
{
if (isInSubs)
localCol->sequence(0);
else
localCol->sequence(gwip->returnedCols.size());
localCol->expressionId(ci->expressionId++);
ReturnedColumn* rc = localCol->clone();
rc->colSource(rc->colSource() | CORRELATED_JOIN);
gwip->additionalRetCols.push_back(SRCP(rc));
gwip->localCols.push_back(localCol);
if (rc->hasWindowFunc() && !isInSubs)
gwip->windowFuncList.push_back(rc);
}
// push the correlated join partner to the group by list only when there's aggregate
// and we don't push aggregate column to the group by
// @bug4756. mysql does not always give correct information about whether there is
// aggregate on the SELECT list. Need to figure that by ourselves and then decide
// to add the group by or not.
if (gwip->subQuery)
{
if (!localCol->hasAggregate() && !localCol->hasWindowFunc())
gwip->subGroupByCols.push_back(SRCP(localCol->clone()));
}
if (sop->op() == OP_EQ)
{
if (gwip->subSelectType == CalpontSelectExecutionPlan::IN_SUBS ||
gwip->subSelectType == CalpontSelectExecutionPlan::EXISTS_SUBS)
correlatedCol->joinInfo(correlatedCol->joinInfo() | JOIN_SEMI);
else if (gwip->subSelectType == CalpontSelectExecutionPlan::NOT_IN_SUBS ||
gwip->subSelectType == CalpontSelectExecutionPlan::NOT_EXISTS_SUBS)
correlatedCol->joinInfo(correlatedCol->joinInfo() | JOIN_ANTI);
}
}
SimpleFilter* sf = new SimpleFilter();
sf->timeZone(gwip->thd->variables.time_zone->get_name()->ptr());
//@bug 2101 for when there are only constants in a delete or update where clause (eg "where 5 < 6").
//There will be no field column and it will get here only if the comparison is true.
if (gwip->columnMap.empty() &&
((current_thd->lex->sql_command == SQLCOM_UPDATE) ||
(current_thd->lex->sql_command == SQLCOM_UPDATE_MULTI) ||
(current_thd->lex->sql_command == SQLCOM_DELETE) ||
(current_thd->lex->sql_command == SQLCOM_DELETE_MULTI)))
{
IDEBUG( cerr << "deleted func with 2 const columns" << endl );
delete rhs;
delete lhs;
return false;
}
// handle noop (only for table mode)
if (rhs->data() == "noop" || lhs->data() == "noop")
{
sop.reset(new Operator("noop"));
}
else
{
for (uint32_t i = 0; i < itemList.size(); i++)
{
if (isPredicateFunction(itemList[i], gwip))
{
gwip->fatalParseError = true;
gwip->parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_SUB_EXPRESSION);
}
}
}
sf->op(sop);
sf->lhs(lhs);
sf->rhs(rhs);
sop->setOpType(lhs->resultType(), rhs->resultType());
sop->resultType(sop->operationType());
if (sop->op() == OP_EQ)
{
CalpontSystemCatalog::TableAliasName tan_lhs;
CalpontSystemCatalog::TableAliasName tan_rhs;
bool outerjoin = (rhs->singleTable(tan_rhs) && lhs->singleTable(tan_lhs));
// @bug 1632. Alias should be taken account to the identity of tables for selfjoin to work
if (outerjoin && tan_lhs != tan_rhs) // join
{
if (!gwip->condPush) // vtable
{
if (!gwip->innerTables.empty())
{
checkOuterTableColumn(gwip, tan_lhs, lhs);
checkOuterTableColumn(gwip, tan_rhs, rhs);
}
if (funcType == Item_func::EQ_FUNC)
{
gwip->equiCondSFList.push_back(sf);
}
ParseTree* ptp = new ParseTree(sf);
gwip->ptWorkStack.push(ptp);
}
}
else
{
ParseTree* ptp = new ParseTree(sf);
gwip->ptWorkStack.push(ptp);
}
}
else
{
ParseTree* ptp = new ParseTree(sf);
gwip->ptWorkStack.push(ptp);
}
return true;
}
bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip)
{
boost::shared_ptr<Operator> sop(new PredicateOperator(ifp->func_name()));
@ -1694,9 +1868,6 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip)
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (ifp->functype() == Item_func::BETWEEN)
{
idbassert (gwip->rcWorkStack.size() >= 3);
@ -2126,174 +2297,14 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip)
RowColumn* rlhs = dynamic_cast<RowColumn*>(lhs);
if (rrhs && rlhs)
{
return buildRowColumnFilter(gwip, rrhs, rlhs, ifp);
}
// push the column that is associated with the correlated column to the returned
// column list, so the materialized view have the complete projection list.
// e.g. tout.c1 in (select tin.c1 from tin where tin.c2=tout.c2);
// the projetion list of subquery will have tin.c1, tin.c2.
ReturnedColumn* correlatedCol = NULL;
ReturnedColumn* localCol = NULL;
vector<Item*> itemList;
if (rhs->joinInfo() & JOIN_CORRELATED)
{
correlatedCol = rhs;
localCol = lhs;
}
else if (lhs->joinInfo() & JOIN_CORRELATED)
{
correlatedCol = lhs;
localCol = rhs;
}
for (uint32_t i = 0; i < ifp->argument_count(); i++)
itemList.push_back(ifp->arguments()[i]);
if (correlatedCol && localCol)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(localCol);
if ((!cc || (cc && ifp->functype() == Item_func::EQ_FUNC)) &&
!(localCol->joinInfo() & JOIN_CORRELATED))
{
localCol->sequence(gwip->returnedCols.size());
localCol->expressionId(ci->expressionId++);
ReturnedColumn* rc = localCol->clone();
rc->colSource(rc->colSource() | CORRELATED_JOIN);
gwip->additionalRetCols.push_back(SRCP(rc));
gwip->localCols.push_back(localCol);
if (rc->hasWindowFunc())
gwip->windowFuncList.push_back(rc);
}
// push the correlated join partner to the group by list only when there's aggregate
// and we don't push aggregate column to the group by
// @bug4756. mysql does not always give correct information about whether there is
// aggregate on the SELECT list. Need to figure that by ourselves and then decide
// to add the group by or not.
if (gwip->subQuery)
{
if (!localCol->hasAggregate() && !localCol->hasWindowFunc())
gwip->subGroupByCols.push_back(SRCP(localCol->clone()));
}
if (sop->op() == OP_EQ)
{
if (gwip->subSelectType == CalpontSelectExecutionPlan::IN_SUBS ||
gwip->subSelectType == CalpontSelectExecutionPlan::EXISTS_SUBS)
correlatedCol->joinInfo(correlatedCol->joinInfo() | JOIN_SEMI);
else if (gwip->subSelectType == CalpontSelectExecutionPlan::NOT_IN_SUBS ||
gwip->subSelectType == CalpontSelectExecutionPlan::NOT_EXISTS_SUBS)
correlatedCol->joinInfo(correlatedCol->joinInfo() | JOIN_ANTI);
}
}
SimpleFilter* sf = new SimpleFilter();
sf->timeZone(gwip->thd->variables.time_zone->get_name()->ptr());
//@bug 2101 for when there are only constants in a delete or update where clause (eg "where 5 < 6").
//There will be no field column and it will get here only if the comparison is true.
if (gwip->columnMap.empty() &&
((current_thd->lex->sql_command == SQLCOM_UPDATE) ||
(current_thd->lex->sql_command == SQLCOM_UPDATE_MULTI) ||
(current_thd->lex->sql_command == SQLCOM_DELETE) ||
(current_thd->lex->sql_command == SQLCOM_DELETE_MULTI)))
{
IDEBUG( cerr << "deleted func with 2 const columns" << endl );
delete rhs;
delete lhs;
return false;
}
// handle noop (only for table mode)
if (rhs->data() == "noop" || lhs->data() == "noop")
{
sop.reset(new Operator("noop"));
}
else
{
for (uint32_t i = 0; i < ifp->argument_count(); i++)
{
if (isPredicateFunction(ifp->arguments()[i], gwip))
{
gwip->fatalParseError = true;
gwip->parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_SUB_EXPRESSION);
}
}
}
sf->op(sop);
sf->lhs(lhs);
sf->rhs(rhs);
sop->setOpType(lhs->resultType(), rhs->resultType());
sop->resultType(sop->operationType());
if (sop->op() == OP_EQ)
{
CalpontSystemCatalog::TableAliasName tan_lhs;
CalpontSystemCatalog::TableAliasName tan_rhs;
set<CalpontSystemCatalog::TableAliasName>::const_iterator it;
bool outerjoin = (rhs->singleTable(tan_rhs) && lhs->singleTable(tan_lhs));
// @bug 1632. Alias should be taken account to the identity of tables for selfjoin to work
if (outerjoin && tan_lhs != tan_rhs) // join
{
if (!gwip->condPush) // vtable
{
if (!gwip->innerTables.empty())
{
bool notInner = true;
for (it = gwip->innerTables.begin(); it != gwip->innerTables.end(); ++it)
{
if (tan_lhs.alias == it->alias && tan_lhs.view == it->view)
notInner = false;
}
if (notInner)
{
lhs->returnAll(true);
IDEBUG( cerr << "setting returnAll on " << tan_lhs << endl);
}
}
if (!gwip->innerTables.empty())
{
bool notInner = true;
for (it = gwip->innerTables.begin(); it != gwip->innerTables.end(); ++it)
{
if (tan_rhs.alias == it->alias && tan_rhs.view == it->view)
notInner = false;
}
if (notInner)
{
rhs->returnAll(true);
IDEBUG( cerr << "setting returnAll on " << tan_rhs << endl );
}
}
if (ifp->functype() == Item_func::EQ_FUNC)
{
gwip->equiCondSFList.push_back(sf);
}
ParseTree* ptp = new ParseTree(sf);
gwip->ptWorkStack.push(ptp);
}
}
else
{
ParseTree* ptp = new ParseTree(sf);
gwip->ptWorkStack.push(ptp);
}
}
else
{
ParseTree* ptp = new ParseTree(sf);
gwip->ptWorkStack.push(ptp);
}
return buildEqualityPredicate(lhs, rhs, gwip, sop, ifp->functype(), itemList);
}
return true;
@ -6881,6 +6892,125 @@ int processLimitAndOffset(
return 0;
}
/*@brief Create in-to-exists predicate for an IN subquery */
/***********************************************************
* DESCRIPTION:
* This function processes the lhs and rhs of an IN predicate
* for a query such as:
* select col1 from t1 where col2 in (select col2' from t2);
* here, lhs is col2 and rhs is the in subquery "select col2' from t2".
* It creates a new predicate of the form "col2=col2'" which then later
* gets injected into the execution plan of the subquery.
* If lhs is of type Item::ROW_ITEM instead, such as:
* select col1 from t1 where (col2,col3) in (select col2',col3' from t2);
* the function builds an "and" filter of the form "col2=col2' and col3=col3'".
* RETURNS
* none
***********************************************************/
void buildInToExistsFilter(gp_walk_info& gwi, SELECT_LEX& select_lex)
{
RowColumn* rlhs = dynamic_cast<RowColumn*>(gwi.inSubQueryLHS);
size_t additionalRetColsBefore = gwi.additionalRetCols.size();
if (rlhs)
{
idbassert(gwi.inSubQueryLHSItem->type() == Item::ROW_ITEM);
Item_row* row = (Item_row*)gwi.inSubQueryLHSItem;
idbassert(!rlhs->columnVec().empty() &&
(rlhs->columnVec().size() == gwi.returnedCols.size()) &&
row->cols() && (row->cols() == select_lex.item_list.elements) &&
(row->cols() == gwi.returnedCols.size()));
List_iterator_fast<Item> it(select_lex.item_list);
Item* item;
int i = 0;
ParseTree* rowFilter = nullptr;
while ((item = it++))
{
boost::shared_ptr<Operator> sop(new PredicateOperator("="));
vector<Item*> itemList = {row->element_index(i), item};
ReturnedColumn* rhs = gwi.returnedCols[i]->clone();
buildEqualityPredicate(rlhs->columnVec()[i]->clone(), rhs, &gwi, sop,
Item_func::EQ_FUNC, itemList, true);
if (gwi.fatalParseError)
{
delete rlhs;
return;
}
ParseTree* tmpFilter = nullptr;
if (!gwi.ptWorkStack.empty())
{
tmpFilter = gwi.ptWorkStack.top();
gwi.ptWorkStack.pop();
}
if (i == 0 && tmpFilter)
{
rowFilter = tmpFilter;
}
else if (i != 0 && tmpFilter && rowFilter)
{
ParseTree* ptp = new ParseTree(new LogicOperator("and"));
ptp->left(rowFilter);
ptp->right(tmpFilter);
rowFilter = ptp;
}
i++;
}
delete rlhs;
if (rowFilter)
gwi.ptWorkStack.push(rowFilter);
}
else
{
idbassert((gwi.returnedCols.size() == 1) &&
(select_lex.item_list.elements == 1));
boost::shared_ptr<Operator> sop(new PredicateOperator("="));
vector<Item*> itemList = {gwi.inSubQueryLHSItem,
select_lex.item_list.head()};
ReturnedColumn* rhs = gwi.returnedCols[0]->clone();
buildEqualityPredicate(gwi.inSubQueryLHS, rhs, &gwi, sop,
Item_func::EQ_FUNC, itemList, true);
if (gwi.fatalParseError)
return;
}
size_t additionalRetColsAdded = gwi.additionalRetCols.size() -
additionalRetColsBefore;
if (gwi.returnedCols.size() &&
(gwi.returnedCols.size() == additionalRetColsAdded))
{
for (size_t i = 0; i < gwi.returnedCols.size(); i++)
{
gwi.returnedCols[i]->expressionId(
gwi.additionalRetCols[additionalRetColsBefore + i]->expressionId());
gwi.returnedCols[i]->colSource(
gwi.additionalRetCols[additionalRetColsBefore + i]->colSource());
}
// Delete the duplicate copy of the returned cols
auto iter = gwi.additionalRetCols.begin();
std::advance(iter, additionalRetColsBefore);
gwi.additionalRetCols.erase(iter, gwi.additionalRetCols.end());
}
}
/*@brief Translates SELECT_LEX into CSEP */
/***********************************************************
* DESCRIPTION:
@ -7476,6 +7606,51 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
}
}
// MCOL-4617 If this is an IN subquery, then create the in-to-exists
// predicate and inject it into the csep
if (gwi.subQuery &&
gwi.subSelectType == CalpontSelectExecutionPlan::IN_SUBS &&
gwi.inSubQueryLHS && gwi.inSubQueryLHSItem)
{
// create the predicate
buildInToExistsFilter(gwi, select_lex);
if (gwi.fatalParseError)
{
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
return ER_INTERNAL_ERROR;
}
// now inject the created predicate
if (!gwi.ptWorkStack.empty())
{
ParseTree* inToExistsFilter = gwi.ptWorkStack.top();
gwi.ptWorkStack.pop();
if (havingFilter)
{
ParseTree* ptp = new ParseTree(new LogicOperator("and"));
ptp->left(havingFilter);
ptp->right(inToExistsFilter);
havingFilter = ptp;
}
else
{
if (csep->filters())
{
ParseTree* ptp = new ParseTree(new LogicOperator("and"));
ptp->left(csep->filters());
ptp->right(inToExistsFilter);
csep->filters(ptp);
}
else
{
csep->filters(inToExistsFilter);
}
}
}
}
// for post process expressions on the select list
// error out post process for union and sub select unit
if (isUnion || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT)