mirror of
https://github.com/MariaDB/server.git
synced 2025-07-27 18:02:13 +03:00
10.0-base -> 10.0-monty
This commit is contained in:
274
sql/sql_repl.cc
274
sql/sql_repl.cc
@ -1,5 +1,5 @@
|
||||
/* Copyright (c) 2000, 2012, Oracle and/or its affiliates.
|
||||
Copyright (c) 2008, 2011, Monty Program Ab
|
||||
Copyright (c) 2008, 2012, Monty Program Ab
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@ -34,14 +34,6 @@ my_bool opt_sporadic_binlog_dump_fail = 0;
|
||||
static int binlog_dump_count = 0;
|
||||
#endif
|
||||
|
||||
/**
|
||||
a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES,
|
||||
INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without
|
||||
taking all the mutexes needed to access active_mi->rli->slave_skip_counter
|
||||
properly.
|
||||
*/
|
||||
uint sql_slave_skip_counter;
|
||||
|
||||
extern TYPELIB binlog_checksum_typelib;
|
||||
|
||||
/*
|
||||
@ -491,6 +483,27 @@ static ulonglong get_heartbeat_period(THD * thd)
|
||||
return entry? entry->val_int(&null_value) : 0;
|
||||
}
|
||||
|
||||
/*
|
||||
Lookup the capabilities of the slave, which it announces by setting a value
|
||||
MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability.
|
||||
|
||||
Older MariaDB slaves, and other MySQL slaves, do not set
|
||||
@mariadb_slave_capability, corresponding to a capability of
|
||||
MARIA_SLAVE_CAPABILITY_UNKNOWN (0).
|
||||
*/
|
||||
static int
|
||||
get_mariadb_slave_capability(THD *thd)
|
||||
{
|
||||
bool null_value;
|
||||
const LEX_STRING name= { C_STRING_WITH_LEN("mariadb_slave_capability") };
|
||||
const user_var_entry *entry=
|
||||
(user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
|
||||
name.length);
|
||||
return entry ?
|
||||
(int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Function prepares and sends repliation heartbeat event.
|
||||
|
||||
@ -563,14 +576,68 @@ static int send_heartbeat_event(NET* net, String* packet,
|
||||
static const char *
|
||||
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
||||
Log_event_type event_type, char *log_file_name,
|
||||
IO_CACHE *log)
|
||||
IO_CACHE *log, int mariadb_slave_capability,
|
||||
ulong ev_offset, uint8 current_checksum_alg)
|
||||
{
|
||||
my_off_t pos;
|
||||
|
||||
/* Do not send annotate_rows events unless slave requested it. */
|
||||
if (event_type == ANNOTATE_ROWS_EVENT &&
|
||||
!(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
|
||||
return NULL;
|
||||
if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
|
||||
{
|
||||
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
|
||||
{
|
||||
/* This slave can tolerate events omitted from the binlog stream. */
|
||||
return NULL;
|
||||
}
|
||||
else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE)
|
||||
{
|
||||
/*
|
||||
The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as
|
||||
it will not log them in its own binary log). However, it understands the
|
||||
event and will just ignore it, and it would break if we omitted it,
|
||||
leaving a hole in the binlog stream. So just send the event as-is.
|
||||
*/
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
The slave does not understand ANNOTATE_ROWS_EVENT.
|
||||
|
||||
Older MariaDB slaves (and MySQL slaves) will break replication if there
|
||||
are holes in the binlog stream (they will miscompute the binlog offset
|
||||
and request the wrong position when reconnecting).
|
||||
|
||||
So replace the event with a dummy event of the same size that will be
|
||||
a no-operation on the slave.
|
||||
*/
|
||||
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
|
||||
return "Failed to replace row annotate event with dummy: too small event.";
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Do not send binlog checkpoint events to a slave that does not understand it.
|
||||
*/
|
||||
if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
|
||||
mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT)
|
||||
{
|
||||
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
|
||||
{
|
||||
/* This slave can tolerate events omitted from the binlog stream. */
|
||||
return NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy
|
||||
event instead, with same length so slave does not get confused about
|
||||
binlog positions.
|
||||
*/
|
||||
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
|
||||
return "Failed to replace binlog checkpoint event with dummy: "
|
||||
"too small event.";
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Skip events with the @@skip_replication flag set, if slave requested
|
||||
@ -628,6 +695,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
NET* net = &thd->net;
|
||||
mysql_mutex_t *log_lock;
|
||||
mysql_cond_t *log_cond;
|
||||
int mariadb_slave_capability;
|
||||
|
||||
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
|
||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||
@ -653,7 +721,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
heartbeat_ts= &heartbeat_buf;
|
||||
set_timespec_nsec(*heartbeat_ts, 0);
|
||||
}
|
||||
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
|
||||
mariadb_slave_capability= get_mariadb_slave_capability(thd);
|
||||
if (global_system_variables.log_warnings > 1)
|
||||
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
|
||||
thd->server_id, log_ident, (ulong)pos);
|
||||
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
|
||||
{
|
||||
@ -765,7 +835,7 @@ impossible position";
|
||||
this larger than the corresponding packet (query) sent
|
||||
from client to master.
|
||||
*/
|
||||
thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
|
||||
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
|
||||
|
||||
/*
|
||||
We can set log_lock now, it does not move (it's a member of
|
||||
@ -938,7 +1008,9 @@ impossible position";
|
||||
}
|
||||
|
||||
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
||||
log_file_name, &log)))
|
||||
log_file_name, &log,
|
||||
mariadb_slave_capability, ev_offset,
|
||||
current_checksum_alg)))
|
||||
{
|
||||
errmsg= tmp_msg;
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
@ -1096,7 +1168,9 @@ impossible position";
|
||||
|
||||
if (read_packet &&
|
||||
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
||||
log_file_name, &log)))
|
||||
log_file_name, &log,
|
||||
mariadb_slave_capability, ev_offset,
|
||||
current_checksum_alg)))
|
||||
{
|
||||
errmsg= tmp_msg;
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
@ -1223,15 +1297,28 @@ err:
|
||||
|
||||
@retval 0 success
|
||||
@retval 1 error
|
||||
@retval -1 fatal error
|
||||
*/
|
||||
|
||||
int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
{
|
||||
int slave_errno= 0;
|
||||
int thread_mask;
|
||||
char master_info_file_tmp[FN_REFLEN];
|
||||
char relay_log_info_file_tmp[FN_REFLEN];
|
||||
DBUG_ENTER("start_slave");
|
||||
|
||||
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
|
||||
DBUG_RETURN(1);
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
create_logfile_name_with_suffix(master_info_file_tmp,
|
||||
sizeof(master_info_file_tmp),
|
||||
master_info_file, 0, &mi->connection_name);
|
||||
create_logfile_name_with_suffix(relay_log_info_file_tmp,
|
||||
sizeof(relay_log_info_file_tmp),
|
||||
relay_log_info_file, 0,
|
||||
&mi->connection_name);
|
||||
|
||||
lock_slave_threads(mi); // this allows us to cleanly read slave_running
|
||||
// Get a mask of _stopped_ threads
|
||||
init_thread_mask(&thread_mask,mi,1 /* inverse */);
|
||||
@ -1245,7 +1332,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
thread_mask&= thd->lex->slave_thd_opt;
|
||||
if (thread_mask) //some threads are stopped, start them
|
||||
{
|
||||
if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
|
||||
if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0,
|
||||
thread_mask))
|
||||
slave_errno=ER_MASTER_INFO;
|
||||
else if (server_id_supplied && *mi->host)
|
||||
@ -1319,10 +1406,11 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
|
||||
if (!slave_errno)
|
||||
slave_errno = start_slave_threads(0 /*no mutex */,
|
||||
1 /* wait for start */,
|
||||
mi,
|
||||
master_info_file,relay_log_info_file,
|
||||
thread_mask);
|
||||
1 /* wait for start */,
|
||||
mi,
|
||||
master_info_file_tmp,
|
||||
relay_log_info_file_tmp,
|
||||
thread_mask);
|
||||
}
|
||||
else
|
||||
slave_errno = ER_BAD_SLAVE;
|
||||
@ -1339,11 +1427,11 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
if (slave_errno)
|
||||
{
|
||||
if (net_report)
|
||||
my_message(slave_errno, ER(slave_errno), MYF(0));
|
||||
DBUG_RETURN(1);
|
||||
my_error(slave_errno, MYF(0),
|
||||
(int) mi->connection_name.length,
|
||||
mi->connection_name.str);
|
||||
DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1);
|
||||
}
|
||||
else if (net_report)
|
||||
my_ok(thd);
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
@ -1361,17 +1449,17 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
|
||||
|
||||
@retval 0 success
|
||||
@retval 1 error
|
||||
@retval -1 error
|
||||
*/
|
||||
|
||||
int stop_slave(THD* thd, Master_info* mi, bool net_report )
|
||||
{
|
||||
DBUG_ENTER("stop_slave");
|
||||
|
||||
int slave_errno;
|
||||
if (!thd)
|
||||
thd = current_thd;
|
||||
DBUG_ENTER("stop_slave");
|
||||
DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str));
|
||||
|
||||
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
|
||||
DBUG_RETURN(1);
|
||||
DBUG_RETURN(-1);
|
||||
THD_STAGE_INFO(thd, stage_killing_slave);
|
||||
int thread_mask;
|
||||
lock_slave_threads(mi);
|
||||
@ -1406,8 +1494,6 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
|
||||
my_message(slave_errno, ER(slave_errno), MYF(0));
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
else if (net_report)
|
||||
my_ok(thd);
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
@ -1431,15 +1517,18 @@ int reset_slave(THD *thd, Master_info* mi)
|
||||
int thread_mask= 0, error= 0;
|
||||
uint sql_errno=ER_UNKNOWN_ERROR;
|
||||
const char* errmsg= "Unknown error occured while reseting slave";
|
||||
char master_info_file_tmp[FN_REFLEN];
|
||||
char relay_log_info_file_tmp[FN_REFLEN];
|
||||
DBUG_ENTER("reset_slave");
|
||||
|
||||
lock_slave_threads(mi);
|
||||
init_thread_mask(&thread_mask,mi,0 /* not inverse */);
|
||||
if (thread_mask) // We refuse if any slave thread is running
|
||||
{
|
||||
sql_errno= ER_SLAVE_MUST_STOP;
|
||||
error=1;
|
||||
goto err;
|
||||
unlock_slave_threads(mi);
|
||||
my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
|
||||
mi->connection_name.str);
|
||||
DBUG_RETURN(ER_SLAVE_MUST_STOP);
|
||||
}
|
||||
|
||||
ha_reset_slave(thd);
|
||||
@ -1466,22 +1555,35 @@ int reset_slave(THD *thd, Master_info* mi)
|
||||
|
||||
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
|
||||
end_master_info(mi);
|
||||
|
||||
// and delete these two files
|
||||
fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
|
||||
create_logfile_name_with_suffix(master_info_file_tmp,
|
||||
sizeof(master_info_file_tmp),
|
||||
master_info_file, 0, &mi->connection_name);
|
||||
create_logfile_name_with_suffix(relay_log_info_file_tmp,
|
||||
sizeof(relay_log_info_file_tmp),
|
||||
relay_log_info_file, 0, &mi->connection_name);
|
||||
|
||||
fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32);
|
||||
if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
|
||||
mysql_file_delete(key_file_master_info, fname, MYF(MY_WME)))
|
||||
{
|
||||
error=1;
|
||||
goto err;
|
||||
}
|
||||
else if (global_system_variables.log_warnings > 1)
|
||||
sql_print_information("Deleted Master_info file '%s'.", fname);
|
||||
|
||||
// delete relay_log_info_file
|
||||
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
|
||||
fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32);
|
||||
if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) &&
|
||||
mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME)))
|
||||
{
|
||||
error=1;
|
||||
goto err;
|
||||
}
|
||||
else if (global_system_variables.log_warnings > 1)
|
||||
sql_print_information("Deleted Master_info file '%s'.", fname);
|
||||
|
||||
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
|
||||
err:
|
||||
@ -1561,20 +1663,12 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
char saved_host[HOSTNAME_LENGTH + 1];
|
||||
uint saved_port;
|
||||
char saved_log_name[FN_REFLEN];
|
||||
char master_info_file_tmp[FN_REFLEN];
|
||||
char relay_log_info_file_tmp[FN_REFLEN];
|
||||
my_off_t saved_log_pos;
|
||||
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
|
||||
DBUG_ENTER("change_master");
|
||||
|
||||
lock_slave_threads(mi);
|
||||
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
|
||||
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
|
||||
if (thread_mask) // We refuse if any slave thread is running
|
||||
{
|
||||
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
|
||||
THD_STAGE_INFO(thd, stage_changing_master);
|
||||
/*
|
||||
We need to check if there is an empty master_host. Otherwise
|
||||
change master succeeds, a master.info file is created containing
|
||||
@ -1582,17 +1676,61 @@ bool change_master(THD* thd, Master_info* mi)
|
||||
is thrown stating that the server is not configured as slave.
|
||||
(See BUG#28796).
|
||||
*/
|
||||
if(lex_mi->host && !*lex_mi->host)
|
||||
if (lex_mi->host && !*lex_mi->host)
|
||||
{
|
||||
my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
}
|
||||
// TODO: see if needs re-write
|
||||
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
|
||||
if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name,
|
||||
lex_mi->host,
|
||||
lex_mi->port))
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
lock_slave_threads(mi);
|
||||
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
|
||||
if (thread_mask) // We refuse if any slave thread is running
|
||||
{
|
||||
my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
|
||||
mi->connection_name.str);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
|
||||
THD_STAGE_INFO(thd, stage_changing_master);
|
||||
|
||||
create_logfile_name_with_suffix(master_info_file_tmp,
|
||||
sizeof(master_info_file_tmp),
|
||||
master_info_file, 0, &mi->connection_name);
|
||||
create_logfile_name_with_suffix(relay_log_info_file_tmp,
|
||||
sizeof(relay_log_info_file_tmp),
|
||||
relay_log_info_file, 0, &mi->connection_name);
|
||||
|
||||
/* if new Master_info doesn't exists, add it */
|
||||
if (!master_info_index->get_master_info(&mi->connection_name,
|
||||
MYSQL_ERROR::WARN_LEVEL_NOTE))
|
||||
{
|
||||
if (master_info_index->add_master_info(mi, TRUE))
|
||||
{
|
||||
my_error(ER_MASTER_INFO, MYF(0),
|
||||
(int) lex_mi->connection_name.length,
|
||||
lex_mi->connection_name.str);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
if (global_system_variables.log_warnings > 1)
|
||||
sql_print_information("Master: '%.*s' Master_info_file: '%s' "
|
||||
"Relay_info_file: '%s'",
|
||||
(int) mi->connection_name.length,
|
||||
mi->connection_name.str,
|
||||
master_info_file_tmp, relay_log_info_file_tmp);
|
||||
|
||||
if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0,
|
||||
thread_mask))
|
||||
{
|
||||
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
|
||||
my_error(ER_MASTER_INFO, MYF(0),
|
||||
(int) lex_mi->connection_name.length,
|
||||
lex_mi->connection_name.str);
|
||||
ret= TRUE;
|
||||
goto err;
|
||||
}
|
||||
@ -1860,7 +1998,7 @@ int reset_master(THD* thd)
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (mysql_bin_log.reset_logs(thd))
|
||||
if (mysql_bin_log.reset_logs(thd, 1))
|
||||
return 1;
|
||||
RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
|
||||
return 0;
|
||||
@ -1886,6 +2024,7 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
File file = -1;
|
||||
MYSQL_BIN_LOG *binary_log= NULL;
|
||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||
Master_info *mi= 0;
|
||||
LOG_INFO linfo;
|
||||
|
||||
DBUG_ENTER("mysql_show_binlog_events");
|
||||
@ -1915,10 +2054,15 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
}
|
||||
else /* showing relay log contents */
|
||||
{
|
||||
if (!active_mi)
|
||||
mysql_mutex_lock(&LOCK_active_mi);
|
||||
if (!(mi= master_info_index->
|
||||
get_master_info(&thd->variables.default_master_connection,
|
||||
MYSQL_ERROR::WARN_LEVEL_ERROR)))
|
||||
{
|
||||
mysql_mutex_unlock(&LOCK_active_mi);
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
binary_log= &(active_mi->rli.relay_log);
|
||||
}
|
||||
binary_log= &(mi->rli.relay_log);
|
||||
}
|
||||
|
||||
if (binary_log->is_open())
|
||||
@ -1932,6 +2076,13 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
mysql_mutex_t *log_lock = binary_log->get_log_lock();
|
||||
Log_event* ev;
|
||||
|
||||
if (mi)
|
||||
{
|
||||
/* We can unlock the mutex as we have a lock on the file */
|
||||
mysql_mutex_unlock(&LOCK_active_mi);
|
||||
mi= 0;
|
||||
}
|
||||
|
||||
unit->set_limit(thd->lex->current_select);
|
||||
limit_start= unit->offset_limit_cnt;
|
||||
limit_end= unit->select_limit_cnt;
|
||||
@ -2002,7 +2153,7 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
description_event->checksum_alg= ev->checksum_alg;
|
||||
|
||||
if (event_count >= limit_start &&
|
||||
ev->net_send(protocol, linfo.log_file_name, pos))
|
||||
ev->net_send(thd, protocol, linfo.log_file_name, pos))
|
||||
{
|
||||
errmsg = "Net error";
|
||||
delete ev;
|
||||
@ -2026,6 +2177,9 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
|
||||
mysql_mutex_unlock(log_lock);
|
||||
}
|
||||
else if (mi)
|
||||
mysql_mutex_unlock(&LOCK_active_mi);
|
||||
|
||||
// Check that linfo is still on the function scope.
|
||||
DEBUG_SYNC(thd, "after_show_binlog_events");
|
||||
|
||||
|
Reference in New Issue
Block a user