1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

MDEV-13073 This part merges the Ali semisync related changes

and specifically the ack receiving functionality.
Semisync is turned to be static instead of plugin so its functions
are invoked at the same points as RUN_HOOKS.
The RUN_HOOKS and the observer interface remain to be removed by later
patch.

Todo:
  React on killed status by repl_semisync_master.wait_after_sync(). Currently
  Repl_semi_sync_master::commit_trx does not check the killed status.

  There were few bugfixes found that are present in mysql and its unclear
  whether/how they are covered. Those include:

  Bug#15985893: GTID SKIPPED EVENTS ON MASTER CAUSE SEMI SYNC TIME-OUTS
  Bug#17932935 CALLING IS_SEMI_SYNC_SLAVE() IN EACH FUNCTION CALL
                 HAS BAD PERFORMANCE
  Bug#20574628: SEMI-SYNC REPLICATION PERFORMANCE DEGRADES WITH A HIGH NUMBER OF THREADS
This commit is contained in:
Andrei Elkin
2017-11-22 17:10:34 +02:00
committed by Monty
parent abceaa7542
commit e972125f11
32 changed files with 1197 additions and 695 deletions

View File

@ -30,7 +30,8 @@
#include <my_dir.h>
#include "rpl_handler.h"
#include "debug_sync.h"
#include "log.h" // get_gtid_list_event
#include "semisync_master.h"
#include "semisync_slave.h"
enum enum_gtid_until_state {
GTID_UNTIL_NOT_DONE,
@ -160,6 +161,7 @@ struct binlog_send_info {
bool clear_initial_log_pos;
bool should_stop;
size_t dirlen;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
@ -315,14 +317,30 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags,
if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
{
/* RUN_HOOK() must return zero when thd->semi_sync_slave */
DBUG_ASSERT(!info->thd->semi_sync_slave);
info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed to run hook 'reserve_header'";
ret= 1;
}
if (info->thd->semi_sync_slave)
{
repl_semisync_master.reserveSyncHeader(packet);
}
*ev_offset= packet->length();
return ret;
}
inline bool is_semi_sync_slave()
{
int null_value;
long long val= 0;
get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
return val;
}
static int send_file(THD *thd)
{
NET* net = &thd->net;
@ -875,73 +893,6 @@ get_binlog_list(MEM_ROOT *memroot)
DBUG_RETURN(current_list);
}
/*
Find the Gtid_list_log_event at the start of a binlog.
NULL for ok, non-NULL error message for error.
If ok, then the event is returned in *out_gtid_list. This can be NULL if we
get back to binlogs written by old server version without GTID support. If
so, it means we have reached the point to start from, as no GTID events can
exist in earlier binlogs.
*/
static const char *
get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
{
Format_description_log_event init_fdle(BINLOG_VERSION);
Format_description_log_event *fdle;
Log_event *ev;
const char *errormsg = NULL;
*out_gtid_list= NULL;
if (!(ev= Log_event::read_log_event(cache, &init_fdle,
opt_master_verify_checksum)) ||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
if (ev)
delete ev;
return "Could not read format description log event while looking for "
"GTID position in binlog";
}
fdle= static_cast<Format_description_log_event *>(ev);
for (;;)
{
Log_event_type typ;
ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum);
if (!ev)
{
errormsg= "Could not read GTID list event while looking for GTID "
"position in binlog";
break;
}
typ= ev->get_type_code();
if (typ == GTID_LIST_EVENT)
break; /* Done, found it */
if (typ == START_ENCRYPTION_EVENT)
{
if (fdle->start_decryption((Start_encryption_log_event*) ev))
errormsg= "Could not set up decryption for binlog.";
}
delete ev;
if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
typ == FORMAT_DESCRIPTION_EVENT || typ == START_ENCRYPTION_EVENT)
continue; /* Continue looking */
/* We did not find any Gtid_list_log_event, must be old binlog. */
ev= NULL;
break;
}
delete fdle;
*out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
return errormsg;
}
/*
Check if every GTID requested by the slave is contained in this (or a later)
@ -1673,6 +1624,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
slave_connection_state *gtid_state= &info->gtid_state;
slave_connection_state *until_gtid_state= info->until_gtid_state;
bool need_sync= false;
if (event_type == GTID_LIST_EVENT &&
info->using_gtid_state && until_gtid_state)
@ -1984,7 +1936,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
(info->thd, info->flags, packet, info->log_file_name, pos)))
(info->thd, info->flags, packet, info->log_file_name, pos)) ||
repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(),
info->log_file_name + info->dirlen,
pos, &need_sync))
{
info->error= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
@ -2012,6 +1967,8 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
}
if (need_sync)
repl_semisync_master.flushNet(info->thd, packet->c_ptr());
return NULL; /* Success */
}
@ -2748,7 +2705,7 @@ static int send_one_binlog_file(binlog_send_info *info,
/** end of file or error */
return (int)end_pos;
}
info->dirlen= dirname_length(info->log_file_name);
/**
* send events from current position up to end_pos
*/
@ -2770,6 +2727,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
binlog_send_info *info= &infoobj;
bool has_transmit_started= false;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
@ -2792,6 +2750,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
info->error= ER_UNKNOWN_ERROR;
goto err;
}
has_transmit_started= true;
/* Check if the dump thread is created by a slave with semisync enabled. */
thd->semi_sync_slave = is_semi_sync_slave();
repl_semisync_master.dump_start(thd, log_ident, pos);
/*
heartbeat_period from @master_heartbeat_period user variable
@ -2908,7 +2871,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
err:
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
(void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
if (has_transmit_started)
{
repl_semisync_master.dump_end(thd);
}
if (info->thd->killed == KILL_SLAVE_SAME_ID)
{
@ -3374,7 +3341,9 @@ int reset_slave(THD *thd, Master_info* mi)
else if (global_system_variables.log_warnings > 1)
sql_print_information("Deleted Master_info file '%s'.", fname);
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
(void) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
if (rpl_semi_sync_slave_enabled)
repl_semisync_slave.resetSlave(mi);
err:
mi->unlock_slave_threads();
if (error)
@ -3876,11 +3845,14 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
return 1;
}
if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
next_log_number))
return 1;
RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
return 0;
bool ret= 0;
/* Temporarily disable master semisync before reseting master. */
repl_semisync_master.beforeResetMaster();
ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
next_log_number);
(void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
repl_semisync_master.afterResetMaster();
return ret;
}