mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
MDEV-31448: Killing a replica thread awaiting its GCO can hang/crash a parallel replica
The problem is that when a worker thread is (user) killed in wait_for_prior_commit, the event group may complete out-of-order since the wait for prior commit was aborted by the kill. This fix ensures that event groups will always complete in-order, even in the error case. This is done in finish_event_group() by doing an extra wait_for_prior_commit(), if necessary, that ignores kills. This fix supersedes the fix for MDEV-30780, so the earlier fix for that is reverted in this patch. Also fix that an error from wait_for_prior_commit() inside finish_event_group() would not signal the error to wakeup_subsequent_commits(). Based on earlier work by Brandon Nesterenko and Andrei Elkin, with some changes to simplify the semantics of wait_for_prior_commit() and make the code more robust to future changes. Reviewed-by: Andrei Elkin <andrei.elkin@mariadb.com> Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
@ -27,6 +27,9 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool;
|
|||||||
|
|
||||||
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
|
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
|
||||||
int err);
|
int err);
|
||||||
|
static void
|
||||||
|
register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
|
||||||
|
rpl_parallel_entry *entry);
|
||||||
|
|
||||||
static int
|
static int
|
||||||
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
||||||
@ -151,15 +154,35 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
|||||||
int err;
|
int err;
|
||||||
|
|
||||||
thd->get_stmt_da()->set_overwrite_status(true);
|
thd->get_stmt_da()->set_overwrite_status(true);
|
||||||
|
|
||||||
|
if (unlikely(rgi->worker_error))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
In case a previous wait was killed, we need to re-register to be able to
|
||||||
|
repeat the wait.
|
||||||
|
|
||||||
|
And before doing that, we un-register any previous registration (in case
|
||||||
|
we got an error earlier and skipped waiting).
|
||||||
|
*/
|
||||||
|
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
|
||||||
|
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||||
|
register_wait_for_prior_event_group_commit(rgi, entry);
|
||||||
|
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Remove any left-over registration to wait for a prior commit to
|
Remove any left-over registration to wait for a prior commit to
|
||||||
complete. Normally, such wait would already have been removed at
|
complete. Normally, such wait would already have been removed at
|
||||||
this point by wait_for_prior_commit() called from within COMMIT
|
this point by wait_for_prior_commit() called from within COMMIT
|
||||||
processing. However, in case of MyISAM and no binlog, we might not
|
processing.
|
||||||
have any commit processing, and so we need to do the wait here,
|
|
||||||
before waking up any subsequent commits, to preserve correct
|
However, in case of MyISAM and no binlog, we might not have any commit
|
||||||
order of event execution. Also, in the error case we might have
|
processing, and so we need to do the wait here, before waking up any
|
||||||
skipped waiting and thus need to remove it explicitly.
|
subsequent commits, to preserve correct order of event execution.
|
||||||
|
|
||||||
|
Also, in the error case we might have skipped waiting and thus need to
|
||||||
|
remove it explicitly. Or the wait might have been killed and we need to
|
||||||
|
repeat the registration and the wait.
|
||||||
|
|
||||||
It is important in the non-error case to do a wait, not just an
|
It is important in the non-error case to do a wait, not just an
|
||||||
unregister. Because we might be last in a group-commit that is
|
unregister. Because we might be last in a group-commit that is
|
||||||
@ -172,8 +195,18 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
|||||||
all earlier event groups have also committed; this way no more
|
all earlier event groups have also committed; this way no more
|
||||||
mark_start_commit() calls can be made and it is safe to de-allocate
|
mark_start_commit() calls can be made and it is safe to de-allocate
|
||||||
the GCO.
|
the GCO.
|
||||||
|
|
||||||
|
Thus this final wait is done with kill ignored during the wait. This is
|
||||||
|
fine, at this point there is no active query or transaction to abort, and
|
||||||
|
the thread will continue as soon as earlier event groups complete.
|
||||||
|
|
||||||
|
Note though, that in the non-error case there is no guarantee that
|
||||||
|
finish_event_group() will be run in-order. For example, a successful
|
||||||
|
binlog group commit will wakeup all participating event groups
|
||||||
|
simultaneously so only thread scheduling will decide the order in which
|
||||||
|
finish_event_group() calls acquire LOCK_parallel_entry.
|
||||||
*/
|
*/
|
||||||
err= wfc->wait_for_prior_commit(thd);
|
err= wfc->wait_for_prior_commit(thd, false);
|
||||||
if (unlikely(err) && !rgi->worker_error)
|
if (unlikely(err) && !rgi->worker_error)
|
||||||
signal_error_to_sql_driver_thread(thd, rgi, err);
|
signal_error_to_sql_driver_thread(thd, rgi, err);
|
||||||
thd->wait_for_commit_ptr= NULL;
|
thd->wait_for_commit_ptr= NULL;
|
||||||
@ -242,8 +275,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
|||||||
not yet started should just skip their group, preparing for stop of the
|
not yet started should just skip their group, preparing for stop of the
|
||||||
SQL driver thread.
|
SQL driver thread.
|
||||||
*/
|
*/
|
||||||
if (unlikely(rgi->worker_error) &&
|
if (unlikely(rgi->worker_error) && entry->stop_on_error_sub_id > sub_id)
|
||||||
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
|
|
||||||
entry->stop_on_error_sub_id= sub_id;
|
entry->stop_on_error_sub_id= sub_id;
|
||||||
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||||
#ifdef ENABLED_DEBUG_SYNC
|
#ifdef ENABLED_DEBUG_SYNC
|
||||||
@ -820,12 +852,15 @@ do_retry:
|
|||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||||
register_wait_for_prior_event_group_commit(rgi, entry);
|
if (rgi->gtid_sub_id < entry->stop_on_error_sub_id
|
||||||
if (!(entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX ||
|
|
||||||
#ifndef DBUG_OFF
|
#ifndef DBUG_OFF
|
||||||
(DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)) ||
|
|| DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)
|
||||||
#endif
|
#endif
|
||||||
rgi->gtid_sub_id < entry->stop_on_error_sub_id))
|
)
|
||||||
|
{
|
||||||
|
register_wait_for_prior_event_group_commit(rgi, entry);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
A failure of a preceeding "parent" transaction may not be
|
A failure of a preceeding "parent" transaction may not be
|
||||||
|
@ -7635,11 +7635,18 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
|
|||||||
with register_wait_for_prior_commit(). If the commit already completed,
|
with register_wait_for_prior_commit(). If the commit already completed,
|
||||||
returns immediately.
|
returns immediately.
|
||||||
|
|
||||||
|
If ALLOW_KILL is set to true (the default), the wait can be aborted by a
|
||||||
|
kill. In case of kill, the wait registration is still removed, so another
|
||||||
|
call of unregister_wait_for_prior_commit() is needed to later retry the
|
||||||
|
wait. If ALLOW_KILL is set to false, then kill will be ignored and this
|
||||||
|
function will not return until the prior commit (if any) has called
|
||||||
|
wakeup_subsequent_commits().
|
||||||
|
|
||||||
If thd->backup_commit_lock is set, release it while waiting for other threads
|
If thd->backup_commit_lock is set, release it while waiting for other threads
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int
|
int
|
||||||
wait_for_commit::wait_for_prior_commit2(THD *thd)
|
wait_for_commit::wait_for_prior_commit2(THD *thd, bool allow_kill)
|
||||||
{
|
{
|
||||||
PSI_stage_info old_stage;
|
PSI_stage_info old_stage;
|
||||||
wait_for_commit *loc_waitee;
|
wait_for_commit *loc_waitee;
|
||||||
@ -7664,7 +7671,7 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
|
|||||||
&stage_waiting_for_prior_transaction_to_commit,
|
&stage_waiting_for_prior_transaction_to_commit,
|
||||||
&old_stage);
|
&old_stage);
|
||||||
while ((loc_waitee= this->waitee.load(std::memory_order_relaxed)) &&
|
while ((loc_waitee= this->waitee.load(std::memory_order_relaxed)) &&
|
||||||
likely(!thd->check_killed(1)))
|
(!allow_kill || likely(!thd->check_killed(1))))
|
||||||
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
|
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
|
||||||
if (!loc_waitee)
|
if (!loc_waitee)
|
||||||
{
|
{
|
||||||
|
@ -2144,14 +2144,14 @@ struct wait_for_commit
|
|||||||
bool commit_started;
|
bool commit_started;
|
||||||
|
|
||||||
void register_wait_for_prior_commit(wait_for_commit *waitee);
|
void register_wait_for_prior_commit(wait_for_commit *waitee);
|
||||||
int wait_for_prior_commit(THD *thd)
|
int wait_for_prior_commit(THD *thd, bool allow_kill=true)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
Quick inline check, to avoid function call and locking in the common case
|
Quick inline check, to avoid function call and locking in the common case
|
||||||
where no wakeup is registered, or a registered wait was already signalled.
|
where no wakeup is registered, or a registered wait was already signalled.
|
||||||
*/
|
*/
|
||||||
if (waitee.load(std::memory_order_acquire))
|
if (waitee.load(std::memory_order_acquire))
|
||||||
return wait_for_prior_commit2(thd);
|
return wait_for_prior_commit2(thd, allow_kill);
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (wakeup_error)
|
if (wakeup_error)
|
||||||
@ -2205,7 +2205,7 @@ struct wait_for_commit
|
|||||||
|
|
||||||
void wakeup(int wakeup_error);
|
void wakeup(int wakeup_error);
|
||||||
|
|
||||||
int wait_for_prior_commit2(THD *thd);
|
int wait_for_prior_commit2(THD *thd, bool allow_kill);
|
||||||
void wakeup_subsequent_commits2(int wakeup_error);
|
void wakeup_subsequent_commits2(int wakeup_error);
|
||||||
void unregister_wait_for_prior_commit2();
|
void unregister_wait_for_prior_commit2();
|
||||||
|
|
||||||
@ -4726,10 +4726,10 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
wait_for_commit *wait_for_commit_ptr;
|
wait_for_commit *wait_for_commit_ptr;
|
||||||
int wait_for_prior_commit()
|
int wait_for_prior_commit(bool allow_kill=true)
|
||||||
{
|
{
|
||||||
if (wait_for_commit_ptr)
|
if (wait_for_commit_ptr)
|
||||||
return wait_for_commit_ptr->wait_for_prior_commit(this);
|
return wait_for_commit_ptr->wait_for_prior_commit(this, allow_kill);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void wakeup_subsequent_commits(int wakeup_error)
|
void wakeup_subsequent_commits(int wakeup_error)
|
||||||
|
Reference in New Issue
Block a user