diff --git a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result index 8aea6ccb1e8..61eba2cab2f 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result +++ b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result @@ -79,6 +79,43 @@ COUNT(*) SHOW STATUS LIKE 'Slave_open_temp_tables'; Variable_name Value Slave_open_temp_tables 0 +*** Test that if master logged partial event group before crash, we finish that group correctly before executing format description event *** +include/stop_slave.inc +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table, and writes to any of them"); +SET gtid_domain_id= 1; +DELETE FROM t1; +ALTER TABLE t1 ENGINE=InnoDB; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY); +INSERT INTO t2 VALUES (1); +INSERT INTO t2 VALUES (2); +SET gtid_domain_id= 2; +CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY); +INSERT INTO t3 VALUES (10); +INSERT INTO t3 VALUES (20); +INSERT INTO t1 SELECT a, 'server_1' FROM t2; +INSERT INTO t1 SELECT a, 'default' FROM t3; +INSERT INTO t1 SELECT a+2, '+server_1' FROM t2; +FLUSH TABLES; +SET SESSION debug_dbug="+d,crash_before_writing_xid"; +INSERT INTO t1 SELECT a+4, '++server_1' FROM t2; +Got one of the listed errors +INSERT INTO t1 VALUES (0, 1); +include/save_master_gtid.inc +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a b +0 1 +1 server_1 +2 server_1 +3 +server_1 +4 +server_1 +10 default +20 default +SHOW STATUS LIKE 'Slave_open_temp_tables'; +Variable_name Value +Slave_open_temp_tables 0 +FLUSH LOGS; include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test index c50bbd3f215..1504dad224e 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test +++ b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test @@ -1,3 +1,4 @@ +--source include/have_innodb.inc --source include/have_binlog_format_statement.inc --let $rpl_topology=1->2 --source include/rpl_init.inc @@ -132,6 +133,83 @@ SELECT COUNT(*) FROM t1 WHERE a BETWEEN 100+0 AND 100+256; SHOW STATUS LIKE 'Slave_open_temp_tables'; +--echo *** Test that if master logged partial event group before crash, we finish that group correctly before executing format description event *** + +--source include/stop_slave.inc + +--connection server_1 +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table, and writes to any of them"); +SET gtid_domain_id= 1; +DELETE FROM t1; +ALTER TABLE t1 ENGINE=InnoDB; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY); +INSERT INTO t2 VALUES (1); +INSERT INTO t2 VALUES (2); + +--connection default +SET gtid_domain_id= 2; +CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY); +INSERT INTO t3 VALUES (10); +INSERT INTO t3 VALUES (20); + +--connection server_1 +INSERT INTO t1 SELECT a, 'server_1' FROM t2; + +--connection default +INSERT INTO t1 SELECT a, 'default' FROM t3; + +--connection server_1 +INSERT INTO t1 SELECT a+2, '+server_1' FROM t2; + +# Crash the master server in the middle of writing an event group. +--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +wait +EOF + +FLUSH TABLES; +SET SESSION debug_dbug="+d,crash_before_writing_xid"; +--error 2006,2013 +INSERT INTO t1 SELECT a+4, '++server_1' FROM t2; + +--source include/wait_until_disconnected.inc +--connection default +--source include/wait_until_disconnected.inc + +--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +restart +EOF + +--connection default +--enable_reconnect +--source include/wait_until_connected_again.inc + +--connection server_1 +--enable_reconnect +--source include/wait_until_connected_again.inc + +INSERT INTO t1 VALUES (0, 1); +#--save_master_pos +--source include/save_master_gtid.inc + +--connection server_2 +# Start the slave replicating the events. +# The main thing to test here is that the slave will know that it +# needs to abort the partially received event group, so that the +# execution of format_description event will not wait infinitely +# for a commit of the incomplete group that never happens. + +--source include/start_slave.inc +#--sync_with_master +--source include/sync_with_master_gtid.inc + +SELECT * FROM t1 ORDER BY a; +SHOW STATUS LIKE 'Slave_open_temp_tables'; + +--connection server_1 +# This FLUSH can be removed once MDEV-6608 is fixed. +FLUSH LOGS; + + --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 9ba155bebb4..21234d7fd38 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -587,6 +587,30 @@ handle_rpl_parallel_thread(void *arg) events= next; continue; } + else if (events->typ == + rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART) + { + if (in_event_group) + { + /* + Master restarted (crashed) in the middle of an event group. + So we need to roll back and discard that event group. + */ + group_rgi->cleanup_context(thd, 1); + in_event_group= false; + finish_event_group(thd, group_rgi->gtid_sub_id, + events->entry_for_queued, group_rgi); + + group_rgi->next= rgis_to_free; + rgis_to_free= group_rgi; + thd->rgi_slave= group_rgi= NULL; + } + + events->next= qevs_to_free; + qevs_to_free= events; + events= next; + continue; + } DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); thd->rgi_slave= group_rgi= rgi; @@ -1617,6 +1641,54 @@ rpl_parallel::workers_idle() } +int +rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, + Format_description_log_event *fdev) +{ + uint32 idx; + rpl_parallel_thread *thr; + rpl_parallel_thread::queued_event *qev; + Relay_log_info *rli= rgi->rli; + + /* + We only need to queue the server restart if we still have a thread working + on a (potentially partial) event group. + + If the last thread we queued for has finished, then it cannot have any + partial event group that needs aborting. + + Thus there is no need for the full complexity of choose_thread(). We only + need to check if we have a current worker thread, and queue for it if so. + */ + idx= rpl_thread_idx; + thr= rpl_threads[idx]; + if (!thr) + return 0; + mysql_mutex_lock(&thr->LOCK_rpl_thread); + if (thr->current_owner != &rpl_threads[idx]) + { + /* No active worker thread, so no need to queue the master restart. */ + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 0; + } + + if (!(qev= thr->get_qev(fdev, 0, rli))) + { + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 1; + } + + qev->rgi= rgi; + qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART; + qev->entry_for_queued= this; + qev->ir= rli->last_inuse_relaylog; + ++qev->ir->queued_count; + thr->enqueue(qev); + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 0; +} + + void rpl_parallel::wait_for_workers_idle(THD *thd) { @@ -1727,7 +1799,16 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Thus we need to wait for all prior events to execute to completion, in case they need access to any of the temporary tables. + + We also need to notify the worker thread running the prior incomplete + event group (if any), as such event group signifies an incompletely + written group cut short by a master crash, and must be rolled back. */ + if (current->queue_master_restart(serial_rgi, fdev)) + { + delete ev; + return 1; + } wait_for_workers_idle(rli->sql_driver_thd); } } diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index d59205cc72a..befe08b3d9b 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -76,10 +76,15 @@ struct rpl_parallel_thread { queued_event can hold either an event to be executed, or just a binlog position to be updated without any associated event. */ - enum queued_event_t { QUEUED_EVENT, QUEUED_POS_UPDATE } typ; + enum queued_event_t { + QUEUED_EVENT, + QUEUED_POS_UPDATE, + QUEUED_MASTER_RESTART + } typ; union { Log_event *ev; /* QUEUED_EVENT */ - rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE */ + rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE and + QUEUED_MASTER_RESTART */ }; rpl_group_info *rgi; inuse_relaylog *ir; @@ -224,8 +229,8 @@ struct rpl_parallel_entry { rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, bool reuse); - group_commit_orderer *get_gco(); - void free_gco(group_commit_orderer *gco); + int queue_master_restart(rpl_group_info *rgi, + Format_description_log_event *fdev); }; struct rpl_parallel { HASH domain_hash;