1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

0.1: SQL-level System Versioning

This commit is contained in:
Daniel Fiala
2016-06-19 07:38:28 +01:00
committed by Aleksey Midenkov
parent 14bdfa8541
commit be6f2d302c
115 changed files with 4055 additions and 1101 deletions

View File

@ -44,6 +44,9 @@
// mysql_derived_filling
#include "sql_insert.h" // For vers_insert_history_row() that may be
// needed for System Versioning.
/**
True if the table's input and output record buffers are comparable using
compare_record(TABLE*).
@ -280,6 +283,10 @@ int mysql_update(THD *thd,
Explain_update *explain;
query_plan.index= MAX_KEY;
query_plan.using_filesort= FALSE;
// For System Versioning (may need to insert new fields to a table).
ha_rows updated_sys_ver= 0;
DBUG_ENTER("mysql_update");
create_explain_query(thd->lex, thd->mem_root);
@ -354,6 +361,9 @@ int mysql_update(THD *thd,
DBUG_RETURN(1);
}
if (table->default_field)
table->mark_default_fields_for_write(false);
#ifndef NO_EMBEDDED_ACCESS_CHECKS
/* Check values */
table_list->grant.want_privilege= table->grant.want_privilege=
@ -734,6 +744,11 @@ int mysql_update(THD *thd,
while (!(error=info.read_record(&info)) && !thd->killed)
{
if (table->versioned() && !table->vers_end_field()->is_max_timestamp())
{
continue;
}
explain->tracker.on_record_read();
thd->inc_examined_row_count(1);
if (!select || select->skip_record(thd) > 0)
@ -743,10 +758,17 @@ int mysql_update(THD *thd,
explain->tracker.on_record_after_where();
store_record(table,record[1]);
if (fill_record_n_invoke_before_triggers(thd, table, fields, values, 0,
TRG_EVENT_UPDATE))
break; /* purecov: inspected */
if (table->versioned() && table->vers_update_fields())
{
error= 1;
break;
}
found++;
if (!can_compare_record || compare_record(table))
@ -805,19 +827,29 @@ int mysql_update(THD *thd,
else
{
/* Non-batched update */
error= table->file->ha_update_row(table->record[1],
error= table->file->ha_update_row(table->record[1],
table->record[0]);
}
if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
{
if (error != HA_ERR_RECORD_IS_THE_SAME)
updated++;
else
error= 0;
}
else if (!ignore ||
if (error == HA_ERR_RECORD_IS_THE_SAME)
{
error= 0;
}
else if (!error)
{
updated++;
if (table->versioned())
{
store_record(table, record[2]);
if ((error = vers_insert_history_row(table, &updated_sys_ver)))
break;
restore_record(table, record[2]);
}
}
else if (!ignore ||
table->file->is_fatal_error(error, HA_CHECK_ALL))
{
{
/*
If (ignore && error is ignorable) we don't have to
do anything; otherwise...
@ -1007,9 +1039,15 @@ int mysql_update(THD *thd,
if (error < 0 && !thd->lex->analyze_stmt)
{
char buff[MYSQL_ERRMSG_SIZE];
my_snprintf(buff, sizeof(buff), ER_THD(thd, ER_UPDATE_INFO), (ulong) found,
(ulong) updated,
(ulong) thd->get_stmt_da()->current_statement_warn_count());
if (!table->versioned())
my_snprintf(buff, sizeof(buff), ER_THD(thd, ER_UPDATE_INFO), (ulong) found,
(ulong) updated,
(ulong) thd->get_stmt_da()->current_statement_warn_count());
else
my_snprintf(buff, sizeof(buff),
ER_THD(thd, ER_UPDATE_INFO_WITH_SYSTEM_VERSIONING),
(ulong) found, (ulong) updated, (ulong) updated_sys_ver,
(ulong) thd->get_stmt_da()->current_statement_warn_count());
my_ok(thd, (thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated,
id, buff);
DBUG_PRINT("info",("%ld records updated", (long) updated));
@ -1627,8 +1665,10 @@ multi_update::multi_update(THD *thd_arg, TABLE_LIST *table_list,
tmp_tables(0), updated(0), found(0), fields(field_list),
values(value_list), table_count(0), copy_field(0),
handle_duplicates(handle_duplicates_arg), do_update(1), trans_safe(1),
transactional_tables(0), ignore(ignore_arg), error_handled(0), prepared(0)
{}
transactional_tables(0), ignore(ignore_arg), error_handled(0), prepared(0),
updated_sys_ver(0)
{
}
/*
@ -1877,7 +1917,7 @@ static bool safe_update_on_fly(THD *thd, JOIN_TAB *join_tab,
return !is_key_used(table, table->s->primary_key, table->write_set);
return TRUE;
default:
break; // Avoid compler warning
break; // Avoid compiler warning
}
return FALSE;
@ -2097,6 +2137,11 @@ int multi_update::send_data(List<Item> &not_used_values)
if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
continue;
if (table->versioned() && !table->vers_end_field()->is_max_timestamp())
{
continue;
}
if (table == table_to_update)
{
/*
@ -2109,6 +2154,7 @@ int multi_update::send_data(List<Item> &not_used_values)
table->status|= STATUS_UPDATED;
store_record(table,record[1]);
if (fill_record_n_invoke_before_triggers(thd, table,
*fields_for_table[offset],
*values_for_table[offset], 0,
@ -2127,6 +2173,13 @@ int multi_update::send_data(List<Item> &not_used_values)
if (table->default_field && table->update_default_fields(1, ignore))
DBUG_RETURN(1);
if (table->versioned() &&
table->vers_update_fields())
{
error= 1;
break;
}
if ((error= cur_table->view_check_option(thd, ignore)) !=
VIEW_CHECK_OK)
{
@ -2174,6 +2227,23 @@ int multi_update::send_data(List<Item> &not_used_values)
error= 0;
updated--;
}
else if (table->versioned())
{
restore_record(table,record[1]);
// Set end time to now()
if (table->vers_end_field()->set_time())
{
error= 1;
break;
}
if ( (error= vers_insert_history_row(table, &updated_sys_ver)) )
{
error= 1;
break;
}
}
/* non-transactional or transactional table got modified */
/* either multi_update class' flag is raised in its branch */
if (table->file->has_transactions())
@ -2200,6 +2270,7 @@ int multi_update::send_data(List<Item> &not_used_values)
*/
uint field_num= 0;
List_iterator_fast<TABLE> tbl_it(unupdated_check_opt_tables);
/* Set first tbl = table and then tbl to tables from tbl_it */
TABLE *tbl= table;
do
{
@ -2262,10 +2333,6 @@ void multi_update::abort_result_set()
if (do_update && table_count > 1)
{
/* Add warning here */
/*
todo/fixme: do_update() is never called with the arg 1.
should it change the signature to become argless?
*/
(void) do_updates();
}
}
@ -2447,19 +2514,44 @@ int multi_update::do_updates()
goto err2;
}
}
if ((local_error=table->file->ha_update_row(table->record[1],
table->record[0])) &&
if (table->versioned() &&
table->vers_update_fields())
{
goto err2;
}
if ((local_error=table->file->ha_update_row(table->record[1],
table->record[0])) &&
local_error != HA_ERR_RECORD_IS_THE_SAME)
{
if (!ignore ||
table->file->is_fatal_error(local_error, HA_CHECK_ALL))
{
err_table= table;
goto err;
goto err;
}
}
}
if (local_error != HA_ERR_RECORD_IS_THE_SAME)
{
updated++;
if (table->versioned())
{
restore_record(table,record[1]);
// Set end time to now()
if (table->vers_end_field()->set_time())
{
goto err2;
}
if ( (local_error= vers_insert_history_row(table, &updated_sys_ver)) )
{
err_table = table;
goto err;
}
}
}
else
local_error= 0;
}