1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

Binlog-in-engine: Handle mixing transactional and non-transactional tables

When updating non-transactional tables inside a multi-statement transaction,
and binlog_direct_non_transactional_updates=1, then the non-transactional
updates are binlogged directly through the statement cache while the
transaction cache is still being added to in the main transaction.

Thus, move the engine_binlog_info out from binlog_cache_mngr and into the
individual stmt/trx binlog_cache_data, so that we can have separate
engine_binlog_info active for the statement and the transaction cache.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-07-23 00:19:30 +02:00
parent 7a67f72979
commit 585785c7bc
9 changed files with 612 additions and 71 deletions

View File

@@ -0,0 +1,415 @@
include/master-slave.inc
[connection master]
CREATE TABLE t1(a INT PRIMARY KEY, b INT, c LONGTEXT) ENGINE=InnoDB;
CREATE TABLE t2(a INT PRIMARY KEY, b INT, c LONGTEXT) ENGINE=Aria;
SET @c= REPEAT('*', 20);
SET SESSION binlog_format=statement;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 0, 0, @c), (2 + 0, 0, @c), (3 + 0, 0, @c);
INSERT INTO t2 VALUES (1 + 0, 1, @c), (2 + 0, 1, @c), (3 + 0, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+0;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+0;
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+0;
INSERT INTO t2 VALUES (4 + 0, 2, @c);
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 10, 0, @c), (2 + 10, 0, @c), (3 + 10, 0, @c);
INSERT INTO t2 VALUES (1 + 10, 1, @c), (2 + 10, 1, @c), (3 + 10, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+10;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+10;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+10;
INSERT INTO t2 VALUES (4 + 10, 2, @c);
COMMIT;
SET SESSION binlog_format=row;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 100, 0, @c), (2 + 100, 0, @c), (3 + 100, 0, @c);
INSERT INTO t2 VALUES (1 + 100, 1, @c), (2 + 100, 1, @c), (3 + 100, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+100;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+100;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+100;
INSERT INTO t2 VALUES (4 + 100, 2, @c);
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 110, 0, @c), (2 + 110, 0, @c), (3 + 110, 0, @c);
INSERT INTO t2 VALUES (1 + 110, 1, @c), (2 + 110, 1, @c), (3 + 110, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+110;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+110;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+110;
INSERT INTO t2 VALUES (4 + 110, 2, @c);
COMMIT;
SET @c= REPEAT('%', 1024);
SET SESSION binlog_format=statement;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 1000, 0, @c), (2 + 1000, 0, @c), (3 + 1000, 0, @c);
INSERT INTO t2 VALUES (1 + 1000, 1, @c), (2 + 1000, 1, @c), (3 + 1000, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+1000;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+1000;
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+1000;
INSERT INTO t2 VALUES (4 + 1000, 2, @c);
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 1010, 0, @c), (2 + 1010, 0, @c), (3 + 1010, 0, @c);
INSERT INTO t2 VALUES (1 + 1010, 1, @c), (2 + 1010, 1, @c), (3 + 1010, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+1010;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+1010;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+1010;
INSERT INTO t2 VALUES (4 + 1010, 2, @c);
COMMIT;
SET SESSION binlog_format=row;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 1100, 0, @c), (2 + 1100, 0, @c), (3 + 1100, 0, @c);
INSERT INTO t2 VALUES (1 + 1100, 1, @c), (2 + 1100, 1, @c), (3 + 1100, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+1100;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+1100;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+1100;
INSERT INTO t2 VALUES (4 + 1100, 2, @c);
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 1110, 0, @c), (2 + 1110, 0, @c), (3 + 1110, 0, @c);
INSERT INTO t2 VALUES (1 + 1110, 1, @c), (2 + 1110, 1, @c), (3 + 1110, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+1110;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+1110;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+1110;
INSERT INTO t2 VALUES (4 + 1110, 2, @c);
COMMIT;
SET @c= REPEAT('.', 18000);
SET SESSION binlog_format=statement;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 2000, 0, @c), (2 + 2000, 0, @c), (3 + 2000, 0, @c);
INSERT INTO t2 VALUES (1 + 2000, 1, @c), (2 + 2000, 1, @c), (3 + 2000, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+2000;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+2000;
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+2000;
INSERT INTO t2 VALUES (4 + 2000, 2, @c);
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 2010, 0, @c), (2 + 2010, 0, @c), (3 + 2010, 0, @c);
INSERT INTO t2 VALUES (1 + 2010, 1, @c), (2 + 2010, 1, @c), (3 + 2010, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+2010;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+2010;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+2010;
INSERT INTO t2 VALUES (4 + 2010, 2, @c);
COMMIT;
SET SESSION binlog_format=row;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 2100, 0, @c), (2 + 2100, 0, @c), (3 + 2100, 0, @c);
INSERT INTO t2 VALUES (1 + 2100, 1, @c), (2 + 2100, 1, @c), (3 + 2100, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+2100;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+2100;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+2100;
INSERT INTO t2 VALUES (4 + 2100, 2, @c);
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 2110, 0, @c), (2 + 2110, 0, @c), (3 + 2110, 0, @c);
INSERT INTO t2 VALUES (1 + 2110, 1, @c), (2 + 2110, 1, @c), (3 + 2110, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+2110;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+2110;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+2110;
INSERT INTO t2 VALUES (4 + 2110, 2, @c);
COMMIT;
SET @c= REPEAT('.', 40000);
SET SESSION binlog_format=statement;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 3000, 0, @c), (2 + 3000, 0, @c), (3 + 3000, 0, @c);
INSERT INTO t2 VALUES (1 + 3000, 1, @c), (2 + 3000, 1, @c), (3 + 3000, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+3000;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+3000;
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+3000;
INSERT INTO t2 VALUES (4 + 3000, 2, @c);
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 3010, 0, @c), (2 + 3010, 0, @c), (3 + 3010, 0, @c);
INSERT INTO t2 VALUES (1 + 3010, 1, @c), (2 + 3010, 1, @c), (3 + 3010, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+3010;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+3010;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+3010;
INSERT INTO t2 VALUES (4 + 3010, 2, @c);
COMMIT;
SET SESSION binlog_format=row;
SET SESSION binlog_direct_non_transactional_updates= 0;
INSERT INTO t1 VALUES (1 + 3100, 0, @c), (2 + 3100, 0, @c), (3 + 3100, 0, @c);
INSERT INTO t2 VALUES (1 + 3100, 1, @c), (2 + 3100, 1, @c), (3 + 3100, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+3100;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+3100;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+3100;
INSERT INTO t2 VALUES (4 + 3100, 2, @c);
COMMIT;
SET SESSION binlog_direct_non_transactional_updates= 1;
INSERT INTO t1 VALUES (1 + 3110, 0, @c), (2 + 3110, 0, @c), (3 + 3110, 0, @c);
INSERT INTO t2 VALUES (1 + 3110, 1, @c), (2 + 3110, 1, @c), (3 + 3110, 1, @c);
BEGIN;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+3110;
UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+3110;
UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+3110;
INSERT INTO t2 VALUES (4 + 3110, 2, @c);
COMMIT;
SELECT a, b, length(c) FROM t1 ORDER BY a;
a b length(c)
1 2 21
2 0 20
3 2 21
11 2 21
12 0 20
13 2 21
101 2 21
102 0 20
103 2 21
111 2 21
112 0 20
113 2 21
1001 2 1025
1002 0 1024
1003 2 1025
1011 2 1025
1012 0 1024
1013 2 1025
1101 2 1025
1102 0 1024
1103 2 1025
1111 2 1025
1112 0 1024
1113 2 1025
2001 2 18001
2002 0 18000
2003 2 18001
2011 2 18001
2012 0 18000
2013 2 18001
2101 2 18001
2102 0 18000
2103 2 18001
2111 2 18001
2112 0 18000
2113 2 18001
3001 2 40001
3002 0 40000
3003 2 40001
3011 2 40001
3012 0 40000
3013 2 40001
3101 2 40001
3102 0 40000
3103 2 40001
3111 2 40001
3112 0 40000
3113 2 40001
SELECT a, b, length(c) FROM t2 ORDER BY a;
a b length(c)
1 1 20
2 2 21
3 1 20
4 2 20
11 1 20
12 2 21
13 1 20
14 2 20
101 1 20
102 2 21
103 1 20
104 2 20
111 1 20
112 2 21
113 1 20
114 2 20
1001 1 1024
1002 2 1025
1003 1 1024
1004 2 1024
1011 1 1024
1012 2 1025
1013 1 1024
1014 2 1024
1101 1 1024
1102 2 1025
1103 1 1024
1104 2 1024
1111 1 1024
1112 2 1025
1113 1 1024
1114 2 1024
2001 1 18000
2002 2 18001
2003 1 18000
2004 2 18000
2011 1 18000
2012 2 18001
2013 1 18000
2014 2 18000
2101 1 18000
2102 2 18001
2103 1 18000
2104 2 18000
2111 1 18000
2112 2 18001
2113 1 18000
2114 2 18000
3001 1 40000
3002 2 40001
3003 1 40000
3004 2 40000
3011 1 40000
3012 2 40001
3013 1 40000
3014 2 40000
3101 1 40000
3102 2 40001
3103 1 40000
3104 2 40000
3111 1 40000
3112 2 40001
3113 1 40000
3114 2 40000
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
SELECT a, b, length(c) FROM t1 ORDER BY a;
a b length(c)
1 2 21
2 0 20
3 2 21
11 2 21
12 0 20
13 2 21
101 2 21
102 0 20
103 2 21
111 2 21
112 0 20
113 2 21
1001 2 1025
1002 0 1024
1003 2 1025
1011 2 1025
1012 0 1024
1013 2 1025
1101 2 1025
1102 0 1024
1103 2 1025
1111 2 1025
1112 0 1024
1113 2 1025
2001 2 18001
2002 0 18000
2003 2 18001
2011 2 18001
2012 0 18000
2013 2 18001
2101 2 18001
2102 0 18000
2103 2 18001
2111 2 18001
2112 0 18000
2113 2 18001
3001 2 40001
3002 0 40000
3003 2 40001
3011 2 40001
3012 0 40000
3013 2 40001
3101 2 40001
3102 0 40000
3103 2 40001
3111 2 40001
3112 0 40000
3113 2 40001
SELECT a, b, length(c) FROM t2 ORDER BY a;
a b length(c)
1 1 20
2 2 21
3 1 20
4 2 20
11 1 20
12 2 21
13 1 20
14 2 20
101 1 20
102 2 21
103 1 20
104 2 20
111 1 20
112 2 21
113 1 20
114 2 20
1001 1 1024
1002 2 1025
1003 1 1024
1004 2 1024
1011 1 1024
1012 2 1025
1013 1 1024
1014 2 1024
1101 1 1024
1102 2 1025
1103 1 1024
1104 2 1024
1111 1 1024
1112 2 1025
1113 1 1024
1114 2 1024
2001 1 18000
2002 2 18001
2003 1 18000
2004 2 18000
2011 1 18000
2012 2 18001
2013 1 18000
2014 2 18000
2101 1 18000
2102 2 18001
2103 1 18000
2104 2 18000
2111 1 18000
2112 2 18001
2113 1 18000
2114 2 18000
3001 1 40000
3002 2 40001
3003 1 40000
3004 2 40000
3011 1 40000
3012 2 40001
3013 1 40000
3014 2 40000
3101 1 40000
3102 2 40001
3103 1 40000
3104 2 40000
3111 1 40000
3112 2 40001
3113 1 40000
3114 2 40000
connection master;
DROP TABLE t1, t2;
CALL mtr.add_suppression('Statement is unsafe because it accesses a non-transactional table after accessing a transactional table');
include/rpl_end.inc

View File

@@ -0,0 +1,67 @@
--source include/master-slave.inc
--source include/have_binlog_format_mixed.inc
--source include/have_innodb_binlog.inc
CREATE TABLE t1(a INT PRIMARY KEY, b INT, c LONGTEXT) ENGINE=InnoDB;
CREATE TABLE t2(a INT PRIMARY KEY, b INT, c LONGTEXT) ENGINE=Aria;
--let $i= 0
while ($i <= 3) {
if ($i == 0) {
SET @c= REPEAT('*', 20);
}
if ($i == 1) {
SET @c= REPEAT('%', 1024);
}
if ($i == 2) {
SET @c= REPEAT('.', 18000);
}
if ($i == 3) {
SET @c= REPEAT('.', 40000);
}
--let $f= 0
while ($f <= 1) {
if ($f == 0) {
SET SESSION binlog_format=statement;
}
if ($f == 1) {
SET SESSION binlog_format=row;
}
--let $s= 0
while ($s <= 1) {
--let $k = `SELECT $i*1000 + $f*100 + $s*10`
eval SET SESSION binlog_direct_non_transactional_updates= $s;
eval INSERT INTO t1 VALUES (1 + $k, 0, @c), (2 + $k, 0, @c), (3 + $k, 0, @c);
eval INSERT INTO t2 VALUES (1 + $k, 1, @c), (2 + $k, 1, @c), (3 + $k, 1, @c);
BEGIN;
eval UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=1+$k;
eval UPDATE t2 SET b=2, c=CONCAT('!', c) WHERE a=2+$k;
eval UPDATE t1 SET b=2, c=CONCAT('!', c) WHERE a=3+$k;
eval INSERT INTO t2 VALUES (4 + $k, 2, @c);
COMMIT;
inc $s;
}
inc $f;
}
inc $i;
}
SELECT a, b, length(c) FROM t1 ORDER BY a;
SELECT a, b, length(c) FROM t2 ORDER BY a;
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
SELECT a, b, length(c) FROM t1 ORDER BY a;
SELECT a, b, length(c) FROM t2 ORDER BY a;
--connection master
DROP TABLE t1, t2;
CALL mtr.add_suppression('Statement is unsafe because it accesses a non-transactional table after accessing a transactional table');
--source include/rpl_end.inc

View File

@@ -1,7 +1,7 @@
include/master-slave.inc include/master-slave.inc
[connection master] [connection master]
CREATE TABLE t1 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB; CREATE TABLE t1 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB;
CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB; CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=MyISAM;
SET @b= REPEAT('$', 0); SET @b= REPEAT('$', 0);
BEGIN; BEGIN;
INSERT INTO t1 VALUES (0, 1, @b); INSERT INTO t1 VALUES (0, 1, @b);
@@ -34,12 +34,15 @@ SAVEPOINT s10;
INSERT INTO t1 VALUES (0, 11, @b); INSERT INTO t1 VALUES (0, 11, @b);
INSERT INTO t2 VALUES (0, 12, @b); INSERT INTO t2 VALUES (0, 12, @b);
ROLLBACK TO s10; ROLLBACK TO s10;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
COMMIT; COMMIT;
SELECT a, length(b) FROM t1 WHERE i=0 AND a>=10 ORDER BY a; SELECT a, length(b) FROM t1 WHERE i=0 AND a>=10 ORDER BY a;
a length(b) a length(b)
10 0 10 0
SELECT a, length(b) FROM t2 WHERE i=0 ORDER BY a; SELECT a, length(b) FROM t2 WHERE i=0 ORDER BY a;
a length(b) a length(b)
12 0
BEGIN; BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=0; UPDATE t1 SET a=a+1000 WHERE i=0;
UPDATE t1 SET b='x' WHERE i=0; UPDATE t1 SET b='x' WHERE i=0;
@@ -90,12 +93,15 @@ SAVEPOINT s10;
INSERT INTO t1 VALUES (1, 11, @b); INSERT INTO t1 VALUES (1, 11, @b);
INSERT INTO t2 VALUES (1, 12, @b); INSERT INTO t2 VALUES (1, 12, @b);
ROLLBACK TO s10; ROLLBACK TO s10;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
COMMIT; COMMIT;
SELECT a, length(b) FROM t1 WHERE i=1 AND a>=10 ORDER BY a; SELECT a, length(b) FROM t1 WHERE i=1 AND a>=10 ORDER BY a;
a length(b) a length(b)
10 10 10 10
SELECT a, length(b) FROM t2 WHERE i=1 ORDER BY a; SELECT a, length(b) FROM t2 WHERE i=1 ORDER BY a;
a length(b) a length(b)
12 10
BEGIN; BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=1; UPDATE t1 SET a=a+1000 WHERE i=1;
UPDATE t1 SET b='x' WHERE i=1; UPDATE t1 SET b='x' WHERE i=1;
@@ -146,12 +152,15 @@ SAVEPOINT s10;
INSERT INTO t1 VALUES (2, 11, @b); INSERT INTO t1 VALUES (2, 11, @b);
INSERT INTO t2 VALUES (2, 12, @b); INSERT INTO t2 VALUES (2, 12, @b);
ROLLBACK TO s10; ROLLBACK TO s10;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
COMMIT; COMMIT;
SELECT a, length(b) FROM t1 WHERE i=2 AND a>=10 ORDER BY a; SELECT a, length(b) FROM t1 WHERE i=2 AND a>=10 ORDER BY a;
a length(b) a length(b)
10 100 10 100
SELECT a, length(b) FROM t2 WHERE i=2 ORDER BY a; SELECT a, length(b) FROM t2 WHERE i=2 ORDER BY a;
a length(b) a length(b)
12 100
BEGIN; BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=2; UPDATE t1 SET a=a+1000 WHERE i=2;
UPDATE t1 SET b='x' WHERE i=2; UPDATE t1 SET b='x' WHERE i=2;
@@ -202,12 +211,15 @@ SAVEPOINT s10;
INSERT INTO t1 VALUES (3, 11, @b); INSERT INTO t1 VALUES (3, 11, @b);
INSERT INTO t2 VALUES (3, 12, @b); INSERT INTO t2 VALUES (3, 12, @b);
ROLLBACK TO s10; ROLLBACK TO s10;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
COMMIT; COMMIT;
SELECT a, length(b) FROM t1 WHERE i=3 AND a>=10 ORDER BY a; SELECT a, length(b) FROM t1 WHERE i=3 AND a>=10 ORDER BY a;
a length(b) a length(b)
10 642 10 642
SELECT a, length(b) FROM t2 WHERE i=3 ORDER BY a; SELECT a, length(b) FROM t2 WHERE i=3 ORDER BY a;
a length(b) a length(b)
12 642
BEGIN; BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=3; UPDATE t1 SET a=a+1000 WHERE i=3;
UPDATE t1 SET b='x' WHERE i=3; UPDATE t1 SET b='x' WHERE i=3;
@@ -258,12 +270,15 @@ SAVEPOINT s10;
INSERT INTO t1 VALUES (4, 11, @b); INSERT INTO t1 VALUES (4, 11, @b);
INSERT INTO t2 VALUES (4, 12, @b); INSERT INTO t2 VALUES (4, 12, @b);
ROLLBACK TO s10; ROLLBACK TO s10;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
COMMIT; COMMIT;
SELECT a, length(b) FROM t1 WHERE i=4 AND a>=10 ORDER BY a; SELECT a, length(b) FROM t1 WHERE i=4 AND a>=10 ORDER BY a;
a length(b) a length(b)
10 3930 10 3930
SELECT a, length(b) FROM t2 WHERE i=4 ORDER BY a; SELECT a, length(b) FROM t2 WHERE i=4 ORDER BY a;
a length(b) a length(b)
12 3930
BEGIN; BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=4; UPDATE t1 SET a=a+1000 WHERE i=4;
UPDATE t1 SET b='x' WHERE i=4; UPDATE t1 SET b='x' WHERE i=4;
@@ -314,12 +329,15 @@ SAVEPOINT s10;
INSERT INTO t1 VALUES (5, 11, @b); INSERT INTO t1 VALUES (5, 11, @b);
INSERT INTO t2 VALUES (5, 12, @b); INSERT INTO t2 VALUES (5, 12, @b);
ROLLBACK TO s10; ROLLBACK TO s10;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
COMMIT; COMMIT;
SELECT a, length(b) FROM t1 WHERE i=5 AND a>=10 ORDER BY a; SELECT a, length(b) FROM t1 WHERE i=5 AND a>=10 ORDER BY a;
a length(b) a length(b)
10 16000 10 16000
SELECT a, length(b) FROM t2 WHERE i=5 ORDER BY a; SELECT a, length(b) FROM t2 WHERE i=5 ORDER BY a;
a length(b) a length(b)
12 16000
BEGIN; BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=5; UPDATE t1 SET a=a+1000 WHERE i=5;
UPDATE t1 SET b='x' WHERE i=5; UPDATE t1 SET b='x' WHERE i=5;
@@ -370,12 +388,15 @@ SAVEPOINT s10;
INSERT INTO t1 VALUES (6, 11, @b); INSERT INTO t1 VALUES (6, 11, @b);
INSERT INTO t2 VALUES (6, 12, @b); INSERT INTO t2 VALUES (6, 12, @b);
ROLLBACK TO s10; ROLLBACK TO s10;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
COMMIT; COMMIT;
SELECT a, length(b) FROM t1 WHERE i=6 AND a>=10 ORDER BY a; SELECT a, length(b) FROM t1 WHERE i=6 AND a>=10 ORDER BY a;
a length(b) a length(b)
10 40000 10 40000
SELECT a, length(b) FROM t2 WHERE i=6 ORDER BY a; SELECT a, length(b) FROM t2 WHERE i=6 ORDER BY a;
a length(b) a length(b)
12 40000
BEGIN; BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=6; UPDATE t1 SET a=a+1000 WHERE i=6;
UPDATE t1 SET b='x' WHERE i=6; UPDATE t1 SET b='x' WHERE i=6;

View File

@@ -3,8 +3,7 @@
--source include/have_innodb_binlog.inc --source include/have_innodb_binlog.inc
CREATE TABLE t1 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB; CREATE TABLE t1 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB;
# ToDo CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=MyISAM; CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=MyISAM;
CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB;
# Add different amounts of data, to test various cases where event # Add different amounts of data, to test various cases where event
# groups fit or do not fit in case, are binlogged / not binlogged as # groups fit or do not fit in case, are binlogged / not binlogged as

View File

@@ -1599,9 +1599,9 @@ struct handlerton
binlog_oob_data(). Can also change the pointer to point to different data binlog_oob_data(). Can also change the pointer to point to different data
(or set it to NULL). (or set it to NULL).
*/ */
void (*binlog_oob_reset)(THD *thd, void **engine_data); void (*binlog_oob_reset)(void **engine_data);
/* Call to allow engine to release the engine_data from binlog_oob_data(). */ /* Call to allow engine to release the engine_data from binlog_oob_data(). */
void (*binlog_oob_free)(THD *thd, void *engine_data); void (*binlog_oob_free)(void *engine_data);
/* /*
Obtain an object to allow reading from the binlog. Obtain an object to allow reading from the binlog.
The boolean argument wait_durable is set to true to require that The boolean argument wait_durable is set to true to require that

View File

@@ -394,7 +394,6 @@ public:
stmt_start_engine_ptr(nullptr), stmt_start_engine_ptr(nullptr),
cache_savepoint_list(nullptr), cache_savepoint_list(nullptr),
cache_savepoint_next_ptr(&cache_savepoint_list), cache_savepoint_next_ptr(&cache_savepoint_list),
engine_binlog_info {0, 0, 0},
using_xa(FALSE), xa_xid(0) using_xa(FALSE), xa_xid(0)
{ {
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
@@ -410,24 +409,37 @@ public:
} }
~binlog_cache_mngr() ~binlog_cache_mngr()
{ {
if (engine_binlog_info.engine_ptr)
(*opt_binlog_engine_hton->binlog_oob_free)
(thd, engine_binlog_info.engine_ptr);
} }
void reset(bool do_stmt, bool do_trx) void reset(bool do_stmt, bool do_trx)
{ {
if (engine_binlog_info.engine_ptr)
(*opt_binlog_engine_hton->binlog_oob_reset)
(thd, &engine_binlog_info.engine_ptr);
if (do_stmt) if (do_stmt)
stmt_cache.reset(); {
if (opt_binlog_engine_hton)
{
stmt_cache.reset_for_engine_binlog();
/*
Use a custom write_function to spill to the engine-implemented binlog.
And re-use the IO_CACHE::append_read_pos as a handle for our
write_function; it is unused when the cache is not SEQ_READ_APPEND.
*/
stmt_cache.cache_log.write_function= binlog_spill_to_engine;
stmt_cache.cache_log.append_read_pos= (uchar *)this;
}
else
stmt_cache.reset();
}
if (do_trx) if (do_trx)
{ {
if (opt_binlog_engine_hton) if (opt_binlog_engine_hton)
{ {
trx_cache.reset_for_engine_binlog(); trx_cache.reset_for_engine_binlog();
trx_cache.cache_log.write_function= binlog_spill_to_engine;
trx_cache.cache_log.append_read_pos= (uchar *)this;
last_commit_pos_file.engine_file_no= ~(uint64_t)0; last_commit_pos_file.engine_file_no= ~(uint64_t)0;
stmt_start_engine_ptr= nullptr;
cache_savepoint_list= nullptr;
cache_savepoint_next_ptr= &cache_savepoint_list;
} }
else else
{ {
@@ -437,23 +449,6 @@ public:
last_commit_pos_offset= 0; last_commit_pos_offset= 0;
using_xa= FALSE; using_xa= FALSE;
} }
if (likely(opt_binlog_engine_hton) &&
likely(opt_binlog_engine_hton->binlog_oob_data))
{
stmt_start_engine_ptr= nullptr;
cache_savepoint_list= nullptr;
cache_savepoint_next_ptr= &cache_savepoint_list;
/*
Use a custom write_function to spill to the engine-implemented binlog.
And re-use the IO_CACHE::append_read_pos as a handle for our
write_function; it is unused when the cache is not SEQ_READ_APPEND.
*/
trx_cache.cache_log.write_function= binlog_spill_to_engine;
trx_cache.cache_log.append_read_pos= (uchar *)this;
engine_binlog_info.out_of_band_offset= 0;
engine_binlog_info.gtid_offset= 0;
/* Preserve the engine_ptr for the engine to re-use, was reset above. */
}
} }
binlog_cache_data* get_binlog_cache_data(bool is_transactional) binlog_cache_data* get_binlog_cache_data(bool is_transactional)
@@ -496,8 +491,6 @@ public:
*/ */
binlog_savepoint_info *cache_savepoint_list; binlog_savepoint_info *cache_savepoint_list;
binlog_savepoint_info **cache_savepoint_next_ptr; binlog_savepoint_info **cache_savepoint_next_ptr;
/* Context for engine-implemented binlogging. */
handler_binlog_event_group_info engine_binlog_info;
/* /*
Flag set true if this transaction is committed with log_xid() as part of Flag set true if this transaction is committed with log_xid() as part of
@@ -1810,9 +1803,10 @@ binlog_trans_log_truncate(THD *thd, binlog_savepoint_info *sv)
/* No pending savepoints in-cache anymore. */ /* No pending savepoints in-cache anymore. */
cache_mngr->cache_savepoint_next_ptr= &cache_mngr->cache_savepoint_list; cache_mngr->cache_savepoint_next_ptr= &cache_mngr->cache_savepoint_list;
cache_mngr->cache_savepoint_list= nullptr; cache_mngr->cache_savepoint_list= nullptr;
cache_mngr->engine_binlog_info.out_of_band_offset= sv->cache_offset; cache_mngr->trx_cache.engine_binlog_info.out_of_band_offset= sv->cache_offset;
(*opt_binlog_engine_hton->binlog_savepoint_rollback) (*opt_binlog_engine_hton->binlog_savepoint_rollback)
(thd, &cache_mngr->engine_binlog_info.engine_ptr, nullptr, &sv->engine_ptr); (thd, &cache_mngr->trx_cache.engine_binlog_info.engine_ptr,
nullptr, &sv->engine_ptr);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@@ -1950,7 +1944,7 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
if (mysql_bin_log.write_event(end_ev, cache_data, &cache_data->cache_log)) if (mysql_bin_log.write_event(end_ev, cache_data, &cache_data->cache_log))
DBUG_RETURN(1); DBUG_RETURN(1);
if (cache_mngr->engine_binlog_info.out_of_band_offset) if (cache_data->engine_binlog_info.out_of_band_offset)
{ {
/* /*
This is a "large" transaction, where parts of the transaction were This is a "large" transaction, where parts of the transaction were
@@ -1960,7 +1954,7 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
group is consecutive out-of-band data and the commit record will group is consecutive out-of-band data and the commit record will
only contain the GTID event (depending on engine implementation). only contain the GTID event (depending on engine implementation).
*/ */
if (my_b_flush_io_cache(&cache_mngr->trx_cache.cache_log, 0)) if (my_b_flush_io_cache(&cache_data->cache_log, 0))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
@@ -2018,9 +2012,10 @@ binlog_get_cache(THD *thd, uint64_t file_no, uint64_t offset,
{ {
cache_mngr->last_commit_pos_file.engine_file_no= file_no; cache_mngr->last_commit_pos_file.engine_file_no= file_no;
cache_mngr->last_commit_pos_offset= offset; cache_mngr->last_commit_pos_offset= offset;
context= &cache_mngr->engine_binlog_info; binlog_cache_data *cache_data= !cache_mngr->trx_cache.empty() ?
cache= !cache_mngr->trx_cache.empty() ? &cache_mngr->trx_cache : &cache_mngr->stmt_cache;
&cache_mngr->trx_cache.cache_log : &cache_mngr->stmt_cache.cache_log; context= &cache_data->engine_binlog_info;
cache= &cache_data->cache_log;
gtid= thd->get_last_commit_gtid(); gtid= thd->get_last_commit_gtid();
} }
*out_cache= cache; *out_cache= cache;
@@ -2227,9 +2222,9 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
trx_cache.set_prev_position(cache->pos_in_file); trx_cache.set_prev_position(cache->pos_in_file);
trx_cache.restore_prev_position(); trx_cache.restore_prev_position();
trx_cache.reset_cache_for_engine(stmt_pos, binlog_spill_to_engine); trx_cache.reset_cache_for_engine(stmt_pos, binlog_spill_to_engine);
cache_mngr->engine_binlog_info.out_of_band_offset= stmt_pos; cache_mngr->trx_cache.engine_binlog_info.out_of_band_offset= stmt_pos;
(*opt_binlog_engine_hton->binlog_savepoint_rollback) (*opt_binlog_engine_hton->binlog_savepoint_rollback)
(thd, &cache_mngr->engine_binlog_info.engine_ptr, (thd, &cache_mngr->trx_cache.engine_binlog_info.engine_ptr,
&cache_mngr->stmt_start_engine_ptr, nullptr); &cache_mngr->stmt_start_engine_ptr, nullptr);
} }
} }
@@ -6695,7 +6690,9 @@ binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len)
} }
binlog_cache_mngr *mngr= (binlog_cache_mngr *)cache->append_read_pos; binlog_cache_mngr *mngr= (binlog_cache_mngr *)cache->append_read_pos;
void **engine_ptr= &mngr->engine_binlog_info.engine_ptr; binlog_cache_data *cache_data= unlikely(cache==&mngr->stmt_cache.cache_log) ?
&mngr->stmt_cache : &mngr->trx_cache;
void **engine_ptr= &cache_data->engine_binlog_info.engine_ptr;
mysql_mutex_assert_not_owner(&LOCK_commit_ordered); mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
size_t max_len= std::min(binlog_max_spill_size, (size_t)binlog_cache_size); size_t max_len= std::min(binlog_max_spill_size, (size_t)binlog_cache_size);
@@ -6826,7 +6823,7 @@ binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len)
sofar+= part_len; sofar+= part_len;
} while (sofar < len); } while (sofar < len);
mngr->engine_binlog_info.out_of_band_offset+= len; cache_data->engine_binlog_info.out_of_band_offset+= len;
cache->pos_in_file= spill_end; cache->pos_in_file= spill_end;
return false; return false;
@@ -6843,17 +6840,11 @@ static binlog_cache_mngr *binlog_setup_cache_mngr(THD *thd)
auto *cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr, auto *cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr,
sizeof(binlog_cache_mngr), sizeof(binlog_cache_mngr),
MYF(MY_ZEROFILL)); MYF(MY_ZEROFILL));
if (!cache_mngr || if (!cache_mngr)
open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_stmt_cache_size, MYF(MY_WME)))
{
my_free(cache_mngr);
return NULL; return NULL;
} IO_CACHE *stmt_cache= &cache_mngr->stmt_cache.cache_log;
IO_CACHE *trx_cache= &cache_mngr->trx_cache.cache_log;
my_bool res; my_bool res;
if (likely(opt_binlog_engine_hton) && if (opt_binlog_engine_hton)
likely(opt_binlog_engine_hton->binlog_oob_data))
{ {
/* /*
With binlog implementation in engine, we do not need to spill large With binlog implementation in engine, we do not need to spill large
@@ -6861,13 +6852,31 @@ static binlog_cache_mngr *binlog_setup_cache_mngr(THD *thd)
through the binlog as the transaction runs. Setting the file to INT_MIN through the binlog as the transaction runs. Setting the file to INT_MIN
makes IO_CACHE not attempt to create the temporary file. makes IO_CACHE not attempt to create the temporary file.
*/ */
res= init_io_cache(trx_cache, (File)INT_MIN, (size_t)binlog_cache_size, res= init_io_cache(stmt_cache, (File)INT_MIN,
(size_t)binlog_stmt_cache_size,
WRITE_CACHE, 0L, 0, MYF(MY_WME | MY_NABP)); WRITE_CACHE, 0L, 0, MYF(MY_WME | MY_NABP));
/* /*
Use a custom write_function to spill to the engine-implemented binlog. Use a custom write_function to spill to the engine-implemented binlog.
And re-use the IO_CACHE::append_read_pos as a handle for our And re-use the IO_CACHE::append_read_pos as a handle for our
write_function; it is unused when the cache is not SEQ_READ_APPEND. write_function; it is unused when the cache is not SEQ_READ_APPEND.
*/ */
stmt_cache->write_function= binlog_spill_to_engine;
stmt_cache->append_read_pos= (uchar *)cache_mngr;
}
else
res= open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_stmt_cache_size,
MYF(MY_WME));
if (unlikely(res))
{
my_free(cache_mngr);
return NULL;
}
IO_CACHE *trx_cache= &cache_mngr->trx_cache.cache_log;
if (opt_binlog_engine_hton)
{
res= init_io_cache(trx_cache, (File)INT_MIN, (size_t)binlog_cache_size,
WRITE_CACHE, 0L, 0, MYF(MY_WME | MY_NABP));
trx_cache->write_function= binlog_spill_to_engine; trx_cache->write_function= binlog_spill_to_engine;
trx_cache->append_read_pos= (uchar *)cache_mngr; trx_cache->append_read_pos= (uchar *)cache_mngr;
} }
@@ -8111,7 +8120,7 @@ err:
if (opt_binlog_engine_hton) if (opt_binlog_engine_hton)
{ {
handler_binlog_event_group_info *engine_context= handler_binlog_event_group_info *engine_context=
&cache_mngr->engine_binlog_info; &cache_data->engine_binlog_info;
engine_context->gtid_offset= my_b_tell(file); engine_context->gtid_offset= my_b_tell(file);
my_off_t binlog_total_bytes; my_off_t binlog_total_bytes;
MDL_request mdl_request; MDL_request mdl_request;
@@ -9239,6 +9248,14 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
else else
break; break;
} }
/*
If not going through log_and_order(), we are not going to go through
commit_ordered(), and the engine will not binlog for us as part of its
own internal transaction commit. So we will need to binlog explicitly.
(This occurs when mixing transactional and non-transactional DML in the
same event group).
*/
entry.auto_binlog= entry.auto_binlog && cache_mngr->using_xa;
if (cache_mngr->stmt_cache.has_incident() || if (cache_mngr->stmt_cache.has_incident() ||
cache_mngr->trx_cache.has_incident()) cache_mngr->trx_cache.has_incident())
@@ -10083,10 +10100,11 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{ {
set_current_thd(current->thd); set_current_thd(current->thd);
binlog_cache_mngr *cache_mngr= current->cache_mngr; binlog_cache_mngr *cache_mngr= current->cache_mngr;
IO_CACHE *file= binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_log(current->using_trx_cache); cache_mngr->get_binlog_cache_data(current->using_trx_cache);
IO_CACHE *file= &cache_data->cache_log;
handler_binlog_event_group_info *engine_context= handler_binlog_event_group_info *engine_context=
&cache_mngr->engine_binlog_info; &cache_data->engine_binlog_info;
if (likely(!current->error)) if (likely(!current->error))
current->error= (*opt_binlog_engine_hton->binlog_write_direct_ordered) current->error= (*opt_binlog_engine_hton->binlog_write_direct_ordered)
(file, engine_context, current->thd->get_last_commit_gtid()); (file, engine_context, current->thd->get_last_commit_gtid());
@@ -10165,10 +10183,11 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{ {
set_current_thd(current->thd); set_current_thd(current->thd);
binlog_cache_mngr *cache_mngr= current->cache_mngr; binlog_cache_mngr *cache_mngr= current->cache_mngr;
IO_CACHE *file= binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_log(current->using_trx_cache); cache_mngr->get_binlog_cache_data(current->using_trx_cache);
IO_CACHE *file= &cache_data->cache_log;
handler_binlog_event_group_info *engine_context= handler_binlog_event_group_info *engine_context=
&cache_mngr->engine_binlog_info; &cache_data->engine_binlog_info;
if (likely(!current->error)) if (likely(!current->error))
current->error= (*opt_binlog_engine_hton->binlog_write_direct) current->error= (*opt_binlog_engine_hton->binlog_write_direct)
(file, engine_context, current->thd->get_last_commit_gtid()); (file, engine_context, current->thd->get_last_commit_gtid());
@@ -10176,8 +10195,11 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
set_current_thd(leader->thd); set_current_thd(leader->thd);
} }
binlog_cache_data *cache_data=
last_in_queue->cache_mngr->get_binlog_cache_data
(last_in_queue->using_trx_cache);
(*opt_binlog_engine_hton->binlog_group_commit_ordered) (*opt_binlog_engine_hton->binlog_group_commit_ordered)
(last_in_queue->thd, &last_in_queue->cache_mngr->engine_binlog_info); (last_in_queue->thd, &cache_data->engine_binlog_info);
} }
else if (check_purge) else if (check_purge)
checkpoint_and_purge(binlog_id); checkpoint_and_purge(binlog_id);
@@ -10281,16 +10303,17 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_ASSERT((entry->using_stmt_cache && !mngr->stmt_cache.empty()) || DBUG_ASSERT((entry->using_stmt_cache && !mngr->stmt_cache.empty()) ||
(entry->using_trx_cache && !mngr->trx_cache.empty()) (entry->using_trx_cache && !mngr->trx_cache.empty())
/* Assert that empty transaction is handled elsewhere. */); /* Assert that empty transaction is handled elsewhere. */);
IO_CACHE *cache= (entry->using_trx_cache && !mngr->trx_cache.empty()) ? binlog_cache_data *cache_data=
&mngr->trx_cache.cache_log : &mngr->stmt_cache.cache_log; (entry->using_trx_cache && !mngr->trx_cache.empty()) ?
&mngr->trx_cache : &mngr->stmt_cache;
/* /*
The GTID event cannot go first since we only allocate the GTID at binlog The GTID event cannot go first since we only allocate the GTID at binlog
time. So write the GTID at the very end, and record its offset so that the time. So write the GTID at the very end, and record its offset so that the
engine can pick it out and binlog it at the start. engine can pick it out and binlog it at the start.
*/ */
mngr->engine_binlog_info.gtid_offset= my_b_tell(cache); cache_data->engine_binlog_info.gtid_offset= my_b_tell(&cache_data->cache_log);
if (write_gtid_event(entry->thd, cache, is_prepared_xa(entry->thd), if (write_gtid_event(entry->thd, &cache_data->cache_log,
false, is_prepared_xa(entry->thd), false,
entry->using_trx_cache, commit_id, entry->using_trx_cache, commit_id,
has_xid, entry->ro_1pc)) has_xid, entry->ro_1pc))
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);

