mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
Merge branch '10.6' into 10.9
This commit is contained in:
@ -27,6 +27,9 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool;
|
||||
|
||||
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
|
||||
int err);
|
||||
static void
|
||||
register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
|
||||
rpl_parallel_entry *entry);
|
||||
|
||||
static int
|
||||
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
||||
@ -154,15 +157,35 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
||||
return;
|
||||
|
||||
thd->get_stmt_da()->set_overwrite_status(true);
|
||||
|
||||
if (unlikely(rgi->worker_error))
|
||||
{
|
||||
/*
|
||||
In case a previous wait was killed, we need to re-register to be able to
|
||||
repeat the wait.
|
||||
|
||||
And before doing that, we un-register any previous registration (in case
|
||||
we got an error earlier and skipped waiting).
|
||||
*/
|
||||
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
register_wait_for_prior_event_group_commit(rgi, entry);
|
||||
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||
}
|
||||
|
||||
/*
|
||||
Remove any left-over registration to wait for a prior commit to
|
||||
complete. Normally, such wait would already have been removed at
|
||||
this point by wait_for_prior_commit() called from within COMMIT
|
||||
processing. However, in case of MyISAM and no binlog, we might not
|
||||
have any commit processing, and so we need to do the wait here,
|
||||
before waking up any subsequent commits, to preserve correct
|
||||
order of event execution. Also, in the error case we might have
|
||||
skipped waiting and thus need to remove it explicitly.
|
||||
processing.
|
||||
|
||||
However, in case of MyISAM and no binlog, we might not have any commit
|
||||
processing, and so we need to do the wait here, before waking up any
|
||||
subsequent commits, to preserve correct order of event execution.
|
||||
|
||||
Also, in the error case we might have skipped waiting and thus need to
|
||||
remove it explicitly. Or the wait might have been killed and we need to
|
||||
repeat the registration and the wait.
|
||||
|
||||
It is important in the non-error case to do a wait, not just an
|
||||
unregister. Because we might be last in a group-commit that is
|
||||
@ -175,8 +198,18 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
||||
all earlier event groups have also committed; this way no more
|
||||
mark_start_commit() calls can be made and it is safe to de-allocate
|
||||
the GCO.
|
||||
|
||||
Thus this final wait is done with kill ignored during the wait. This is
|
||||
fine, at this point there is no active query or transaction to abort, and
|
||||
the thread will continue as soon as earlier event groups complete.
|
||||
|
||||
Note though, that in the non-error case there is no guarantee that
|
||||
finish_event_group() will be run in-order. For example, a successful
|
||||
binlog group commit will wakeup all participating event groups
|
||||
simultaneously so only thread scheduling will decide the order in which
|
||||
finish_event_group() calls acquire LOCK_parallel_entry.
|
||||
*/
|
||||
err= wfc->wait_for_prior_commit(thd);
|
||||
err= wfc->wait_for_prior_commit(thd, false);
|
||||
if (unlikely(err) && !rgi->worker_error)
|
||||
signal_error_to_sql_driver_thread(thd, rgi, err);
|
||||
thd->wait_for_commit_ptr= NULL;
|
||||
@ -245,8 +278,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
||||
not yet started should just skip their group, preparing for stop of the
|
||||
SQL driver thread.
|
||||
*/
|
||||
if (unlikely(rgi->worker_error) &&
|
||||
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
|
||||
if (unlikely(rgi->worker_error) && entry->stop_on_error_sub_id > sub_id)
|
||||
entry->stop_on_error_sub_id= sub_id;
|
||||
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||
#ifdef ENABLED_DEBUG_SYNC
|
||||
@ -291,16 +323,11 @@ static void
|
||||
signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
|
||||
{
|
||||
rgi->worker_error= err;
|
||||
/*
|
||||
In case we get an error during commit, inform following transactions that
|
||||
we aborted our commit.
|
||||
*/
|
||||
DBUG_EXECUTE_IF("hold_worker2_favor_worker3", {
|
||||
if (rgi->current_gtid.seq_no == 2002) {
|
||||
debug_sync_set_action(thd, STRING_WITH_LEN("now WAIT_FOR cont_worker2"));
|
||||
}});
|
||||
|
||||
rgi->unmark_start_commit();
|
||||
rgi->cleanup_context(thd, true);
|
||||
rgi->rli->abort_slave= true;
|
||||
rgi->rli->stop_for_until= false;
|
||||
@ -347,7 +374,7 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
|
||||
Do not start parallel execution of this event group until all prior groups
|
||||
have reached the commit phase that are not safe to run in parallel with.
|
||||
*/
|
||||
static bool
|
||||
static void
|
||||
do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
|
||||
bool *did_enter_cond, PSI_stage_info *old_stage)
|
||||
{
|
||||
@ -399,18 +426,45 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
|
||||
&entry->LOCK_parallel_entry);
|
||||
} while (wait_count > entry->count_committing_event_groups);
|
||||
}
|
||||
}
|
||||
|
||||
if (entry->force_abort && wait_count > entry->stop_count)
|
||||
|
||||
static bool
|
||||
do_stop_handling(rpl_group_info *rgi)
|
||||
{
|
||||
bool should_stop= false;
|
||||
rpl_parallel_entry *entry= rgi->parallel_entry;
|
||||
|
||||
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
||||
|
||||
if (unlikely(entry->force_abort) && rgi->gtid_sub_id > entry->stop_sub_id)
|
||||
{
|
||||
/*
|
||||
We are stopping (STOP SLAVE), and this event group is beyond the point
|
||||
where we can safely stop. So return a flag that will cause us to skip,
|
||||
rather than execute, the following events.
|
||||
We are stopping (STOP SLAVE), and this event group need not be applied
|
||||
before we can safely stop. So return a flag that will cause us to skip,
|
||||
rather than execute, the following events. Once all queued events have
|
||||
been skipped, the STOP SLAVE is complete (for this thread).
|
||||
*/
|
||||
return true;
|
||||
should_stop= true;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
|
||||
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
|
||||
{
|
||||
rgi->worker_error= 1;
|
||||
should_stop= true;
|
||||
}
|
||||
|
||||
if (likely(!should_stop))
|
||||
{
|
||||
/*
|
||||
Since we did not decide to stop, bump the largest_started_sub_id while
|
||||
still holding LOCK_parallel_entry.
|
||||
*/
|
||||
if (rgi->gtid_sub_id > entry->largest_started_sub_id)
|
||||
entry->largest_started_sub_id= rgi->gtid_sub_id;
|
||||
}
|
||||
|
||||
return should_stop;
|
||||
}
|
||||
|
||||
|
||||
@ -457,15 +511,25 @@ do_ftwrl_wait(rpl_group_info *rgi,
|
||||
mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
|
||||
} while (sub_id > entry->pause_sub_id);
|
||||
|
||||
DBUG_EXECUTE_IF("delay_ftwrl_wait_gtid_0_x_100", {
|
||||
if (rgi->current_gtid.domain_id == 0 &&
|
||||
rgi->current_gtid.seq_no == 100) {
|
||||
/*
|
||||
Simulate delayed wakeup from the mysql_cond_wait(). To do this, we
|
||||
need to have the LOCK_parallel_entry mutex released during the wait.
|
||||
*/
|
||||
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
||||
debug_sync_set_action(thd,
|
||||
STRING_WITH_LEN("now SIGNAL pause_wait_started WAIT_FOR pause_wait_continue"));
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
}
|
||||
});
|
||||
/*
|
||||
We do not call EXIT_COND() here, as this will be done later by our
|
||||
caller (since we set *did_enter_cond to true).
|
||||
*/
|
||||
}
|
||||
|
||||
if (sub_id > entry->largest_started_sub_id)
|
||||
entry->largest_started_sub_id= sub_id;
|
||||
|
||||
DBUG_RETURN(aborted);
|
||||
}
|
||||
|
||||
@ -624,7 +688,17 @@ rpl_pause_for_ftwrl(THD *thd)
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
++e->need_sub_id_signal;
|
||||
if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
|
||||
{
|
||||
e->pause_sub_id= e->largest_started_sub_id;
|
||||
DBUG_EXECUTE_IF("pause_for_ftwrl_wait", {
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
debug_sync_set_action(thd,
|
||||
STRING_WITH_LEN("now "
|
||||
"SIGNAL pause_ftwrl_waiting "
|
||||
"WAIT_FOR pause_ftwrl_cont"));
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
});
|
||||
}
|
||||
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
|
||||
&stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
|
||||
thd->set_time_for_next_stage();
|
||||
@ -860,10 +934,12 @@ do_retry:
|
||||
for (;;)
|
||||
{
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
register_wait_for_prior_event_group_commit(rgi, entry);
|
||||
if (entry->stop_on_error_sub_id != (uint64) ULONGLONG_MAX &&
|
||||
!DBUG_IF("simulate_mdev_12746") &&
|
||||
rgi->gtid_sub_id >= entry->stop_on_error_sub_id)
|
||||
if (rgi->gtid_sub_id < entry->stop_on_error_sub_id ||
|
||||
DBUG_IF("simulate_mdev_12746"))
|
||||
{
|
||||
register_wait_for_prior_event_group_commit(rgi, entry);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
A failure of a preceding "parent" transaction may not be
|
||||
@ -1301,14 +1377,15 @@ handle_rpl_parallel_thread(void *arg)
|
||||
event_gtid_sub_id= rgi->gtid_sub_id;
|
||||
rgi->thd= thd;
|
||||
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
|
||||
DBUG_EXECUTE_IF("gco_wait_delay_gtid_0_x_99", {
|
||||
if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 99) {
|
||||
debug_sync_set_action(thd,
|
||||
STRING_WITH_LEN("now SIGNAL gco_wait_paused WAIT_FOR gco_wait_cont"));
|
||||
} });
|
||||
|
||||
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
|
||||
{
|
||||
skip_event_group= true;
|
||||
rgi->worker_error= 1;
|
||||
}
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
|
||||
skip_event_group= do_stop_handling(rgi);
|
||||
if (likely(!skip_event_group))
|
||||
skip_event_group= do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
|
||||
|
||||
@ -2588,9 +2665,7 @@ rpl_parallel::find(uint32 domain_id, Relay_log_info *rli)
|
||||
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
|
||||
if (my_hash_insert(&domain_hash, (uchar *)e))
|
||||
{
|
||||
mysql_cond_destroy(&e->COND_parallel_entry);
|
||||
mysql_mutex_destroy(&e->LOCK_parallel_entry);
|
||||
my_free(e);
|
||||
free_rpl_parallel_entry(e);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
@ -2634,20 +2709,18 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
||||
are also executed, so that we stop at a consistent point in the binlog
|
||||
stream (per replication domain).
|
||||
|
||||
All event groups wait for e->count_committing_event_groups to reach
|
||||
the value of group_commit_orderer::wait_count before starting to
|
||||
execute. Thus, at this point we know that any event group with a
|
||||
strictly larger wait_count are safe to skip, none of them can have
|
||||
started executing yet. So we set e->stop_count here and use it to
|
||||
decide in the worker threads whether to continue executing an event
|
||||
group or whether to skip it, when force_abort is set.
|
||||
At this point, we are holding LOCK_parallel_entry, and we know that no
|
||||
event group after e->largest_started_sub_id has started running yet. We
|
||||
record this value in e->stop_sub_id, and then each event group can check
|
||||
their own sub_id against it. If their sub_id is strictly larger, then
|
||||
that event group will be skipped.
|
||||
|
||||
If we stop due to reaching the START SLAVE UNTIL condition, then we
|
||||
need to continue executing any queued events up to that point.
|
||||
*/
|
||||
e->force_abort= true;
|
||||
e->stop_count= rli->stop_for_until ?
|
||||
e->count_queued_event_groups : e->count_committing_event_groups;
|
||||
e->stop_sub_id= rli->stop_for_until ?
|
||||
e->current_sub_id : e->largest_started_sub_id;
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
for (j= 0; j < e->rpl_thread_max; ++j)
|
||||
{
|
||||
@ -2754,7 +2827,7 @@ rpl_parallel::stop_during_until()
|
||||
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
if (e->force_abort)
|
||||
e->stop_count= e->count_committing_event_groups;
|
||||
e->stop_sub_id= e->largest_started_sub_id;
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user