1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-07 00:04:31 +03:00

SQL: versioning DDL part I [closes #172]

This commit is contained in:
kevg
2017-04-24 14:47:44 +03:00
committed by Aleksey Midenkov
parent f94fd4b730
commit 0185872449
8 changed files with 457 additions and 17 deletions

View File

@@ -5133,6 +5133,20 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL ENUM_VALUE_LIST NULL
READ_ONLY NO READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME VERSIONING_DDL_SURVIVAL
SESSION_VALUE OFF
GLOBAL_VALUE OFF
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE OFF
VARIABLE_SCOPE SESSION
VARIABLE_TYPE BOOLEAN
VARIABLE_COMMENT Use system versioning DDL survival feature
NUMERIC_MIN_VALUE NULL
NUMERIC_MAX_VALUE NULL
NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST OFF,ON
READ_ONLY NO
COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME VERSIONING_FORCE VARIABLE_NAME VERSIONING_FORCE
SESSION_VALUE OFF SESSION_VALUE OFF
GLOBAL_VALUE OFF GLOBAL_VALUE OFF

View File

@@ -0,0 +1,176 @@
set @@session.time_zone='+00:00';
select ifnull(max(trx_id), 0) into @start_trx_id from information_schema.innodb_vtq;
create procedure if not exists verify_vtq()
begin
set @i= 0;
select
@i:= @i + 1 as No,
trx_id > 0 as A,
commit_id > trx_id as B,
begin_ts > '1-1-1 0:0:0' as C,
commit_ts >= begin_ts as D
from information_schema.innodb_vtq
where trx_id > @start_trx_id;
select ifnull(max(trx_id), 0)
into @start_trx_id
from information_schema.innodb_vtq;
end~~
create function if not exists default_engine()
returns varchar(255)
deterministic
begin
declare e varchar(255);
select lower(engine) from information_schema.engines where support='DEFAULT' into e;
return e;
end~~
create function if not exists sys_datatype()
returns varchar(255)
deterministic
begin
if default_engine() = 'innodb' then
return 'bigint unsigned';
elseif default_engine() = 'myisam' then
return 'timestamp(6)';
end if;
return NULL;
end~~
create function if not exists sys_commit_ts(sys_field varchar(255))
returns varchar(255)
deterministic
begin
if default_engine() = 'innodb' then
return concat('vtq_commit_ts(', sys_field, ')');
elseif default_engine() = 'myisam' then
return sys_field;
end if;
return NULL;
end~~
create procedure if not exists innodb_verify_vtq(recs int)
begin
declare i int default 1;
if default_engine() = 'innodb' then
call verify_vtq;
elseif default_engine() = 'myisam' then
create temporary table tmp (No int, A bool, B bool, C bool, D bool);
while i <= recs do
insert into tmp values (i, 1, 1, 1, 1);
set i= i + 1;
end while;
select * from tmp;
drop table tmp;
end if;
end~~
create procedure concat_exec2(a varchar(255), b varchar(255))
begin
prepare stmt from concat(a, b);
execute stmt;
deallocate prepare stmt;
end~~
create procedure concat_exec3(a varchar(255), b varchar(255), c varchar(255))
begin
prepare stmt from concat(a, b, c);
execute stmt;
deallocate prepare stmt;
end~~
create function get_historical_table_name(table_name_arg varchar(255))
returns varchar(255)
begin
return (select table_name from information_schema.tables
where table_schema='test' and table_name like concat(table_name_arg, '_%') limit 1);
end~~
create procedure drop_last_historical(table_name_arg varchar(255))
begin
call concat_exec2('drop table ', get_historical_table_name(table_name_arg));
end~~
set versioning_ddl_survival = 1;
create or replace table t (a int) with system versioning;
insert into t values (1);
update t set a=2 where a=1;
select sys_trx_start from t where a=2 into @tm;
alter table t add column b int;
select * from t;
a b
2 NULL
call concat_exec3('select * from ', get_historical_table_name('t'), ' for system_time all');
a
2
1
call concat_exec3('select @tm=sys_trx_start from ', get_historical_table_name('t'), ' for system_time all where a=2');
@tm=sys_trx_start
1
select @tm<sys_trx_start from t where a=2;
@tm<sys_trx_start
1
select sys_trx_start from t where a=2 into @tm;
call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t'), ' for system_time all where a=2');
@tm=sys_trx_end
1
call drop_last_historical('t');
create or replace table t (a int) with system versioning;
insert into t values (1);
update t set a=2 where a=1;
select sys_trx_start from t where a=2 into @tm;
alter table t add column b int;
select * from t;
a b
2 NULL
call concat_exec3('select * from ', get_historical_table_name('t'), ' for system_time all');
a
2
1
call concat_exec3('select @tm=sys_trx_start from ', get_historical_table_name('t'), ' for system_time all where a=2');
@tm=sys_trx_start
1
select @tm<sys_trx_start from t where a=2;
@tm<sys_trx_start
1
select sys_trx_start from t where a=2 into @tm;
call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t'), ' for system_time all where a=2');
@tm=sys_trx_end
1
call drop_last_historical('t');
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);
update t set a=2 where a=1;
select sys_trx_start from t where a=2 into @tm;
alter table t add column b int;
select * from t;
a b
2 NULL
call concat_exec3('select * from ', get_historical_table_name('t'), ' for system_time all');
a
2
1
call concat_exec3('select @tm=sys_trx_start from ', get_historical_table_name('t'), ' for system_time all where a=2');
@tm=sys_trx_start
1
select @tm<sys_trx_start from t where a=2;
@tm<sys_trx_start
1
select sys_trx_start from t where a=2 into @tm;
call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t'), ' for system_time all where a=2');
@tm=sys_trx_end
1
call drop_last_historical('t');
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);
update t set a=2 where a=1;
alter table t add column b int, algorithm=inplace;
set versioning_ddl_survival = 0;
drop procedure concat_exec2;
drop procedure concat_exec3;
drop function get_historical_table_name;
drop procedure drop_last_historical;
call verify_vtq;
No A B C D
1 1 1 1 1
2 1 1 1 1
3 1 1 1 1
4 1 1 1 1
5 1 1 1 1
drop table t;
drop procedure verify_vtq;
drop procedure innodb_verify_vtq;
drop function default_engine;
drop function sys_commit_ts;
drop function sys_datatype;

