mirror of
https://github.com/MariaDB/server.git
synced 2025-08-01 03:47:19 +03:00
MDEV-31509: Lost data with FTWRL and STOP SLAVE
The largest_started_sub_id needs to be set under LOCK_parallel_entry together with testing stop_sub_id. However, in-between was the logic for do_ftwrl_wait(), which temporarily releases the mutex. This could lead to inconsistent stopping amongst worker threads and lost data. Fix by moving all the stop-related logic out from unrelated do_gco_wait() and do_ftwrl_wait() and into its own function do_stop_handling(). Reviewed-by: Andrei Elkin <andrei.elkin@mariadb.com> Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
@ -369,7 +369,7 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
|
||||
Do not start parallel execution of this event group until all prior groups
|
||||
have reached the commit phase that are not safe to run in parallel with.
|
||||
*/
|
||||
static bool
|
||||
static void
|
||||
do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
|
||||
bool *did_enter_cond, PSI_stage_info *old_stage)
|
||||
{
|
||||
@ -421,8 +421,18 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
|
||||
&entry->LOCK_parallel_entry);
|
||||
} while (wait_count > entry->count_committing_event_groups);
|
||||
}
|
||||
}
|
||||
|
||||
if (entry->force_abort && rgi->gtid_sub_id > entry->stop_sub_id)
|
||||
|
||||
static bool
|
||||
do_stop_handling(rpl_group_info *rgi)
|
||||
{
|
||||
bool should_stop= false;
|
||||
rpl_parallel_entry *entry= rgi->parallel_entry;
|
||||
|
||||
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
||||
|
||||
if (unlikely(entry->force_abort) && rgi->gtid_sub_id > entry->stop_sub_id)
|
||||
{
|
||||
/*
|
||||
We are stopping (STOP SLAVE), and this event group need not be applied
|
||||
@ -430,10 +440,26 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
|
||||
rather than execute, the following events. Once all queued events have
|
||||
been skipped, the STOP SLAVE is complete (for this thread).
|
||||
*/
|
||||
return true;
|
||||
should_stop= true;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
|
||||
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
|
||||
{
|
||||
rgi->worker_error= 1;
|
||||
should_stop= true;
|
||||
}
|
||||
|
||||
if (likely(!should_stop))
|
||||
{
|
||||
/*
|
||||
Since we did not decide to stop, bump the largest_started_sub_id while
|
||||
still holding LOCK_parallel_entry.
|
||||
*/
|
||||
if (rgi->gtid_sub_id > entry->largest_started_sub_id)
|
||||
entry->largest_started_sub_id= rgi->gtid_sub_id;
|
||||
}
|
||||
|
||||
return should_stop;
|
||||
}
|
||||
|
||||
|
||||
@ -480,15 +506,25 @@ do_ftwrl_wait(rpl_group_info *rgi,
|
||||
mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
|
||||
} while (sub_id > entry->pause_sub_id);
|
||||
|
||||
DBUG_EXECUTE_IF("delay_ftwrl_wait_gtid_0_x_100", {
|
||||
if (rgi->current_gtid.domain_id == 0 &&
|
||||
rgi->current_gtid.seq_no == 100) {
|
||||
/*
|
||||
Simulate delayed wakeup from the mysql_cond_wait(). To do this, we
|
||||
need to have the LOCK_parallel_entry mutex released during the wait.
|
||||
*/
|
||||
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||
debug_sync_set_action(thd,
|
||||
STRING_WITH_LEN("now SIGNAL pause_wait_started WAIT_FOR pause_wait_continue"));
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
}
|
||||
});
|
||||
/*
|
||||
We do not call EXIT_COND() here, as this will be done later by our
|
||||
caller (since we set *did_enter_cond to true).
|
||||
*/
|
||||
}
|
||||
|
||||
if (sub_id > entry->largest_started_sub_id)
|
||||
entry->largest_started_sub_id= sub_id;
|
||||
|
||||
DBUG_RETURN(aborted);
|
||||
}
|
||||
|
||||
@ -646,7 +682,17 @@ rpl_pause_for_ftwrl(THD *thd)
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
++e->need_sub_id_signal;
|
||||
if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
|
||||
{
|
||||
e->pause_sub_id= e->largest_started_sub_id;
|
||||
DBUG_EXECUTE_IF("pause_for_ftwrl_wait", {
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
debug_sync_set_action(thd,
|
||||
STRING_WITH_LEN("now "
|
||||
"SIGNAL pause_ftwrl_waiting "
|
||||
"WAIT_FOR pause_ftwrl_cont"));
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
});
|
||||
}
|
||||
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
|
||||
&stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
|
||||
thd->set_time_for_next_stage();
|
||||
@ -1284,14 +1330,15 @@ handle_rpl_parallel_thread(void *arg)
|
||||
event_gtid_sub_id= rgi->gtid_sub_id;
|
||||
rgi->thd= thd;
|
||||
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
|
||||
DBUG_EXECUTE_IF("gco_wait_delay_gtid_0_x_99", {
|
||||
if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 99) {
|
||||
debug_sync_set_action(thd,
|
||||
STRING_WITH_LEN("now SIGNAL gco_wait_paused WAIT_FOR gco_wait_cont"));
|
||||
} });
|
||||
|
||||
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
|
||||
{
|
||||
skip_event_group= true;
|
||||
rgi->worker_error= 1;
|
||||
}
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
|
||||
skip_event_group= do_stop_handling(rgi);
|
||||
if (likely(!skip_event_group))
|
||||
skip_event_group= do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
|
||||
|
||||
|
Reference in New Issue
Block a user