mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
Merge MDEV-5754, MDEV-5769, and MDEV-5764 into 10.0
This commit is contained in:
@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
|
||||
rgi->is_error= true;
|
||||
rgi->cleanup_context(thd, true);
|
||||
rgi->rli->abort_slave= true;
|
||||
rgi->rli->stop_for_until= false;
|
||||
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
|
||||
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
|
||||
rgi->rli->relay_log.signal_update();
|
||||
@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
|
||||
|
||||
|
||||
void
|
||||
rpl_parallel::wait_for_done(THD *thd)
|
||||
rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
||||
{
|
||||
struct rpl_parallel_entry *e;
|
||||
rpl_parallel_thread *rpt;
|
||||
@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
|
||||
started executing yet. So we set e->stop_count here and use it to
|
||||
decide in the worker threads whether to continue executing an event
|
||||
group or whether to skip it, when force_abort is set.
|
||||
|
||||
If we stop due to reaching the START SLAVE UNTIL condition, then we
|
||||
need to continue executing any queued events up to that point.
|
||||
*/
|
||||
e->force_abort= true;
|
||||
e->stop_count= e->count_committing_event_groups;
|
||||
e->stop_count= rli->stop_for_until ?
|
||||
e->count_queued_event_groups : e->count_committing_event_groups;
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
for (j= 0; j < e->rpl_thread_max; ++j)
|
||||
{
|
||||
@@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
This function handles the case where the SQL driver thread reached the
|
||||
START SLAVE UNTIL position; we stop queueing more events but continue
|
||||
processing remaining, already queued events; then use executes manual
|
||||
STOP SLAVE; then this function signals to worker threads that they
|
||||
should stop the processing of any remaining queued events.
|
||||
*/
|
||||
void
|
||||
rpl_parallel::stop_during_until()
|
||||
{
|
||||
struct rpl_parallel_entry *e;
|
||||
uint32 i;
|
||||
|
||||
for (i= 0; i < domain_hash.records; ++i)
|
||||
{
|
||||
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
if (e->force_abort)
|
||||
e->stop_count= e->count_committing_event_groups;
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
rpl_parallel::workers_idle()
|
||||
{
|
||||
@@ -1230,11 +1259,12 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
|
||||
do_event() is executed by the sql_driver_thd thread.
|
||||
It's main purpose is to find a thread that can execute the query.
|
||||
|
||||
@retval false ok, event was accepted
|
||||
@retval true error
|
||||
@retval 0 ok, event was accepted
|
||||
@retval 1 error
|
||||
@retval -1 event should be executed serially, in the sql driver thread
|
||||
*/
|
||||
|
||||
bool
|
||||
int
|
||||
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
ulonglong event_size)
|
||||
{
|
||||
@@ -1248,6 +1278,32 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
bool did_enter_cond= false;
|
||||
PSI_stage_info old_stage;
|
||||
|
||||
/* Handle master log name change, seen in Rotate_log_event. */
|
||||
typ= ev->get_type_code();
|
||||
if (unlikely(typ == ROTATE_EVENT))
|
||||
{
|
||||
Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev);
|
||||
if ((rev->server_id != global_system_variables.server_id ||
|
||||
rli->replicate_same_server_id) &&
|
||||
!rev->is_relay_log_event() &&
|
||||
!rli->is_in_group())
|
||||
{
|
||||
memcpy(rli->future_event_master_log_name,
|
||||
rev->new_log_ident, rev->ident_len+1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Execute queries non-parallel if slave_skip_counter is set, as it's is
|
||||
easier to skip queries in single threaded mode.
|
||||
*/
|
||||
if (rli->slave_skip_counter)
|
||||
return -1;
|
||||
|
||||
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
|
||||
if (unlikely(!current) && typ != GTID_EVENT)
|
||||
return -1;
|
||||
|
||||
/* ToDo: what to do with this lock?!? */
|
||||
mysql_mutex_unlock(&rli->data_lock);
|
||||
|
||||
@@ -1259,21 +1315,20 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
been partially queued, but after that we will just ignore any further
|
||||
events the SQL driver thread may try to queue, and eventually it will stop.
|
||||
*/
|
||||
if (((typ= ev->get_type_code()) == GTID_EVENT ||
|
||||
!(is_group_event= Log_event::is_group_event(typ))) &&
|
||||
rli->abort_slave)
|
||||
is_group_event= Log_event::is_group_event(typ);
|
||||
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
|
||||
sql_thread_stopping= true;
|
||||
if (sql_thread_stopping)
|
||||
{
|
||||
delete ev;
|
||||
/*
|
||||
Return false ("no error"); normal stop is not an error, and otherwise the
|
||||
error has already been recorded.
|
||||
Return "no error"; normal stop is not an error, and otherwise the error
|
||||
has already been recorded.
|
||||
*/
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (typ == GTID_EVENT || unlikely(!current))
|
||||
if (typ == GTID_EVENT)
|
||||
{
|
||||
uint32 domain_id;
|
||||
if (likely(typ == GTID_EVENT))
|
||||
@@ -1288,7 +1343,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
{
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
current= e;
|
||||
}
|
||||
@@ -1307,7 +1362,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
{
|
||||
/* This means we were killed. The error is already signalled. */
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
|
||||
@@ -1315,7 +1370,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (typ == GTID_EVENT)
|
||||
@@ -1328,7 +1383,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1366,7 +1421,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
e->current_gco= rgi->gco= gco;
|
||||
}
|
||||
@@ -1380,7 +1435,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
e->current_sub_id= rgi->gtid_sub_id;
|
||||
++e->count_queued_event_groups;
|
||||
}
|
||||
else if (!is_group_event || !e)
|
||||
else if (!is_group_event)
|
||||
{
|
||||
my_off_t log_pos;
|
||||
int err;
|
||||
@@ -1389,38 +1444,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
|
||||
Same for events not preceeded by GTID (we should not see those normally,
|
||||
but they might be from an old master).
|
||||
|
||||
The variable `e' is NULL for the case where the master did not
|
||||
have GTID, like a MariaDB 5.5 or MySQL master.
|
||||
*/
|
||||
qev->rgi= serial_rgi;
|
||||
/* Handle master log name change, seen in Rotate_log_event. */
|
||||
if (typ == ROTATE_EVENT)
|
||||
{
|
||||
Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev);
|
||||
if ((rev->server_id != global_system_variables.server_id ||
|
||||
rli->replicate_same_server_id) &&
|
||||
!rev->is_relay_log_event() &&
|
||||
!rli->is_in_group())
|
||||
{
|
||||
memcpy(rli->future_event_master_log_name,
|
||||
rev->new_log_ident, rev->ident_len+1);
|
||||
}
|
||||
}
|
||||
|
||||
tmp= serial_rgi->is_parallel_exec;
|
||||
serial_rgi->is_parallel_exec= true;
|
||||
err= rpt_handle_event(qev, NULL);
|
||||
serial_rgi->is_parallel_exec= tmp;
|
||||
log_pos= qev->ev->log_pos;
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
|
||||
log_pos= ev->log_pos;
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
||||
|
||||
if (err)
|
||||
{
|
||||
cur_thread->free_qev(qev);
|
||||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
/*
|
||||
Queue an empty event, so that the position will be updated in a
|
||||
@@ -1451,5 +1490,5 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
&did_enter_cond, &old_stage);
|
||||
mysql_cond_signal(&cur_thread->COND_rpl_thread);
|
||||
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
Reference in New Issue
Block a user