mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
merge with 5.3
sql/sql_insert.cc: CREATE ... IF NOT EXISTS may do nothing, but it is still not a failure. don't forget to my_ok it. ****** CREATE ... IF NOT EXISTS may do nothing, but it is still not a failure. don't forget to my_ok it. sql/sql_table.cc: small cleanup ****** small cleanup
This commit is contained in:
278
sql/sql_repl.cc
278
sql/sql_repl.cc
@@ -1,4 +1,5 @@
|
||||
/* Copyright (C) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
|
||||
/* Copyright (C) 2000, 2011, Oracle and/or its affiliates.
|
||||
Copyright (c) 2009-2011, Monty Program Ab
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@@ -41,6 +42,8 @@ static int binlog_dump_count = 0;
|
||||
*/
|
||||
uint sql_slave_skip_counter;
|
||||
|
||||
extern TYPELIB binlog_checksum_typelib;
|
||||
|
||||
/*
|
||||
fake_rotate_event() builds a fake (=which does not exist physically in any
|
||||
binlog) Rotate event, which contains the name of the binlog we are going to
|
||||
@@ -60,10 +63,21 @@ uint sql_slave_skip_counter;
|
||||
*/
|
||||
|
||||
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
|
||||
ulonglong position, const char** errmsg)
|
||||
ulonglong position, const char** errmsg,
|
||||
uint8 checksum_alg_arg)
|
||||
{
|
||||
DBUG_ENTER("fake_rotate_event");
|
||||
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
|
||||
|
||||
/*
|
||||
this Rotate is to be sent with checksum if and only if
|
||||
slave's get_master_version_and_clock time handshake value
|
||||
of master's @@global.binlog_checksum was TRUE
|
||||
*/
|
||||
|
||||
my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
|
||||
checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
|
||||
|
||||
/*
|
||||
'when' (the timestamp) is set to 0 so that slave could distinguish between
|
||||
real and fake Rotate events (if necessary)
|
||||
@@ -73,7 +87,8 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
|
||||
|
||||
char* p = log_file_name+dirname_length(log_file_name);
|
||||
uint ident_len = (uint) strlen(p);
|
||||
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
|
||||
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
|
||||
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
|
||||
int4store(header + SERVER_ID_OFFSET, server_id);
|
||||
int4store(header + EVENT_LEN_OFFSET, event_len);
|
||||
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
|
||||
@@ -84,7 +99,19 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
|
||||
packet->append(header, sizeof(header));
|
||||
int8store(buf+R_POS_OFFSET,position);
|
||||
packet->append(buf, ROTATE_HEADER_LEN);
|
||||
packet->append(p,ident_len);
|
||||
packet->append(p, ident_len);
|
||||
|
||||
if (do_checksum)
|
||||
{
|
||||
char b[BINLOG_CHECKSUM_LEN];
|
||||
ha_checksum crc= my_checksum(0L, NULL, 0);
|
||||
crc= my_checksum(crc, (uchar*)header, sizeof(header));
|
||||
crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
|
||||
crc= my_checksum(crc, (uchar*)p, ident_len);
|
||||
int4store(b, crc);
|
||||
packet->append(b, sizeof(b));
|
||||
}
|
||||
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
*errmsg = "failed on my_net_write()";
|
||||
@@ -193,6 +220,86 @@ static int send_file(THD *thd)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Internal to mysql_binlog_send() routine that recalculates checksum for
|
||||
a FD event (asserted) that needs additional arranment prior sending to slave.
|
||||
*/
|
||||
inline void fix_checksum(String *packet, ulong ev_offset)
|
||||
{
|
||||
/* recalculate the crc for this event */
|
||||
uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
|
||||
ha_checksum crc= my_checksum(0L, NULL, 0);
|
||||
DBUG_ASSERT(data_len ==
|
||||
LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
|
||||
BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN);
|
||||
crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len -
|
||||
BINLOG_CHECKSUM_LEN);
|
||||
int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);
|
||||
}
|
||||
|
||||
|
||||
static user_var_entry * get_binlog_checksum_uservar(THD * thd)
|
||||
{
|
||||
LEX_STRING name= { C_STRING_WITH_LEN("master_binlog_checksum")};
|
||||
user_var_entry *entry=
|
||||
(user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
|
||||
name.length);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
Function for calling in mysql_binlog_send
|
||||
to check if slave initiated checksum-handshake.
|
||||
|
||||
@param[in] thd THD to access a user variable
|
||||
|
||||
@return TRUE if handshake took place, FALSE otherwise
|
||||
*/
|
||||
|
||||
static bool is_slave_checksum_aware(THD * thd)
|
||||
{
|
||||
DBUG_ENTER("is_slave_checksum_aware");
|
||||
user_var_entry *entry= get_binlog_checksum_uservar(thd);
|
||||
DBUG_RETURN(entry? true : false);
|
||||
}
|
||||
|
||||
/**
|
||||
Function for calling in mysql_binlog_send
|
||||
to get the value of @@binlog_checksum of the master at
|
||||
time of checksum-handshake.
|
||||
|
||||
The value tells the master whether to compute or not, and the slave
|
||||
to verify or not the first artificial Rotate event's checksum.
|
||||
|
||||
@param[in] thd THD to access a user variable
|
||||
|
||||
@return value of @@binlog_checksum alg according to
|
||||
@c enum enum_binlog_checksum_alg
|
||||
*/
|
||||
|
||||
static uint8 get_binlog_checksum_value_at_connect(THD * thd)
|
||||
{
|
||||
uint8 ret;
|
||||
|
||||
DBUG_ENTER("get_binlog_checksum_value_at_connect");
|
||||
user_var_entry *entry= get_binlog_checksum_uservar(thd);
|
||||
if (!entry)
|
||||
{
|
||||
ret= BINLOG_CHECKSUM_ALG_UNDEF;
|
||||
}
|
||||
else
|
||||
{
|
||||
DBUG_ASSERT(entry->type == STRING_RESULT);
|
||||
String str;
|
||||
uint dummy_errors;
|
||||
str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin,
|
||||
&dummy_errors);
|
||||
ret= (uint8) find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1;
|
||||
DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg
|
||||
}
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
/*
|
||||
Adjust the position pointer in the binary log file for all running slaves
|
||||
|
||||
@@ -354,6 +461,9 @@ Increase max_allowed_packet on master";
|
||||
case LOG_READ_TRUNC:
|
||||
*errmsg = "binlog truncated in the middle of event";
|
||||
break;
|
||||
case LOG_READ_CHECKSUM_FAILURE:
|
||||
*errmsg = "event read from binlog did not pass crc check";
|
||||
break;
|
||||
default:
|
||||
*errmsg = "unknown error reading log event on the master";
|
||||
break;
|
||||
@@ -452,10 +562,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
mysql_cond_t *log_cond;
|
||||
|
||||
bool binlog_can_be_corrupted= FALSE;
|
||||
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
|
||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||
#ifndef DBUG_OFF
|
||||
int left_events = max_binlog_dump_events;
|
||||
#endif
|
||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||
DBUG_ENTER("mysql_binlog_send");
|
||||
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
|
||||
|
||||
@@ -571,7 +682,8 @@ impossible position";
|
||||
given that we want minimum modification of 4.0, we send the normal
|
||||
and fake Rotates.
|
||||
*/
|
||||
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
|
||||
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
|
||||
get_binlog_checksum_value_at_connect(thd)))
|
||||
{
|
||||
/*
|
||||
This error code is not perfect, as fake_rotate_event() does not
|
||||
@@ -607,8 +719,8 @@ impossible position";
|
||||
Try to find a Format_description_log_event at the beginning of
|
||||
the binlog
|
||||
*/
|
||||
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
|
||||
{
|
||||
if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
|
||||
{
|
||||
/*
|
||||
The packet has offsets equal to the normal offsets in a
|
||||
binlog event + ev_offset (the first ev_offset characters are
|
||||
@@ -619,6 +731,23 @@ impossible position";
|
||||
(*packet)[EVENT_TYPE_OFFSET+ev_offset]));
|
||||
if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
|
||||
packet->length() - ev_offset);
|
||||
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
|
||||
current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
|
||||
current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
|
||||
if (!is_slave_checksum_aware(thd) &&
|
||||
current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
|
||||
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
|
||||
{
|
||||
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
errmsg= "Slave can not handle replication events with the checksum "
|
||||
"that master is configured to log";
|
||||
sql_print_warning("Master is configured to log replication events "
|
||||
"with checksum, but will not send such events to "
|
||||
"slaves that cannot process them");
|
||||
goto err;
|
||||
}
|
||||
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
|
||||
LOG_EVENT_BINLOG_IN_USE_F);
|
||||
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
|
||||
@@ -634,6 +763,12 @@ impossible position";
|
||||
*/
|
||||
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
|
||||
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
|
||||
|
||||
/* fix the checksum due to latest changes in header */
|
||||
if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
|
||||
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
|
||||
fix_checksum(packet, ev_offset);
|
||||
|
||||
/* send it */
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
@@ -678,7 +813,8 @@ impossible position";
|
||||
goto err;
|
||||
|
||||
my_off_t prev_pos= pos;
|
||||
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
|
||||
while (!(error = Log_event::read_log_event(&log, packet, log_lock,
|
||||
current_checksum_alg)))
|
||||
{
|
||||
prev_pos= my_b_tell(&log);
|
||||
#ifndef DBUG_OFF
|
||||
@@ -696,7 +832,8 @@ impossible position";
|
||||
if (coord)
|
||||
coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
|
||||
|
||||
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
|
||||
event_type=
|
||||
(Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
|
||||
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
|
||||
{
|
||||
if (event_type == XID_EVENT)
|
||||
@@ -706,12 +843,35 @@ impossible position";
|
||||
"now "
|
||||
"wait_for signal.continue";
|
||||
DBUG_ASSERT(opt_debug_sync_timeout > 0);
|
||||
DBUG_ASSERT(!debug_sync_set_action(current_thd,
|
||||
DBUG_ASSERT(!debug_sync_set_action(thd,
|
||||
STRING_WITH_LEN(act)));
|
||||
const char act2[]=
|
||||
"now "
|
||||
"signal signal.continued";
|
||||
DBUG_ASSERT(!debug_sync_set_action(current_thd,
|
||||
STRING_WITH_LEN(act2)));
|
||||
}
|
||||
});
|
||||
if (event_type == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
|
||||
packet->length() - ev_offset);
|
||||
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
|
||||
current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
|
||||
current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
|
||||
if (!is_slave_checksum_aware(thd) &&
|
||||
current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
|
||||
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
|
||||
{
|
||||
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
errmsg= "Slave can not handle replication events with the checksum "
|
||||
"that master is configured to log";
|
||||
sql_print_warning("Master is configured to log replication events "
|
||||
"with checksum, but will not send such events to "
|
||||
"slaves that cannot process them");
|
||||
goto err;
|
||||
}
|
||||
|
||||
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
|
||||
LOG_EVENT_BINLOG_IN_USE_F);
|
||||
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
|
||||
@@ -719,46 +879,50 @@ impossible position";
|
||||
else if (event_type == STOP_EVENT)
|
||||
binlog_can_be_corrupted= FALSE;
|
||||
|
||||
pos = my_b_tell(&log);
|
||||
if (RUN_HOOK(binlog_transmit, before_send_event,
|
||||
(thd, flags, packet, log_file_name, pos)))
|
||||
if (event_type != ANNOTATE_ROWS_EVENT ||
|
||||
(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
|
||||
{
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
errmsg= "run 'before_send_event' hook failed";
|
||||
goto err;
|
||||
}
|
||||
pos = my_b_tell(&log);
|
||||
if (RUN_HOOK(binlog_transmit, before_send_event,
|
||||
(thd, flags, packet, log_file_name, pos)))
|
||||
{
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
errmsg= "run 'before_send_event' hook failed";
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
errmsg = "Failed on my_net_write()";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
|
||||
{
|
||||
errmsg = "Failed on my_net_write()";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
|
||||
{
|
||||
if (event_type == XID_EVENT)
|
||||
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
|
||||
{
|
||||
net_flush(net);
|
||||
}
|
||||
});
|
||||
if (event_type == XID_EVENT)
|
||||
{
|
||||
net_flush(net);
|
||||
}
|
||||
});
|
||||
|
||||
DBUG_PRINT("info", ("log event code %d", event_type));
|
||||
if (event_type == LOAD_EVENT)
|
||||
{
|
||||
if (send_file(thd))
|
||||
{
|
||||
errmsg = "failed in send_file()";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
DBUG_PRINT("info", ("log event code %d", event_type));
|
||||
if (event_type == LOAD_EVENT)
|
||||
{
|
||||
if (send_file(thd))
|
||||
{
|
||||
errmsg = "failed in send_file()";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
|
||||
{
|
||||
errmsg= "Failed to run hook 'after_send_event'";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
|
||||
{
|
||||
errmsg= "Failed to run hook 'after_send_event'";
|
||||
my_errno= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
/* reset transmit packet for next loop */
|
||||
@@ -771,7 +935,8 @@ impossible position";
|
||||
of a crash ?). treat any corruption as EOF
|
||||
*/
|
||||
if (binlog_can_be_corrupted &&
|
||||
error != LOG_READ_MEM && error != LOG_READ_EOF)
|
||||
(error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE &&
|
||||
error != LOG_READ_EOF))
|
||||
{
|
||||
my_b_seek(&log, prev_pos);
|
||||
error=LOG_READ_EOF;
|
||||
@@ -835,14 +1000,16 @@ impossible position";
|
||||
*/
|
||||
|
||||
mysql_mutex_lock(log_lock);
|
||||
switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0)) {
|
||||
switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
|
||||
current_checksum_alg)) {
|
||||
case 0:
|
||||
/* we read successfully, so we'll need to send it to the slave */
|
||||
mysql_mutex_unlock(log_lock);
|
||||
read_packet = 1;
|
||||
if (coord)
|
||||
coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
|
||||
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
|
||||
event_type=
|
||||
(Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
|
||||
break;
|
||||
|
||||
case LOG_READ_EOF:
|
||||
@@ -913,7 +1080,9 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (read_packet)
|
||||
if (read_packet &&
|
||||
(event_type != ANNOTATE_ROWS_EVENT ||
|
||||
(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)))
|
||||
{
|
||||
thd_proc_info(thd, "Sending binlog event to slave");
|
||||
pos = my_b_tell(&log);
|
||||
@@ -995,7 +1164,7 @@ impossible position";
|
||||
*/
|
||||
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
|
||||
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
|
||||
&errmsg))
|
||||
&errmsg, current_checksum_alg))
|
||||
{
|
||||
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
|
||||
goto err;
|
||||
@@ -1798,7 +1967,8 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
This code will fail on a mixed relay log (one which has Format_desc then
|
||||
Rotate then Format_desc).
|
||||
*/
|
||||
ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event);
|
||||
ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event,
|
||||
opt_master_verify_checksum);
|
||||
if (ev)
|
||||
{
|
||||
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
|
||||
@@ -1820,8 +1990,12 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
|
||||
for (event_count = 0;
|
||||
(ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0,
|
||||
description_event)); )
|
||||
description_event,
|
||||
opt_master_verify_checksum)); )
|
||||
{
|
||||
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
|
||||
description_event->checksum_alg= ev->checksum_alg;
|
||||
|
||||
if (event_count >= limit_start &&
|
||||
ev->net_send(protocol, linfo.log_file_name, pos))
|
||||
{
|
||||
|
Reference in New Issue
Block a user