1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

WL#342 heartbeat

backporting from 6.0 code base to 5.1.
This commit is contained in:
Andrei Elkin
2009-09-29 14:16:23 +03:00
parent 14cf09c12a
commit c03549bf05
57 changed files with 1809 additions and 1209 deletions

View File

@ -336,6 +336,74 @@ Increase max_allowed_packet on master";
}
/**
An auxiliary function for calling in mysql_binlog_send
to initialize the heartbeat timeout in waiting for a binlogged event.
@param[in] thd THD to access a user variable
@return heartbeat period an ulonglong of nanoseconds
or zero if heartbeat was not demanded by slave
*/
static ulonglong get_heartbeat_period(THD * thd)
{
my_bool null_value;
LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
user_var_entry *entry=
(user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
name.length);
return entry? entry->val_int(&null_value) : 0;
}
/*
Function prepares and sends repliation heartbeat event.
@param net net object of THD
@param packet buffer to store the heartbeat instance
@param event_coordinates binlog file name and position of the last
real event master sent from binlog
@note
Among three essential pieces of heartbeat data Log_event::when
is computed locally.
The error to send is serious and should force terminating
the dump thread.
*/
static int send_heartbeat_event(NET* net, String* packet,
const struct event_coordinates *coord)
{
DBUG_ENTER("send_heartbeat_event");
char header[LOG_EVENT_HEADER_LEN];
/*
'when' (the timestamp) is set to 0 so that slave could distinguish between
real and fake Rotate events (if necessary)
*/
memset(header, 0, 4); // when
header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
char* p= coord->file_name + dirname_length(coord->file_name);
uint ident_len = strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0);
int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
packet->append(header, sizeof(header));
packet->append(p, ident_len); // log_file_name
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
net_flush(net))
{
DBUG_RETURN(-1);
}
packet->set("\0", 1, &my_charset_bin);
DBUG_RETURN(0);
}
/*
TODO: Clean up loop to only have one call to send_file()
*/
@ -361,7 +429,22 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
/*
heartbeat_period from @master_heartbeat_period user variable
*/
ulonglong heartbeat_period= get_heartbeat_period(thd);
struct timespec heartbeat_buf;
struct event_coordinates coord_buf;
struct timespec *heartbeat_ts= NULL;
struct event_coordinates *coord= NULL;
if (heartbeat_period != LL(0))
{
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
coord= &coord_buf;
coord->file_name= log_file_name; // initialization basing on what slave remembers
coord->pos= pos;
}
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{
@ -555,6 +638,11 @@ impossible position";
goto err;
}
#endif
/*
log's filename does not change while it's active
*/
if (coord)
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
@ -650,26 +738,65 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
if (coord)
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
break;
case LOG_READ_EOF:
{
int ret;
ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
{
pthread_mutex_unlock(log_lock);
goto end;
}
if (!thd->killed)
{
/* Note that the following call unlocks lock_log */
mysql_bin_log.wait_for_update(thd, 0);
}
else
pthread_mutex_unlock(log_lock);
DBUG_PRINT("wait",("binary log received update"));
break;
default:
#ifndef DBUG_OFF
ulong hb_info_counter= 0;
#endif
signal_cnt= mysql_bin_log.signal_cnt;
do
{
if (coord)
{
DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
}
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
if (ret == ETIMEDOUT || ret == ETIME)
{
#ifndef DBUG_OFF
if (hb_info_counter < 3)
{
sql_print_information("master sends heartbeat message");
hb_info_counter++;
if (hb_info_counter == 3)
sql_print_information("the rest of heartbeat info skipped ...");
}
#endif
if (send_heartbeat_event(net, packet, coord))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
pthread_mutex_unlock(log_lock);
goto err;
}
}
else
{
DBUG_ASSERT(ret == 0 && signal_cnt != mysql_bin_log.signal_cnt ||
thd->killed);
DBUG_PRINT("wait",("binary log received update"));
}
} while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
pthread_mutex_unlock(log_lock);
}
break;
default:
pthread_mutex_unlock(log_lock);
fatal_error = 1;
break;
@ -753,6 +880,8 @@ impossible position";
packet->length(0);
packet->append('\0');
if (coord)
coord->file_name= log_file_name; // reset to the next
}
}
@ -1195,13 +1324,18 @@ bool change_master(THD* thd, Master_info* mi)
mi->port = lex_mi->port;
if (lex_mi->connect_retry)
mi->connect_retry = lex_mi->connect_retry;
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->heartbeat_period = lex_mi->heartbeat_period;
else
mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
(slave_net_timeout/2.0));
mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED)
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl_verify_server_cert=
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE);
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
if (lex_mi->ssl_ca)
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
@ -1745,6 +1879,26 @@ public:
bool update(THD *thd, set_var *var);
};
static void fix_slave_net_timeout(THD *thd, enum_var_type type)
{
DBUG_ENTER("fix_slave_net_timeout");
#ifdef HAVE_REPLICATION
pthread_mutex_lock(&LOCK_active_mi);
DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
slave_net_timeout,
(active_mi? active_mi->heartbeat_period : 0.0)));
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
"The currect value for master_heartbeat_period"
" exceeds the new value of `slave_net_timeout' sec."
" A sensible value for the period should be"
" less than the timeout.");
pthread_mutex_unlock(&LOCK_active_mi);
#endif
DBUG_VOID_RETURN;
}
static sys_var_chain vars = { NULL, NULL };
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
@ -1770,7 +1924,8 @@ static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir",
OPT_GLOBAL, SHOW_CHAR_PTR,
(uchar*) &slave_load_tmpdir);
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
&slave_net_timeout);
&slave_net_timeout,
fix_slave_net_timeout);
static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors",
OPT_GLOBAL, SHOW_CHAR,
(uchar*) slave_skip_error_names);
@ -1835,6 +1990,7 @@ int init_replication_sys_vars()
return 0;
}
#endif /* HAVE_REPLICATION */