From 75dc2671011ba53e4f4531752c213ced7f9012ff Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Thu, 22 Oct 2015 10:28:51 +0200 Subject: [PATCH 1/3] Change Seconds_behind_master to be updated only at commit in parallel replication Before, the Seconds_behind_master was updated already when an event was queued for a worker thread to execute later. This might lead users to interpret a low value as the slave being almost up to date with the master, while in reality there might still be lots and lots of events still queued up waiting to be applied by the slave. See https://lists.launchpad.net/maria-developers/msg08958.html for more detailed discussions. --- sql/rpl_parallel.cc | 3 +++ sql/rpl_rli.cc | 13 +++++++++++++ sql/rpl_rli.h | 7 +++++++ sql/slave.cc | 7 ++++++- 4 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index cc5da77303c..920442cb76d 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -44,6 +44,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); + if (!(ev->is_artificial_event() || ev->is_relay_log_event() || + (ev->when == 0))) + rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time; mysql_mutex_lock(&rli->data_lock); /* Mutex will be released in apply_event_and_update_pos(). */ err= apply_event_and_update_pos(ev, thd, rgi, rpt); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 8a2a55fcde0..3b2fea93c3b 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1001,6 +1001,18 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, else if (group_master_log_pos < log_pos) group_master_log_pos= log_pos; } + + /* + In the parallel case, we only update the Seconds_Behind_Master at the + end of a transaction. In the non-parallel case, the value is updated as + soon as an event is read from the relay log; however this would be too + confusing for the user, seeing the slave reported as up-to-date when + potentially thousands of events are still queued up for worker threads + waiting for execution. + */ + if (rgi->last_master_timestamp && + rgi->last_master_timestamp > last_master_timestamp) + last_master_timestamp= rgi->last_master_timestamp; } else { @@ -1630,6 +1642,7 @@ rpl_group_info::reinit(Relay_log_info *rli) row_stmt_start_timestamp= 0; long_find_row_note_printed= false; did_mark_start_commit= false; + last_master_timestamp = 0; gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; commit_orderer.reinit(); } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 2d92f384ef3..1e8bb66ffbe 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -668,6 +668,13 @@ struct rpl_group_info /* Needs room for "Gtid D-S-N\x00". */ char gtid_info_buf[5+10+1+10+1+20+1]; + /* + The timestamp, from the master, of the commit event. + Used to do delayed update of rli->last_master_timestamp, for getting + reasonable values out of Seconds_Behind_Master in SHOW SLAVE STATUS. + */ + time_t last_master_timestamp; + /* Information to be able to re-try an event group in case of a deadlock or other temporary error. diff --git a/sql/slave.cc b/sql/slave.cc index 5af48b6a793..32f384d4e22 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3500,8 +3500,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, If it is an artificial event, or a relay log event (IO thread generated event) or ev->when is set to 0, we don't update the last_master_timestamp. + + In parallel replication, we might queue a large number of events, and + the user might be surprised to see a claim that the slave is up to date + long before those queued events are actually executed. */ - if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) + if (opt_slave_parallel_threads == 0 && + !(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) { rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; DBUG_ASSERT(rli->last_master_timestamp >= 0); From 6d96fab7dd9ce6f384eebc9b73a9128a0b0ca1aa Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Thu, 28 May 2015 12:32:19 +0200 Subject: [PATCH 2/3] MDEV-7818: Deadlock occurring with parallel replication and FTWRL Preparation patch, moving the GCO wait into a separate function, in preparation for adding a separate wait phase for FLUSH TABLES WITH READ LOCK. --- sql/rpl_parallel.cc | 138 ++++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 62 deletions(-) diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 920442cb76d..965480c57a3 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -275,6 +275,74 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi, } +/* + Do not start parallel execution of this event group until all prior groups + have reached the commit phase that are not safe to run in parallel with. +*/ +static bool +do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 wait_count; + + if (!gco->installed) + { + group_commit_orderer *prev_gco= gco->prev_gco; + if (prev_gco) + { + prev_gco->last_sub_id= gco->prior_sub_id; + prev_gco->next_gco= gco; + } + gco->installed= true; + } + wait_count= gco->wait_count; + if (wait_count > entry->count_committing_event_groups) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); + thd->ENTER_COND(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry, + &stage_waiting_for_prior_transaction_to_start_commit, + old_stage); + *did_enter_cond= true; + do + { + if (thd->check_killed() && !rgi->worker_error) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + /* + Even though we were killed, we need to continue waiting for the + prior event groups to signal that we can continue. Otherwise we + mess up the accounting for ordering. However, now that we have + marked the error, events will just be skipped rather than + executed, and things will progress quickly towards stop. + */ + } + mysql_cond_wait(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry); + } while (wait_count > entry->count_committing_event_groups); + } + + if (entry->force_abort && wait_count > entry->stop_count) + { + /* + We are stopping (STOP SLAVE), and this event group is beyond the point + where we can safely stop. So return a flag that will cause us to skip, + rather than execute, the following events. + */ + return true; + } + else + return false; +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -768,7 +836,6 @@ handle_rpl_parallel_thread(void *arg) { bool did_enter_cond= false; PSI_stage_info old_stage; - uint64 wait_count; DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", { if (rgi->current_gtid.domain_id == 0 && @@ -806,72 +873,19 @@ handle_rpl_parallel_thread(void *arg) event_gtid_sub_id= rgi->gtid_sub_id; rgi->thd= thd; + mysql_mutex_lock(&entry->LOCK_parallel_entry); + skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage); + + if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) + skip_event_group= true; + if (likely(!skip_event_group)) + do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); + /* Register ourself to wait for the previous commit, if we need to do such registration _and_ that previous commit has not already occured. - - Also do not start parallel execution of this event group until all - prior groups have reached the commit phase that are not safe to run - in parallel with. */ - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (!gco->installed) - { - group_commit_orderer *prev_gco= gco->prev_gco; - if (prev_gco) - { - prev_gco->last_sub_id= gco->prior_sub_id; - prev_gco->next_gco= gco; - } - gco->installed= true; - } - wait_count= gco->wait_count; - if (wait_count > entry->count_committing_event_groups) - { - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); - thd->ENTER_COND(&gco->COND_group_commit_orderer, - &entry->LOCK_parallel_entry, - &stage_waiting_for_prior_transaction_to_start_commit, - &old_stage); - did_enter_cond= true; - do - { - if (thd->check_killed() && !rgi->worker_error) - { - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); - thd->clear_error(); - thd->get_stmt_da()->reset_diagnostics_area(); - thd->send_kill_message(); - slave_output_error_info(rgi, thd); - signal_error_to_sql_driver_thread(thd, rgi, 1); - /* - Even though we were killed, we need to continue waiting for the - prior event groups to signal that we can continue. Otherwise we - mess up the accounting for ordering. However, now that we have - marked the error, events will just be skipped rather than - executed, and things will progress quickly towards stop. - */ - } - mysql_cond_wait(&gco->COND_group_commit_orderer, - &entry->LOCK_parallel_entry); - } while (wait_count > entry->count_committing_event_groups); - } - - if (entry->force_abort && wait_count > entry->stop_count) - { - /* - We are stopping (STOP SLAVE), and this event group is beyond the - point where we can safely stop. So set a flag that will cause us - to skip, rather than execute, the following events. - */ - skip_event_group= true; - } - else - skip_event_group= false; - - if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) - skip_event_group= true; register_wait_for_prior_event_group_commit(rgi, entry); unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, From ba02550166eb39c0375a6422ecaa4731421250b6 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Thu, 22 Oct 2015 11:18:34 +0200 Subject: [PATCH 3/3] MDEV-7818: Deadlock occurring with parallel replication and FTWRL Problem is that FLUSH TABLES WITH READ LOCK first blocks threads from starting new commits, then waits for running commits to complete. But in-order parallel replication needs commits to happen in a particular order, so this can easily deadlock. To fix this problem, this patch introduces a way to temporarily pause the parallel replication worker threads. Before starting FTWRL, we let all worker threads complete in-progress transactions, and then wait. Then we proceed to take the global read lock. Once the lock is obtained, we unpause the worker threads. Now commits are blocked from starting by the global read lock, so the deadlock will no longer occur. --- .../perfschema/r/stage_mdl_global.result | 1 + mysql-test/suite/rpl/r/rpl_parallel2.result | 92 +++++- mysql-test/suite/rpl/t/rpl_parallel2.test | 137 ++++++++- sql/mysqld.cc | 3 + sql/mysqld.h | 3 + sql/rpl_parallel.cc | 275 +++++++++++++++++- sql/rpl_parallel.h | 38 ++- sql/sql_parse.cc | 13 + 8 files changed, 537 insertions(+), 25 deletions(-) diff --git a/mysql-test/suite/perfschema/r/stage_mdl_global.result b/mysql-test/suite/perfschema/r/stage_mdl_global.result index 1a6f51a4acc..fdcaafa6201 100644 --- a/mysql-test/suite/perfschema/r/stage_mdl_global.result +++ b/mysql-test/suite/perfschema/r/stage_mdl_global.result @@ -6,6 +6,7 @@ user1 statement/sql/flush flush tables with read lock username event_name nesting_event_type username event_name nesting_event_type user1 stage/sql/init STATEMENT +user1 stage/sql/init STATEMENT user1 stage/sql/query end STATEMENT user1 stage/sql/closing tables STATEMENT user1 stage/sql/freeing items STATEMENT diff --git a/mysql-test/suite/rpl/r/rpl_parallel2.result b/mysql-test/suite/rpl/r/rpl_parallel2.result index de90bcd158f..2ca73738d84 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel2.result +++ b/mysql-test/suite/rpl/r/rpl_parallel2.result @@ -29,8 +29,98 @@ include/start_slave.inc SELECT * FROM t1 WHERE a >= 10 ORDER BY a; a b 10 0 +*** MDEV-7818: Deadlock occurring with parallel replication and FTWRL *** +CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t2 VALUES (1,0), (2,0), (3,0); +include/stop_slave.inc +SET @old_dbug= @@SESSION.debug_dbug; +SET @commit_id= 4242; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; +BEGIN; +UPDATE t2 SET b=b+1 WHERE a=2; +COMMIT; +BEGIN; +INSERT INTO t2 VALUES (4,10); +COMMIT; +SET SESSION debug_dbug= @old_dbug; +INSERT INTO t2 VALUES (5,0); +INSERT INTO t2 VALUES (6,0); +INSERT INTO t2 VALUES (7,0); +INSERT INTO t2 VALUES (8,0); +INSERT INTO t2 VALUES (9,0); +INSERT INTO t2 VALUES (10,0); +INSERT INTO t2 VALUES (11,0); +INSERT INTO t2 VALUES (12,0); +INSERT INTO t2 VALUES (13,0); +INSERT INTO t2 VALUES (14,0); +INSERT INTO t2 VALUES (15,0); +INSERT INTO t2 VALUES (16,0); +INSERT INTO t2 VALUES (17,0); +INSERT INTO t2 VALUES (18,0); +INSERT INTO t2 VALUES (19,0); +BEGIN; +SELECT * FROM t2 WHERE a=2 FOR UPDATE; +a b +2 0 +include/start_slave.inc +FLUSH TABLES WITH READ LOCK; +COMMIT; +STOP SLAVE; +SELECT * FROM t2 ORDER BY a; +a b +1 0 +2 1 +3 0 +4 10 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +UNLOCK TABLES; +include/wait_for_slave_to_stop.inc +include/start_slave.inc +SELECT * FROM t2 ORDER BY a; +a b +1 0 +2 1 +3 0 +4 10 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +*** MDEV-8318: Assertion `!pool->busy' failed in pool_mark_busy(rpl_parallel_thread_pool*) on concurrent FTWRL *** +LOCK TABLE t2 WRITE; +FLUSH TABLES WITH READ LOCK; +FLUSH TABLES WITH READ LOCK; +KILL QUERY CID; +ERROR 70100: Query execution was interrupted +UNLOCK TABLES; +UNLOCK TABLES; include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc -DROP TABLE t1; +DROP TABLE t1, t2; include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel2.test b/mysql-test/suite/rpl/t/rpl_parallel2.test index 47b0e87a6b6..50617c63024 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel2.test +++ b/mysql-test/suite/rpl/t/rpl_parallel2.test @@ -1,3 +1,5 @@ +--source include/have_debug.inc +--source include/have_innodb.inc --source include/have_binlog_format_statement.inc --let $rpl_topology=1->2 --source include/rpl_init.inc @@ -78,13 +80,144 @@ SET GLOBAL sql_slave_skip_counter= 1; SELECT * FROM t1 WHERE a >= 10 ORDER BY a; -# Clean up +--echo *** MDEV-7818: Deadlock occurring with parallel replication and FTWRL *** + +--connection server_1 +CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t2 VALUES (1,0), (2,0), (3,0); +--save_master_pos + +--connection server_2 +--sync_with_master +--source include/stop_slave.inc + +--connection server_1 +# Create a group commit with two transactions, will be used to provoke the +# problematic thread interaction with FTWRL on the slave. +SET @old_dbug= @@SESSION.debug_dbug; +SET @commit_id= 4242; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; + +BEGIN; +UPDATE t2 SET b=b+1 WHERE a=2; +COMMIT; + +BEGIN; +INSERT INTO t2 VALUES (4,10); +COMMIT; + +SET SESSION debug_dbug= @old_dbug; + +INSERT INTO t2 VALUES (5,0); +INSERT INTO t2 VALUES (6,0); +INSERT INTO t2 VALUES (7,0); +INSERT INTO t2 VALUES (8,0); +INSERT INTO t2 VALUES (9,0); +INSERT INTO t2 VALUES (10,0); +INSERT INTO t2 VALUES (11,0); +INSERT INTO t2 VALUES (12,0); +INSERT INTO t2 VALUES (13,0); +INSERT INTO t2 VALUES (14,0); +INSERT INTO t2 VALUES (15,0); +INSERT INTO t2 VALUES (16,0); +INSERT INTO t2 VALUES (17,0); +INSERT INTO t2 VALUES (18,0); +INSERT INTO t2 VALUES (19,0); +--save_master_pos + +--connection server_2 + +--connect (s1, 127.0.0.1, root,, test, $SLAVE_MYPORT,) +# Block one transaction on a row lock. +BEGIN; +SELECT * FROM t2 WHERE a=2 FOR UPDATE; + +--connection server_2 + +# Wait for slave thread of the other transaction to have the commit lock. +--source include/start_slave.inc +--let $wait_condition= SELECT COUNT(*) > 0 FROM information_schema.processlist WHERE state = "Waiting for prior transaction to commit" +--source include/wait_condition.inc + +--connect (s2, 127.0.0.1, root,, test, $SLAVE_MYPORT,) +send FLUSH TABLES WITH READ LOCK; +# The bug was that at this point we were deadlocked. +# The FTWRL command would wait forever for T2 to commit. +# T2 would wait for T1 to commit first, but T1 is waiting for +# the global read lock to be released. + +--connection s1 +# Release the lock that blocs T1 from replicating. +COMMIT; + +--connection s1 +send STOP SLAVE; + +--connection s2 +reap; + +--connection server_1 +SELECT * FROM t2 ORDER BY a; + +--connection s2 +UNLOCK TABLES; + +--connection s1 +reap; + +--connection server_2 +--source include/wait_for_slave_to_stop.inc +--source include/start_slave.inc +--sync_with_master + +SELECT * FROM t2 ORDER BY a; + + + +--echo *** MDEV-8318: Assertion `!pool->busy' failed in pool_mark_busy(rpl_parallel_thread_pool*) on concurrent FTWRL *** + +--connection server_1 +LOCK TABLE t2 WRITE; + + +--connect (m1,localhost,root,,test) +--connection m1 +--let $cid=`SELECT CONNECTION_ID()` +send FLUSH TABLES WITH READ LOCK; + +--connect (m2,localhost,root,,test) +# We cannot force the race with DEBUG_SYNC, because the race does not +# exist after fixing the bug. At best we could force a debug sync to +# time out, which is effectively just a sleep. +# So just put a small sleep here; it is enough to trigger the bug in +# most run before the bug fix, and the code should work correctly +# however the thread scheduling happens. +--sleep 0.1 +send FLUSH TABLES WITH READ LOCK; + +--connection server_1 +--replace_result $cid CID +eval KILL QUERY $cid; + +--connection m1 +--error ER_QUERY_INTERRUPTED +reap; + +--connection server_1 +UNLOCK TABLES; + +--connection m2 +reap; +UNLOCK TABLES; + + +# Clean up. --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; --source include/start_slave.inc --connection server_1 -DROP TABLE t1; +DROP TABLE t1, t2; --source include/rpl_end.inc diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 204bf75909f..31d6348e679 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9525,6 +9525,9 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0}; PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0}; PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0}; +PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read lock", 0}; +PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for worker threads to pause for global read lock", 0}; +PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replication worker thread pool is busy", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0}; diff --git a/sql/mysqld.h b/sql/mysqld.h index 156e7f99bb5..8c953c1dd44 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -454,6 +454,9 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit; extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit; extern PSI_stage_info stage_waiting_for_room_in_worker_thread; extern PSI_stage_info stage_waiting_for_workers_idle; +extern PSI_stage_info stage_waiting_for_ftwrl; +extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause; +extern PSI_stage_info stage_waiting_for_rpl_thread_pool; extern PSI_stage_info stage_master_gtid_wait_primary; extern PSI_stage_info stage_master_gtid_wait; extern PSI_stage_info stage_gtid_wait_other_connection; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 965480c57a3..0f8c69f6a68 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -287,6 +287,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_count; + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + if (!gco->installed) { group_commit_orderer *prev_gco= gco->prev_gco; @@ -343,6 +345,214 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, } +static void +do_ftwrl_wait(rpl_group_info *rgi, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 sub_id= rgi->gtid_sub_id; + + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + + /* + If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this + transaction is later than transactions that have priority to complete + before FTWRL. If so, wait here so that FTWRL can proceed and complete + first. + + (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes + this test false as required). + */ + if (unlikely(sub_id > entry->pause_sub_id)) + { + thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, + &stage_waiting_for_ftwrl, old_stage); + *did_enter_cond= true; + do + { + if (entry->force_abort || rgi->worker_error) + break; + if (thd->check_killed()) + { + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + break; + } + mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); + } while (sub_id > entry->pause_sub_id); + + /* + We do not call EXIT_COND() here, as this will be done later by our + caller (since we set *did_enter_cond to true). + */ + } + + if (sub_id > entry->largest_started_sub_id) + entry->largest_started_sub_id= sub_id; +} + + +static int +pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd) +{ + PSI_stage_info old_stage; + int res= 0; + + /* + Wait here while the queue is busy. This is done to make FLUSH TABLES WITH + READ LOCK work correctly, without incuring extra locking penalties in + normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the + thread pool, and for this we need to make sure the pool will not go away + during the operation. The LOCK_rpl_thread_pool is not suitable for + this. It is taken by release_thread() while holding LOCK_rpl_thread; so it + must be released before locking any LOCK_rpl_thread lock, or a deadlock + can occur. + + So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and + pool size changes with this condition wait. + */ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + if (thd) + thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, + &stage_waiting_for_rpl_thread_pool, &old_stage); + while (pool->busy) + { + if (thd && thd->check_killed()) + { + thd->send_kill_message(); + res= 1; + break; + } + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + } + if (!res) + pool->busy= true; + if (thd) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + + return res; +} + + +static void +pool_mark_not_busy(rpl_parallel_thread_pool *pool) +{ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + DBUG_ASSERT(pool->busy); + pool->busy= false; + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); +} + + +void +rpl_unpause_after_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + + DBUG_ASSERT(pool->busy); + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + rpt->pause_for_ftwrl = false; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + e->pause_sub_id= (uint64)ULONGLONG_MAX; + mysql_cond_broadcast(&e->COND_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } + + pool_mark_not_busy(pool); +} + + +/* + . + + Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called. +*/ +int +rpl_pause_for_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + int err; + + /* + While the count_pending_pause_for_ftwrl counter is non-zero, the pool + cannot be shutdown/resized, so threads are guaranteed to not disappear. + + This is required to safely be able to access the individual threads below. + (We cannot lock an individual thread while holding LOCK_rpl_thread_pool, + as this can deadlock against release_thread()). + */ + if ((err= pool_mark_busy(pool, thd))) + return err; + + for (i= 0; i < pool->count; ++i) + { + PSI_stage_info old_stage; + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + /* + Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not + de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl(). + */ + rpt->pause_for_ftwrl = true; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + ++e->need_sub_id_signal; + if (e->pause_sub_id == (uint64)ULONGLONG_MAX) + e->pause_sub_id= e->largest_started_sub_id; + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, + &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); + while (e->pause_sub_id < (uint64)ULONGLONG_MAX && + e->last_committed_sub_id < e->pause_sub_id && + !err) + { + if (thd->check_killed()) + { + thd->send_kill_message(); + err= 1; + break; + } + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + }; + --e->need_sub_id_signal; + thd->EXIT_COND(&old_stage); + if (err) + break; + } + + if (err) + rpl_unpause_after_ftwrl(thd); + return err; +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -1037,17 +1247,40 @@ handle_rpl_parallel_thread(void *arg) */ rpt->batch_free(); - if ((events= rpt->event_queue) != NULL) + for (;;) { + if ((events= rpt->event_queue) != NULL) + { + /* + Take next group of events from the replication pool. + This is faster than having to wakeup the pool manager thread to give + us a new event. + */ + rpt->dequeue1(events); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + goto more_events; + } + if (!rpt->pause_for_ftwrl || + (in_event_group && !group_rgi->parallel_entry->force_abort)) + break; /* - Take next group of events from the replication pool. - This is faster than having to wakeup the pool manager thread to give us - a new event. + We are currently in the delicate process of pausing parallel + replication while FLUSH TABLES WITH READ LOCK is starting. We must + not de-allocate the thread (setting rpt->current_owner= NULL) until + rpl_unpause_after_ftwrl() has woken us up. */ - rpt->dequeue1(events); + mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - goto more_events; + mysql_cond_wait(&rpt->current_entry->COND_parallel_entry, + &rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + /* + Now loop to check again for more events available, since we released + and re-aquired the LOCK_rpl_thread mutex. + */ } + rpt->inuse_relaylog_refcount_update(); if (in_event_group && group_rgi->parallel_entry->force_abort) @@ -1124,6 +1357,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, rpl_parallel_thread **new_list= NULL; rpl_parallel_thread *new_free_list= NULL; rpl_parallel_thread *rpt_array= NULL; + int res; + + if ((res= pool_mark_busy(pool, current_thd))) + return res; /* Allocate the new list of threads up-front. @@ -1172,7 +1409,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, */ for (i= 0; i < pool->count; ++i) { - rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); + rpl_parallel_thread *rpt; + + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + while ((rpt= pool->free_list) == NULL) + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + pool->free_list= rpt->next; + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->stop= true; mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -1222,9 +1466,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); } - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - mysql_cond_broadcast(&pool->COND_rpl_thread_pool); - mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + pool_mark_not_busy(pool); return 0; @@ -1248,6 +1490,7 @@ err: } my_free(new_list); } + pool_mark_not_busy(pool); return 1; } @@ -1511,7 +1754,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : count(0), threads(0), free_list(0), inited(false) + : threads(0), free_list(0), count(0), inited(false), busy(false) { } @@ -1519,9 +1762,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool() int rpl_parallel_thread_pool::init(uint32 size) { - count= 0; threads= NULL; free_list= NULL; + count= 0; + busy= false; mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, MY_MUTEX_INIT_SLOW); @@ -1562,7 +1806,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, rpl_parallel_thread *rpt; mysql_mutex_lock(&LOCK_rpl_thread_pool); - while ((rpt= free_list) == NULL) + while (unlikely(busy) || !(rpt= free_list)) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); free_list= rpt->next; mysql_mutex_unlock(&LOCK_rpl_thread_pool); @@ -1773,6 +2017,7 @@ rpl_parallel::find(uint32 domain_id) e->rpl_thread_max= count; e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; + e->pause_sub_id= (uint64)ULONGLONG_MAX; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -1974,7 +2219,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); - e->need_sub_id_signal= true; + ++e->need_sub_id_signal; thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, &stage_waiting_for_workers_idle, &old_stage); while (e->current_sub_id > e->last_committed_sub_id) @@ -1987,7 +2232,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) } mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); } - e->need_sub_id_signal= false; + --e->need_sub_id_signal; thd->EXIT_COND(&old_stage); if (err) return err; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 0c2e4270646..262bd86702f 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -70,6 +70,7 @@ struct rpl_parallel_thread { bool delay_start; bool running; bool stop; + bool pause_for_ftwrl; mysql_mutex_t LOCK_rpl_thread; mysql_cond_t COND_rpl_thread; mysql_cond_t COND_rpl_thread_queue; @@ -199,12 +200,18 @@ struct rpl_parallel_thread { struct rpl_parallel_thread_pool { - uint32 count; struct rpl_parallel_thread **threads; struct rpl_parallel_thread *free_list; mysql_mutex_t LOCK_rpl_thread_pool; mysql_cond_t COND_rpl_thread_pool; + uint32 count; bool inited; + /* + While FTWRL runs, this counter is incremented to make SQL thread or + STOP/START slave not try to start new activity while that operation + is in progress. + */ + bool busy; rpl_parallel_thread_pool(); int init(uint32 size); @@ -219,6 +226,12 @@ struct rpl_parallel_entry { mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; uint32 domain_id; + /* + Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show + that they are waiting, so that finish_event_group knows to signal them + when last_committed_sub_id is increased. + */ + uint32 need_sub_id_signal; uint64 last_commit_id; bool active; /* @@ -227,12 +240,6 @@ struct rpl_parallel_entry { waiting for event groups to complete. */ bool force_abort; - /* - Set in wait_for_workers_idle() to show that it is waiting, so that - finish_event_group knows to signal it when last_committed_sub_id is - increased. - */ - bool need_sub_id_signal; /* At STOP SLAVE (force_abort=true), we do not want to process all events in the queue (which could unnecessarily delay stop, if a lot of events happen @@ -273,6 +280,15 @@ struct rpl_parallel_entry { queued for execution by a worker thread. */ uint64 current_sub_id; + /* + The largest sub_id that has started its transaction. Protected by + LOCK_parallel_entry. + + (Transactions can start out-of-order, so this value signifies that no + transactions with larger sub_id have started, but not necessarily that all + transactions with smaller sub_id have started). + */ + uint64 largest_started_sub_id; rpl_group_info *current_group_info; /* If we get an error in some event group, we set the sub_id of that event @@ -282,6 +298,12 @@ struct rpl_parallel_entry { The value is ULONGLONG_MAX when no error occured. */ uint64 stop_on_error_sub_id; + /* + During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than + this value must not start, but wait until the global read lock is released. + The value is set to ULONGLONG_MAX when no FTWRL is pending. + */ + uint64 pause_sub_id; /* Total count of event groups queued so far. */ uint64 count_queued_event_groups; /* @@ -322,5 +344,7 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool; extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); +extern int rpl_pause_for_ftwrl(THD *thd); +extern void rpl_unpause_after_ftwrl(THD *thd); #endif /* RPL_PARALLEL_H */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index a665f0314e8..848689f313a 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4284,6 +4284,17 @@ end_with_restore_list: break; } + if (lex->type & REFRESH_READ_LOCK) + { + /* + We need to pause any parallel replication slave workers during FLUSH + TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as + worker threads eun run in arbitrary order but need to commit in a + specific given order. + */ + if (rpl_pause_for_ftwrl(thd)) + goto error; + } /* reload_acl_and_cache() will tell us if we are allowed to write to the binlog or not. @@ -4314,6 +4325,8 @@ end_with_restore_list: if (!res) my_ok(thd); } + if (lex->type & REFRESH_READ_LOCK) + rpl_unpause_after_ftwrl(thd); break; }