diff --git a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result new file mode 100644 index 00000000000..8aea6ccb1e8 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result @@ -0,0 +1,86 @@ +include/rpl_init.inc [topology=1->2] +*** MDEV-6321: close_temporary_tables() in format description event not serialised correctly *** +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=5; +CHANGE MASTER TO master_use_gtid= current_pos; +include/start_slave.inc +CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(100) CHARACTER SET utf8); +include/stop_slave.inc +SET gtid_domain_id= 1; +INSERT INTO t1 VALUES (1, 0); +CREATE TEMPORARY TABLE t2 (a int); +SET gtid_domain_id= 2; +CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY); +CREATE TEMPORARY TABLE t4 (a int); +INSERT INTO t3 VALUES (100); +INSERT INTO t4 SELECT a+1 FROM t3; +INSERT INTO t2 VALUES (2), (4), (6), (8), (10), (12), (14), (16), (18), (20); +INSERT INTO t2 VALUES (3), (6), (9), (12), (15), (18); +INSERT INTO t2 VALUES (4), (8), (12), (16), (20); +INSERT INTO t3 SELECT a+2 FROM t4; +INSERT INTO t4 SELECT a+4 FROM t3; +INSERT INTO t2 VALUES (5), (10), (15), (20); +INSERT INTO t2 VALUES (6), (12), (18); +INSERT INTO t2 VALUES (7), (14); +INSERT INTO t2 VALUES (8), (16); +INSERT INTO t2 VALUES (9), (18); +INSERT INTO t2 VALUES (10), (20); +INSERT INTO t3 SELECT a+8 FROM t4; +INSERT INTO t4 SELECT a+16 FROM t3; +INSERT INTO t2 VALUES (11); +INSERT INTO t2 VALUES (12); +INSERT INTO t2 VALUES (13); +INSERT INTO t3 SELECT a+32 FROM t4; +INSERT INTO t2 VALUES (14); +INSERT INTO t2 VALUES (15); +INSERT INTO t2 VALUES (16); +INSERT INTO t4 SELECT a+64 FROM t3; +INSERT INTO t2 VALUES (17); +INSERT INTO t2 VALUES (18); +INSERT INTO t2 VALUES (19); +INSERT INTO t3 SELECT a+128 FROM t4; +INSERT INTO t2 VALUES (20); +INSERT INTO t1 SELECT a, a MOD 7 FROM t3; +INSERT INTO t1 SELECT a, a MOD 7 FROM t4; +INSERT INTO t1 SELECT a, COUNT(*) FROM t2 GROUP BY a; +FLUSH TABLES; +SET SESSION debug_dbug="+d,crash_dispatch_command_before"; +SELECT 1; +Got one of the listed errors +INSERT INTO t1 VALUES (0, 1); +include/start_slave.inc +SELECT * FROM t1 WHERE a <= 20 ORDER BY a; +a b +0 1 +1 0 +2 1 +3 1 +4 2 +5 1 +6 3 +7 1 +8 3 +9 2 +10 3 +11 1 +12 5 +13 1 +14 3 +15 3 +16 4 +17 1 +18 5 +19 1 +20 5 +SELECT COUNT(*) FROM t1 WHERE a BETWEEN 100+0 AND 100+256; +COUNT(*) +55 +SHOW STATUS LIKE 'Slave_open_temp_tables'; +Variable_name Value +Slave_open_temp_tables 0 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_temptable-master.opt b/mysql-test/suite/rpl/t/rpl_parallel_temptable-master.opt new file mode 100644 index 00000000000..425fda95086 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_temptable-master.opt @@ -0,0 +1 @@ +--skip-stack-trace --skip-core-file diff --git a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test new file mode 100644 index 00000000000..c50bbd3f215 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test @@ -0,0 +1,143 @@ +--source include/have_binlog_format_statement.inc +--let $rpl_topology=1->2 +--source include/rpl_init.inc + +--echo *** MDEV-6321: close_temporary_tables() in format description event not serialised correctly *** + +--connection server_2 +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=5; +CHANGE MASTER TO master_use_gtid= current_pos; +--source include/start_slave.inc + +--connection server_1 +CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(100) CHARACTER SET utf8); +--save_master_pos + +--connection server_2 +--sync_with_master +--source include/stop_slave.inc + + +--connection server_1 +SET gtid_domain_id= 1; +INSERT INTO t1 VALUES (1, 0); + +CREATE TEMPORARY TABLE t2 (a int); + +--connection default +SET gtid_domain_id= 2; +CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY); +CREATE TEMPORARY TABLE t4 (a int); +INSERT INTO t3 VALUES (100); +INSERT INTO t4 SELECT a+1 FROM t3; + +--connection server_1 +INSERT INTO t2 VALUES (2), (4), (6), (8), (10), (12), (14), (16), (18), (20); +INSERT INTO t2 VALUES (3), (6), (9), (12), (15), (18); +INSERT INTO t2 VALUES (4), (8), (12), (16), (20); + +--connection default +INSERT INTO t3 SELECT a+2 FROM t4; +INSERT INTO t4 SELECT a+4 FROM t3; + +--connection server_1 +INSERT INTO t2 VALUES (5), (10), (15), (20); +INSERT INTO t2 VALUES (6), (12), (18); +INSERT INTO t2 VALUES (7), (14); +INSERT INTO t2 VALUES (8), (16); +INSERT INTO t2 VALUES (9), (18); +INSERT INTO t2 VALUES (10), (20); + +--connection default +INSERT INTO t3 SELECT a+8 FROM t4; +INSERT INTO t4 SELECT a+16 FROM t3; + +--connection server_1 +INSERT INTO t2 VALUES (11); +INSERT INTO t2 VALUES (12); +INSERT INTO t2 VALUES (13); + +--connection default +INSERT INTO t3 SELECT a+32 FROM t4; + +--connection server_1 +INSERT INTO t2 VALUES (14); +INSERT INTO t2 VALUES (15); +INSERT INTO t2 VALUES (16); + +--connection default +INSERT INTO t4 SELECT a+64 FROM t3; + +--connection server_1 +INSERT INTO t2 VALUES (17); +INSERT INTO t2 VALUES (18); +INSERT INTO t2 VALUES (19); + +--connection default +INSERT INTO t3 SELECT a+128 FROM t4; + +--connection server_1 +INSERT INTO t2 VALUES (20); + +--connection default +INSERT INTO t1 SELECT a, a MOD 7 FROM t3; +INSERT INTO t1 SELECT a, a MOD 7 FROM t4; + +--connection server_1 +INSERT INTO t1 SELECT a, COUNT(*) FROM t2 GROUP BY a; + +# Crash the master server, so that temporary tables are implicitly dropped. +--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +wait +EOF + +FLUSH TABLES; +SET SESSION debug_dbug="+d,crash_dispatch_command_before"; +--error 2006,2013 +SELECT 1; + +--source include/wait_until_disconnected.inc +--connection default +--source include/wait_until_disconnected.inc + +--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +restart +EOF + +--connection default +--enable_reconnect +--source include/wait_until_connected_again.inc + +--connection server_1 +--enable_reconnect +--source include/wait_until_connected_again.inc + +INSERT INTO t1 VALUES (0, 1); +--save_master_pos + +--connection server_2 +# Start the slave replicating the events. +# The bug was that the format description event written after the crash could +# be fetched ahead of the execution of the temporary table events and executed +# out-of-band. This would cause drop of all temporary tables and thus failure +# for execution of remaining events. + +--source include/start_slave.inc +--sync_with_master + +SELECT * FROM t1 WHERE a <= 20 ORDER BY a; +SELECT COUNT(*) FROM t1 WHERE a BETWEEN 100+0 AND 100+256; +SHOW STATUS LIKE 'Slave_open_temp_tables'; + + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc + +--connection server_1 +DROP TABLE t1; + +--source include/rpl_end.inc diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index eeb66821809..9ba155bebb4 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1617,6 +1617,36 @@ rpl_parallel::workers_idle() } +void +rpl_parallel::wait_for_workers_idle(THD *thd) +{ + uint32 i, max_i; + + max_i= domain_hash.records; + for (i= 0; i < max_i; ++i) + { + bool active; + wait_for_commit my_orderer; + struct rpl_parallel_entry *e; + + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + if ((active= (e->current_sub_id > e->last_committed_sub_id))) + { + wait_for_commit *waitee= &e->current_group_info->commit_orderer; + my_orderer.register_wait_for_prior_commit(waitee); + thd->wait_for_commit_ptr= &my_orderer; + } + mysql_mutex_unlock(&e->LOCK_parallel_entry); + if (active) + { + my_orderer.wait_for_prior_commit(thd); + thd->wait_for_commit_ptr= NULL; + } + } +} + + /* This is used when we get an error during processing in do_event(); We will not queue any event to the thread, but we still need to wake it up @@ -1684,6 +1714,24 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); + if (typ == FORMAT_DESCRIPTION_EVENT) + { + Format_description_log_event *fdev= + static_cast(ev); + if (fdev->created) + { + /* + This format description event marks a new binlog after a master server + restart. We are going to close all temporary tables to clean up any + possible left-overs after a prior master crash. + + Thus we need to wait for all prior events to execute to completion, + in case they need access to any of the temporary tables. + */ + wait_for_workers_idle(rli->sql_driver_thd); + } + } + /* Stop queueing additional event groups once the SQL thread is requested to stop. diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index c7e15528e97..d59205cc72a 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -239,6 +239,7 @@ struct rpl_parallel { void wait_for_done(THD *thd, Relay_log_info *rli); void stop_during_until(); bool workers_idle(); + void wait_for_workers_idle(THD *thd); int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); };