mirror of
https://github.com/MariaDB/server.git
synced 2025-07-27 18:02:13 +03:00
MDEV-26: Global transaction ID.
Fix problems related to reconnect. When we need to reconnect (ie. explict stop/start of just the IO thread by user, or automatic reconnect due to loosing network connection with the master), it is a bit complex to correctly resume at the right point without causing duplicate or missing events in the relay log. The previous code had multiple problems in this regard. With this patch, the problem is solved as follows. The IO thread keeps track (in memory) of which GTID was last queued to the relay log. If it needs to reconnect, it resumes at that GTID position. It also counts number of events received within the last, possibly partial, event group, and skips the same number of events after a reconnect, so that events already enqueued before the reconnect are not duplicated. (There is no need to keep any persistent state; whenever we restart slave threads after both of them being stopped (such as after server restart), we erase the relay logs and start over from the last GTID applied by SQL thread. But while the SQL thread is running, this patch is needed to get correct relay log).
This commit is contained in:
131
sql/sql_repl.cc
131
sql/sql_repl.cc
@ -50,7 +50,7 @@ extern TYPELIB binlog_checksum_typelib;
|
||||
static int
|
||||
fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
|
||||
my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
|
||||
uint8 checksum_alg_arg)
|
||||
uint8 checksum_alg_arg, uint32 end_pos)
|
||||
{
|
||||
char header[LOG_EVENT_HEADER_LEN];
|
||||
ulong event_len;
|
||||
@ -70,7 +70,7 @@ fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
|
||||
int4store(header + EVENT_LEN_OFFSET, event_len);
|
||||
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
|
||||
// TODO: check what problems this may cause and fix them
|
||||
int4store(header + LOG_POS_OFFSET, 0);
|
||||
int4store(header + LOG_POS_OFFSET, end_pos);
|
||||
if (packet->append(header, sizeof(header)))
|
||||
{
|
||||
*errmsg= "Failed due to out-of-memory writing event";
|
||||
@ -146,7 +146,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
|
||||
|
||||
if ((err= fake_event_header(packet, ROTATE_EVENT,
|
||||
ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
|
||||
errmsg, checksum_alg_arg)))
|
||||
errmsg, checksum_alg_arg, 0)))
|
||||
DBUG_RETURN(err);
|
||||
|
||||
int8store(buf+R_POS_OFFSET,position);
|
||||
@ -169,7 +169,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
|
||||
|
||||
static int fake_gtid_list_event(NET* net, String* packet,
|
||||
Gtid_list_log_event *glev, const char** errmsg,
|
||||
uint8 checksum_alg_arg)
|
||||
uint8 checksum_alg_arg, uint32 current_pos)
|
||||
{
|
||||
my_bool do_checksum;
|
||||
int err;
|
||||
@ -185,7 +185,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
|
||||
}
|
||||
if ((err= fake_event_header(packet, GTID_LIST_EVENT,
|
||||
str.length(), &do_checksum, &crc,
|
||||
errmsg, checksum_alg_arg)))
|
||||
errmsg, checksum_alg_arg, current_pos)))
|
||||
return err;
|
||||
|
||||
packet->append(str);
|
||||
@ -1406,7 +1406,7 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
|
||||
enum_gtid_until_state gtid_until_group,
|
||||
Log_event_type event_type, uint8 current_checksum_alg,
|
||||
ushort flags, const char **errmsg,
|
||||
rpl_binlog_state *until_binlog_state)
|
||||
rpl_binlog_state *until_binlog_state, uint32 current_pos)
|
||||
{
|
||||
switch (gtid_until_group)
|
||||
{
|
||||
@ -1437,7 +1437,8 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
|
||||
return true;
|
||||
Gtid_list_log_event glev(until_binlog_state,
|
||||
Gtid_list_log_event::FLAG_UNTIL_REACHED);
|
||||
if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg))
|
||||
if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg,
|
||||
current_pos))
|
||||
return true;
|
||||
*errmsg= NULL;
|
||||
return true;
|
||||
@ -1508,6 +1509,19 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
||||
return "Failed to read Gtid_log_event: corrupt binlog";
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
|
||||
{
|
||||
rpl_gtid *dbug_gtid;
|
||||
if ((dbug_gtid= until_binlog_state->find(10,1)) &&
|
||||
dbug_gtid->seq_no == 100)
|
||||
{
|
||||
DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
|
||||
DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
return "DBUG-injected forced reconnect";
|
||||
}
|
||||
});
|
||||
|
||||
if (until_binlog_state->update(&event_gtid, false))
|
||||
{
|
||||
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
@ -1527,19 +1541,31 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
||||
if (event_gtid.server_id == gtid->server_id &&
|
||||
event_gtid.seq_no >= gtid->seq_no)
|
||||
{
|
||||
/*
|
||||
In strict mode, it is an error if the slave requests to start in
|
||||
a "hole" in the master's binlog: a GTID that does not exist, even
|
||||
though both the prior and subsequent seq_no exists for same
|
||||
domain_id and server_id.
|
||||
*/
|
||||
if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no)
|
||||
if (event_gtid.seq_no > gtid->seq_no)
|
||||
{
|
||||
my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
|
||||
*error_gtid= *gtid;
|
||||
return "The binlog on the master is missing the GTID requested "
|
||||
"by the slave (even though both a prior and a subsequent "
|
||||
"sequence number does exist), and GTID strict mode is enabled.";
|
||||
/*
|
||||
In strict mode, it is an error if the slave requests to start
|
||||
in a "hole" in the master's binlog: a GTID that does not
|
||||
exist, even though both the prior and subsequent seq_no exists
|
||||
for same domain_id and server_id.
|
||||
*/
|
||||
if (slave_gtid_strict_mode)
|
||||
{
|
||||
my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
|
||||
*error_gtid= *gtid;
|
||||
return "The binlog on the master is missing the GTID requested "
|
||||
"by the slave (even though both a prior and a subsequent "
|
||||
"sequence number does exist), and GTID strict mode is enabled.";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
Send a fake Gtid_list event to the slave.
|
||||
This allows the slave to update its current binlog position
|
||||
so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
|
||||
*/
|
||||
// send_fake_gtid_list_event(until_binlog_state);
|
||||
}
|
||||
/*
|
||||
Delete this entry if we have reached slave start position (so we
|
||||
@ -1797,6 +1823,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||
#ifndef DBUG_OFF
|
||||
int left_events = max_binlog_dump_events;
|
||||
uint dbug_reconnect_counter= 0;
|
||||
#endif
|
||||
DBUG_ENTER("mysql_binlog_send");
|
||||
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
|
||||
@ -1830,6 +1857,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
until_gtid_state= &until_gtid_state_obj;
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
|
||||
{
|
||||
DBUG_SET("-d,binlog_force_reconnect_after_22_events");
|
||||
DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
|
||||
dbug_reconnect_counter= 22;
|
||||
});
|
||||
|
||||
/*
|
||||
We want to corrupt the first event, in Log_event::read_log_event().
|
||||
But we do not want the corruption to happen early, eg. when client does
|
||||
@ -2176,6 +2210,19 @@ impossible position";
|
||||
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
|
||||
}
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
if (dbug_reconnect_counter > 0)
|
||||
{
|
||||
--dbug_reconnect_counter;
|
||||
if (dbug_reconnect_counter == 0)
|
||||
{
|
||||
errmsg= "DBUG-injected forced reconnect";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
||||
log_file_name, &log,
|
||||
mariadb_slave_capability, ev_offset,
|
||||
@ -2191,7 +2238,7 @@ impossible position";
|
||||
if (until_gtid_state &&
|
||||
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
|
||||
event_type, current_checksum_alg, flags, &errmsg,
|
||||
&until_binlog_state))
|
||||
&until_binlog_state, my_b_tell(&log)))
|
||||
{
|
||||
if (errmsg)
|
||||
{
|
||||
@ -2373,7 +2420,7 @@ impossible position";
|
||||
until_gtid_state &&
|
||||
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
|
||||
event_type, current_checksum_alg, flags, &errmsg,
|
||||
&until_binlog_state))
|
||||
&until_binlog_state, my_b_tell(&log)))
|
||||
{
|
||||
if (errmsg)
|
||||
{
|
||||
@ -2970,6 +3017,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
|
||||
char saved_host[HOSTNAME_LENGTH + 1];
|
||||
uint saved_port;
|
||||
char saved_log_name[FN_REFLEN];
|
||||
Master_info::enum_using_gtid saved_using_gtid;
|
||||
char master_info_file_tmp[FN_REFLEN];
|
||||
char relay_log_info_file_tmp[FN_REFLEN];
|
||||
my_off_t saved_log_pos;
|
||||
@ -3059,6 +3107,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
|
||||
saved_port= mi->port;
|
||||
strmake(saved_log_name, mi->master_log_name, FN_REFLEN - 1);
|
||||
saved_log_pos= mi->master_log_pos;
|
||||
saved_using_gtid= mi->using_gtid;
|
||||
|
||||
/*
|
||||
If the user specified host or port without binlog or position,
|
||||
@ -3291,6 +3340,11 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
|
||||
"master_log_pos='%ld'.", saved_host, saved_port, saved_log_name,
|
||||
(ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name,
|
||||
(ulong) mi->master_log_pos);
|
||||
if (saved_using_gtid != Master_info::USE_GTID_NO ||
|
||||
mi->using_gtid != Master_info::USE_GTID_NO)
|
||||
sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s",
|
||||
mi->using_gtid_astext(saved_using_gtid),
|
||||
mi->using_gtid_astext(mi->using_gtid));
|
||||
|
||||
/*
|
||||
If we don't write new coordinates to disk now, then old will remain in
|
||||
@ -3753,11 +3807,11 @@ rpl_deinit_gtid_slave_state()
|
||||
|
||||
|
||||
/*
|
||||
Format the current GTID state as a string, for use when connecting to a
|
||||
master server with GTID, or for returning the value of @@global.gtid_state.
|
||||
Format the current GTID state as a string, for returning the value of
|
||||
@@global.gtid_slave_pos.
|
||||
|
||||
If the flag use_binlog is true, then the contents of the binary log (if
|
||||
enabled) is merged into the current GTID state.
|
||||
enabled) is merged into the current GTID state (@@global.gtid_current_pos).
|
||||
*/
|
||||
int
|
||||
rpl_append_gtid_state(String *dest, bool use_binlog)
|
||||
@ -3770,10 +3824,35 @@ rpl_append_gtid_state(String *dest, bool use_binlog)
|
||||
(err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids)))
|
||||
return err;
|
||||
|
||||
rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids);
|
||||
err= rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids);
|
||||
my_free(gtid_list);
|
||||
|
||||
return 0;
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Load the current GITD position into a slave_connection_state, for use when
|
||||
connecting to a master server with GTID.
|
||||
|
||||
If the flag use_binlog is true, then the contents of the binary log (if
|
||||
enabled) is merged into the current GTID state (master_use_gtid=current_pos).
|
||||
*/
|
||||
int
|
||||
rpl_load_gtid_state(slave_connection_state *state, bool use_binlog)
|
||||
{
|
||||
int err;
|
||||
rpl_gtid *gtid_list= NULL;
|
||||
uint32 num_gtids= 0;
|
||||
|
||||
if (use_binlog && opt_bin_log &&
|
||||
(err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids)))
|
||||
return err;
|
||||
|
||||
err= state->load(&rpl_global_gtid_slave_state, gtid_list, num_gtids);
|
||||
my_free(gtid_list);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user