mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
MDEV-225: Replace with dummy events an event that is not understood by a slave to which it should be sent
Add function to replace arbitrary event with dummy event. Add code which uses this to fix the bug that enabling row_annotate events on the master breaks slaves which do not request such events. Add that slaves set a variable @mariadb_slave_capability to inform the master in a robust way about which events it can, and cannot, handle. Add tests.
This commit is contained in:
@ -1807,6 +1807,19 @@ static Exit_status check_master_version()
|
|||||||
"Master returned '%s'", mysql_error(mysql));
|
"Master returned '%s'", mysql_error(mysql));
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Announce our capabilities to the server, so it will send us all the events
|
||||||
|
that we know about.
|
||||||
|
*/
|
||||||
|
if (mysql_query(mysql, "SET @mariadb_slave_capability="
|
||||||
|
STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_MINE)))
|
||||||
|
{
|
||||||
|
error("Could not inform master about capability. Master returned '%s'",
|
||||||
|
mysql_error(mysql));
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
|
||||||
delete glob_description_event;
|
delete glob_description_event;
|
||||||
switch (version) {
|
switch (version) {
|
||||||
case 3:
|
case 3:
|
||||||
|
@ -225,6 +225,8 @@ INSERT INTO global_suppressions VALUES
|
|||||||
("Slave I/O: The slave I/O thread stops because a fatal error is encountered when it tried to SET @master_binlog_checksum on master.*"),
|
("Slave I/O: The slave I/O thread stops because a fatal error is encountered when it tried to SET @master_binlog_checksum on master.*"),
|
||||||
("Slave I/O: Get master BINLOG_CHECKSUM failed with error.*"),
|
("Slave I/O: Get master BINLOG_CHECKSUM failed with error.*"),
|
||||||
("Slave I/O: Notifying master by SET @master_binlog_checksum= @@global.binlog_checksum failed with error.*"),
|
("Slave I/O: Notifying master by SET @master_binlog_checksum= @@global.binlog_checksum failed with error.*"),
|
||||||
|
("Slave I/O: Setting master-side filtering of @@skip_replication failed with error:.*"),
|
||||||
|
("Slave I/O: Setting @mariadb_slave_capability failed with error:.*"),
|
||||||
("THE_LAST_SUPPRESSION")||
|
("THE_LAST_SUPPRESSION")||
|
||||||
|
|
||||||
|
|
||||||
|
98
mysql-test/suite/rpl/r/rpl_mariadb_slave_capability.result
Normal file
98
mysql-test/suite/rpl/r/rpl_mariadb_slave_capability.result
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
include/master-slave.inc
|
||||||
|
[connection master]
|
||||||
|
set @old_master_binlog_checksum= @@global.binlog_checksum;
|
||||||
|
set @old_slave_dbug= @@global.debug_dbug;
|
||||||
|
CREATE TABLE t1 (a INT PRIMARY KEY);
|
||||||
|
INSERT INTO t1 VALUES (0);
|
||||||
|
# Test slave with no capability gets dummy event, which is ignored.
|
||||||
|
include/stop_slave.inc
|
||||||
|
SET @@global.debug_dbug='+d,simulate_slave_capability_none';
|
||||||
|
include/start_slave.inc
|
||||||
|
ALTER TABLE t1 ORDER BY a;
|
||||||
|
SET SESSION binlog_annotate_row_events = ON;
|
||||||
|
DELETE FROM t1;
|
||||||
|
INSERT INTO t1 /* A comment just to make the annotate event sufficiently long that the dummy event will need to get padded with spaces so that we can test that this works */ VALUES(1);
|
||||||
|
show binlog events in 'master-bin.000001' from <binlog_start> limit 0, 10;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
master-bin.000001 # Query # # BEGIN
|
||||||
|
master-bin.000001 # Annotate_rows # # DELETE FROM t1
|
||||||
|
master-bin.000001 # Table_map # # table_id: # (test.t1)
|
||||||
|
master-bin.000001 # Delete_rows # # table_id: # flags: STMT_END_F
|
||||||
|
master-bin.000001 # Query # # COMMIT
|
||||||
|
master-bin.000001 # Query # # BEGIN
|
||||||
|
master-bin.000001 # Annotate_rows # # INSERT INTO t1 /* A comment just to make the annotate event sufficiently long that the dummy event will need to get padded with spaces so that we can test that this works */ VALUES(1)
|
||||||
|
master-bin.000001 # Table_map # # table_id: # (test.t1)
|
||||||
|
master-bin.000001 # Write_rows # # table_id: # flags: STMT_END_F
|
||||||
|
master-bin.000001 # Query # # COMMIT
|
||||||
|
SELECT * FROM t1;
|
||||||
|
a
|
||||||
|
1
|
||||||
|
show relaylog events in 'slave-relay-bin.000003' from <binlog_start> limit 0,10;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
slave-relay-bin.000003 # Query # # BEGIN
|
||||||
|
slave-relay-bin.000003 # User var # # @`!dummyvar`=NULL
|
||||||
|
slave-relay-bin.000003 # Table_map # # table_id: # (test.t1)
|
||||||
|
slave-relay-bin.000003 # Delete_rows # # table_id: # flags: STMT_END_F
|
||||||
|
slave-relay-bin.000003 # Query # # COMMIT
|
||||||
|
slave-relay-bin.000003 # Query # # BEGIN
|
||||||
|
slave-relay-bin.000003 # Query # # # Dummy event replacing event type 160 that slave cannot handle.
|
||||||
|
slave-relay-bin.000003 # Table_map # # table_id: # (test.t1)
|
||||||
|
slave-relay-bin.000003 # Write_rows # # table_id: # flags: STMT_END_F
|
||||||
|
slave-relay-bin.000003 # Query # # COMMIT
|
||||||
|
set @@global.debug_dbug= @old_slave_dbug;
|
||||||
|
# Test dummy event is checksummed correctly.
|
||||||
|
set @@global.binlog_checksum = CRC32;
|
||||||
|
TRUNCATE t1;
|
||||||
|
INSERT INTO t1 VALUES(2);
|
||||||
|
show binlog events in 'master-bin.000002' from <binlog_start> limit 0, 5;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
master-bin.000002 # Query # # BEGIN
|
||||||
|
master-bin.000002 # Annotate_rows # # INSERT INTO t1 VALUES(2)
|
||||||
|
master-bin.000002 # Table_map # # table_id: # (test.t1)
|
||||||
|
master-bin.000002 # Write_rows # # table_id: # flags: STMT_END_F
|
||||||
|
master-bin.000002 # Query # # COMMIT
|
||||||
|
SELECT * FROM t1;
|
||||||
|
a
|
||||||
|
2
|
||||||
|
show relaylog events in 'slave-relay-bin.000005' from <binlog_start> limit 3,5;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
slave-relay-bin.000005 # Query # # BEGIN
|
||||||
|
slave-relay-bin.000005 # Query # # # Dummy ev
|
||||||
|
slave-relay-bin.000005 # Table_map # # table_id: # (test.t1)
|
||||||
|
slave-relay-bin.000005 # Write_rows # # table_id: # flags: STMT_END_F
|
||||||
|
slave-relay-bin.000005 # Query # # COMMIT
|
||||||
|
# Test that slave which cannot tolerate holes in binlog stream but
|
||||||
|
# knows the event does not get dummy event
|
||||||
|
include/stop_slave.inc
|
||||||
|
SET @@global.debug_dbug='+d,simulate_slave_capability_old_53';
|
||||||
|
include/start_slave.inc
|
||||||
|
ALTER TABLE t1 ORDER BY a;
|
||||||
|
UPDATE t1 SET a = 3;
|
||||||
|
show binlog events in 'master-bin.000002' from <binlog_start> limit 0, 5;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
master-bin.000002 # Query # # BEGIN
|
||||||
|
master-bin.000002 # Annotate_rows # # UPDATE t1 SET a = 3
|
||||||
|
master-bin.000002 # Table_map # # table_id: # (test.t1)
|
||||||
|
master-bin.000002 # Update_rows # # table_id: # flags: STMT_END_F
|
||||||
|
master-bin.000002 # Query # # COMMIT
|
||||||
|
SELECT * FROM t1;
|
||||||
|
a
|
||||||
|
3
|
||||||
|
show relaylog events in 'slave-relay-bin.000006' from <binlog_start> limit 0,5;
|
||||||
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
|
slave-relay-bin.000006 # Query # # BEGIN
|
||||||
|
slave-relay-bin.000006 # Annotate_rows # # UPDATE t1 SET a = 3
|
||||||
|
slave-relay-bin.000006 # Table_map # # table_id: # (test.t1)
|
||||||
|
slave-relay-bin.000006 # Update_rows # # table_id: # flags: STMT_END_F
|
||||||
|
slave-relay-bin.000006 # Query # # COMMIT
|
||||||
|
select @@global.log_slave_updates;
|
||||||
|
@@global.log_slave_updates
|
||||||
|
1
|
||||||
|
select @@global.replicate_annotate_row_events;
|
||||||
|
@@global.replicate_annotate_row_events
|
||||||
|
0
|
||||||
|
set @@global.debug_dbug= @old_slave_dbug;
|
||||||
|
Clean up.
|
||||||
|
set @@global.binlog_checksum = @old_master_binlog_checksum;
|
||||||
|
DROP TABLE t1;
|
||||||
|
include/rpl_end.inc
|
104
mysql-test/suite/rpl/t/rpl_mariadb_slave_capability.test
Normal file
104
mysql-test/suite/rpl/t/rpl_mariadb_slave_capability.test
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
--source include/master-slave.inc
|
||||||
|
--source include/have_debug.inc
|
||||||
|
--source include/have_binlog_format_row.inc
|
||||||
|
|
||||||
|
connection master;
|
||||||
|
|
||||||
|
set @old_master_binlog_checksum= @@global.binlog_checksum;
|
||||||
|
set @old_slave_dbug= @@global.debug_dbug;
|
||||||
|
CREATE TABLE t1 (a INT PRIMARY KEY);
|
||||||
|
INSERT INTO t1 VALUES (0);
|
||||||
|
|
||||||
|
sync_slave_with_master;
|
||||||
|
connection slave;
|
||||||
|
|
||||||
|
--echo # Test slave with no capability gets dummy event, which is ignored.
|
||||||
|
--source include/stop_slave.inc
|
||||||
|
SET @@global.debug_dbug='+d,simulate_slave_capability_none';
|
||||||
|
--source include/start_slave.inc
|
||||||
|
connection master;
|
||||||
|
# Add a dummy event just to have something to sync_slave_with_master on.
|
||||||
|
# Otherwise we occasionally get different $relaylog_start, depending on
|
||||||
|
# whether Format_description_log_event was written to relay log or not
|
||||||
|
# at the time of SHOW SLAVE STATUS.
|
||||||
|
ALTER TABLE t1 ORDER BY a;
|
||||||
|
sync_slave_with_master;
|
||||||
|
connection slave;
|
||||||
|
let $relaylog_start= query_get_value(SHOW SLAVE STATUS, Relay_Log_Pos, 1);
|
||||||
|
|
||||||
|
connection master;
|
||||||
|
SET SESSION binlog_annotate_row_events = ON;
|
||||||
|
let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1);
|
||||||
|
let $binlog_start= query_get_value(SHOW MASTER STATUS, Position, 1);
|
||||||
|
# A short event, to test when we need to use user_var_event for dummy event.
|
||||||
|
DELETE FROM t1;
|
||||||
|
INSERT INTO t1 /* A comment just to make the annotate event sufficiently long that the dummy event will need to get padded with spaces so that we can test that this works */ VALUES(1);
|
||||||
|
let $binlog_limit= 0, 10;
|
||||||
|
--source include/show_binlog_events.inc
|
||||||
|
sync_slave_with_master;
|
||||||
|
connection slave;
|
||||||
|
|
||||||
|
SELECT * FROM t1;
|
||||||
|
let $binlog_file= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1);
|
||||||
|
let $binlog_start= $relaylog_start;
|
||||||
|
let $binlog_limit=0,10;
|
||||||
|
--source include/show_relaylog_events.inc
|
||||||
|
set @@global.debug_dbug= @old_slave_dbug;
|
||||||
|
|
||||||
|
--echo # Test dummy event is checksummed correctly.
|
||||||
|
|
||||||
|
connection master;
|
||||||
|
set @@global.binlog_checksum = CRC32;
|
||||||
|
TRUNCATE t1;
|
||||||
|
let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1);
|
||||||
|
let $binlog_start= query_get_value(SHOW MASTER STATUS, Position, 1);
|
||||||
|
INSERT INTO t1 VALUES(2);
|
||||||
|
let $binlog_limit= 0, 5;
|
||||||
|
--source include/show_binlog_events.inc
|
||||||
|
sync_slave_with_master;
|
||||||
|
connection slave;
|
||||||
|
|
||||||
|
SELECT * FROM t1;
|
||||||
|
let $binlog_file= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1);
|
||||||
|
let $binlog_start= 0;
|
||||||
|
let $binlog_limit=3,5;
|
||||||
|
--source include/show_relaylog_events.inc
|
||||||
|
|
||||||
|
--echo # Test that slave which cannot tolerate holes in binlog stream but
|
||||||
|
--echo # knows the event does not get dummy event
|
||||||
|
|
||||||
|
--source include/stop_slave.inc
|
||||||
|
SET @@global.debug_dbug='+d,simulate_slave_capability_old_53';
|
||||||
|
--source include/start_slave.inc
|
||||||
|
connection master;
|
||||||
|
ALTER TABLE t1 ORDER BY a;
|
||||||
|
sync_slave_with_master;
|
||||||
|
connection slave;
|
||||||
|
let $relaylog_start= query_get_value(SHOW SLAVE STATUS, Relay_Log_Pos, 1);
|
||||||
|
|
||||||
|
connection master;
|
||||||
|
let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1);
|
||||||
|
let $binlog_start= query_get_value(SHOW MASTER STATUS, Position, 1);
|
||||||
|
UPDATE t1 SET a = 3;
|
||||||
|
let $binlog_limit= 0, 5;
|
||||||
|
--source include/show_binlog_events.inc
|
||||||
|
sync_slave_with_master;
|
||||||
|
connection slave;
|
||||||
|
|
||||||
|
SELECT * FROM t1;
|
||||||
|
let $binlog_file= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1);
|
||||||
|
let $binlog_start= $relaylog_start;
|
||||||
|
let $binlog_limit=0,5;
|
||||||
|
--source include/show_relaylog_events.inc
|
||||||
|
|
||||||
|
select @@global.log_slave_updates;
|
||||||
|
select @@global.replicate_annotate_row_events;
|
||||||
|
|
||||||
|
set @@global.debug_dbug= @old_slave_dbug;
|
||||||
|
|
||||||
|
--echo Clean up.
|
||||||
|
connection master;
|
||||||
|
set @@global.binlog_checksum = @old_master_binlog_checksum;
|
||||||
|
DROP TABLE t1;
|
||||||
|
sync_slave_with_master;
|
||||||
|
--source include/rpl_end.inc
|
109
sql/log_event.cc
109
sql/log_event.cc
@ -3300,6 +3300,115 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Replace a binlog event read into a packet with a dummy event. Either a
|
||||||
|
Query_log_event that has just a comment, or if that will not fit in the
|
||||||
|
space used for the event to be replaced, then a NULL user_var event.
|
||||||
|
|
||||||
|
This is used when sending binlog data to a slave which does not understand
|
||||||
|
this particular event and which is too old to support informational events
|
||||||
|
or holes in the event stream.
|
||||||
|
|
||||||
|
This allows to write such events into the binlog on the master and still be
|
||||||
|
able to replicate against old slaves without them breaking.
|
||||||
|
|
||||||
|
Clears the flag LOG_EVENT_THREAD_SPECIFIC_F and set LOG_EVENT_SUPPRESS_USE_F.
|
||||||
|
Overwrites the type with QUERY_EVENT (or USER_VAR_EVENT), and replaces the
|
||||||
|
body with a minimal query / NULL user var.
|
||||||
|
|
||||||
|
Returns zero on success, -1 if error due to too little space in original
|
||||||
|
event. A minimum of 25 bytes (19 bytes fixed header + 6 bytes in the body)
|
||||||
|
is needed in any event to be replaced with a dummy event.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
Query_log_event::dummy_event(String *packet, ulong ev_offset,
|
||||||
|
uint8 checksum_alg)
|
||||||
|
{
|
||||||
|
uchar *p= (uchar *)packet->ptr() + ev_offset;
|
||||||
|
size_t data_len= packet->length() - ev_offset;
|
||||||
|
uint16 flags;
|
||||||
|
static const size_t min_user_var_event_len=
|
||||||
|
LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + 1 + UV_VAL_IS_NULL; // 25
|
||||||
|
static const size_t min_query_event_len=
|
||||||
|
LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN + 1 + 1; // 34
|
||||||
|
|
||||||
|
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
|
||||||
|
data_len-= BINLOG_CHECKSUM_LEN;
|
||||||
|
else
|
||||||
|
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
|
||||||
|
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
|
||||||
|
|
||||||
|
if (data_len < min_user_var_event_len)
|
||||||
|
/* Cannot replace with dummy, event too short. */
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
flags= uint2korr(p + FLAGS_OFFSET);
|
||||||
|
flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
|
||||||
|
flags|= LOG_EVENT_SUPPRESS_USE_F;
|
||||||
|
int2store(p + FLAGS_OFFSET, flags);
|
||||||
|
|
||||||
|
if (data_len < min_query_event_len)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Have to use dummy user_var event for such a short packet.
|
||||||
|
|
||||||
|
This works, but the event will be considered part of an event group with
|
||||||
|
the following event. So for example @@global.sql_slave_skip_counter=1
|
||||||
|
will skip not only the dummy event, but also the immediately following
|
||||||
|
event.
|
||||||
|
|
||||||
|
We write a NULL user var with the name @`!dummyvar` (or as much
|
||||||
|
as that as will fit within the size of the original event - so
|
||||||
|
possibly just @`!`).
|
||||||
|
*/
|
||||||
|
static const char var_name[]= "!dummyvar";
|
||||||
|
uint name_len= data_len - (min_user_var_event_len - 1);
|
||||||
|
|
||||||
|
p[EVENT_TYPE_OFFSET]= USER_VAR_EVENT;
|
||||||
|
int4store(p + LOG_EVENT_HEADER_LEN, name_len);
|
||||||
|
memcpy(p + LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE, var_name, name_len);
|
||||||
|
p[LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + name_len]= 1; // indicates NULL
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Use a dummy query event, just a comment.
|
||||||
|
*/
|
||||||
|
static const char message[]=
|
||||||
|
"# Dummy event replacing event type %u that slave cannot handle.";
|
||||||
|
char buf[sizeof(message)+1]; /* +1, as %u can expand to 3 digits. */
|
||||||
|
uchar old_type= p[EVENT_TYPE_OFFSET];
|
||||||
|
uchar *q= p + LOG_EVENT_HEADER_LEN;
|
||||||
|
size_t comment_len, len;
|
||||||
|
|
||||||
|
p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
|
||||||
|
int4store(q + Q_THREAD_ID_OFFSET, 0);
|
||||||
|
int4store(q + Q_EXEC_TIME_OFFSET, 0);
|
||||||
|
q[Q_DB_LEN_OFFSET]= 0;
|
||||||
|
int2store(q + Q_ERR_CODE_OFFSET, 0);
|
||||||
|
int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
|
||||||
|
q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
|
||||||
|
q+= Q_DATA_OFFSET + 1;
|
||||||
|
len= my_snprintf(buf, sizeof(buf), message, old_type);
|
||||||
|
comment_len= data_len - (min_query_event_len - 1);
|
||||||
|
if (comment_len <= len)
|
||||||
|
memcpy(q, buf, comment_len);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
memcpy(q, buf, len);
|
||||||
|
memset(q+len, ' ', comment_len - len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
|
||||||
|
{
|
||||||
|
ha_checksum crc= my_checksum(0L, p, data_len);
|
||||||
|
int4store(p + data_len, crc);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#ifdef MYSQL_CLIENT
|
#ifdef MYSQL_CLIENT
|
||||||
/**
|
/**
|
||||||
Query_log_event::print().
|
Query_log_event::print().
|
||||||
|
@ -573,6 +573,43 @@ enum enum_binlog_checksum_alg {
|
|||||||
#define BINLOG_CHECKSUM_LEN CHECKSUM_CRC32_SIGNATURE_LEN
|
#define BINLOG_CHECKSUM_LEN CHECKSUM_CRC32_SIGNATURE_LEN
|
||||||
#define BINLOG_CHECKSUM_ALG_DESC_LEN 1 /* 1 byte checksum alg descriptor */
|
#define BINLOG_CHECKSUM_ALG_DESC_LEN 1 /* 1 byte checksum alg descriptor */
|
||||||
|
|
||||||
|
/*
|
||||||
|
These are capability numbers for MariaDB slave servers.
|
||||||
|
|
||||||
|
Newer MariaDB slaves set this to inform the master about their capabilities.
|
||||||
|
This allows the master to decide which events it can send to the slave
|
||||||
|
without breaking replication on old slaves that maybe do not understand
|
||||||
|
all events from newer masters.
|
||||||
|
|
||||||
|
As new releases are backwards compatible, a given capability implies also
|
||||||
|
all capabilities with smaller number.
|
||||||
|
|
||||||
|
Older MariaDB slaves and other MySQL slave servers do not set this, so they
|
||||||
|
are recorded with capability 0.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* MySQL or old MariaDB slave with no announced capability. */
|
||||||
|
#define MARIA_SLAVE_CAPABILITY_UNKNOWN 0
|
||||||
|
/* MariaDB >= 5.3, which understands ANNOTATE_ROWS_EVENT. */
|
||||||
|
#define MARIA_SLAVE_CAPABILITY_ANNOTATE 1
|
||||||
|
/*
|
||||||
|
MariaDB >= 5.5. This version has the capability to tolerate events omitted
|
||||||
|
from the binlog stream without breaking replication (MySQL slaves fail
|
||||||
|
because they mis-compute the offsets into the master's binlog).
|
||||||
|
*/
|
||||||
|
#define MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES 2
|
||||||
|
/* MariaDB > 5.5, which knows about binlog_checkpoint_log_event. */
|
||||||
|
#define MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT 3
|
||||||
|
/*
|
||||||
|
MariaDB server which understands MySQL 5.6 ignorable events. This server
|
||||||
|
can tolerate receiving any event with the LOG_EVENT_IGNORABLE_F flag set.
|
||||||
|
*/
|
||||||
|
#define MARIA_SLAVE_CAPABILITY_IGNORABLE 4
|
||||||
|
|
||||||
|
/* Our capability. */
|
||||||
|
#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@enum Log_event_type
|
@enum Log_event_type
|
||||||
|
|
||||||
@ -1827,6 +1864,7 @@ public:
|
|||||||
my_free(data_buf);
|
my_free(data_buf);
|
||||||
}
|
}
|
||||||
Log_event_type get_type_code() { return QUERY_EVENT; }
|
Log_event_type get_type_code() { return QUERY_EVENT; }
|
||||||
|
static int dummy_event(String *packet, ulong ev_offset, uint8 checksum_alg);
|
||||||
#ifdef MYSQL_SERVER
|
#ifdef MYSQL_SERVER
|
||||||
bool write(IO_CACHE* file);
|
bool write(IO_CACHE* file);
|
||||||
virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; }
|
virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; }
|
||||||
|
32
sql/slave.cc
32
sql/slave.cc
@ -1753,6 +1753,38 @@ past_checksum:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Announce MariaDB slave capabilities. */
|
||||||
|
DBUG_EXECUTE_IF("simulate_slave_capability_none", goto after_set_capability;);
|
||||||
|
{
|
||||||
|
const char *q=
|
||||||
|
DBUG_EVALUATE_IF("simulate_slave_capability_old_53",
|
||||||
|
"SET @mariadb_slave_capability="
|
||||||
|
STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_ANNOTATE),
|
||||||
|
"SET @mariadb_slave_capability="
|
||||||
|
STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_MINE));
|
||||||
|
if (mysql_real_query(mysql, q, strlen(q)))
|
||||||
|
{
|
||||||
|
err_code= mysql_errno(mysql);
|
||||||
|
if (is_network_error(err_code))
|
||||||
|
{
|
||||||
|
mi->report(ERROR_LEVEL, err_code,
|
||||||
|
"Setting @mariadb_slave_capability failed with error: %s",
|
||||||
|
mysql_error(mysql));
|
||||||
|
goto network_err;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* Fatal error */
|
||||||
|
errmsg= "The slave I/O thread stops because a fatal error is "
|
||||||
|
"encountered when it tries to set @mariadb_slave_capability.";
|
||||||
|
sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
after_set_capability:
|
||||||
|
|
||||||
err:
|
err:
|
||||||
if (errmsg)
|
if (errmsg)
|
||||||
{
|
{
|
||||||
|
@ -491,6 +491,27 @@ static ulonglong get_heartbeat_period(THD * thd)
|
|||||||
return entry? entry->val_int(&null_value) : 0;
|
return entry? entry->val_int(&null_value) : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Lookup the capabilities of the slave, which it announces by setting a value
|
||||||
|
MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability.
|
||||||
|
|
||||||
|
Older MariaDB slaves, and other MySQL slaves, do not set
|
||||||
|
@mariadb_slave_capability, corresponding to a capability of
|
||||||
|
MARIA_SLAVE_CAPABILITY_UNKNOWN (0).
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
get_mariadb_slave_capability(THD *thd)
|
||||||
|
{
|
||||||
|
bool null_value;
|
||||||
|
const LEX_STRING name= { C_STRING_WITH_LEN("mariadb_slave_capability") };
|
||||||
|
const user_var_entry *entry=
|
||||||
|
(user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
|
||||||
|
name.length);
|
||||||
|
return entry ?
|
||||||
|
(int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Function prepares and sends repliation heartbeat event.
|
Function prepares and sends repliation heartbeat event.
|
||||||
|
|
||||||
@ -563,14 +584,44 @@ static int send_heartbeat_event(NET* net, String* packet,
|
|||||||
static const char *
|
static const char *
|
||||||
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
||||||
Log_event_type event_type, char *log_file_name,
|
Log_event_type event_type, char *log_file_name,
|
||||||
IO_CACHE *log)
|
IO_CACHE *log, int mariadb_slave_capability,
|
||||||
|
ulong ev_offset, uint8 current_checksum_alg)
|
||||||
{
|
{
|
||||||
my_off_t pos;
|
my_off_t pos;
|
||||||
|
|
||||||
/* Do not send annotate_rows events unless slave requested it. */
|
/* Do not send annotate_rows events unless slave requested it. */
|
||||||
if (event_type == ANNOTATE_ROWS_EVENT &&
|
if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
|
||||||
!(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
|
{
|
||||||
return NULL;
|
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
|
||||||
|
{
|
||||||
|
/* This slave can tolerate events omitted from the binlog stream. */
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as
|
||||||
|
it will not log them in its own binary log). However, it understands the
|
||||||
|
event and will just ignore it, and it would break if we omitted it,
|
||||||
|
leaving a hole in the binlog stream. So just send the event as-is.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
The slave does not understand ANNOTATE_ROWS_EVENT.
|
||||||
|
|
||||||
|
Older MariaDB slaves (and MySQL slaves) will break replication if there
|
||||||
|
are holes in the binlog stream (they will miscompute the binlog offset
|
||||||
|
and request the wrong position when reconnecting).
|
||||||
|
|
||||||
|
So replace the event with a dummy event of the same size that will be
|
||||||
|
a no-operation on the slave.
|
||||||
|
*/
|
||||||
|
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
|
||||||
|
return "Failed to replace row annotate event with dummy: too small event.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Skip events with the @@skip_replication flag set, if slave requested
|
Skip events with the @@skip_replication flag set, if slave requested
|
||||||
@ -628,6 +679,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
|||||||
NET* net = &thd->net;
|
NET* net = &thd->net;
|
||||||
mysql_mutex_t *log_lock;
|
mysql_mutex_t *log_lock;
|
||||||
mysql_cond_t *log_cond;
|
mysql_cond_t *log_cond;
|
||||||
|
int mariadb_slave_capability;
|
||||||
|
|
||||||
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
|
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
|
||||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||||
@ -653,6 +705,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
|||||||
heartbeat_ts= &heartbeat_buf;
|
heartbeat_ts= &heartbeat_buf;
|
||||||
set_timespec_nsec(*heartbeat_ts, 0);
|
set_timespec_nsec(*heartbeat_ts, 0);
|
||||||
}
|
}
|
||||||
|
mariadb_slave_capability= get_mariadb_slave_capability(thd);
|
||||||
if (global_system_variables.log_warnings > 1)
|
if (global_system_variables.log_warnings > 1)
|
||||||
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
|
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
|
||||||
thd->server_id, log_ident, (ulong)pos);
|
thd->server_id, log_ident, (ulong)pos);
|
||||||
@ -939,7 +992,9 @@ impossible position";
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
||||||
log_file_name, &log)))
|
log_file_name, &log,
|
||||||
|
mariadb_slave_capability, ev_offset,
|
||||||
|
current_checksum_alg)))
|
||||||
{
|
{
|
||||||
errmsg= tmp_msg;
|
errmsg= tmp_msg;
|
||||||
my_errno= ER_UNKNOWN_ERROR;
|
my_errno= ER_UNKNOWN_ERROR;
|
||||||
@ -1097,7 +1152,9 @@ impossible position";
|
|||||||
|
|
||||||
if (read_packet &&
|
if (read_packet &&
|
||||||
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
|
||||||
log_file_name, &log)))
|
log_file_name, &log,
|
||||||
|
mariadb_slave_capability, ev_offset,
|
||||||
|
current_checksum_alg)))
|
||||||
{
|
{
|
||||||
errmsg= tmp_msg;
|
errmsg= tmp_msg;
|
||||||
my_errno= ER_UNKNOWN_ERROR;
|
my_errno= ER_UNKNOWN_ERROR;
|
||||||
|
Reference in New Issue
Block a user