View File

@@ -27,6 +27,7 @@ class binlog_cache_data
{ {
public: public:
binlog_cache_data(bool precompute_checksums): binlog_cache_data(bool precompute_checksums):
engine_binlog_info {0, 0, 0},
before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0), before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0),
incident(FALSE), precompute_checksums(precompute_checksums), incident(FALSE), precompute_checksums(precompute_checksums),
saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
@@ -45,6 +46,9 @@ public:
~binlog_cache_data() ~binlog_cache_data()
{ {
DBUG_ASSERT(empty()); DBUG_ASSERT(empty());
if (engine_binlog_info.engine_ptr)
(*opt_binlog_engine_hton->binlog_oob_free)
(engine_binlog_info.engine_ptr);
close_cached_file(&cache_log); close_cached_file(&cache_log);
} }
@@ -108,6 +112,14 @@ public:
void reset_for_engine_binlog() void reset_for_engine_binlog()
{ {
bool cache_was_empty= empty(); bool cache_was_empty= empty();
if (engine_binlog_info.engine_ptr)
(*opt_binlog_engine_hton->binlog_oob_reset)
(&engine_binlog_info.engine_ptr);
engine_binlog_info.out_of_band_offset= 0;
engine_binlog_info.gtid_offset= 0;
/* Preserve the engine_ptr for the engine to re-use, was reset above. */
truncate(cache_log.pos_in_file); truncate(cache_log.pos_in_file);
cache_log.pos_in_file= 0; cache_log.pos_in_file= 0;
cache_log.request_pos= cache_log.write_pos= cache_log.buffer; cache_log.request_pos= cache_log.write_pos= cache_log.buffer;
@@ -198,6 +210,8 @@ public:
Cache to store data before copying it to the binary log. Cache to store data before copying it to the binary log.
*/ */
IO_CACHE cache_log; IO_CACHE cache_log;
/* Context for engine-implemented binlogging. */
handler_binlog_event_group_info engine_binlog_info;
protected: protected:
/* /*

View File

@@ -291,6 +291,7 @@ public:
struct chunk_data_cache : public chunk_data_base { struct chunk_data_cache : public chunk_data_base {
IO_CACHE *cache; IO_CACHE *cache;
binlog_oob_context *oob_ctx; binlog_oob_context *oob_ctx;
my_off_t main_start;
size_t main_remain; size_t main_remain;
size_t gtid_remain; size_t gtid_remain;
uint32_t header_remain; uint32_t header_remain;
@@ -300,6 +301,7 @@ struct chunk_data_cache : public chunk_data_base {
chunk_data_cache(IO_CACHE *cache_arg, chunk_data_cache(IO_CACHE *cache_arg,
handler_binlog_event_group_info *binlog_info) handler_binlog_event_group_info *binlog_info)
: cache(cache_arg), : cache(cache_arg),
main_start(binlog_info->out_of_band_offset),
main_remain((size_t)(binlog_info->gtid_offset - main_remain((size_t)(binlog_info->gtid_offset -
binlog_info->out_of_band_offset)), binlog_info->out_of_band_offset)),
header_sofar(0) header_sofar(0)
@@ -381,7 +383,7 @@ struct chunk_data_cache : public chunk_data_base {
ut_a(!res2 /* ToDo: Error handling */); ut_a(!res2 /* ToDo: Error handling */);
gtid_remain-= size2; gtid_remain-= size2;
if (gtid_remain == 0) if (gtid_remain == 0)
my_b_seek(cache, 0); /* Move to read the rest of the events. */ my_b_seek(cache, main_start); /* Move to read the rest of the events. */
max_len-= size2; max_len-= size2;
size+= size2; size+= size2;
if (max_len == 0) if (max_len == 0)
@@ -2578,7 +2580,7 @@ ibb_savepoint_rollback(THD *thd, void **engine_data,
void void
innodb_reset_oob(THD *thd, void **engine_data) innodb_reset_oob(void **engine_data)
{ {
binlog_oob_context *c= (binlog_oob_context *)*engine_data; binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (c) if (c)
@@ -2587,7 +2589,7 @@ innodb_reset_oob(THD *thd, void **engine_data)
void void
innodb_free_oob(THD *thd, void *engine_data) innodb_free_oob(void *engine_data)
{ {
free_oob_context((binlog_oob_context *)engine_data); free_oob_context((binlog_oob_context *)engine_data);
} }

View File

@@ -243,8 +243,8 @@ extern bool innodb_binlog_oob(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data); size_t data_len, void **engine_data);
void ibb_savepoint_rollback(THD *thd, void **engine_data, void ibb_savepoint_rollback(THD *thd, void **engine_data,
void **stmt_start_data, void **savepoint_data); void **stmt_start_data, void **savepoint_data);
extern void innodb_reset_oob(THD *thd, void **engine_data); extern void innodb_reset_oob(void **engine_data);
extern void innodb_free_oob(THD *thd, void *engine_data); extern void innodb_free_oob(void *engine_data);
extern handler_binlog_reader *innodb_get_binlog_reader(bool wait_durable); extern handler_binlog_reader *innodb_get_binlog_reader(bool wait_durable);
extern void ibb_wait_durable_offset(uint64_t file_no, uint64_t wait_offset); extern void ibb_wait_durable_offset(uint64_t file_no, uint64_t wait_offset);
extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no); extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no);