mirror of
https://github.com/MariaDB/server.git
synced 2025-08-07 00:04:31 +03:00
MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication
The problem occurs in parallel replication in GTID mode, when we are using multiple replication domains. In this case, if the SQL thread stops, the slave GTID position may refer to a different point in the relay log for each domain. The bug was that when the SQL thread was stopped and restarted (but the IO thread was kept running), the SQL thread would resume applying the relay log from the point of the most advanced replication domain, silently skipping all earlier events within other domains. This caused replication corruption. This patch solves the problem by storing, when the SQL thread stops with multiple parallel replication domains active, the current GTID position. Additionally, the current position in the relay logs is moved back to a point known to be earlier than the current position of any replication domain. Then when the SQL thread restarts from the earlier position, GTIDs encountered are compared against the stored GTID position. Any GTID that was already applied before the stop is skipped to avoid duplicate apply. This patch should have no effect if multi-domain GTID parallel replication is not used. Similarly, if both SQL and IO thread are stopped and restarted, the patch has no effect, as in this case the existing relay logs are removed and re-fetched from the master at the current global @@gtid_slave_pos.
This commit is contained in:
@@ -1829,6 +1829,41 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Handle seeing a GTID during slave restart in GTID mode. If we stopped with
|
||||
different replication domains having reached different positions in the relay
|
||||
log, we need to skip event groups in domains that are further progressed.
|
||||
|
||||
Updates the state with the seen GTID, and returns true if this GTID should
|
||||
be skipped, false otherwise.
|
||||
*/
|
||||
bool
|
||||
process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
|
||||
{
|
||||
slave_connection_state::entry *gtid_entry;
|
||||
slave_connection_state *state= &rli->restart_gtid_pos;
|
||||
|
||||
if (likely(state->count() == 0) ||
|
||||
!(gtid_entry= state->find_entry(gtid->domain_id)))
|
||||
return false;
|
||||
if (gtid->server_id == gtid_entry->gtid.server_id)
|
||||
{
|
||||
uint64 seq_no= gtid_entry->gtid.seq_no;
|
||||
if (gtid->seq_no >= seq_no)
|
||||
{
|
||||
/*
|
||||
This domain has reached its start position. So remove it, so that
|
||||
further events will be processed normally.
|
||||
*/
|
||||
state->remove(>id_entry->gtid);
|
||||
}
|
||||
return gtid->seq_no <= seq_no;
|
||||
}
|
||||
else
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
This is used when we get an error during processing in do_event();
|
||||
We will not queue any event to the thread, but we still need to wake it up
|
||||
@@ -1890,13 +1925,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
return -1;
|
||||
|
||||
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
|
||||
if (unlikely(!current) && typ != GTID_EVENT)
|
||||
is_group_event= Log_event::is_group_event(typ);
|
||||
if (unlikely(!current) && typ != GTID_EVENT &&
|
||||
!(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
|
||||
return -1;
|
||||
|
||||
/* ToDo: what to do with this lock?!? */
|
||||
mysql_mutex_unlock(&rli->data_lock);
|
||||
|
||||
if (typ == FORMAT_DESCRIPTION_EVENT)
|
||||
if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
|
||||
{
|
||||
Format_description_log_event *fdev=
|
||||
static_cast<Format_description_log_event *>(ev);
|
||||
@@ -1922,6 +1959,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (unlikely(typ == GTID_LIST_EVENT))
|
||||
{
|
||||
Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
|
||||
rpl_gtid *list= glev->list;
|
||||
uint32 count= glev->count;
|
||||
rli->update_relay_log_state(list, count);
|
||||
while (count)
|
||||
{
|
||||
process_gtid_for_restart_pos(rli, list);
|
||||
++list;
|
||||
--count;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Stop queueing additional event groups once the SQL thread is requested to
|
||||
@@ -1931,7 +1981,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
been partially queued, but after that we will just ignore any further
|
||||
events the SQL driver thread may try to queue, and eventually it will stop.
|
||||
*/
|
||||
is_group_event= Log_event::is_group_event(typ);
|
||||
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
|
||||
sql_thread_stopping= true;
|
||||
if (sql_thread_stopping)
|
||||
@@ -1944,8 +1993,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
|
||||
{
|
||||
if (typ == GTID_EVENT)
|
||||
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
||||
else
|
||||
{
|
||||
if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
|
||||
{
|
||||
if (!Log_event::is_part_of_group(typ))
|
||||
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
||||
}
|
||||
else
|
||||
{
|
||||
DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
|
||||
if (typ == XID_EVENT ||
|
||||
(typ == QUERY_EVENT &&
|
||||
(((Query_log_event *)ev)->is_commit() ||
|
||||
((Query_log_event *)ev)->is_rollback())))
|
||||
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
||||
}
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (typ == GTID_EVENT)
|
||||
{
|
||||
rpl_gtid gtid;
|
||||
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
||||
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
|
||||
0 : gtid_ev->domain_id);
|
||||
@@ -1956,6 +2031,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
return 1;
|
||||
}
|
||||
current= e;
|
||||
|
||||
gtid.domain_id= gtid_ev->domain_id;
|
||||
gtid.server_id= gtid_ev->server_id;
|
||||
gtid.seq_no= gtid_ev->seq_no;
|
||||
rli->update_relay_log_state(>id, 1);
|
||||
if (process_gtid_for_restart_pos(rli, >id))
|
||||
{
|
||||
/*
|
||||
This domain has progressed further into the relay log before the last
|
||||
SQL thread restart. So we need to skip this event group to not doubly
|
||||
apply it.
|
||||
*/
|
||||
rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
|
||||
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
e= current;
|
||||
|
Reference in New Issue
Block a user