1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

System Versioning pre0.12

Merge remote-tracking branch 'origin/archive/2017-10-17' into 10.3
This commit is contained in:
Aleksey Midenkov
2017-11-07 00:37:49 +03:00
354 changed files with 20615 additions and 1638 deletions

View File

@ -54,12 +54,14 @@
#include "sql_statistics.h"
#include "sql_cte.h"
#include "sql_window.h"
#include "tztime.h"
#include "debug_sync.h" // DEBUG_SYNC
#include <m_ctype.h>
#include <my_bit.h>
#include <hash.h>
#include <ft_global.h>
#include "sys_vars_shared.h"
/*
A key part number that means we're using a fulltext scan.
@ -669,6 +671,362 @@ setup_without_group(THD *thd, Ref_ptr_array ref_pointer_array,
DBUG_RETURN(res);
}
bool vers_select_conds_t::init_from_sysvar(THD *thd)
{
st_vers_current_time &in= thd->variables.vers_current_time;
type= in.type;
unit_start= UNIT_TIMESTAMP;
if (type != FOR_SYSTEM_TIME_UNSPECIFIED && type != FOR_SYSTEM_TIME_ALL)
{
DBUG_ASSERT(type == FOR_SYSTEM_TIME_AS_OF);
start= new (thd->mem_root)
Item_datetime_literal(thd, &in.ltime, TIME_SECOND_PART_DIGITS);
if (!start)
return true;
}
else
start= NULL;
end= NULL;
return false;
}
int vers_setup_select(THD *thd, TABLE_LIST *tables, COND **where_expr,
SELECT_LEX *slex)
{
DBUG_ENTER("vers_setup_select");
#define newx new (thd->mem_root)
TABLE_LIST *table;
int versioned_tables= 0;
if (!thd->stmt_arena->is_conventional() &&
!thd->stmt_arena->is_stmt_prepare() && !thd->stmt_arena->is_sp_execute())
{
// statement is already prepared
DBUG_RETURN(0);
}
for (table= tables; table; table= table->next_local)
{
if (table->table && table->table->versioned())
versioned_tables++;
else if (table->vers_conditions)
{
my_error(ER_VERSIONING_REQUIRED, MYF(0), table->alias);
DBUG_RETURN(-1);
}
}
if (versioned_tables == 0)
DBUG_RETURN(0);
/* For prepared statements we create items on statement arena,
because they must outlive execution phase for multiple executions. */
Query_arena_stmt on_stmt_arena(thd);
if (slex->saved_where)
{
DBUG_ASSERT(thd->stmt_arena->is_sp_execute());
/* 2. this copy_andor_structure() is also required by the same reason */
*where_expr= slex->saved_where->copy_andor_structure(thd);
}
else if (thd->stmt_arena->is_sp_execute())
{
if (thd->stmt_arena->is_stmt_execute()) // SP executed second time (STMT_EXECUTED)
*where_expr= 0;
else if (*where_expr) // SP executed first time (STMT_INITIALIZED_FOR_SP)
/* 1. copy_andor_structure() is required since this andor tree
is modified later (and on shorter arena) */
slex->saved_where= (*where_expr)->copy_andor_structure(thd);
}
/* We have to save also non-versioned on_expr since we may have
conjuction of versioned + non-versioned */
if (thd->stmt_arena->is_sp_execute())
{
for (table= tables; table; table= table->next_local)
{
if (!table->table)
continue;
if (table->saved_on_expr) // same logic as saved_where
{
if (table->on_expr)
table->on_expr= table->saved_on_expr->copy_andor_structure(thd);
else
// on_expr was moved to WHERE (see below: Add ON expression to the WHERE)
*where_expr= and_items(thd,
*where_expr,
table->saved_on_expr->copy_andor_structure(thd));
}
else if (table->on_expr &&
thd->stmt_arena->state == Query_arena::STMT_INITIALIZED_FOR_SP)
{
table->saved_on_expr= table->on_expr->copy_andor_structure(thd);
}
}
}
SELECT_LEX *outer_slex= slex->next_select_in_list();
// propagate derived conditions to outer SELECT_LEX
if (outer_slex && slex->vers_export_outer)
{
for (table= outer_slex->table_list.first; table; table= table->next_local)
{
if (!table->vers_conditions)
{
table->vers_conditions= slex->vers_export_outer;
table->vers_conditions.from_inner= true;
}
}
}
for (table= tables; table; table= table->next_local)
{
if (table->table && table->table->versioned())
{
vers_select_conds_t &vers_conditions= table->vers_conditions;
// propagate system_time from nearest outer SELECT_LEX
if (!vers_conditions && outer_slex && slex->vers_import_outer)
{
TABLE_LIST* derived= slex->master_unit()->derived;
while (outer_slex && (!derived->vers_conditions || derived->vers_conditions.from_inner))
{
derived= outer_slex->master_unit()->derived;
outer_slex= outer_slex->next_select_in_list();
}
if (outer_slex)
{
DBUG_ASSERT(derived);
DBUG_ASSERT(derived->vers_conditions);
vers_conditions= derived->vers_conditions;
}
}
// propagate system_time from sysvar
if (!vers_conditions)
{
if (vers_conditions.init_from_sysvar(thd))
DBUG_RETURN(-1);
}
if (vers_conditions)
{
switch (slex->lock_type)
{
case TL_WRITE_ALLOW_WRITE:
case TL_WRITE_CONCURRENT_INSERT:
case TL_WRITE_DELAYED:
case TL_WRITE_DEFAULT:
case TL_WRITE_LOW_PRIORITY:
case TL_WRITE:
case TL_WRITE_ONLY:
my_error(ER_VERS_HISTORY_LOCK, MYF(0));
DBUG_RETURN(-1);
default:
break;
}
if (vers_conditions == FOR_SYSTEM_TIME_ALL)
continue;
} // if (vers_conditions)
COND** dst_cond= where_expr;
if (table->on_expr)
{
dst_cond= &table->on_expr;
}
if (TABLE_LIST *t= table->embedding)
{
if (t->on_expr)
dst_cond= &t->on_expr;
}
const LEX_CSTRING *fstart= &table->table->vers_start_field()->field_name;
const LEX_CSTRING *fend= &table->table->vers_end_field()->field_name;
Item *row_start=
newx Item_field(thd, &slex->context, table->db, table->alias, fstart);
Item *row_end=
newx Item_field(thd, &slex->context, table->db, table->alias, fend);
Item *row_end2= row_end;
bool tmp_from_ib=
table->table->s->table_category == TABLE_CATEGORY_TEMPORARY &&
table->table->vers_start_field()->type() == MYSQL_TYPE_LONGLONG;
bool timestamps_only= table->table->versioned_by_sql() && !tmp_from_ib;
if (vers_conditions)
{
vers_conditions.resolve_units(timestamps_only);
if (timestamps_only)
{
if (vers_conditions.unit_start == UNIT_TRX_ID || vers_conditions.unit_end == UNIT_TRX_ID)
{
my_error(ER_VERS_ENGINE_UNSUPPORTED, MYF(0), table->table_name);
DBUG_RETURN(-1);
}
}
else if (thd->variables.vers_innodb_algorithm_simple)
{
DBUG_ASSERT(table->table->s && table->table->s->db_plugin);
handlerton *hton= plugin_hton(table->table->s->db_plugin);
DBUG_ASSERT(hton);
bool convert_start= false;
bool convert_end= false;
switch (vers_conditions.type)
{
case FOR_SYSTEM_TIME_AS_OF:
if (vers_conditions.unit_start == UNIT_TIMESTAMP)
convert_start= convert_end= true;
break;
case FOR_SYSTEM_TIME_BEFORE:
if (vers_conditions.unit_start == UNIT_TIMESTAMP)
convert_end= true;
break;
case FOR_SYSTEM_TIME_FROM_TO:
case FOR_SYSTEM_TIME_BETWEEN:
if (vers_conditions.unit_start == UNIT_TIMESTAMP)
convert_end= true;
if (vers_conditions.unit_end == UNIT_TIMESTAMP)
convert_start= true;
default:
break;
}
if (convert_start)
row_start= newx Item_func_vtq_ts(
thd,
hton,
row_start,
VTQ_COMMIT_TS);
if (convert_end)
row_end= newx Item_func_vtq_ts(
thd,
hton,
row_end,
VTQ_COMMIT_TS);
}
}
Item *cond1= 0, *cond2= 0, *curr= 0;
// Temporary tables of can be created from INNODB tables and thus will
// have uint64 type of sys_trx_(start|end) field.
// They need special handling.
TABLE *t= table->table;
if (tmp_from_ib || t->versioned_by_sql() ||
thd->variables.vers_innodb_algorithm_simple)
{
switch (vers_conditions.type)
{
case FOR_SYSTEM_TIME_UNSPECIFIED:
if (t->vers_start_field()->real_type() != MYSQL_TYPE_LONGLONG)
{
MYSQL_TIME max_time;
thd->variables.time_zone->gmt_sec_to_TIME(&max_time, TIMESTAMP_MAX_VALUE);
max_time.second_part= TIME_MAX_SECOND_PART;
curr= newx Item_datetime_literal(thd, &max_time,
TIME_SECOND_PART_DIGITS);
cond1= newx Item_func_eq(thd, row_end, curr);
}
else
{
curr= newx Item_int(thd, ULONGLONG_MAX);
cond1= newx Item_func_eq(thd, row_end2, curr);
}
break;
case FOR_SYSTEM_TIME_AS_OF:
cond1= newx Item_func_le(thd, row_start,
vers_conditions.start);
cond2= newx Item_func_gt(thd, row_end,
vers_conditions.start);
break;
case FOR_SYSTEM_TIME_FROM_TO:
cond1= newx Item_func_lt(thd, row_start,
vers_conditions.end);
cond2= newx Item_func_ge(thd, row_end,
vers_conditions.start);
break;
case FOR_SYSTEM_TIME_BETWEEN:
cond1= newx Item_func_le(thd, row_start,
vers_conditions.end);
cond2= newx Item_func_ge(thd, row_end,
vers_conditions.start);
break;
case FOR_SYSTEM_TIME_BEFORE:
cond1= newx Item_func_lt(thd, row_end,
vers_conditions.start);
break;
default:
DBUG_ASSERT(0);
}
}
else
{
DBUG_ASSERT(table->table->s && table->table->s->db_plugin);
handlerton *hton= plugin_hton(table->table->s->db_plugin);
DBUG_ASSERT(hton);
Item *trx_id0, *trx_id1;
switch (vers_conditions.type)
{
case FOR_SYSTEM_TIME_UNSPECIFIED:
curr= newx Item_int(thd, ULONGLONG_MAX);
cond1= newx Item_func_eq(thd, row_end2, curr);
break;
case FOR_SYSTEM_TIME_AS_OF:
trx_id0= vers_conditions.unit_start == UNIT_TIMESTAMP ?
newx Item_func_vtq_id(thd, hton, vers_conditions.start, VTQ_TRX_ID) :
vers_conditions.start;
cond1= newx Item_func_vtq_trx_sees_eq(thd, hton, trx_id0, row_start);
cond2= newx Item_func_vtq_trx_sees(thd, hton, row_end, trx_id0);
break;
case FOR_SYSTEM_TIME_FROM_TO:
case FOR_SYSTEM_TIME_BETWEEN:
trx_id0= vers_conditions.unit_start == UNIT_TIMESTAMP ?
newx Item_func_vtq_id(thd, hton, vers_conditions.start, VTQ_TRX_ID, true) :
vers_conditions.start;
trx_id1= vers_conditions.unit_end == UNIT_TIMESTAMP ?
newx Item_func_vtq_id(thd, hton, vers_conditions.end, VTQ_TRX_ID, false) :
vers_conditions.end;
cond1= vers_conditions.type == FOR_SYSTEM_TIME_FROM_TO ?
newx Item_func_vtq_trx_sees(thd, hton, trx_id1, row_start) :
newx Item_func_vtq_trx_sees_eq(thd, hton, trx_id1, row_start);
cond2= newx Item_func_vtq_trx_sees_eq(thd, hton, row_end, trx_id0);
break;
case FOR_SYSTEM_TIME_BEFORE:
trx_id0= vers_conditions.unit_start == UNIT_TIMESTAMP ?
newx Item_func_vtq_id(thd, hton, vers_conditions.start, VTQ_TRX_ID) :
vers_conditions.start;
cond1= newx Item_func_lt(thd, row_end, trx_id0);
break;
default:
DBUG_ASSERT(0);
}
}
if (cond1)
{
cond1= and_items(thd,
*dst_cond,
and_items(thd,
cond2,
cond1));
if (on_stmt_arena.arena_replaced())
*dst_cond= cond1;
else
thd->change_item_tree(dst_cond, cond1);
}
} // if (... table->table->versioned())
} // for (table= tables; ...)
DBUG_RETURN(0);
#undef newx
}
/*****************************************************************************
Check fields, find best join, do the select and output fields.
mysql_select assumes that all tables are already opened
@ -744,7 +1102,11 @@ JOIN::prepare(TABLE_LIST *tables_init,
{
remove_redundant_subquery_clauses(select_lex);
}
/* System Versioning: handle FOR SYSTEM_TIME clause. */
if (vers_setup_select(thd, tables_list, &conds, select_lex) < 0)
DBUG_RETURN(-1);
/*
TRUE if the SELECT list mixes elements with and without grouping,
and there is no GROUP BY clause. Mixing non-aggregated fields with
@ -1026,6 +1388,46 @@ JOIN::prepare(TABLE_LIST *tables_init,
if (!procedure && result && result->prepare(fields_list, unit_arg))
goto err; /* purecov: inspected */
if (!thd->stmt_arena->is_stmt_prepare())
{
bool have_versioned_tables= false;
for (TABLE_LIST *table= tables_list; table; table= table->next_local)
{
if (table->table && table->table->versioned())
{
have_versioned_tables= true;
break;
}
}
if (have_versioned_tables)
{
Item_transformer transformer= &Item::vers_optimized_fields_transformer;
if (conds)
{
conds= conds->transform(thd, transformer, NULL);
}
for (ORDER *ord= order; ord; ord= ord->next)
{
ord->item_ptr= (*ord->item)->transform(thd, transformer, NULL);
ord->item= &ord->item_ptr;
}
for (ORDER *ord= group_list; ord; ord= ord->next)
{
ord->item_ptr= (*ord->item)->transform(thd, transformer, NULL);
ord->item= &ord->item_ptr;
}
if (having)
{
having= having->transform(thd, transformer, NULL);
}
}
}
unit= unit_arg;
if (prepare_stage2())
goto err;
@ -3545,6 +3947,16 @@ void JOIN::exec_inner()
result->send_result_set_metadata(
procedure ? procedure_fields_list : *fields,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
{
List_iterator<Item> it(*columns_list);
while (Item *item= it++)
{
Item_transformer transformer= &Item::vers_optimized_fields_transformer;
it.replace(item->transform(thd, transformer, NULL));
}
}
error= do_select(this, procedure);
/* Accumulate the counts from all join iterations of all join parts. */
thd->inc_examined_row_count(join_examined_rows);
@ -16289,7 +16701,12 @@ Field *create_tmp_field_from_field(THD *thd, Field *org_field,
item->result_field= new_field;
else
new_field->field_name= *name;
new_field->flags|= (org_field->flags & NO_DEFAULT_VALUE_FLAG);
new_field->flags|= (org_field->flags & (
NO_DEFAULT_VALUE_FLAG |
HIDDEN_FLAG |
VERS_SYS_START_FLAG |
VERS_SYS_END_FLAG |
VERS_OPTIMIZED_UPDATE_FLAG));
if (org_field->maybe_null() || (item && item->maybe_null))
new_field->flags&= ~NOT_NULL_FLAG; // Because of outer join
if (org_field->type() == MYSQL_TYPE_VAR_STRING ||
@ -16884,6 +17301,8 @@ create_tmp_table(THD *thd, TMP_TABLE_PARAM *param, List<Item> &fields,
List_iterator_fast<Item> li(fields);
Item *item;
Field **tmp_from_field=from_field;
Field *sys_trx_start= NULL;
Field *sys_trx_end= NULL;
while ((item=li++))
{
Item::Type type= item->type();
@ -17006,6 +17425,31 @@ create_tmp_table(THD *thd, TMP_TABLE_PARAM *param, List<Item> &fields,
continue; // Some kind of const item
}
DBUG_ASSERT(!new_field->field_name.str || strlen(new_field->field_name.str) == new_field->field_name.length);
if (type == Item::FIELD_ITEM || type == Item::REF_ITEM)
{
if (item->real_item()->type() == Item::FIELD_ITEM)
{
Item_field *item_field= (Item_field *)item->real_item();
Field *field= item_field->field;
TABLE_SHARE *s= field->table->s;
if (s->versioned)
{
if (field->flags & VERS_SYS_START_FLAG)
sys_trx_start= new_field;
else if (field->flags & VERS_SYS_END_FLAG)
sys_trx_end= new_field;
}
}
}
if (type == Item::TYPE_HOLDER)
{
Item_type_holder *ith= (Item_type_holder*)item;
if (ith->flags & VERS_SYS_START_FLAG)
sys_trx_start= new_field;
else if (ith->flags & VERS_SYS_END_FLAG)
sys_trx_end= new_field;
}
if (type == Item::SUM_FUNC_ITEM)
{
Item_sum *agg_item= (Item_sum *) item;
@ -17086,6 +17530,21 @@ create_tmp_table(THD *thd, TMP_TABLE_PARAM *param, List<Item> &fields,
total_uneven_bit_length= 0;
}
}
if (sys_trx_start && sys_trx_end)
{
sys_trx_start->flags|= VERS_SYS_START_FLAG | HIDDEN_FLAG;
sys_trx_end->flags|= VERS_SYS_END_FLAG | HIDDEN_FLAG;
share->versioned= true;
share->field= table->field;
share->row_start_field= sys_trx_start->field_index;
share->row_end_field= sys_trx_end->field_index;
}
else
{
DBUG_ASSERT(!sys_trx_start && !sys_trx_end);
}
DBUG_ASSERT(fieldnr == (uint) (reg_field - table->field));
DBUG_ASSERT(field_count >= (uint) (reg_field - table->field));
field_count= fieldnr;
@ -25575,6 +26034,11 @@ void TABLE_LIST::print(THD *thd, table_map eliminated_tables, String *str,
}
#endif /* WITH_PARTITION_STORAGE_ENGINE */
}
if (table && table->versioned())
{
// versioning conditions are already unwrapped to WHERE clause
str->append(" FOR SYSTEM_TIME ALL");
}
if (my_strcasecmp(table_alias_charset, cmp_name, alias))
{
char t_alias_buff[MAX_ALIAS_NAME];