View File

@@ -0,0 +1,104 @@
-- source suite/versioning/common.inc
delimiter ~~;
create procedure concat_exec2(a varchar(255), b varchar(255))
begin
prepare stmt from concat(a, b);
execute stmt;
deallocate prepare stmt;
end~~
create procedure concat_exec3(a varchar(255), b varchar(255), c varchar(255))
begin
prepare stmt from concat(a, b, c);
execute stmt;
deallocate prepare stmt;
end~~
create function get_historical_table_name(table_name_arg varchar(255))
returns varchar(255)
begin
return (select table_name from information_schema.tables
where table_schema='test' and table_name like concat(table_name_arg, '_%') limit 1);
end~~
create procedure drop_last_historical(table_name_arg varchar(255))
begin
call concat_exec2('drop table ', get_historical_table_name(table_name_arg));
end~~
delimiter ;~~
set versioning_ddl_survival = 1;
create or replace table t (a int) with system versioning;
insert into t values (1);
update t set a=2 where a=1;
select sys_trx_start from t where a=2 into @tm;
alter table t add column b int;
select * from t;
call concat_exec3('select * from ', get_historical_table_name('t'), ' for system_time all');
call concat_exec3('select @tm=sys_trx_start from ', get_historical_table_name('t'), ' for system_time all where a=2');
select @tm<sys_trx_start from t where a=2;
select sys_trx_start from t where a=2 into @tm;
call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t'), ' for system_time all where a=2');
call drop_last_historical('t');
# same for INNODB ALGORITHM=COPY
create or replace table t (a int) with system versioning;
insert into t values (1);
update t set a=2 where a=1;
select sys_trx_start from t where a=2 into @tm;
alter table t add column b int;
select * from t;
call concat_exec3('select * from ', get_historical_table_name('t'), ' for system_time all');
call concat_exec3('select @tm=sys_trx_start from ', get_historical_table_name('t'), ' for system_time all where a=2');
select @tm<sys_trx_start from t where a=2;
select sys_trx_start from t where a=2 into @tm;
call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t'), ' for system_time all where a=2');
call drop_last_historical('t');
# same for INNODB default ALGORITHM
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);
update t set a=2 where a=1;
select sys_trx_start from t where a=2 into @tm;
alter table t add column b int;
select * from t;
call concat_exec3('select * from ', get_historical_table_name('t'), ' for system_time all');
call concat_exec3('select @tm=sys_trx_start from ', get_historical_table_name('t'), ' for system_time all where a=2');
select @tm<sys_trx_start from t where a=2;
select sys_trx_start from t where a=2 into @tm;
call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t'), ' for system_time all where a=2');
call drop_last_historical('t');
# no DDL for INNODB explicit ALGORITHM=INPLACE
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);
update t set a=2 where a=1;
alter table t add column b int, algorithm=inplace;
set versioning_ddl_survival = 0;
drop procedure concat_exec2;
drop procedure concat_exec3;
drop function get_historical_table_name;
drop procedure drop_last_historical;
call verify_vtq;
drop table t;
-- source suite/versioning/common_finish.inc

