mirror of
https://github.com/MariaDB/server.git
synced 2025-07-27 18:02:13 +03:00
Automerge from mysql-next-mr.
This commit is contained in:
483
sql/sql_repl.cc
483
sql/sql_repl.cc
@ -21,6 +21,7 @@
|
||||
#include "log_event.h"
|
||||
#include "rpl_filter.h"
|
||||
#include <my_dir.h>
|
||||
#include "rpl_handler.h"
|
||||
|
||||
int max_binlog_dump_events = 0; // unlimited
|
||||
my_bool opt_sporadic_binlog_dump_fail = 0;
|
||||
@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
Reset thread transmit packet buffer for event sending
|
||||
|
||||
This function allocates header bytes for event transmission, and
|
||||
should be called before store the event data to the packet buffer.
|
||||
*/
|
||||
static int reset_transmit_packet(THD *thd, ushort flags,
|
||||
ulong *ev_offset, const char **errmsg)
|
||||
{
|
||||
int ret= 0;
|
||||
String *packet= &thd->packet;
|
||||
|
||||
/* reserve and set default header */
|
||||
packet->length(0);
|
||||
packet->set("\0", 1, &my_charset_bin);
|
||||
|
||||
if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
|
||||
{
|
||||
*errmsg= "Failed to run hook 'reserve_header'";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
ret= 1;
|
||||
}
|
||||
*ev_offset= packet->length();
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int send_file(THD *thd)
|
||||
{
|
||||
NET* net = &thd->net;
|
||||
@ -336,6 +363,73 @@ Increase max_allowed_packet on master";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
An auxiliary function for calling in mysql_binlog_send
|
||||
to initialize the heartbeat timeout in waiting for a binlogged event.
|
||||
|
||||
@param[in] thd THD to access a user variable
|
||||
|
||||
@return heartbeat period an ulonglong of nanoseconds
|
||||
or zero if heartbeat was not demanded by slave
|
||||
*/
|
||||
static ulonglong get_heartbeat_period(THD * thd)
|
||||
{
|
||||
my_bool null_value;
|
||||
LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
|
||||
user_var_entry *entry=
|
||||
(user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
|
||||
name.length);
|
||||
return entry? entry->val_int(&null_value) : 0;
|
||||
}
|
||||
|
||||
/*
|
||||
Function prepares and sends repliation heartbeat event.
|
||||
|
||||
@param net net object of THD
|
||||
@param packet buffer to store the heartbeat instance
|
||||
@param event_coordinates binlog file name and position of the last
|
||||
real event master sent from binlog
|
||||
|
||||
@note
|
||||
Among three essential pieces of heartbeat data Log_event::when
|
||||
is computed locally.
|
||||
The error to send is serious and should force terminating
|
||||
the dump thread.
|
||||
*/
|
||||
static int send_heartbeat_event(NET* net, String* packet,
|
||||
const struct event_coordinates *coord)
|
||||
{
|
||||
DBUG_ENTER("send_heartbeat_event");
|
||||
char header[LOG_EVENT_HEADER_LEN];
|
||||
/*
|
||||
'when' (the timestamp) is set to 0 so that slave could distinguish between
|
||||
real and fake Rotate events (if necessary)
|
||||
*/
|
||||
memset(header, 0, 4); // when
|
||||
|
||||
header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
|
||||
|
||||
char* p= coord->file_name + dirname_length(coord->file_name);
|
||||
|
||||
uint ident_len = strlen(p);
|
||||
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
|
||||
int4store(header + SERVER_ID_OFFSET, server_id);
|
||||
int4store(header + EVENT_LEN_OFFSET, event_len);
|
||||
int2store(header + FLAGS_OFFSET, 0);
|
||||
|
||||
int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
|
||||
|
||||
packet->append(header, sizeof(header));
|
||||
packet->append(p, ident_len); // log_file_name
|
||||
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
|
||||
net_flush(net))
|
||||
{
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
TODO: Clean up loop to only have one call to send_file()
|
||||
*/
|
||||
@ -346,6 +440,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
LOG_INFO linfo;
|
||||
char *log_file_name = linfo.log_file_name;
|
||||
char search_file_name[FN_REFLEN], *name;
|
||||
|
||||
ulong ev_offset;
|
||||
|
||||
IO_CACHE log;
|
||||
File file = -1;
|
||||
String* packet = &thd->packet;
|
||||
@ -361,6 +458,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
|
||||
|
||||
bzero((char*) &log,sizeof(log));
|
||||
/*
|
||||
heartbeat_period from @master_heartbeat_period user variable
|
||||
*/
|
||||
ulonglong heartbeat_period= get_heartbeat_period(thd);
|
||||
struct timespec heartbeat_buf;
|
||||
struct event_coordinates coord_buf;
|
||||
struct timespec *heartbeat_ts= NULL;
|
||||
struct event_coordinates *coord= NULL;
|
||||
if (heartbeat_period != LL(0))
|
||||
{
|
||||
heartbeat_ts= &heartbeat_buf;
|
||||
set_timespec_nsec(*heartbeat_ts, 0);
|
||||
coord= &coord_buf;
|
||||
coord->file_name= log_file_name; // initialization basing on what slave remembers
|
||||
coord->pos= pos;
|
||||
}
|
||||
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
|
||||
thd->server_id, log_ident, (ulong)pos);
|
||||
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
|
||||
{
|
||||
errmsg= "Failed to run hook 'transmit_start'";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
|
||||
@ -416,11 +537,9 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
|
||||
/*
|
||||
We need to start a packet with something other than 255
|
||||
to distinguish it from error
|
||||
*/
|
||||
packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
|
||||
/* reset transmit packet for the fake rotate event below */
|
||||
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
|
||||
goto err;
|
||||
|
||||
/*
|
||||
Tell the client about the log name with a fake Rotate event;
|
||||
@ -460,7 +579,7 @@ impossible position";
|
||||
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
goto err;
|
||||
}
|
||||
packet->set("\0", 1, &my_charset_bin);
|
||||
|
||||
/*
|
||||
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
|
||||
this larger than the corresponding packet (query) sent
|
||||
@ -476,6 +595,11 @@ impossible position";
|
||||
log_lock = mysql_bin_log.get_log_lock();
|
||||
if (pos > BIN_LOG_HEADER_SIZE)
|
||||
{
|
||||
/* reset transmit packet for the event read from binary log
|
||||
file */
|
||||
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
|
||||
goto err;
|
||||
|
||||
/*
|
||||
Try to find a Format_description_log_event at the beginning of
|
||||
the binlog
|
||||
@ -483,29 +607,30 @@ impossible position";
|
||||
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
|
||||
{
|
||||
/*
|
||||
The packet has offsets equal to the normal offsets in a binlog
|
||||
event +1 (the first character is \0).
|
||||
The packet has offsets equal to the normal offsets in a
|
||||
binlog event + ev_offset (the first ev_offset characters are
|
||||
the header (default \0)).
|
||||
*/
|
||||
DBUG_PRINT("info",
|
||||
("Looked for a Format_description_log_event, found event type %d",
|
||||
(*packet)[EVENT_TYPE_OFFSET+1]));
|
||||
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
|
||||
(*packet)[EVENT_TYPE_OFFSET+ev_offset]));
|
||||
if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
|
||||
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
|
||||
LOG_EVENT_BINLOG_IN_USE_F);
|
||||
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
|
||||
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
|
||||
/*
|
||||
mark that this event with "log_pos=0", so the slave
|
||||
should not increment master's binlog position
|
||||
(rli->group_master_log_pos)
|
||||
*/
|
||||
int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
|
||||
int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
|
||||
/*
|
||||
if reconnect master sends FD event with `created' as 0
|
||||
to avoid destroying temp tables.
|
||||
*/
|
||||
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
|
||||
ST_CREATED_OFFSET+1, (ulong) 0);
|
||||
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
|
||||
/* send it */
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
@ -531,8 +656,6 @@ impossible position";
|
||||
Format_description_log_event will be found naturally if it is written.
|
||||
*/
|
||||
}
|
||||
/* reset the packet as we wrote to it in any case */
|
||||
packet->set("\0", 1, &my_charset_bin);
|
||||
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
|
||||
else
|
||||
{
|
||||
@ -544,6 +667,12 @@ impossible position";
|
||||
|
||||
while (!net->error && net->vio != 0 && !thd->killed)
|
||||
{
|
||||
Log_event_type event_type= UNKNOWN_EVENT;
|
||||
|
||||
/* reset the transmit packet for the event read from binary log
|
||||
file */
|
||||
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
|
||||
goto err;
|
||||
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
|
||||
{
|
||||
#ifndef DBUG_OFF
|
||||
@ -555,16 +684,31 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
#endif
|
||||
/*
|
||||
log's filename does not change while it's active
|
||||
*/
|
||||
if (coord)
|
||||
coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
|
||||
|
||||
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
|
||||
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
|
||||
if (event_type == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
|
||||
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
|
||||
LOG_EVENT_BINLOG_IN_USE_F);
|
||||
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
|
||||
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
|
||||
}
|
||||
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
|
||||
else if (event_type == STOP_EVENT)
|
||||
binlog_can_be_corrupted= FALSE;
|
||||
|
||||
pos = my_b_tell(&log);
|
||||
if (RUN_HOOK(binlog_transmit, before_send_event,
|
||||
(thd, flags, packet, log_file_name, pos)))
|
||||
{
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
errmsg= "run 'before_send_event' hook failed";
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
errmsg = "Failed on my_net_write()";
|
||||
@ -572,9 +716,8 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
|
||||
DBUG_PRINT("info", ("log event code %d",
|
||||
(*packet)[LOG_EVENT_OFFSET+1] ));
|
||||
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
|
||||
DBUG_PRINT("info", ("log event code %d", event_type));
|
||||
if (event_type == LOAD_EVENT)
|
||||
{
|
||||
if (send_file(thd))
|
||||
{
|
||||
@ -583,7 +726,17 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
packet->set("\0", 1, &my_charset_bin);
|
||||
|
||||
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
|
||||
{
|
||||
errmsg= "Failed to run hook 'after_send_event'";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
|
||||
/* reset transmit packet for next loop */
|
||||
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
|
||||
goto err;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -634,6 +787,11 @@ impossible position";
|
||||
}
|
||||
#endif
|
||||
|
||||
/* reset the transmit packet for the event read from binary log
|
||||
file */
|
||||
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
|
||||
goto err;
|
||||
|
||||
/*
|
||||
No one will update the log while we are reading
|
||||
now, but we'll be quick and just read one record
|
||||
@ -650,34 +808,86 @@ impossible position";
|
||||
/* we read successfully, so we'll need to send it to the slave */
|
||||
pthread_mutex_unlock(log_lock);
|
||||
read_packet = 1;
|
||||
if (coord)
|
||||
coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
|
||||
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
|
||||
break;
|
||||
|
||||
case LOG_READ_EOF:
|
||||
{
|
||||
int ret;
|
||||
ulong signal_cnt;
|
||||
DBUG_PRINT("wait",("waiting for data in binary log"));
|
||||
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
|
||||
{
|
||||
pthread_mutex_unlock(log_lock);
|
||||
goto end;
|
||||
}
|
||||
if (!thd->killed)
|
||||
{
|
||||
/* Note that the following call unlocks lock_log */
|
||||
mysql_bin_log.wait_for_update(thd, 0);
|
||||
}
|
||||
else
|
||||
pthread_mutex_unlock(log_lock);
|
||||
DBUG_PRINT("wait",("binary log received update"));
|
||||
break;
|
||||
|
||||
default:
|
||||
#ifndef DBUG_OFF
|
||||
ulong hb_info_counter= 0;
|
||||
#endif
|
||||
signal_cnt= mysql_bin_log.signal_cnt;
|
||||
do
|
||||
{
|
||||
if (coord)
|
||||
{
|
||||
DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
|
||||
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
|
||||
}
|
||||
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
|
||||
DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
|
||||
if (ret == ETIMEDOUT || ret == ETIME)
|
||||
{
|
||||
#ifndef DBUG_OFF
|
||||
if (hb_info_counter < 3)
|
||||
{
|
||||
sql_print_information("master sends heartbeat message");
|
||||
hb_info_counter++;
|
||||
if (hb_info_counter == 3)
|
||||
sql_print_information("the rest of heartbeat info skipped ...");
|
||||
}
|
||||
#endif
|
||||
/* reset transmit packet for the heartbeat event */
|
||||
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
|
||||
goto err;
|
||||
if (send_heartbeat_event(net, packet, coord))
|
||||
{
|
||||
errmsg = "Failed on my_net_write()";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
pthread_mutex_unlock(log_lock);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
DBUG_ASSERT(ret == 0 && signal_cnt != mysql_bin_log.signal_cnt ||
|
||||
thd->killed);
|
||||
DBUG_PRINT("wait",("binary log received update"));
|
||||
}
|
||||
} while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
|
||||
pthread_mutex_unlock(log_lock);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
pthread_mutex_unlock(log_lock);
|
||||
test_for_non_eof_log_read_errors(error, &errmsg);
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (read_packet)
|
||||
{
|
||||
thd_proc_info(thd, "Sending binlog event to slave");
|
||||
{
|
||||
thd_proc_info(thd, "Sending binlog event to slave");
|
||||
pos = my_b_tell(&log);
|
||||
if (RUN_HOOK(binlog_transmit, before_send_event,
|
||||
(thd, flags, packet, log_file_name, pos)))
|
||||
{
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
errmsg= "run 'before_send_event' hook failed";
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
|
||||
{
|
||||
errmsg = "Failed on my_net_write()";
|
||||
@ -685,7 +895,7 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
|
||||
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
|
||||
if (event_type == LOAD_EVENT)
|
||||
{
|
||||
if (send_file(thd))
|
||||
{
|
||||
@ -694,11 +904,13 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
packet->set("\0", 1, &my_charset_bin);
|
||||
/*
|
||||
No need to net_flush because we will get to flush later when
|
||||
we hit EOF pretty quick
|
||||
*/
|
||||
|
||||
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
|
||||
{
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
errmsg= "Failed to run hook 'after_send_event'";
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
log.error=0;
|
||||
@ -728,6 +940,10 @@ impossible position";
|
||||
end_io_cache(&log);
|
||||
(void) my_close(file, MYF(MY_WME));
|
||||
|
||||
/* reset transmit packet for the possible fake rotate event */
|
||||
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
|
||||
goto err;
|
||||
|
||||
/*
|
||||
Call fake_rotate_event() in case the previous log (the one which
|
||||
we have just finished reading) did not contain a Rotate event
|
||||
@ -745,8 +961,8 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
|
||||
packet->length(0);
|
||||
packet->append('\0');
|
||||
if (coord)
|
||||
coord->file_name= log_file_name; // reset to the next
|
||||
}
|
||||
}
|
||||
|
||||
@ -754,6 +970,7 @@ end:
|
||||
end_io_cache(&log);
|
||||
(void)my_close(file, MYF(MY_WME));
|
||||
|
||||
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
|
||||
my_eof(thd);
|
||||
thd_proc_info(thd, "Waiting to finalize termination");
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
@ -764,6 +981,7 @@ end:
|
||||
err:
|
||||
thd_proc_info(thd, "Waiting to finalize termination");
|
||||
end_io_cache(&log);
|
||||
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
|
||||
/*
|
||||
Exclude iteration through thread list
|
||||
this is needed for purge_logs() - it will iterate through
|
||||
@ -1058,6 +1276,7 @@ int reset_slave(THD *thd, Master_info* mi)
|
||||
goto err;
|
||||
}
|
||||
|
||||
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
|
||||
err:
|
||||
unlock_slave_threads(mi);
|
||||
if (error)
|
||||
@ -1131,26 +1350,40 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
int thread_mask;
|
||||
const char* errmsg= 0;
|
||||
bool need_relay_log_purge= 1;
|
||||
bool ret= FALSE;
|
||||
DBUG_ENTER("change_master");
|
||||
|
||||
lock_slave_threads(mi);
|
||||
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
|
||||
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
|
||||
if (thread_mask) // We refuse if any slave thread is running
|
||||
{
|
||||
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
|
||||
thd_proc_info(thd, "Changing master");
|
||||
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
|
||||
/*
|
||||
We need to check if there is an empty master_host. Otherwise
|
||||
change master succeeds, a master.info file is created containing
|
||||
empty master_host string and when issuing: start slave; an error
|
||||
is thrown stating that the server is not configured as slave.
|
||||
(See BUG#28796).
|
||||
*/
|
||||
if(lex_mi->host && !*lex_mi->host)
|
||||
{
|
||||
my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
}
|
||||
// TODO: see if needs re-write
|
||||
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
|
||||
thread_mask))
|
||||
{
|
||||
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1189,13 +1422,46 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
mi->port = lex_mi->port;
|
||||
if (lex_mi->connect_retry)
|
||||
mi->connect_retry = lex_mi->connect_retry;
|
||||
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
|
||||
mi->heartbeat_period = lex_mi->heartbeat_period;
|
||||
else
|
||||
mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
|
||||
(slave_net_timeout/2.0));
|
||||
mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
|
||||
/*
|
||||
reset the last time server_id list if the current CHANGE MASTER
|
||||
is mentioning IGNORE_SERVER_IDS= (...)
|
||||
*/
|
||||
if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
|
||||
reset_dynamic(&mi->ignore_server_ids);
|
||||
for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++)
|
||||
{
|
||||
ulong s_id;
|
||||
get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
|
||||
if (s_id == ::server_id && replicate_same_server_id)
|
||||
{
|
||||
my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), s_id);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (bsearch((const ulong *) &s_id,
|
||||
mi->ignore_server_ids.buffer,
|
||||
mi->ignore_server_ids.elements, sizeof(ulong),
|
||||
(int (*) (const void*, const void*))
|
||||
change_master_server_id_cmp) == NULL)
|
||||
insert_dynamic(&mi->ignore_server_ids, (uchar*) &s_id);
|
||||
}
|
||||
}
|
||||
sort_dynamic(&mi->ignore_server_ids, (qsort_cmp) change_master_server_id_cmp);
|
||||
|
||||
if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
|
||||
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
|
||||
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
|
||||
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
|
||||
|
||||
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED)
|
||||
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
|
||||
mi->ssl_verify_server_cert=
|
||||
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE);
|
||||
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
|
||||
|
||||
if (lex_mi->ssl_ca)
|
||||
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
|
||||
@ -1218,9 +1484,11 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
if (lex_mi->relay_log_name)
|
||||
{
|
||||
need_relay_log_purge= 0;
|
||||
strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
|
||||
char relay_log_name[FN_REFLEN];
|
||||
mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name);
|
||||
strmake(mi->rli.group_relay_log_name, relay_log_name,
|
||||
sizeof(mi->rli.group_relay_log_name)-1);
|
||||
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
|
||||
strmake(mi->rli.event_relay_log_name, relay_log_name,
|
||||
sizeof(mi->rli.event_relay_log_name)-1);
|
||||
}
|
||||
|
||||
@ -1267,8 +1535,8 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
if (flush_master_info(mi, 0))
|
||||
{
|
||||
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
if (need_relay_log_purge)
|
||||
{
|
||||
@ -1279,8 +1547,8 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
&errmsg))
|
||||
{
|
||||
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -1295,8 +1563,8 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
&msg, 0))
|
||||
{
|
||||
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
/*
|
||||
@ -1333,10 +1601,13 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
pthread_cond_broadcast(&mi->data_cond);
|
||||
pthread_mutex_unlock(&mi->rli.data_lock);
|
||||
|
||||
err:
|
||||
unlock_slave_threads(mi);
|
||||
thd_proc_info(thd, 0);
|
||||
my_ok(thd);
|
||||
DBUG_RETURN(FALSE);
|
||||
if (ret == FALSE)
|
||||
my_ok(thd);
|
||||
delete_dynamic(&lex_mi->repl_ignore_server_ids); //freeing of parser-time alloc
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
|
||||
@ -1357,7 +1628,11 @@ int reset_master(THD* thd)
|
||||
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
|
||||
return 1;
|
||||
}
|
||||
return mysql_bin_log.reset_logs(thd);
|
||||
|
||||
if (mysql_bin_log.reset_logs(thd))
|
||||
return 1;
|
||||
RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
|
||||
@ -1395,6 +1670,7 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
bool ret = TRUE;
|
||||
IO_CACHE log;
|
||||
File file = -1;
|
||||
MYSQL_BIN_LOG *binary_log= NULL;
|
||||
DBUG_ENTER("mysql_show_binlog_events");
|
||||
|
||||
Log_event::init_show_field_list(&field_list);
|
||||
@ -1405,14 +1681,30 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
Format_description_log_event *description_event= new
|
||||
Format_description_log_event(3); /* MySQL 4.0 by default */
|
||||
|
||||
/*
|
||||
Wait for handlers to insert any pending information
|
||||
into the binlog. For e.g. ndb which updates the binlog asynchronously
|
||||
this is needed so that the uses sees all its own commands in the binlog
|
||||
*/
|
||||
ha_binlog_wait(thd);
|
||||
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
|
||||
thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
|
||||
|
||||
if (mysql_bin_log.is_open())
|
||||
/* select wich binary log to use: binlog or relay */
|
||||
if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS )
|
||||
{
|
||||
/*
|
||||
Wait for handlers to insert any pending information
|
||||
into the binlog. For e.g. ndb which updates the binlog asynchronously
|
||||
this is needed so that the uses sees all its own commands in the binlog
|
||||
*/
|
||||
ha_binlog_wait(thd);
|
||||
|
||||
binary_log= &mysql_bin_log;
|
||||
}
|
||||
else /* showing relay log contents */
|
||||
{
|
||||
if (!active_mi)
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
binary_log= &(active_mi->rli.relay_log);
|
||||
}
|
||||
|
||||
if (binary_log->is_open())
|
||||
{
|
||||
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
|
||||
SELECT_LEX_UNIT *unit= &thd->lex->unit;
|
||||
@ -1420,7 +1712,7 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
|
||||
char search_file_name[FN_REFLEN], *name;
|
||||
const char *log_file_name = lex_mi->log_file_name;
|
||||
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
|
||||
pthread_mutex_t *log_lock = binary_log->get_log_lock();
|
||||
LOG_INFO linfo;
|
||||
Log_event* ev;
|
||||
|
||||
@ -1430,13 +1722,13 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
|
||||
name= search_file_name;
|
||||
if (log_file_name)
|
||||
mysql_bin_log.make_log_name(search_file_name, log_file_name);
|
||||
binary_log->make_log_name(search_file_name, log_file_name);
|
||||
else
|
||||
name=0; // Find first log
|
||||
|
||||
linfo.index_file_offset = 0;
|
||||
|
||||
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
|
||||
if (binary_log->find_log_pos(&linfo, name, 1))
|
||||
{
|
||||
errmsg = "Could not find target log";
|
||||
goto err;
|
||||
@ -1739,6 +2031,26 @@ public:
|
||||
bool update(THD *thd, set_var *var);
|
||||
};
|
||||
|
||||
static void fix_slave_net_timeout(THD *thd, enum_var_type type)
|
||||
{
|
||||
DBUG_ENTER("fix_slave_net_timeout");
|
||||
#ifdef HAVE_REPLICATION
|
||||
pthread_mutex_lock(&LOCK_active_mi);
|
||||
DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
|
||||
slave_net_timeout,
|
||||
(active_mi? active_mi->heartbeat_period : 0.0)));
|
||||
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
|
||||
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
|
||||
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
|
||||
"The currect value for master_heartbeat_period"
|
||||
" exceeds the new value of `slave_net_timeout' sec."
|
||||
" A sensible value for the period should be"
|
||||
" less than the timeout.");
|
||||
pthread_mutex_unlock(&LOCK_active_mi);
|
||||
#endif
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
static sys_var_chain vars = { NULL, NULL };
|
||||
|
||||
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
|
||||
@ -1755,6 +2067,16 @@ static sys_var_const sys_relay_log_info_file(&vars, "relay_log_info_file",
|
||||
(uchar*) &relay_log_info_file);
|
||||
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
|
||||
&relay_log_purge);
|
||||
static sys_var_bool_ptr sys_relay_log_recovery(&vars, "relay_log_recovery",
|
||||
&relay_log_recovery);
|
||||
static sys_var_uint_ptr sys_sync_binlog_period(&vars, "sync_binlog",
|
||||
&sync_binlog_period);
|
||||
static sys_var_uint_ptr sys_sync_relaylog_period(&vars, "sync_relay_log",
|
||||
&sync_relaylog_period);
|
||||
static sys_var_uint_ptr sys_sync_relayloginfo_period(&vars, "sync_relay_log_info",
|
||||
&sync_relayloginfo_period);
|
||||
static sys_var_uint_ptr sys_sync_masterinfo_period(&vars, "sync_master_info",
|
||||
&sync_masterinfo_period);
|
||||
static sys_var_const sys_relay_log_space_limit(&vars,
|
||||
"relay_log_space_limit",
|
||||
OPT_GLOBAL, SHOW_LONGLONG,
|
||||
@ -1764,13 +2086,13 @@ static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir",
|
||||
OPT_GLOBAL, SHOW_CHAR_PTR,
|
||||
(uchar*) &slave_load_tmpdir);
|
||||
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
|
||||
&slave_net_timeout);
|
||||
&slave_net_timeout,
|
||||
fix_slave_net_timeout);
|
||||
static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors",
|
||||
OPT_GLOBAL, SHOW_CHAR,
|
||||
(uchar*) slave_skip_error_names);
|
||||
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
|
||||
&slave_trans_retries);
|
||||
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
|
||||
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
|
||||
|
||||
|
||||
@ -1812,12 +2134,6 @@ bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
|
||||
}
|
||||
|
||||
|
||||
bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
|
||||
{
|
||||
sync_binlog_period= (ulong) var->save_result.ulonglong_value;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int init_replication_sys_vars()
|
||||
{
|
||||
if (mysql_add_sys_var_chain(vars.first, my_long_options))
|
||||
@ -1829,6 +2145,5 @@ int init_replication_sys_vars()
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
#endif /* HAVE_REPLICATION */
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user