mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
MDEV-34705: Inplement starting from a specific GTID position
To find the target position, we first loop backwards over binlog files, reading the initial GTID state written at the start to find the file to start in. We then binary search on the differential GTID states written every --innodb-binlog-state-interval bytes. This patch does only minimal changes to the dump thread code in sql_repl.cc to be able to send out binlog data to the client. Some re-factoring/cleanup should be done in a follow-up patch to more cleanly separate the two code paths, avoid a lot of if-statements and make the binlog-in-engine code path free of much of the cruft from the legacy binlog implementation. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
237
sql/sql_repl.cc
237
sql/sql_repl.cc
@@ -304,6 +304,52 @@ static int fake_gtid_list_event(binlog_send_info *info,
|
||||
}
|
||||
|
||||
|
||||
static int fake_format_description_event(binlog_send_info *info,
|
||||
Format_description_log_event *fdev,
|
||||
const char **errmsg,
|
||||
uint32 current_pos)
|
||||
{
|
||||
my_bool do_checksum;
|
||||
int err;
|
||||
ha_checksum crc;
|
||||
char buf[320];
|
||||
String str(buf, sizeof(buf), system_charset_info);
|
||||
String* packet= info->packet;
|
||||
|
||||
str.length(0);
|
||||
fdev->dont_set_created= true;
|
||||
if (fdev->to_packet(&str))
|
||||
{
|
||||
info->error= ER_UNKNOWN_ERROR;
|
||||
*errmsg= "Failed due to out-of-memory writing Format_description event";
|
||||
return -1;
|
||||
}
|
||||
if ((err= fake_event_header(packet, FORMAT_DESCRIPTION_EVENT,
|
||||
str.length(), &do_checksum, &crc,
|
||||
errmsg, BINLOG_CHECKSUM_ALG_CRC32,
|
||||
current_pos)))
|
||||
{
|
||||
info->error= ER_UNKNOWN_ERROR;
|
||||
return err;
|
||||
}
|
||||
|
||||
packet->append(str);
|
||||
if (do_checksum)
|
||||
{
|
||||
crc= my_checksum(crc, (uchar*)str.ptr(), str.length());
|
||||
}
|
||||
|
||||
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
|
||||
(err= fake_event_write(info->net, packet, errmsg)))
|
||||
{
|
||||
info->error= ER_UNKNOWN_ERROR;
|
||||
return err;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Reset thread transmit packet buffer for event sending
|
||||
|
||||
@@ -1317,6 +1363,11 @@ end:
|
||||
}
|
||||
|
||||
|
||||
static const char *gtid_too_old_errmsg=
|
||||
"Could not find GTID state requested by slave in any binlog "
|
||||
"files. Probably the slave state is too old and required binlog files "
|
||||
"have been purged.";
|
||||
|
||||
/*
|
||||
Helper function for gtid_find_binlog_pos() below.
|
||||
Check a binlog file against a slave position. Use a GTID index if present.
|
||||
@@ -1410,6 +1461,52 @@ end:
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Do some checks on each GTID in the starting GTID state found when searching
|
||||
for the starting GTID position in the binlog.
|
||||
*/
|
||||
static void
|
||||
found_pos_check_gtid(const rpl_gtid *found_gtid, slave_connection_state *state,
|
||||
slave_connection_state *until_gtid_state)
|
||||
{
|
||||
const rpl_gtid *gtid= state->find(found_gtid->domain_id);
|
||||
if (!gtid)
|
||||
{
|
||||
/*
|
||||
Contains_all_slave_gtid() returns false if there is any domain in
|
||||
Gtid_list_event which is not in the requested slave position.
|
||||
|
||||
We may delete a domain from the slave state inside this loop, but
|
||||
we only do this when it is the very last GTID logged for that
|
||||
domain in earlier binlogs, and then we can not encounter it in any
|
||||
further GTIDs in the Gtid_list.
|
||||
*/
|
||||
DBUG_ASSERT(0);
|
||||
} else if (gtid->server_id == found_gtid->server_id &&
|
||||
gtid->seq_no == found_gtid->seq_no)
|
||||
{
|
||||
/*
|
||||
The slave requested to start from the very beginning of this
|
||||
domain in this binlog file. So delete the entry from the state,
|
||||
we do not need to skip anything.
|
||||
*/
|
||||
state->remove(gtid);
|
||||
}
|
||||
|
||||
if (until_gtid_state &&
|
||||
(gtid= until_gtid_state->find(found_gtid->domain_id)) &&
|
||||
gtid->server_id == found_gtid->server_id &&
|
||||
gtid->seq_no <= found_gtid->seq_no)
|
||||
{
|
||||
/*
|
||||
We've already reached the stop position in UNTIL for this domain,
|
||||
since it is before the start position.
|
||||
*/
|
||||
until_gtid_state->remove(gtid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Find the name of the binlog file to start reading for a slave that connects
|
||||
using GTID state.
|
||||
@@ -1506,43 +1603,7 @@ gtid_find_binlog_pos(slave_connection_state *state, char *out_name,
|
||||
their UNTIL condition.
|
||||
*/
|
||||
for (i= 0; i < count; ++i)
|
||||
{
|
||||
const rpl_gtid *gtid= state->find(gtids[i].domain_id);
|
||||
if (!gtid)
|
||||
{
|
||||
/*
|
||||
Contains_all_slave_gtid() returns false if there is any domain in
|
||||
Gtid_list_event which is not in the requested slave position.
|
||||
|
||||
We may delete a domain from the slave state inside this loop, but
|
||||
we only do this when it is the very last GTID logged for that
|
||||
domain in earlier binlogs, and then we can not encounter it in any
|
||||
further GTIDs in the Gtid_list.
|
||||
*/
|
||||
DBUG_ASSERT(0);
|
||||
} else if (gtid->server_id == gtids[i].server_id &&
|
||||
gtid->seq_no == gtids[i].seq_no)
|
||||
{
|
||||
/*
|
||||
The slave requested to start from the very beginning of this
|
||||
domain in this binlog file. So delete the entry from the state,
|
||||
we do not need to skip anything.
|
||||
*/
|
||||
state->remove(gtid);
|
||||
}
|
||||
|
||||
if (until_gtid_state &&
|
||||
(gtid= until_gtid_state->find(gtids[i].domain_id)) &&
|
||||
gtid->server_id == gtids[i].server_id &&
|
||||
gtid->seq_no <= gtids[i].seq_no)
|
||||
{
|
||||
/*
|
||||
We've already reached the stop position in UNTIL for this domain,
|
||||
since it is before the start position.
|
||||
*/
|
||||
until_gtid_state->remove(gtid);
|
||||
}
|
||||
}
|
||||
found_pos_check_gtid(&(gtids[i]), state, until_gtid_state);
|
||||
}
|
||||
|
||||
goto end;
|
||||
@@ -1551,9 +1612,7 @@ gtid_find_binlog_pos(slave_connection_state *state, char *out_name,
|
||||
}
|
||||
|
||||
/* We reached the end without finding anything. */
|
||||
errormsg= "Could not find GTID state requested by slave in any binlog "
|
||||
"files. Probably the slave state is too old and required binlog files "
|
||||
"have been purged.";
|
||||
errormsg= gtid_too_old_errmsg;
|
||||
|
||||
end:
|
||||
if (glev)
|
||||
@@ -1567,6 +1626,26 @@ end:
|
||||
}
|
||||
|
||||
|
||||
static const char *
|
||||
gtid_find_engine_pos(handler_binlog_reader *binlog_reader,
|
||||
slave_connection_state *pos,
|
||||
slave_connection_state *until_gtid_pos,
|
||||
rpl_binlog_state *until_binlog_state)
|
||||
{
|
||||
int res= binlog_reader->init_gtid_pos(pos, until_binlog_state);
|
||||
if (res < 0)
|
||||
return "Error while looking up GTID position in engine binlog";
|
||||
if (res == 0)
|
||||
return gtid_too_old_errmsg;
|
||||
until_binlog_state->iterate(
|
||||
[pos, until_gtid_pos] (const rpl_gtid *gtid) -> bool {
|
||||
found_pos_check_gtid(gtid, pos, until_gtid_pos);
|
||||
return false;
|
||||
});
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
gtid_index_lookup_pos(const char *name, uint32 offset, uint32 *out_start_seek,
|
||||
slave_connection_state *out_gtid_state)
|
||||
@@ -2342,6 +2421,8 @@ static int init_binlog_sender(binlog_send_info *info,
|
||||
{
|
||||
LEX_CSTRING *engine_name= hton_name(opt_binlog_engine_hton);
|
||||
my_error(ER_CANNOT_INIT_ENGINE_BINLOG_READER, MYF(0), engine_name->str);
|
||||
info->errmsg= "Error while initializing engine binlog reader";
|
||||
info->error= ER_CANNOT_INIT_ENGINE_BINLOG_READER;
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -2369,6 +2450,14 @@ static int init_binlog_sender(binlog_send_info *info,
|
||||
info->is_until_before_gtids= get_slave_gtid_until_before_gtids(thd);
|
||||
}
|
||||
}
|
||||
else if (opt_binlog_engine_hton)
|
||||
{
|
||||
my_error(ER_ENGINE_BINLOG_REQUIRES_GTID, MYF(0));
|
||||
info->errmsg=
|
||||
"Slave must enable GTID mode when master uses --binlog-storage-engine";
|
||||
info->error= ER_ENGINE_BINLOG_REQUIRES_GTID;
|
||||
return 1;
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
|
||||
{
|
||||
@@ -2431,14 +2520,31 @@ static int init_binlog_sender(binlog_send_info *info,
|
||||
info->error= error;
|
||||
return 1;
|
||||
}
|
||||
if ((info->errmsg= gtid_find_binlog_pos(&info->gtid_state,
|
||||
search_file_name,
|
||||
info->until_gtid_state,
|
||||
&info->until_binlog_state,
|
||||
&found_in_index, &start_seek)))
|
||||
|
||||
if (opt_binlog_engine_hton)
|
||||
{
|
||||
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
return 1;
|
||||
if ((info->errmsg= gtid_find_engine_pos(info->engine_binlog_reader,
|
||||
&info->gtid_state,
|
||||
info->until_gtid_state,
|
||||
&info->until_binlog_state)))
|
||||
{
|
||||
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
return 1;
|
||||
}
|
||||
found_in_index= true;
|
||||
start_seek= 0; /* Not used when binlog implemented in engine. */
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((info->errmsg= gtid_find_binlog_pos(&info->gtid_state,
|
||||
search_file_name,
|
||||
info->until_gtid_state,
|
||||
&info->until_binlog_state,
|
||||
&found_in_index, &start_seek)))
|
||||
{
|
||||
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (found_in_index)
|
||||
@@ -2461,7 +2567,8 @@ static int init_binlog_sender(binlog_send_info *info,
|
||||
}
|
||||
linfo->index_file_offset= 0;
|
||||
|
||||
if (mysql_bin_log.find_log_pos(linfo, name, 1))
|
||||
if (!opt_binlog_engine_hton &&
|
||||
mysql_bin_log.find_log_pos(linfo, name, 1))
|
||||
{
|
||||
info->errmsg= "Could not find first log file name in binary "
|
||||
"log index file";
|
||||
@@ -2474,7 +2581,8 @@ static int init_binlog_sender(binlog_send_info *info,
|
||||
// note: publish that we use file, before we open it
|
||||
thd->set_current_linfo(linfo);
|
||||
|
||||
if (check_start_offset(info, linfo->log_file_name, *pos))
|
||||
if (!opt_binlog_engine_hton &&
|
||||
check_start_offset(info, linfo->log_file_name, *pos))
|
||||
return 1;
|
||||
|
||||
if (*pos > BIN_LOG_HEADER_SIZE)
|
||||
@@ -2918,11 +3026,13 @@ static int send_events(binlog_send_info *info, IO_CACHE* log, LOG_INFO* linfo,
|
||||
ulong ev_offset;
|
||||
|
||||
String *packet= info->packet;
|
||||
if (!opt_binlog_engine_hton) {
|
||||
linfo->pos= my_b_tell(log);
|
||||
info->last_pos= my_b_tell(log);
|
||||
|
||||
log->end_of_file= end_pos;
|
||||
while (linfo->pos < end_pos)
|
||||
}
|
||||
while (opt_binlog_engine_hton || linfo->pos < end_pos)
|
||||
{
|
||||
if (should_stop(info))
|
||||
return 0;
|
||||
@@ -3052,7 +3162,8 @@ static int send_one_binlog_file(binlog_send_info *info,
|
||||
mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
|
||||
|
||||
/* seek to the requested position, to start the requested dump */
|
||||
if (start_pos != BIN_LOG_HEADER_SIZE)
|
||||
if (!opt_binlog_engine_hton &&
|
||||
start_pos != BIN_LOG_HEADER_SIZE)
|
||||
{
|
||||
my_b_seek(log, start_pos);
|
||||
linfo->pos= start_pos;
|
||||
@@ -3066,7 +3177,8 @@ static int send_one_binlog_file(binlog_send_info *info,
|
||||
* get end pos of current log file, this function
|
||||
* will wait if there is nothing available
|
||||
*/
|
||||
my_off_t end_pos= get_binlog_end_pos(info, log, linfo);
|
||||
my_off_t end_pos= opt_binlog_engine_hton ?
|
||||
UINT32_MAX : get_binlog_end_pos(info, log, linfo);
|
||||
if (end_pos <= 1)
|
||||
{
|
||||
/** end of file or error */
|
||||
@@ -3141,6 +3253,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
|
||||
while (!should_stop(info))
|
||||
{
|
||||
/* ToDo: do some re-factoring/cleanup so that the code path for binlog-in-engine becomes separate from the legacy code path, sharing common code but avoiding much of the old cruft. */
|
||||
if (opt_binlog_engine_hton) {
|
||||
/* Build a legacy Format_description event for slave. */
|
||||
if (!(info->fdev= new Format_description_log_event
|
||||
(4, 0, BINLOG_CHECKSUM_ALG_OFF)))
|
||||
{
|
||||
info->errmsg= "Out of memory initializing format_description event";
|
||||
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
goto err;
|
||||
}
|
||||
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) ||
|
||||
fake_format_description_event(info, info->fdev, &info->errmsg, pos))
|
||||
{
|
||||
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
Tell the client about the log name with a fake Rotate event;
|
||||
this is needed even if we also send a Format_description_log_event
|
||||
@@ -3192,6 +3323,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
goto err;
|
||||
}
|
||||
|
||||
} /* !opt_binlog_engine_hton */
|
||||
|
||||
/*
|
||||
We want to corrupt the first event that will be sent to the slave.
|
||||
But we do not want the corruption to happen early, eg. when client does
|
||||
|
Reference in New Issue
Block a user