1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

Fixed the problem of wrong identification of WITH tables defined in WITH clauses without RECURSIVE.

Added test cases to check the fix.
Fixed the problem of wrong types of recursive tables when the type of anchor part does not coincide with the
type of recursive part.
Prevented usage of marerialization and subquery cache for subqueries with recursive references.
Introduced system variables 'max_recursion_level'.
Added a test case to test usage of this variable.
This commit is contained in:
Galina Shalygina
2016-05-24 21:29:52 +03:00
parent 0f7fe2a743
commit b4f1f42062
16 changed files with 493 additions and 143 deletions

View File

@ -118,83 +118,78 @@ bool With_clause::check_dependencies(THD *thd)
if (with_elem->derived_dep_map & with_elem->get_elem_map())
with_elem->is_recursive= true;
}
for (With_element *with_elem= first_elem;
with_elem != NULL;
with_elem= with_elem->next_elem)
{
if (with_elem->is_recursive)
{
#if 0
my_error(ER_RECURSIVE_QUERY_IN_WITH_CLAUSE, MYF(0),
with_elem->query_name->str);
return true;
#endif
}
}
if (!with_recursive)
{
/*
For each with table T defined in this with clause check whether
it is used in any definition that follows the definition of T.
*/
for (With_element *with_elem= first_elem;
with_elem != NULL;
with_elem= with_elem->next_elem)
{
With_element *checked_elem= with_elem->next_elem;
for (uint i = with_elem->number+1;
i < elements;
i++, checked_elem= checked_elem->next_elem)
{
if (with_elem->check_dependency_on(checked_elem))
{
my_error(ER_WRONG_ORDER_IN_WITH_CLAUSE, MYF(0),
with_elem->query_name->str, checked_elem->query_name->str);
return true;
}
}
}
}
dependencies_are_checked= true;
return false;
}
struct st_unit_ctxt_elem
{
st_unit_ctxt_elem *prev;
st_select_lex_unit *unit;
};
bool With_element::check_dependencies_in_spec(THD *thd)
{
for (st_select_lex *sl= spec->first_select(); sl; sl= sl->next_select())
{
check_dependencies_in_select(sl, sl->with_dep);
st_unit_ctxt_elem ctxt0= {NULL, owner->owner};
st_unit_ctxt_elem ctxt1= {&ctxt0, spec};
check_dependencies_in_select(sl, &ctxt1, false, &sl->with_dep);
base_dep_map|= sl->with_dep;
}
return false;
}
void With_element::check_dependencies_in_select(st_select_lex *sl,
table_map &dep_map)
With_element *find_table_def_in_with_clauses(TABLE_LIST *tbl,
st_unit_ctxt_elem *ctxt)
{
bool is_sq_select= sl->master_unit()->item != NULL;
With_element *barrier= NULL;
for (st_unit_ctxt_elem *unit_ctxt_elem= ctxt;
unit_ctxt_elem;
unit_ctxt_elem= unit_ctxt_elem->prev)
{
st_select_lex_unit *unit= unit_ctxt_elem->unit;
With_clause *with_clause= unit->with_clause;
if (with_clause &&
(tbl->with= with_clause->find_table_def(tbl, barrier)))
return tbl->with;
barrier= NULL;
if (unit->with_element && !unit->with_element->get_owner()->with_recursive)
barrier= unit->with_element;
}
return NULL;
}
void With_element::check_dependencies_in_select(st_select_lex *sl,
st_unit_ctxt_elem *ctxt,
bool in_subq,
table_map *dep_map)
{
With_clause *with_clause= sl->get_with_clause();
for (TABLE_LIST *tbl= sl->table_list.first; tbl; tbl= tbl->next_local)
{
if (tbl->derived || tbl->nested_join)
continue;
tbl->with_internal_reference_map= 0;
if (with_clause && !tbl->with)
tbl->with= with_clause->find_table_def(tbl, NULL);
if (!tbl->with)
tbl->with= owner->find_table_def(tbl);
if (!tbl->with && tbl->select_lex)
tbl->with= tbl->select_lex->find_table_def_in_with_clauses(tbl);
tbl->with= find_table_def_in_with_clauses(tbl, ctxt);
if (tbl->with && tbl->with->owner== this->owner)
{
dep_map|= tbl->with->get_elem_map();
*dep_map|= tbl->with->get_elem_map();
tbl->with_internal_reference_map= get_elem_map();
if (is_sq_select)
if (in_subq)
sq_dep_map|= tbl->with->get_elem_map();
}
}
st_select_lex_unit *inner_unit= sl->first_inner_unit();
for (; inner_unit; inner_unit= inner_unit->next_unit())
check_dependencies_in_unit(inner_unit, dep_map);
check_dependencies_in_unit(inner_unit, ctxt, in_subq, dep_map);
}
@ -213,12 +208,32 @@ void With_element::check_dependencies_in_select(st_select_lex *sl,
*/
void With_element::check_dependencies_in_unit(st_select_lex_unit *unit,
table_map &dep_map)
st_unit_ctxt_elem *ctxt,
bool in_subq,
table_map *dep_map)
{
if (unit->with_clause)
check_dependencies_in_with_clause(unit->with_clause, ctxt, in_subq, dep_map);
in_subq |= unit->item != NULL;
st_unit_ctxt_elem unit_ctxt_elem= {ctxt, unit};
st_select_lex *sl= unit->first_select();
for (; sl; sl= sl->next_select())
{
check_dependencies_in_select(sl, dep_map);
check_dependencies_in_select(sl, &unit_ctxt_elem, in_subq, dep_map);
}
}
void
With_element::check_dependencies_in_with_clause(With_clause *with_clause,
st_unit_ctxt_elem *ctxt,
bool in_subq,
table_map *dep_map)
{
for (With_element *with_elem= with_clause->first_elem;
with_elem != NULL;
with_elem= with_elem->next_elem)
{
check_dependencies_in_unit(with_elem->spec, ctxt, in_subq, dep_map);
}
}
@ -328,10 +343,11 @@ bool With_clause::check_anchors()
NULL - otherwise
*/
With_element *With_clause::find_table_def(TABLE_LIST *table)
With_element *With_clause::find_table_def(TABLE_LIST *table,
With_element *barrier)
{
for (With_element *with_elem= first_elem;
with_elem != NULL;
with_elem != barrier;
with_elem= with_elem->next_elem)
{
if (my_strcasecmp(system_charset_info, with_elem->query_name->str,
@ -672,17 +688,27 @@ bool With_element::is_anchor(st_select_lex *sel)
With_element *st_select_lex::find_table_def_in_with_clauses(TABLE_LIST *table)
{
st_select_lex_unit *master_unit= NULL;
With_element *found= NULL;
for (st_select_lex *sl= this;
sl;
sl= sl->master_unit()->outer_select())
sl= master_unit->outer_select())
{
With_clause *with_clause=sl->get_with_clause();
if (with_clause && (found= with_clause->find_table_def(table)))
return found;
/* Do not look for the table's definition beyond the scope of the view */
if (sl->master_unit()->is_view)
With_element *with_elem= sl->get_with_element();
/*
If sl->master_unit() is the spec of a with element then the search for
a definition was already done by With_element::check_dependencies_in_spec
and it was unsuccesful.
*/
if (with_elem)
break;
With_clause *with_clause=sl->get_with_clause();
if (with_clause && (found= with_clause->find_table_def(table,NULL)))
break;
master_unit= sl->master_unit();
/* Do not look for the table's definition beyond the scope of the view */
if (master_unit->is_view)
break;
}
return found;
}
@ -729,7 +755,7 @@ bool TABLE_LIST::is_recursive_with_table()
bool TABLE_LIST::is_with_table_recursive_reference()
{
return (with_internal_reference_map &&
(with->mutually_recursive & with_internal_reference_map));
(with->get_mutually_recursive() & with_internal_reference_map));
}
@ -745,10 +771,11 @@ bool st_select_lex::check_unrestricted_recursive(bool only_standards_compliant)
unrestricted,
encountered))
return true;
with_elem->owner->unrestricted|= unrestricted;
with_elem->get_owner()->add_unrestricted(unrestricted);
if (with_sum_func ||
(with_elem->sq_dep_map & with_elem->mutually_recursive))
with_elem->owner->unrestricted|= with_elem->mutually_recursive;
(with_elem->contains_sq_with_recursive_reference()))
with_elem->get_owner()->add_unrestricted(
with_elem->get_mutually_recursive());
if (only_standards_compliant && with_elem->is_unrestricted())
{
my_error(ER_NOT_STANDARDS_COMPLIANT_RECURSIVE,
@ -776,7 +803,7 @@ bool With_element::check_unrestricted_recursive(st_select_lex *sel,
if (tbl->is_materialized_derived())
{
table_map dep_map;
check_dependencies_in_unit(unit, dep_map);
check_dependencies_in_unit(unit, NULL, false, &dep_map);
if (dep_map & get_elem_map())
{
my_error(ER_REF_TO_RECURSIVE_WITH_TABLE_IN_DERIVED,
@ -797,7 +824,7 @@ bool With_element::check_unrestricted_recursive(st_select_lex *sel,
else
encountered|= with_elem->get_elem_map();
}
}
}
for (With_element *with_elem= sel->get_with_element()->owner->first_elem;
with_elem != NULL;
with_elem= with_elem->next_elem)
@ -841,6 +868,27 @@ bool With_element::check_unrestricted_recursive(st_select_lex *sel,
}
void st_select_lex::check_subqueries_with_recursive_references()
{
st_select_lex_unit *sl_master= master_unit();
List_iterator<TABLE_LIST> ti(leaf_tables);
TABLE_LIST *tbl;
while ((tbl= ti++))
{
if (!(tbl->is_with_table_recursive_reference() && sl_master->item))
continue;
for (st_select_lex *sl= this; sl; sl= sl_master->outer_select())
{
sl_master= sl->master_unit();
if (!sl_master->item)
continue;
Item_subselect *subq= (Item_subselect *) sl_master->item;
subq->with_recursive_reference= true;
}
}
}
/**
@brief
Print this with clause