1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

SQL: replication from unversioned to versioned [fixes #94]

This commit is contained in:
Aleksey Midenkov
2016-12-18 17:06:43 +00:00
parent ef10ef98ab
commit e45b85eb3e
5 changed files with 232 additions and 8 deletions

View File

@ -37,5 +37,77 @@ x
3 3
2 2
connection master; connection master;
create or replace table t1 (x int primary key) engine = innodb;
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
x
2
select * from t1 for system_time all;
x
1
2
connection master;
delete from t1;
connection slave;
select * from t1;
x
select * from t1 for system_time all;
x
1
2
connection master;
create or replace table t1 (x int) engine = innodb;
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
x
2
select * from t1 for system_time all;
x
2
1
connection master;
delete from t1;
connection slave;
select * from t1;
x
select * from t1 for system_time all;
x
2
1
connection master;
create or replace table t1 (x int primary key) with system versioning engine = innodb;
connection slave;
alter table t1 without system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
select * from t1 for system_time all;
x
1
2
connection slave;
select * from t1;
x
2
connection master;
delete from t1;
select * from t1 for system_time all;
x
1
2
connection slave;
select * from t1;
x
connection master;
drop table t1; drop table t1;
include/rpl_end.inc include/rpl_end.inc

View File

@ -37,5 +37,77 @@ x
3 3
2 2
connection master; connection master;
create or replace table t1 (x int primary key) engine = innodb;
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
x
2
select * from t1 for system_time all;
x
1
2
connection master;
delete from t1;
connection slave;
select * from t1;
x
select * from t1 for system_time all;
x
1
2
connection master;
create or replace table t1 (x int) engine = innodb;
connection slave;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
connection slave;
select * from t1;
x
2
select * from t1 for system_time all;
x
2
1
connection master;
delete from t1;
connection slave;
select * from t1;
x
select * from t1 for system_time all;
x
2
1
connection master;
create or replace table t1 (x int primary key) with system versioning engine = innodb;
connection slave;
alter table t1 without system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
select * from t1 for system_time all;
x
1
2
connection slave;
select * from t1;
x
2
connection master;
delete from t1;
select * from t1 for system_time all;
x
1
2
connection slave;
select * from t1;
x
connection master;
drop table t1; drop table t1;
include/rpl_end.inc include/rpl_end.inc

View File

@ -29,5 +29,62 @@ sync_slave_with_master;
select * from t1; select * from t1;
select * from t1 for system_time all; select * from t1 for system_time all;
# check unversioned -> versioned replication
connection master;
create or replace table t1 (x int primary key) engine = innodb;
sync_slave_with_master;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
sync_slave_with_master;
select * from t1;
select * from t1 for system_time all;
connection master;
delete from t1;
sync_slave_with_master;
select * from t1;
select * from t1 for system_time all;
# same thing (UPDATE, DELETE), but without PK
connection master;
create or replace table t1 (x int) engine = innodb;
sync_slave_with_master;
alter table t1 with system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
sync_slave_with_master;
select * from t1;
select * from t1 for system_time all;
connection master;
delete from t1;
sync_slave_with_master;
select * from t1;
select * from t1 for system_time all;
# same thing, but reverse: versioned -> unversioned
connection master;
create or replace table t1 (x int primary key) with system versioning engine = innodb;
sync_slave_with_master;
alter table t1 without system versioning;
connection master;
insert into t1 values (1);
update t1 set x= 2 where x = 1;
select * from t1 for system_time all;
sync_slave_with_master;
select * from t1;
connection master;
delete from t1;
select * from t1 for system_time all;
sync_slave_with_master;
select * from t1;
connection master; connection master;
drop table t1; drop table t1;

View File

@ -6003,8 +6003,8 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data)
DBUG_ASSERT(new_data == table->record[0]); DBUG_ASSERT(new_data == table->record[0]);
DBUG_ASSERT(old_data == table->record[1]); DBUG_ASSERT(old_data == table->record[1]);
// InnoDB changes sys_trx_end to curr_trx_id and we need to restore MAX_TRX // it is important to keep 'old_data' intact for versioning to work correctly on slave side
if (table->file->check_table_binlog_row_based(1)) if (table->file->check_table_binlog_row_based(1) && table->versioned())
memcpy(table->record[2], table->record[1], table->s->reclength); memcpy(table->record[2], table->record[1], table->s->reclength);
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write(); mark_trx_read_write();
@ -6017,8 +6017,10 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data)
if (likely(!error) && !row_already_logged) if (likely(!error) && !row_already_logged)
{ {
rows_changed++; rows_changed++;
if (table->file->check_table_binlog_row_based(1)) { if (table->file->check_table_binlog_row_based(1))
memcpy(table->record[1], table->record[2], table->s->reclength); {
if (table->versioned())
memcpy(table->record[1], table->record[2], table->s->reclength);
error= binlog_log_row(table, old_data, new_data, log_func); error= binlog_log_row(table, old_data, new_data, log_func);
} }
} }

View File

@ -12766,7 +12766,7 @@ uint8 Write_rows_log_event::get_trg_event_map()
Returns TRUE if different. Returns TRUE if different.
*/ */
static bool record_compare(TABLE *table) static bool record_compare(TABLE *table, bool skip_sys_start)
{ {
bool result= FALSE; bool result= FALSE;
/** /**
@ -12799,7 +12799,10 @@ static bool record_compare(TABLE *table)
/* Compare fields */ /* Compare fields */
for (Field **ptr=table->field ; *ptr ; ptr++) for (Field **ptr=table->field ; *ptr ; ptr++)
{ {
if (skip_sys_start && *ptr == table->vers_start_field())
{
continue;
}
/** /**
We only compare field contents that are not null. We only compare field contents that are not null.
NULL fields (i.e., their null bits) were compared NULL fields (i.e., their null bits) were compared
@ -12994,6 +12997,24 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
prepare_record(table, m_width, FALSE); prepare_record(table, m_width, FALSE);
error= unpack_current_row(rgi); error= unpack_current_row(rgi);
bool skip_sys_start= false;
if (table->versioned())
{
Field *sys_trx_end= table->vers_end_field();
DBUG_ASSERT(table->read_set);
bitmap_set_bit(table->read_set, sys_trx_end->field_index);
// master table is unversioned
if (sys_trx_end->val_int() == 0)
{
DBUG_ASSERT(table->write_set);
bitmap_set_bit(table->write_set, sys_trx_end->field_index);
sys_trx_end->set_max();
table->vers_start_field()->set_notnull();
skip_sys_start= true;
}
}
DBUG_PRINT("info",("looking for the following record")); DBUG_PRINT("info",("looking for the following record"));
DBUG_DUMP("record[0]", table->record[0], table->s->reclength); DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
@ -13169,7 +13190,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
/* We use this to test that the correct key is used in test cases. */ /* We use this to test that the correct key is used in test cases. */
DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort();); DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort(););
while (record_compare(table)) while (record_compare(table, skip_sys_start))
{ {
while ((error= table->file->ha_index_next(table->record[0]))) while ((error= table->file->ha_index_next(table->record[0])))
{ {
@ -13233,7 +13254,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
goto end; goto end;
} }
} }
while (record_compare(table)); while (record_compare(table, skip_sys_start));
/* /*
Note: above record_compare will take into accout all record fields Note: above record_compare will take into accout all record fields