View File

@@ -711,6 +711,7 @@ typedef struct system_variables
my_bool vers_force; my_bool vers_force;
ulong vers_hide; ulong vers_hide;
my_bool vers_innodb_algorithm_simple; my_bool vers_innodb_algorithm_simple;
my_bool vers_ddl_survival;
} SV; } SV;
/** /**

View File

@@ -5155,6 +5155,85 @@ static void make_unique_constraint_name(THD *thd, LEX_STRING *name,
** Alter a table definition ** Alter a table definition
****************************************************************************/ ****************************************************************************/
// Works as NOW(6)
static MYSQL_TIME vers_thd_get_now(THD *thd)
{
MYSQL_TIME now;
thd->variables.time_zone->gmt_sec_to_TIME(&now, thd->query_start());
now.second_part= thd->query_start_sec_part();
thd->time_zone_used= 1;
return now;
}
static void vers_table_name_date(THD *thd, const char *table_name,
char *new_name, size_t new_name_size)
{
const MYSQL_TIME now= vers_thd_get_now(thd);
my_snprintf(new_name, new_name_size, "%s_%04d%02d%02d_%02d%02d%02d_%06d",
table_name, now.year, now.month, now.day, now.hour, now.minute,
now.second, now.second_part);
}
bool operator!=(const MYSQL_TIME &lhs, const MYSQL_TIME &rhs)
{
return lhs.year != rhs.year || lhs.month != rhs.month || lhs.day != rhs.day ||
lhs.hour != rhs.hour || lhs.minute != rhs.minute ||
lhs.second_part != rhs.second_part || lhs.neg != rhs.neg ||
lhs.time_type != rhs.time_type;
}
// Sets sys_trx_end=MAX for rows with sys_trx_end=now(6)
static bool vers_reset_alter_copy(THD *thd, TABLE *table)
{
const MYSQL_TIME now= vers_thd_get_now(thd);
READ_RECORD info;
int error= 0;
bool will_batch= false;
uint dup_key_found= 0;
if (init_read_record(&info, thd, table, NULL, NULL, 0, 1, true))
goto err;
will_batch= !table->file->start_bulk_update();
while (!(error= info.read_record(&info)))
{
MYSQL_TIME current;
if (table->vers_end_field()->get_date(&current, 0))
goto err_read_record;
if (current != now)
{
continue;
}
store_record(table, record[1]);
table->vers_end_field()->set_max();
if (will_batch)
error= table->file->ha_bulk_update_row(table->record[1], table->record[0],
&dup_key_found);
else
error= table->file->ha_update_row(table->record[1], table->record[0]);
if (error && table->file->is_fatal_error(error, HA_CHECK_ALL))
{
table->file->print_error(error, MYF(ME_FATALERROR));
goto err_read_record;
}
}
if (will_batch && (error= table->file->exec_bulk_update(&dup_key_found)))
table->file->print_error(error, MYF(ME_FATALERROR));
if (will_batch)
table->file->end_bulk_update();
err_read_record:
end_read_record(&info);
err:
if (table->file->ha_external_lock(thd, F_UNLCK))
return true;
return error ? true : false;
}
/** /**
Rename a table. Rename a table.
@@ -8602,6 +8681,26 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
bool error= open_tables(thd, &table_list, &tables_opened, 0, bool error= open_tables(thd, &table_list, &tables_opened, 0,
&alter_prelocking_strategy); &alter_prelocking_strategy);
thd->open_options&= ~HA_OPEN_FOR_ALTER; thd->open_options&= ~HA_OPEN_FOR_ALTER;
bool versioned= table_list->table && table_list->table->versioned();
if (versioned && thd->variables.vers_ddl_survival)
{
table_list->set_lock_type(thd, TL_WRITE);
if (thd->mdl_context.upgrade_shared_lock(table_list->table->mdl_ticket,
MDL_EXCLUSIVE,
thd->variables.lock_wait_timeout))
{
DBUG_RETURN(true);
}
if (table_list->table->versioned_by_engine() &&
alter_info->requested_algorithm ==
Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT &&
!table_list->table->s->partition_info_str)
{
// Changle default ALGORITHM to COPY for INNODB
alter_info->requested_algorithm= Alter_info::ALTER_TABLE_ALGORITHM_COPY;
}
}
DEBUG_SYNC(thd, "alter_opened_table"); DEBUG_SYNC(thd, "alter_opened_table");
@@ -8914,9 +9013,11 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
Upgrade from MDL_SHARED_UPGRADABLE to MDL_SHARED_NO_WRITE. Upgrade from MDL_SHARED_UPGRADABLE to MDL_SHARED_NO_WRITE.
Afterwards it's safe to take the table level lock. Afterwards it's safe to take the table level lock.
*/ */
if (thd->mdl_context.upgrade_shared_lock(mdl_ticket, MDL_SHARED_NO_WRITE, if ((!(versioned && thd->variables.vers_ddl_survival) &&
thd->variables.lock_wait_timeout) thd->mdl_context.upgrade_shared_lock(
|| lock_tables(thd, table_list, alter_ctx.tables_opened, 0)) mdl_ticket, MDL_SHARED_NO_WRITE,
thd->variables.lock_wait_timeout)) ||
lock_tables(thd, table_list, alter_ctx.tables_opened, 0))
{ {
DBUG_RETURN(true); DBUG_RETURN(true);
} }
@@ -8981,6 +9082,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
handlerton *new_db_type= create_info->db_type; handlerton *new_db_type= create_info->db_type;
handlerton *old_db_type= table->s->db_type(); handlerton *old_db_type= table->s->db_type();
TABLE *new_table= NULL; TABLE *new_table= NULL;
bool new_versioned= false;
ha_rows copied=0,deleted=0; ha_rows copied=0,deleted=0;
/* /*
@@ -9315,6 +9417,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
} }
if (!new_table) if (!new_table)
goto err_new_table_cleanup; goto err_new_table_cleanup;
new_versioned= new_table->versioned();
/* /*
Note: In case of MERGE table, we do not attach children. We do not Note: In case of MERGE table, we do not attach children. We do not
copy data for MERGE tables. Only the children have data. copy data for MERGE tables. Only the children have data.
@@ -9341,7 +9444,18 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
order_num, order, &copied, &deleted, order_num, order, &copied, &deleted,
alter_info->keys_onoff, alter_info->keys_onoff,
&alter_ctx)) &alter_ctx))
{
if (versioned && new_versioned && thd->variables.vers_ddl_survival)
{
if (table->versioned_by_sql())
{
// Failure of this function may result in corruption of
// an original table.
vers_reset_alter_copy(thd, table);
}
}
goto err_new_table_cleanup; goto err_new_table_cleanup;
}
} }
else else
{ {
@@ -9436,9 +9550,14 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
Rename the old table to temporary name to have a backup in case Rename the old table to temporary name to have a backup in case
anything goes wrong while renaming the new table. anything goes wrong while renaming the new table.
*/ */
char backup_name[32]; char backup_name[FN_LEN];
my_snprintf(backup_name, sizeof(backup_name), "%s2-%lx-%lx", tmp_file_prefix, if (versioned && thd->variables.vers_ddl_survival)
current_pid, thd->thread_id); vers_table_name_date(thd, alter_ctx.table_name, backup_name,
sizeof(backup_name));
else
my_snprintf(backup_name, sizeof(backup_name), "%s2-%lx-%lx",
tmp_file_prefix, current_pid, thd->thread_id);
if (lower_case_table_names) if (lower_case_table_names)
my_casedn_str(files_charset_info, backup_name); my_casedn_str(files_charset_info, backup_name);
if (mysql_rename_table(old_db_type, alter_ctx.db, alter_ctx.table_name, if (mysql_rename_table(old_db_type, alter_ctx.db, alter_ctx.table_name,
@@ -9490,7 +9609,8 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
} }
// ALTER TABLE succeeded, delete the backup of the old table. // ALTER TABLE succeeded, delete the backup of the old table.
if (quick_rm_table(thd, old_db_type, alter_ctx.db, backup_name, FN_IS_TMP)) if (!(versioned && new_versioned && thd->variables.vers_ddl_survival) &&
quick_rm_table(thd, old_db_type, alter_ctx.db, backup_name, FN_IS_TMP))
{ {
/* /*
The fact that deletion of the backup failed is not critical The fact that deletion of the backup failed is not critical
@@ -9674,7 +9794,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
Field **dfield_ptr= to->default_field; Field **dfield_ptr= to->default_field;
bool make_versioned= !from->versioned() && to->versioned(); bool make_versioned= !from->versioned() && to->versioned();
bool make_unversioned= from->versioned() && !to->versioned(); bool make_unversioned= from->versioned() && !to->versioned();
Field *to_sys_trx_start= NULL, *from_sys_trx_end= NULL, *to_sys_trx_end= NULL; bool keep_versioned= from->versioned() && to->versioned();
Field *to_sys_trx_start= NULL, *to_sys_trx_end= NULL, *from_sys_trx_end= NULL;
MYSQL_TIME now; MYSQL_TIME now;
DBUG_ENTER("copy_data_between_tables"); DBUG_ENTER("copy_data_between_tables");
@@ -9777,19 +9898,26 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
if (make_versioned) if (make_versioned)
{ {
thd->variables.time_zone->gmt_sec_to_TIME(&now, thd->query_start()); now= vers_thd_get_now(thd);
now.second_part= thd->query_start_sec_part(); to_sys_trx_start= to->vers_start_field();
thd->time_zone_used= 1; to_sys_trx_end= to->vers_end_field();
to_sys_trx_start= to->field[to->s->row_start_field];
to_sys_trx_end= to->field[to->s->row_end_field];
} }
else if (make_unversioned) else if (make_unversioned)
{ {
from_sys_trx_end= from->field[from->s->row_end_field]; from_sys_trx_end= from->vers_end_field();
} }
else if (from->versioned() && to->versioned()) else if (keep_versioned)
{ {
to->file->vers_auto_decrement= 0xffffffffffffffff; to->file->vers_auto_decrement= 0xffffffffffffffff;
if (thd->variables.vers_ddl_survival)
{
thd->variables.time_zone->gmt_sec_to_TIME(&now, thd->query_start());
now.second_part= thd->query_start_sec_part();
thd->time_zone_used= 1;
from_sys_trx_end= from->vers_end_field();
to_sys_trx_start= to->vers_start_field();
}
} }
THD_STAGE_INFO(thd, stage_copy_to_tmp_table); THD_STAGE_INFO(thd, stage_copy_to_tmp_table);
@@ -9863,6 +9991,17 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
if (!from_sys_trx_end->is_max()) if (!from_sys_trx_end->is_max())
continue; continue;
} }
else if (keep_versioned && thd->variables.vers_ddl_survival)
{
// Do not copy history rows.
if (!from_sys_trx_end->is_max())
continue;
store_record(from, record[1]);
from->vers_end_field()->store_time(&now);
from->file->ha_update_row(from->record[1], from->record[0]);
to_sys_trx_start->store_time(&now);
}
prev_insert_id= to->file->next_insert_id; prev_insert_id= to->file->next_insert_id;
if (to->default_field) if (to->default_field)

View File

@@ -406,6 +406,10 @@ static Sys_var_mybool Sys_vers_innodb_algorithm_simple(
SESSION_VAR(vers_innodb_algorithm_simple), CMD_LINE(OPT_ARG), SESSION_VAR(vers_innodb_algorithm_simple), CMD_LINE(OPT_ARG),
DEFAULT(TRUE)); DEFAULT(TRUE));
static Sys_var_mybool Sys_vers_ddl_survival(
"versioning_ddl_survival", "Use system versioning DDL survival feature",
SESSION_VAR(vers_ddl_survival), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
static Sys_var_ulonglong Sys_binlog_cache_size( static Sys_var_ulonglong Sys_binlog_cache_size(
"binlog_cache_size", "The size of the transactional cache for " "binlog_cache_size", "The size of the transactional cache for "
"updates to transactional engines for the binary log. " "updates to transactional engines for the binary log. "

View File

@@ -9530,7 +9530,8 @@ ha_innobase::update_row(
error = row_update_for_mysql((byte*) old_row, m_prebuilt, vers_set_fields); error = row_update_for_mysql((byte*) old_row, m_prebuilt, vers_set_fields);
if (error == DB_SUCCESS && vers_set_fields) { if (error == DB_SUCCESS && vers_set_fields &&
thd_sql_command(m_user_thd) != SQLCOM_ALTER_TABLE) {
if (trx->id != static_cast<trx_id_t>(table->vers_start_field()->val_int())) if (trx->id != static_cast<trx_id_t>(table->vers_start_field()->val_int()))
error = row_insert_for_mysql((byte*) old_row, m_prebuilt, ROW_INS_HISTORICAL); error = row_insert_for_mysql((byte*) old_row, m_prebuilt, ROW_INS_HISTORICAL);
} }

View File

@@ -2004,7 +2004,8 @@ run_again:
upd_field_t* ufield; upd_field_t* ufield;
dict_col_t* col; dict_col_t* col;
unsigned col_idx; unsigned col_idx;
if (node->is_delete) { if (node->is_delete ||
thd_sql_command(trx->mysql_thd) == SQLCOM_ALTER_TABLE) {
ufield = &uvect->fields[0]; ufield = &uvect->fields[0];
uvect->n_fields = 0; uvect->n_fields = 0;
node->is_delete = false; node->is_delete = false;