diff --git a/mysql-test/suite/rpl/r/rpl_parallel_seq.result b/mysql-test/suite/rpl/r/rpl_parallel_seq.result index 02287d54e33..faeb93fcf68 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel_seq.result +++ b/mysql-test/suite/rpl/r/rpl_parallel_seq.result @@ -82,6 +82,30 @@ SELECT @@global.gtid_binlog_state, @@global.gtid_slave_pos as "all through 101 h @@global.gtid_binlog_state all through 101 have been committed 0-1-101 0-1-101 connection slave; +include/stop_slave.inc +set @saved_mode= @@global.slave_parallel_mode; +set @@global.slave_parallel_mode = conservative; +include/start_slave.inc +connection master; +INSERT INTO ti SET a=2; +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +lock table ti write; +SET GLOBAL debug_dbug= "+d,halt_past_mark_start_commit"; +connection master; +INSERT INTO ti SET a=35570; +ALTER SEQUENCE s2 restart with 1; +include/save_master_gtid.inc +connection slave; +unlock tables; +SET debug_sync = "now SIGNAL past_mark_continue"; +include/sync_with_master_gtid.inc +include/stop_slave.inc +SET @@global.slave_parallel_mode = @saved_mode; +SET @@global.debug_dbug = @@GLOBAL.debug_dbug; +include/start_slave.inc +connection slave; flush tables with read lock; connection master; CREATE OR REPLACE SEQUENCE s3 ENGINE=innodb; diff --git a/mysql-test/suite/rpl/t/rpl_parallel_seq.test b/mysql-test/suite/rpl/t/rpl_parallel_seq.test index 9522a976b89..17870eb3b86 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel_seq.test +++ b/mysql-test/suite/rpl/t/rpl_parallel_seq.test @@ -128,6 +128,53 @@ SET DEBUG_SYNC = 'now SIGNAL continue_worker'; SELECT @@global.gtid_binlog_state, @@global.gtid_slave_pos as "all through 101 have been committed"; +# +# MDEV-35570 parallel slave ALTER-SEQUNCE attemted to binlog out-of-order. +# Let two transactions I_1 -> AS_2 where AS_2 depends on a commit parent I_1. +# Under the bug condition AS_2 may complete its work including binlogging +# while I_1 is slowly executing Xid_log_event. +# The test simulate the slowness, AS_2 must defer its completion. +# +--connection slave +--source include/stop_slave.inc +set @saved_mode= @@global.slave_parallel_mode; +set @@global.slave_parallel_mode = conservative; +--source include/start_slave.inc + +--connection master +INSERT INTO ti SET a=2; +--source include/save_master_gtid.inc + +--connection slave +--source include/sync_with_master_gtid.inc +# allow to proceed to sync with the 1st following WFPT2SC wait condtion +lock table ti write; +# allow to proceed into commit to sync with the 2nd following WFPC wait condition +--let $saved_dbug= @@GLOBAL.debug_dbug +SET GLOBAL debug_dbug= "+d,halt_past_mark_start_commit"; + +--connection master +INSERT INTO ti SET a=35570; +ALTER SEQUENCE s2 restart with 1; +--source include/save_master_gtid.inc + +--connection slave +--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to start commit" +--source include/wait_condition.inc +# the 1st wait release +unlock tables; + +--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to commit" +--source include/wait_condition.inc +# the 2nd wait release +SET debug_sync = "now SIGNAL past_mark_continue"; + +--source include/sync_with_master_gtid.inc +--source include/stop_slave.inc +SET @@global.slave_parallel_mode = @saved_mode; +--eval SET @@global.debug_dbug = $saved_dbug +--source include/start_slave.inc + # MDEV-31792 Assertion in MDL_context::acquire_lock upon parallel replication of CREATE SEQUENCE --let $iter = 3 diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 70b80e8884f..3f1d9f47924 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1537,6 +1537,14 @@ handle_rpl_parallel_thread(void *arg) else rgi->mark_start_commit(); DEBUG_SYNC(thd, "rpl_parallel_after_mark_start_commit"); +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("halt_past_mark_start_commit", + { + DBUG_ASSERT(!debug_sync_set_action + (thd, STRING_WITH_LEN("now WAIT_FOR past_mark_continue"))); + DBUG_SET_INITIAL("-d,halt_past_mark_start_commit"); + };); +#endif } } diff --git a/sql/sql_sequence.cc b/sql/sql_sequence.cc index 273a22f9292..65d123b5eb7 100644 --- a/sql/sql_sequence.cc +++ b/sql/sql_sequence.cc @@ -960,119 +960,121 @@ bool Sql_cmd_alter_sequence::execute(THD *thd) SEQUENCE *seq; No_such_table_error_handler no_such_table_handler; DBUG_ENTER("Sql_cmd_alter_sequence::execute"); + { #if defined(HAVE_REPLICATION) - /* No wakeup():s of subsequent commits is allowed in this function. */ - wait_for_commit_raii suspend_wfc(thd); + /* No wakeup():s of subsequent commits is allowed in this function. */ + wait_for_commit_raii suspend_wfc(thd); #endif - if (check_access(thd, ALTER_ACL, first_table->db.str, - &first_table->grant.privilege, - &first_table->grant.m_internal, - 0, 0)) - DBUG_RETURN(TRUE); /* purecov: inspected */ + if (check_access(thd, ALTER_ACL, first_table->db.str, + &first_table->grant.privilege, + &first_table->grant.m_internal, + 0, 0)) + DBUG_RETURN(TRUE); /* purecov: inspected */ - if (check_grant(thd, ALTER_ACL, first_table, FALSE, 1, FALSE)) - DBUG_RETURN(TRUE); /* purecov: inspected */ + if (check_grant(thd, ALTER_ACL, first_table, FALSE, 1, FALSE)) + DBUG_RETURN(TRUE); /* purecov: inspected */ #ifdef WITH_WSREP - if (WSREP(thd) && wsrep_thd_is_local(thd)) - { - const bool used_engine= lex->create_info.used_fields & HA_CREATE_USED_ENGINE; - if (wsrep_check_sequence(thd, new_seq, used_engine)) - DBUG_RETURN(TRUE); - - if (wsrep_to_isolation_begin(thd, first_table->db.str, - first_table->table_name.str, - first_table)) + if (WSREP(thd) && wsrep_thd_is_local(thd)) { - DBUG_RETURN(TRUE); + const bool used_engine= lex->create_info.used_fields & HA_CREATE_USED_ENGINE; + if (wsrep_check_sequence(thd, new_seq, used_engine)) + DBUG_RETURN(TRUE); + + if (wsrep_to_isolation_begin(thd, first_table->db.str, + first_table->table_name.str, + first_table)) + { + DBUG_RETURN(TRUE); + } } - } #endif /* WITH_WSREP */ - if (if_exists()) - thd->push_internal_handler(&no_such_table_handler); - error= open_and_lock_tables(thd, first_table, FALSE, 0); - if (if_exists()) - { - trapped_errors= no_such_table_handler.safely_trapped_errors(); - thd->pop_internal_handler(); - } - if (unlikely(error)) - { - if (trapped_errors) + if (if_exists()) + thd->push_internal_handler(&no_such_table_handler); + error= open_and_lock_tables(thd, first_table, FALSE, 0); + if (if_exists()) { - StringBuffer tbl_name; - tbl_name.append(&first_table->db); - tbl_name.append('.'); - tbl_name.append(&first_table->table_name); - push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, - ER_UNKNOWN_SEQUENCES, - ER_THD(thd, ER_UNKNOWN_SEQUENCES), - tbl_name.c_ptr_safe()); - my_ok(thd); - DBUG_RETURN(FALSE); + trapped_errors= no_such_table_handler.safely_trapped_errors(); + thd->pop_internal_handler(); + } + if (unlikely(error)) + { + if (trapped_errors) + { + StringBuffer tbl_name; + tbl_name.append(&first_table->db); + tbl_name.append('.'); + tbl_name.append(&first_table->table_name); + push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, + ER_UNKNOWN_SEQUENCES, + ER_THD(thd, ER_UNKNOWN_SEQUENCES), + tbl_name.c_ptr_safe()); + my_ok(thd); + DBUG_RETURN(FALSE); + } + DBUG_RETURN(TRUE); } - DBUG_RETURN(TRUE); - } - table= first_table->table; - seq= table->s->sequence; + table= first_table->table; + seq= table->s->sequence; - seq->write_lock(table); - new_seq->reserved_until= seq->reserved_until; + seq->write_lock(table); + new_seq->reserved_until= seq->reserved_until; - /* Copy from old sequence those fields that the user didn't specified */ - if (!(new_seq->used_fields & seq_field_used_increment)) - new_seq->increment= seq->increment; - if (!(new_seq->used_fields & seq_field_used_min_value)) - new_seq->min_value= seq->min_value; - if (!(new_seq->used_fields & seq_field_used_max_value)) - new_seq->max_value= seq->max_value; - if (!(new_seq->used_fields & seq_field_used_start)) - new_seq->start= seq->start; - if (!(new_seq->used_fields & seq_field_used_cache)) - new_seq->cache= seq->cache; - if (!(new_seq->used_fields & seq_field_used_cycle)) - new_seq->cycle= seq->cycle; + /* Copy from old sequence those fields that the user didn't specified */ + if (!(new_seq->used_fields & seq_field_used_increment)) + new_seq->increment= seq->increment; + if (!(new_seq->used_fields & seq_field_used_min_value)) + new_seq->min_value= seq->min_value; + if (!(new_seq->used_fields & seq_field_used_max_value)) + new_seq->max_value= seq->max_value; + if (!(new_seq->used_fields & seq_field_used_start)) + new_seq->start= seq->start; + if (!(new_seq->used_fields & seq_field_used_cache)) + new_seq->cache= seq->cache; + if (!(new_seq->used_fields & seq_field_used_cycle)) + new_seq->cycle= seq->cycle; - /* If we should restart from a new value */ - if (new_seq->used_fields & seq_field_used_restart) - { - if (!(new_seq->used_fields & seq_field_used_restart_value)) - new_seq->restart= new_seq->start; - new_seq->reserved_until= new_seq->restart; - } + /* If we should restart from a new value */ + if (new_seq->used_fields & seq_field_used_restart) + { + if (!(new_seq->used_fields & seq_field_used_restart_value)) + new_seq->restart= new_seq->start; + new_seq->reserved_until= new_seq->restart; + } - /* Let check_and_adjust think all fields are used */ - new_seq->used_fields= ~0; - if (new_seq->check_and_adjust(0)) - { - my_error(ER_SEQUENCE_INVALID_DATA, MYF(0), - first_table->db.str, - first_table->table_name.str); - error= 1; + /* Let check_and_adjust think all fields are used */ + new_seq->used_fields= ~0; + if (new_seq->check_and_adjust(0)) + { + my_error(ER_SEQUENCE_INVALID_DATA, MYF(0), + first_table->db.str, + first_table->table_name.str); + error= 1; + seq->write_unlock(table); + goto end; + } + + if (likely(!(error= new_seq->write(table, 1)))) + { + /* Store the sequence values in table share */ + seq->copy(new_seq); + } + else + table->file->print_error(error, MYF(0)); seq->write_unlock(table); - goto end; + if (trans_commit_stmt(thd)) + error= 1; + if (trans_commit_implicit(thd)) + error= 1; + DBUG_EXECUTE_IF("hold_worker_on_schedule", + { + /* delay binlogging of a parent trx in rpl_parallel_seq */ + my_sleep(100000); + }); } - - if (likely(!(error= new_seq->write(table, 1)))) - { - /* Store the sequence values in table share */ - seq->copy(new_seq); - } - else - table->file->print_error(error, MYF(0)); - seq->write_unlock(table); - if (trans_commit_stmt(thd)) - error= 1; - if (trans_commit_implicit(thd)) - error= 1; - DBUG_EXECUTE_IF("hold_worker_on_schedule", - { - /* delay binlogging of a parent trx in rpl_parallel_seq */ - my_sleep(100000); - }); if (likely(!error)) error= write_bin_log(thd, 1, thd->query(), thd->query_length()); if (likely(!error))