diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index ecb9c133b07..7f6e3bc12a7 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -3069,13 +3069,21 @@ rpl_parallel::stop_during_until() } -bool -rpl_parallel::workers_idle(Relay_log_info *rli) +bool Relay_log_info::are_sql_threads_caught_up() { - mysql_mutex_assert_owner(&rli->data_lock); - return !rli->last_inuse_relaylog || - rli->last_inuse_relaylog->queued_count == - rli->last_inuse_relaylog->dequeued_count; + mysql_mutex_assert_owner(&data_lock); + if (!sql_thread_caught_up) + return false; + /* + The SQL thread sets @ref worker_threads_caught_up to `false` but not `true`. + Therefore, this place needs to check if it can now be `true`. + */ + if (!worker_threads_caught_up && ( // No need to re-check if already `true`. + !last_inuse_relaylog || // `nullptr` case + last_inuse_relaylog->queued_count == last_inuse_relaylog->dequeued_count + )) + worker_threads_caught_up= true; // Refresh + return worker_threads_caught_up; } diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index ef872dec66f..4eb4b00699c 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -510,8 +510,6 @@ struct rpl_parallel { void stop_during_until(); int wait_for_workers_idle(THD *thd); int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); - - static bool workers_idle(Relay_log_info *rli); }; diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index cf991584573..d1e2a0a1e94 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -258,6 +258,29 @@ public: Seconds_Behind_Master as zero while the SQL thread is so waiting. */ bool sql_thread_caught_up; + /** + Simple setter for @ref worker_threads_caught_up; + sets it `false` to to indicate new user events in queue + @pre @ref data_lock held to prevent race with is_threads_caught_up() + */ + inline void unset_worker_threads_caught_up() + { + mysql_mutex_assert_owner(&data_lock); + worker_threads_caught_up= false; + } + /** + @return + `true` if both @ref sql_thread_caught_up and (refresh according to + @ref last_inuse_relaylog as needed) @ref worker_threads_caught_up + @pre Only meaningful if `mi->using_parallel()` + @pre @ref data_lock held to prevent race condition + @note + Parallel replication requires the idleness of the main SQL thread as well, + because after the thread sets its state to "busy" with `data_lock` held, + it enqueues events *without this lock*. Not to mention any event the main + thread processes itself without distribution, e.g., ignored ones. + */ + bool are_sql_threads_caught_up(); void clear_until_condition(); /** @@ -588,6 +611,22 @@ private: relay log. */ uint32 m_flags; + + /** + When `true`, this worker threads' copy of @ref sql_thread_caught_up + represents that __every__ worker thread is waiting for new events. + * The SQL driver thread sets this to `false` through + unset_worker_threads_caught_up() as it prepares an event + (either to enqueue a worker or, e.g., ignored events, process itself) + * For the main driver or any worker thread to refresh this state immediately + when it finishes, the procedure would have to be a critical section. + To avoid depending on a mutex, this state instead only returns to `true` + as part of its reader, are_worker_threads_caught_up(). + `Seconds_Behind_Master` of SHOW SLAVE STATUS uses this method (which also + reads `sql_thread_caught_up`) to know when all SQL threads are waiting. + @pre Only meaningful if `mi->using_parallel()` + */ + bool worker_threads_caught_up= true; }; diff --git a/sql/slave.cc b/sql/slave.cc index c5af3e52f5c..cbdbbf1de14 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3312,21 +3312,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, if (!stamp) idle= true; + else if (mi->using_parallel()) + idle= mi->rli.are_sql_threads_caught_up(); else - { idle= mi->rli.sql_thread_caught_up; - - /* - The idleness of the SQL thread is needed for the parallel slave - because events can be ignored before distribution to a worker thread. - That is, Seconds_Behind_Master should still be calculated and visible - while the slave is processing ignored events, such as those skipped - due to slave_skip_counter. - */ - if (mi->using_parallel() && idle && - !rpl_parallel::workers_idle(&mi->rli)) - idle= false; - } if (idle) time_diff= 0; else @@ -4441,20 +4430,19 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, if (rli->mi->using_parallel()) { /* - rli->sql_thread_caught_up is checked and negated here to ensure that + Relay_log_info::are_sql_threads_caught_up() + is checked and its states are negated here to ensure that the value of Seconds_Behind_Master in SHOW SLAVE STATUS is consistent with the update of last_master_timestamp. It was previously unset immediately after reading an event from the relay log; however, for the duration between that unset and the time that LMT would be updated could lead to spikes in SBM. - The check for queued_count == dequeued_count ensures the worker threads - are all idle (i.e. all events have been executed). + The check also ensures the worker threads + are all practically idle (i.e. all user events have been executed). */ if ((unlikely(rli->last_master_timestamp == 0) || - (rli->sql_thread_caught_up && - (rli->last_inuse_relaylog->queued_count == - rli->last_inuse_relaylog->dequeued_count))) && + rli->are_sql_threads_caught_up()) && event_can_update_last_master_timestamp(ev)) { /* @@ -4469,6 +4457,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, rli->last_master_timestamp= ev->when; } rli->sql_thread_caught_up= false; + rli->unset_worker_threads_caught_up(); } int res= rli->parallel.do_event(serial_rgi, ev, event_size);