mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
MDEV-26: Global transaction ID.
Implement START SLAVE UNTIL master_gtid_pos = "<GTID position>". Add test cases, including a test showing how to use this to promote a new master among a set of slaves.
This commit is contained in:
589
sql/sql_repl.cc
589
sql/sql_repl.cc
@@ -30,6 +30,14 @@
|
||||
#include "rpl_handler.h"
|
||||
#include "debug_sync.h"
|
||||
|
||||
|
||||
enum enum_gtid_until_state {
|
||||
GTID_UNTIL_NOT_DONE,
|
||||
GTID_UNTIL_STOP_AFTER_STANDALONE,
|
||||
GTID_UNTIL_STOP_AFTER_TRANSACTION
|
||||
};
|
||||
|
||||
|
||||
int max_binlog_dump_events = 0; // unlimited
|
||||
my_bool opt_sporadic_binlog_dump_fail = 0;
|
||||
#ifndef DBUG_OFF
|
||||
@@ -38,6 +46,74 @@ static int binlog_dump_count = 0;
|
||||
|
||||
extern TYPELIB binlog_checksum_typelib;
|
||||
|
||||
|
||||
static int
|
||||
fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
|
||||
my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
|
||||
uint8 checksum_alg_arg)
|
||||
{
|
||||
char header[LOG_EVENT_HEADER_LEN];
|
||||
ulong event_len;
|
||||
|
||||
*do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
|
||||
checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
|
||||
|
||||
/*
|
||||
'when' (the timestamp) is set to 0 so that slave could distinguish between
|
||||
real and fake Rotate events (if necessary)
|
||||
*/
|
||||
memset(header, 0, 4);
|
||||
header[EVENT_TYPE_OFFSET] = (uchar)event_type;
|
||||
event_len= LOG_EVENT_HEADER_LEN + extra_len +
|
||||
(*do_checksum ? BINLOG_CHECKSUM_LEN : 0);
|
||||
int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
|
||||
int4store(header + EVENT_LEN_OFFSET, event_len);
|
||||
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
|
||||
// TODO: check what problems this may cause and fix them
|
||||
int4store(header + LOG_POS_OFFSET, 0);
|
||||
if (packet->append(header, sizeof(header)))
|
||||
{
|
||||
*errmsg= "Failed due to out-of-memory writing event";
|
||||
return -1;
|
||||
}
|
||||
if (*do_checksum)
|
||||
{
|
||||
*crc= my_checksum(0L, NULL, 0);
|
||||
*crc= my_checksum(*crc, (uchar*)header, sizeof(header));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg)
|
||||
{
|
||||
if (do_checksum)
|
||||
{
|
||||
char b[BINLOG_CHECKSUM_LEN];
|
||||
int4store(b, crc);
|
||||
if (packet->append(b, sizeof(b)))
|
||||
{
|
||||
*errmsg= "Failed due to out-of-memory writing event checksum";
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
fake_event_write(NET *net, String *packet, const char **errmsg)
|
||||
{
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
*errmsg = "failed on my_net_write()";
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
fake_rotate_event() builds a fake (=which does not exist physically in any
|
||||
binlog) Rotate event, which contains the name of the binlog we are going to
|
||||
@@ -61,59 +137,71 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
|
||||
uint8 checksum_alg_arg)
|
||||
{
|
||||
DBUG_ENTER("fake_rotate_event");
|
||||
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
|
||||
|
||||
/*
|
||||
this Rotate is to be sent with checksum if and only if
|
||||
slave's get_master_version_and_clock time handshake value
|
||||
of master's @@global.binlog_checksum was TRUE
|
||||
*/
|
||||
|
||||
my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
|
||||
checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
|
||||
|
||||
/*
|
||||
'when' (the timestamp) is set to 0 so that slave could distinguish between
|
||||
real and fake Rotate events (if necessary)
|
||||
*/
|
||||
memset(header, 0, 4);
|
||||
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
|
||||
|
||||
char buf[ROTATE_HEADER_LEN+100];
|
||||
my_bool do_checksum;
|
||||
int err;
|
||||
char* p = log_file_name+dirname_length(log_file_name);
|
||||
uint ident_len = (uint) strlen(p);
|
||||
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
|
||||
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
|
||||
int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
|
||||
int4store(header + EVENT_LEN_OFFSET, event_len);
|
||||
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
|
||||
ha_checksum crc;
|
||||
|
||||
// TODO: check what problems this may cause and fix them
|
||||
int4store(header + LOG_POS_OFFSET, 0);
|
||||
if ((err= fake_event_header(packet, ROTATE_EVENT,
|
||||
ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
|
||||
errmsg, checksum_alg_arg)))
|
||||
DBUG_RETURN(err);
|
||||
|
||||
packet->append(header, sizeof(header));
|
||||
int8store(buf+R_POS_OFFSET,position);
|
||||
packet->append(buf, ROTATE_HEADER_LEN);
|
||||
packet->append(p, ident_len);
|
||||
|
||||
if (do_checksum)
|
||||
{
|
||||
char b[BINLOG_CHECKSUM_LEN];
|
||||
ha_checksum crc= my_checksum(0L, NULL, 0);
|
||||
crc= my_checksum(crc, (uchar*)header, sizeof(header));
|
||||
crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
|
||||
crc= my_checksum(crc, (uchar*)p, ident_len);
|
||||
int4store(b, crc);
|
||||
packet->append(b, sizeof(b));
|
||||
}
|
||||
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
*errmsg = "failed on my_net_write()";
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
|
||||
(err= fake_event_write(net, packet, errmsg)))
|
||||
DBUG_RETURN(err);
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
static int fake_gtid_list_event(NET* net, String* packet,
|
||||
Gtid_list_log_event *glev, const char** errmsg,
|
||||
uint8 checksum_alg_arg)
|
||||
{
|
||||
my_bool do_checksum;
|
||||
int err;
|
||||
ha_checksum crc;
|
||||
char buf[128];
|
||||
String str(buf, sizeof(buf), system_charset_info);
|
||||
|
||||
str.length(0);
|
||||
if (glev->to_packet(&str))
|
||||
{
|
||||
*errmsg= "Failed due to out-of-memory writing Gtid_list event";
|
||||
return -1;
|
||||
}
|
||||
if ((err= fake_event_header(packet, GTID_LIST_EVENT,
|
||||
str.length(), &do_checksum, &crc,
|
||||
errmsg, checksum_alg_arg)))
|
||||
return err;
|
||||
|
||||
packet->append(str);
|
||||
if (do_checksum)
|
||||
{
|
||||
crc= my_checksum(crc, (uchar*)str.ptr(), str.length());
|
||||
}
|
||||
|
||||
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
|
||||
(err= fake_event_write(net, packet, errmsg)))
|
||||
return err;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Reset thread transmit packet buffer for event sending
|
||||
|
||||
@@ -526,6 +614,27 @@ get_slave_connect_state(THD *thd, String *out_str)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Get the value of the @slave_until_gtid user variable into the supplied
|
||||
String (this is the GTID position specified for START SLAVE UNTIL
|
||||
master_gtid_pos='xxx').
|
||||
|
||||
Returns false if error (ie. slave did not set the variable and is not doing
|
||||
START SLAVE UNTIL mater_gtid_pos='xxx'), true if success.
|
||||
*/
|
||||
static bool
|
||||
get_slave_until_gtid(THD *thd, String *out_str)
|
||||
{
|
||||
bool null_value;
|
||||
|
||||
const LEX_STRING name= { C_STRING_WITH_LEN("slave_until_gtid") };
|
||||
user_var_entry *entry=
|
||||
(user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
|
||||
name.length);
|
||||
return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Function prepares and sends repliation heartbeat event.
|
||||
|
||||
@@ -773,10 +882,10 @@ contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
|
||||
|
||||
static int
|
||||
check_slave_start_position(THD *thd, slave_connection_state *st,
|
||||
const char **errormsg, rpl_gtid *error_gtid)
|
||||
const char **errormsg, rpl_gtid *error_gtid,
|
||||
slave_connection_state *until_gtid_state)
|
||||
{
|
||||
uint32 i;
|
||||
bool found;
|
||||
int err;
|
||||
rpl_gtid **delete_list= NULL;
|
||||
uint32 delete_idx= 0;
|
||||
@@ -791,9 +900,9 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
|
||||
rpl_gtid master_replication_gtid;
|
||||
rpl_gtid start_gtid;
|
||||
|
||||
if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
|
||||
slave_gtid->server_id,
|
||||
&master_gtid)) &&
|
||||
if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
|
||||
slave_gtid->server_id,
|
||||
&master_gtid) &&
|
||||
master_gtid.seq_no >= slave_gtid->seq_no)
|
||||
continue;
|
||||
|
||||
@@ -814,6 +923,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
|
||||
slave_gtid->seq_no != master_replication_gtid.seq_no)
|
||||
{
|
||||
rpl_gtid domain_gtid;
|
||||
rpl_gtid *until_gtid;
|
||||
|
||||
if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
|
||||
&domain_gtid))
|
||||
@@ -832,6 +942,27 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
|
||||
++missing_domains;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (until_gtid_state &&
|
||||
( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
|
||||
(mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
|
||||
until_gtid->server_id,
|
||||
&master_gtid) &&
|
||||
master_gtid.seq_no >= until_gtid->seq_no)))
|
||||
{
|
||||
/*
|
||||
The slave requested to start from a position that is not (yet) in
|
||||
our binlog, but it also specified an UNTIL condition that _is_ in
|
||||
our binlog (or a missing UNTIL, which means stop at the very
|
||||
beginning). So the stop position is before the start position, and
|
||||
we just delete the entry from the UNTIL hash to mark that this
|
||||
domain has already reached the UNTIL condition.
|
||||
*/
|
||||
if(until_gtid)
|
||||
until_gtid_state->remove(until_gtid);
|
||||
continue;
|
||||
}
|
||||
|
||||
*errormsg= "Requested slave GTID state not found in binlog";
|
||||
*error_gtid= *slave_gtid;
|
||||
err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
|
||||
@@ -951,7 +1082,8 @@ end:
|
||||
the requested GTID that was already purged.
|
||||
*/
|
||||
static const char *
|
||||
gtid_find_binlog_file(slave_connection_state *state, char *out_name)
|
||||
gtid_find_binlog_file(slave_connection_state *state, char *out_name,
|
||||
slave_connection_state *until_gtid_state)
|
||||
{
|
||||
MEM_ROOT memroot;
|
||||
binlog_file_entry *list;
|
||||
@@ -1003,42 +1135,60 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name)
|
||||
|
||||
if (!glev || contains_all_slave_gtid(state, glev))
|
||||
{
|
||||
uint32 i;
|
||||
|
||||
strmake(out_name, buf, FN_REFLEN);
|
||||
|
||||
/*
|
||||
As a special case, we allow to start from binlog file N if the
|
||||
requested GTID is the last event (in the corresponding domain) in
|
||||
binlog file (N-1), but then we need to remove that GTID from the slave
|
||||
state, rather than skipping events waiting for it to turn up.
|
||||
*/
|
||||
for (i= 0; i < glev->count; ++i)
|
||||
if (glev)
|
||||
{
|
||||
const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
|
||||
if (!gtid)
|
||||
{
|
||||
/*
|
||||
contains_all_slave_gtid() returns false if there is any domain in
|
||||
Gtid_list_event which is not in the requested slave position.
|
||||
uint32 i;
|
||||
|
||||
We may delete a domain from the slave state inside this loop, but
|
||||
we only do this when it is the very last GTID logged for that
|
||||
domain in earlier binlogs, and then we can not encounter it in any
|
||||
further GTIDs in the Gtid_list.
|
||||
*/
|
||||
DBUG_ASSERT(0);
|
||||
continue;
|
||||
}
|
||||
if (gtid->server_id == glev->list[i].server_id &&
|
||||
gtid->seq_no == glev->list[i].seq_no)
|
||||
/*
|
||||
As a special case, we allow to start from binlog file N if the
|
||||
requested GTID is the last event (in the corresponding domain) in
|
||||
binlog file (N-1), but then we need to remove that GTID from the slave
|
||||
state, rather than skipping events waiting for it to turn up.
|
||||
|
||||
If slave is doing START SLAVE UNTIL, check for any UNTIL conditions
|
||||
that are already included in a previous binlog file. Delete any such
|
||||
from the UNTIL hash, to mark that such domains have already reached
|
||||
their UNTIL condition.
|
||||
*/
|
||||
for (i= 0; i < glev->count; ++i)
|
||||
{
|
||||
/*
|
||||
The slave requested to start from the very beginning of this
|
||||
domain in this binlog file. So delete the entry from the state,
|
||||
we do not need to skip anything.
|
||||
*/
|
||||
state->remove(gtid);
|
||||
const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
|
||||
if (!gtid)
|
||||
{
|
||||
/*
|
||||
Contains_all_slave_gtid() returns false if there is any domain in
|
||||
Gtid_list_event which is not in the requested slave position.
|
||||
|
||||
We may delete a domain from the slave state inside this loop, but
|
||||
we only do this when it is the very last GTID logged for that
|
||||
domain in earlier binlogs, and then we can not encounter it in any
|
||||
further GTIDs in the Gtid_list.
|
||||
*/
|
||||
DBUG_ASSERT(0);
|
||||
} else if (gtid->server_id == glev->list[i].server_id &&
|
||||
gtid->seq_no == glev->list[i].seq_no)
|
||||
{
|
||||
/*
|
||||
The slave requested to start from the very beginning of this
|
||||
domain in this binlog file. So delete the entry from the state,
|
||||
we do not need to skip anything.
|
||||
*/
|
||||
state->remove(gtid);
|
||||
}
|
||||
|
||||
if (until_gtid_state &&
|
||||
(gtid= until_gtid_state->find(glev->list[i].domain_id)) &&
|
||||
gtid->server_id == glev->list[i].server_id &&
|
||||
gtid->seq_no <= glev->list[i].seq_no)
|
||||
{
|
||||
/*
|
||||
We've already reached the stop position in UNTIL for this domain,
|
||||
since it is before the start position.
|
||||
*/
|
||||
until_gtid_state->remove(gtid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1163,6 +1313,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
|
||||
goto end;
|
||||
}
|
||||
status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
|
||||
current_checksum_alg,
|
||||
>id_list, &list_len);
|
||||
if (status)
|
||||
{
|
||||
@@ -1256,6 +1407,49 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
|
||||
enum_gtid_until_state gtid_until_group,
|
||||
Log_event_type event_type, uint8 current_checksum_alg,
|
||||
ushort flags, const char **errmsg,
|
||||
rpl_binlog_state *until_binlog_state)
|
||||
{
|
||||
switch (gtid_until_group)
|
||||
{
|
||||
case GTID_UNTIL_NOT_DONE:
|
||||
return false;
|
||||
case GTID_UNTIL_STOP_AFTER_STANDALONE:
|
||||
if (Log_event::is_part_of_group(event_type))
|
||||
return false;
|
||||
break;
|
||||
case GTID_UNTIL_STOP_AFTER_TRANSACTION:
|
||||
if (event_type != XID_EVENT &&
|
||||
(event_type != QUERY_EVENT ||
|
||||
!Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset,
|
||||
packet->length()-*ev_offset,
|
||||
current_checksum_alg)))
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
The last event group has been sent, now the START SLAVE UNTIL condition
|
||||
has been reached.
|
||||
|
||||
Send a last fake Gtid_list_log_event with a flag set to mark that we
|
||||
stop due to UNTIL condition.
|
||||
*/
|
||||
if (reset_transmit_packet(thd, flags, ev_offset, errmsg))
|
||||
return true;
|
||||
Gtid_list_log_event glev(until_binlog_state,
|
||||
Gtid_list_log_event::FLAG_UNTIL_REACHED);
|
||||
if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg))
|
||||
return true;
|
||||
*errmsg= NULL;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Helper function for mysql_binlog_send() to write an event down the slave
|
||||
connection.
|
||||
@@ -1268,37 +1462,113 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
||||
IO_CACHE *log, int mariadb_slave_capability,
|
||||
ulong ev_offset, uint8 current_checksum_alg,
|
||||
bool using_gtid_state, slave_connection_state *gtid_state,
|
||||
enum_gtid_skip_type *gtid_skip_group)
|
||||
enum_gtid_skip_type *gtid_skip_group,
|
||||
slave_connection_state *until_gtid_state,
|
||||
enum_gtid_until_state *gtid_until_group,
|
||||
rpl_binlog_state *until_binlog_state)
|
||||
{
|
||||
my_off_t pos;
|
||||
size_t len= packet->length();
|
||||
|
||||
/* Skip GTID event groups until we reach slave position within a domain_id. */
|
||||
if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0)
|
||||
if (event_type == GTID_LIST_EVENT && using_gtid_state &&
|
||||
(gtid_state->count() > 0 || until_gtid_state))
|
||||
{
|
||||
rpl_gtid *gtid_list;
|
||||
uint32 list_len;
|
||||
bool err;
|
||||
|
||||
if (ev_offset > len ||
|
||||
Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
|
||||
current_checksum_alg,
|
||||
>id_list, &list_len))
|
||||
return "Failed to read Gtid_list_log_event: corrupt binlog";
|
||||
err= until_binlog_state->load(gtid_list, list_len);
|
||||
my_free(gtid_list);
|
||||
if (err)
|
||||
return "Failed in internal GTID book-keeping: Out of memory";
|
||||
}
|
||||
|
||||
/* Skip GTID event groups until we reach slave position within a domain_id. */
|
||||
if (event_type == GTID_EVENT && using_gtid_state)
|
||||
{
|
||||
uint32 server_id, domain_id;
|
||||
uint64 seq_no;
|
||||
uchar flags2;
|
||||
rpl_gtid *gtid;
|
||||
|
||||
if (ev_offset > len ||
|
||||
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
|
||||
current_checksum_alg,
|
||||
&domain_id, &server_id, &seq_no, &flags2))
|
||||
return "Failed to read Gtid_log_event: corrupt binlog";
|
||||
gtid= gtid_state->find(domain_id);
|
||||
if (gtid != NULL)
|
||||
if (gtid_state->count() > 0 || until_gtid_state)
|
||||
{
|
||||
/* Skip this event group if we have not yet reached slave start pos. */
|
||||
if (server_id != gtid->server_id || seq_no <= gtid->seq_no)
|
||||
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
|
||||
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
|
||||
/*
|
||||
Delete this entry if we have reached slave start position (so we will
|
||||
not skip subsequent events and won't have to look them up and check).
|
||||
*/
|
||||
if (server_id == gtid->server_id && seq_no >= gtid->seq_no)
|
||||
gtid_state->remove(gtid);
|
||||
rpl_gtid event_gtid;
|
||||
|
||||
if (ev_offset > len ||
|
||||
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
|
||||
current_checksum_alg,
|
||||
&event_gtid.domain_id, &event_gtid.server_id,
|
||||
&event_gtid.seq_no, &flags2))
|
||||
return "Failed to read Gtid_log_event: corrupt binlog";
|
||||
|
||||
if (until_binlog_state->update(&event_gtid))
|
||||
return "Failed in internal GTID book-keeping: Out of memory";
|
||||
|
||||
if (gtid_state->count() > 0)
|
||||
{
|
||||
gtid= gtid_state->find(event_gtid.domain_id);
|
||||
if (gtid != NULL)
|
||||
{
|
||||
/* Skip this event group if we have not yet reached slave start pos. */
|
||||
if (event_gtid.server_id != gtid->server_id ||
|
||||
event_gtid.seq_no <= gtid->seq_no)
|
||||
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
|
||||
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
|
||||
/*
|
||||
Delete this entry if we have reached slave start position (so we will
|
||||
not skip subsequent events and won't have to look them up and check).
|
||||
*/
|
||||
if (event_gtid.server_id == gtid->server_id &&
|
||||
event_gtid.seq_no >= gtid->seq_no)
|
||||
gtid_state->remove(gtid);
|
||||
}
|
||||
}
|
||||
|
||||
if (until_gtid_state)
|
||||
{
|
||||
gtid= until_gtid_state->find(event_gtid.domain_id);
|
||||
if (gtid == NULL)
|
||||
{
|
||||
/*
|
||||
This domain already reached the START SLAVE UNTIL stop condition,
|
||||
so skip this event group.
|
||||
*/
|
||||
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
|
||||
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
|
||||
}
|
||||
else if (event_gtid.server_id == gtid->server_id &&
|
||||
event_gtid.seq_no >= gtid->seq_no)
|
||||
{
|
||||
/*
|
||||
We have reached the stop condition.
|
||||
Delete this domain_id from the hash, so we will skip all further
|
||||
events in this domain and eventually stop when all domains are
|
||||
done.
|
||||
*/
|
||||
uint64 until_seq_no= gtid->seq_no;
|
||||
until_gtid_state->remove(gtid);
|
||||
if (until_gtid_state->count() == 0)
|
||||
*gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
|
||||
GTID_UNTIL_STOP_AFTER_STANDALONE :
|
||||
GTID_UNTIL_STOP_AFTER_TRANSACTION);
|
||||
if (event_gtid.seq_no > until_seq_no)
|
||||
{
|
||||
/*
|
||||
The GTID in START SLAVE UNTIL condition is missing in our binlog.
|
||||
This should normally not happen (user error), but since we can be
|
||||
sure that we are now beyond the position that the UNTIL condition
|
||||
should be in, we can just stop now. And we also need to skip this
|
||||
event group (as it is beyond the UNTIL condition).
|
||||
*/
|
||||
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
|
||||
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1446,6 +1716,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
||||
return NULL; /* Success */
|
||||
}
|
||||
|
||||
|
||||
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
ushort flags)
|
||||
{
|
||||
@@ -1465,12 +1736,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
mysql_mutex_t *log_lock;
|
||||
mysql_cond_t *log_cond;
|
||||
int mariadb_slave_capability;
|
||||
char str_buf[256];
|
||||
char str_buf[128];
|
||||
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
|
||||
bool using_gtid_state;
|
||||
slave_connection_state gtid_state, return_gtid_state;
|
||||
char str_buf2[128];
|
||||
String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
|
||||
slave_connection_state gtid_state, until_gtid_state_obj;
|
||||
slave_connection_state *until_gtid_state= NULL;
|
||||
rpl_gtid error_gtid;
|
||||
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
|
||||
enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE;
|
||||
rpl_binlog_state until_binlog_state;
|
||||
|
||||
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
|
||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||
@@ -1502,6 +1778,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
connect_gtid_state.length(0);
|
||||
using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
|
||||
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
|
||||
if (using_gtid_state &&
|
||||
get_slave_until_gtid(thd, &slave_until_gtid_str))
|
||||
until_gtid_state= &until_gtid_state_obj;
|
||||
|
||||
/*
|
||||
We want to corrupt the first event, in Log_event::read_log_event().
|
||||
But we do not want the corruption to happen early, eg. when client does
|
||||
@@ -1557,13 +1837,23 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
if (until_gtid_state &&
|
||||
until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
|
||||
slave_until_gtid_str.length()))
|
||||
{
|
||||
errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
|
||||
"position sent from slave";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
if ((error= check_slave_start_position(thd, >id_state, &errmsg,
|
||||
&error_gtid)))
|
||||
&error_gtid, until_gtid_state)))
|
||||
{
|
||||
my_errno= error;
|
||||
goto err;
|
||||
}
|
||||
if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name)))
|
||||
if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name,
|
||||
until_gtid_state)))
|
||||
{
|
||||
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
goto err;
|
||||
@@ -1753,6 +2043,15 @@ impossible position";
|
||||
/* The Format_description_log_event event will be found naturally. */
|
||||
}
|
||||
|
||||
/*
|
||||
Handle the case of START SLAVE UNTIL with an UNTIL condition already
|
||||
fulfilled at the start position.
|
||||
|
||||
We will send one event, the format_description, and then stop.
|
||||
*/
|
||||
if (until_gtid_state && until_gtid_state->count() == 0)
|
||||
gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
|
||||
|
||||
/* seek to the requested position, to start the requested dump */
|
||||
my_b_seek(&log, pos); // Seek will done on next read
|
||||
|
||||
@@ -1833,12 +2132,26 @@ impossible position";
|
||||
log_file_name, &log,
|
||||
mariadb_slave_capability, ev_offset,
|
||||
current_checksum_alg, using_gtid_state,
|
||||
>id_state, >id_skip_group)))
|
||||
>id_state, >id_skip_group,
|
||||
until_gtid_state, >id_until_group,
|
||||
&until_binlog_state)))
|
||||
{
|
||||
errmsg= tmp_msg;
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
if (until_gtid_state &&
|
||||
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
|
||||
event_type, current_checksum_alg, flags, &errmsg,
|
||||
&until_binlog_state))
|
||||
{
|
||||
if (errmsg)
|
||||
{
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
goto end;
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
|
||||
{
|
||||
@@ -1992,18 +2305,34 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (read_packet &&
|
||||
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
||||
log_file_name, &log,
|
||||
mariadb_slave_capability, ev_offset,
|
||||
current_checksum_alg,
|
||||
using_gtid_state, >id_state,
|
||||
>id_skip_group)))
|
||||
if (read_packet)
|
||||
{
|
||||
errmsg= tmp_msg;
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
||||
log_file_name, &log,
|
||||
mariadb_slave_capability, ev_offset,
|
||||
current_checksum_alg,
|
||||
using_gtid_state, >id_state,
|
||||
>id_skip_group, until_gtid_state,
|
||||
>id_until_group, &until_binlog_state)))
|
||||
{
|
||||
errmsg= tmp_msg;
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
if (
|
||||
until_gtid_state &&
|
||||
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
|
||||
event_type, current_checksum_alg, flags, &errmsg,
|
||||
&until_binlog_state))
|
||||
{
|
||||
if (errmsg)
|
||||
{
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
log.error=0;
|
||||
}
|
||||
@@ -2167,6 +2496,26 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
lock_slave_threads(mi); // this allows us to cleanly read slave_running
|
||||
// Get a mask of _stopped_ threads
|
||||
init_thread_mask(&thread_mask,mi,1 /* inverse */);
|
||||
|
||||
if (thd->lex->mi.gtid_pos_str.str)
|
||||
{
|
||||
if (thread_mask != (SLAVE_IO|SLAVE_SQL))
|
||||
{
|
||||
slave_errno= ER_SLAVE_WAS_RUNNING;
|
||||
goto err;
|
||||
}
|
||||
if (thd->lex->slave_thd_opt)
|
||||
{
|
||||
slave_errno= ER_BAD_SLAVE_UNTIL_COND;
|
||||
goto err;
|
||||
}
|
||||
if (!mi->using_gtid)
|
||||
{
|
||||
slave_errno= ER_UNTIL_REQUIRES_USING_GTID;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Below we will start all stopped threads. But if the user wants to
|
||||
start only one thread, do as if the other thread was running (as we
|
||||
@@ -2213,10 +2562,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
|
||||
sizeof(mi->rli.until_log_name)-1);
|
||||
}
|
||||
else if (thd->lex->mi.gtid_pos_str.str)
|
||||
{
|
||||
if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str,
|
||||
thd->lex->mi.gtid_pos_str.length))
|
||||
{
|
||||
slave_errno= ER_INCORRECT_GTID_STATE;
|
||||
mysql_mutex_unlock(&mi->rli.data_lock);
|
||||
goto err;
|
||||
}
|
||||
mi->rli.until_condition= Relay_log_info::UNTIL_GTID;
|
||||
}
|
||||
else
|
||||
mi->rli.clear_until_condition();
|
||||
|
||||
if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
|
||||
if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS ||
|
||||
mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS)
|
||||
{
|
||||
/* Preparing members for effective until condition checking */
|
||||
const char *p= fn_ext(mi->rli.until_log_name);
|
||||
@@ -2239,7 +2600,10 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
/* mark the cached result of the UNTIL comparison as "undefined" */
|
||||
mi->rli.until_log_names_cmp_result=
|
||||
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
|
||||
}
|
||||
|
||||
if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
|
||||
{
|
||||
/* Issuing warning then started without --skip-slave-start */
|
||||
if (!opt_skip_slave_start)
|
||||
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
|
||||
@@ -2271,6 +2635,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
ER(ER_SLAVE_WAS_RUNNING));
|
||||
}
|
||||
|
||||
err:
|
||||
unlock_slave_threads(mi);
|
||||
|
||||
if (slave_errno)
|
||||
|
Reference in New Issue
Block a user