1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

Binlog-in-engine: Implement savepoint support

Support for SAVEPOINT, ROLLBACK TO SAVEPOINT, rolling back a failed
statement (keeping active transaction), and rolling back transaction.

For savepoints (and start-of-statement), if the binlog data to be rolled
back is still in the in-memory part of trx cache we can just truncate the
cache to the point.

But if we need to spill cache contents as out-of-band data containing one or
more savepoints/start-of-statement point, then split the spill at each point
and inform the engine of the savepoints.

In InnoDB, at savepoint set, save the state of the forest of perfect binary
trees being built. Then at rollback, restore the appropriate state.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-07-15 13:16:33 +02:00
parent 95ea6e15a6
commit 31ba7922a0
9 changed files with 981 additions and 32 deletions

View File

@@ -0,0 +1,403 @@
include/master-slave.inc
[connection master]
CREATE TABLE t1 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB;
CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB;
SET @b= REPEAT('$', 0);
BEGIN;
INSERT INTO t1 VALUES (0, 1, @b);
SAVEPOINT s1;
INSERT INTO t1 VALUES (0, 2, @b);
SAVEPOINT s2;
INSERT INTO t1 VALUES (0, 3, @b);
SAVEPOINT s3;
INSERT INTO t1 VALUES (0, 4, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (0, 5, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (0, 6, @b);
SAVEPOINT s4;
INSERT INTO t1 VALUES (0, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
INSERT INTO t1 VALUES (0, 8, @b);
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=0 ORDER BY a;
a length(b)
1 0
2 0
6 0
7 0
8 0
BEGIN;
INSERT INTO t1 VALUES (0, 10, @b);
SAVEPOINT s10;
INSERT INTO t1 VALUES (0, 11, @b);
INSERT INTO t2 VALUES (0, 12, @b);
ROLLBACK TO s10;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=0 AND a>=10 ORDER BY a;
a length(b)
10 0
SELECT a, length(b) FROM t2 WHERE i=0 ORDER BY a;
a length(b)
BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=0;
UPDATE t1 SET b='x' WHERE i=0;
ROLLBACK;
BEGIN;
INSERT INTO t1
VALUES (0, 101, @b), (0, 102, @b), (0, 103, @b), (0, 104, @b), (0, 105, @b);
UPDATE t1 SET a=a-104 WHERE i=0 AND a > 100;
ERROR 23000: Duplicate entry '0-1' for key 'PRIMARY'
UPDATE t1 SET a=a+10 WHERE i=0 AND a > 100;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=0 AND a >= 100 ORDER BY a;
a length(b)
111 0
112 0
113 0
114 0
115 0
SET @b= REPEAT('$', 10);
BEGIN;
INSERT INTO t1 VALUES (1, 1, @b);
SAVEPOINT s1;
INSERT INTO t1 VALUES (1, 2, @b);
SAVEPOINT s2;
INSERT INTO t1 VALUES (1, 3, @b);
SAVEPOINT s3;
INSERT INTO t1 VALUES (1, 4, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (1, 5, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (1, 6, @b);
SAVEPOINT s4;
INSERT INTO t1 VALUES (1, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
INSERT INTO t1 VALUES (1, 8, @b);
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=1 ORDER BY a;
a length(b)
1 10
2 10
6 10
7 10
8 10
BEGIN;
INSERT INTO t1 VALUES (1, 10, @b);
SAVEPOINT s10;
INSERT INTO t1 VALUES (1, 11, @b);
INSERT INTO t2 VALUES (1, 12, @b);
ROLLBACK TO s10;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=1 AND a>=10 ORDER BY a;
a length(b)
10 10
SELECT a, length(b) FROM t2 WHERE i=1 ORDER BY a;
a length(b)
BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=1;
UPDATE t1 SET b='x' WHERE i=1;
ROLLBACK;
BEGIN;
INSERT INTO t1
VALUES (1, 101, @b), (1, 102, @b), (1, 103, @b), (1, 104, @b), (1, 105, @b);
UPDATE t1 SET a=a-104 WHERE i=1 AND a > 100;
ERROR 23000: Duplicate entry '1-1' for key 'PRIMARY'
UPDATE t1 SET a=a+10 WHERE i=1 AND a > 100;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=1 AND a >= 100 ORDER BY a;
a length(b)
111 10
112 10
113 10
114 10
115 10
SET @b= REPEAT('$', 100);
BEGIN;
INSERT INTO t1 VALUES (2, 1, @b);
SAVEPOINT s1;
INSERT INTO t1 VALUES (2, 2, @b);
SAVEPOINT s2;
INSERT INTO t1 VALUES (2, 3, @b);
SAVEPOINT s3;
INSERT INTO t1 VALUES (2, 4, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (2, 5, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (2, 6, @b);
SAVEPOINT s4;
INSERT INTO t1 VALUES (2, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
INSERT INTO t1 VALUES (2, 8, @b);
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=2 ORDER BY a;
a length(b)
1 100
2 100
6 100
7 100
8 100
BEGIN;
INSERT INTO t1 VALUES (2, 10, @b);
SAVEPOINT s10;
INSERT INTO t1 VALUES (2, 11, @b);
INSERT INTO t2 VALUES (2, 12, @b);
ROLLBACK TO s10;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=2 AND a>=10 ORDER BY a;
a length(b)
10 100
SELECT a, length(b) FROM t2 WHERE i=2 ORDER BY a;
a length(b)
BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=2;
UPDATE t1 SET b='x' WHERE i=2;
ROLLBACK;
BEGIN;
INSERT INTO t1
VALUES (2, 101, @b), (2, 102, @b), (2, 103, @b), (2, 104, @b), (2, 105, @b);
UPDATE t1 SET a=a-104 WHERE i=2 AND a > 100;
ERROR 23000: Duplicate entry '2-1' for key 'PRIMARY'
UPDATE t1 SET a=a+10 WHERE i=2 AND a > 100;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=2 AND a >= 100 ORDER BY a;
a length(b)
111 100
112 100
113 100
114 100
115 100
SET @b= REPEAT('$', 642);
BEGIN;
INSERT INTO t1 VALUES (3, 1, @b);
SAVEPOINT s1;
INSERT INTO t1 VALUES (3, 2, @b);
SAVEPOINT s2;
INSERT INTO t1 VALUES (3, 3, @b);
SAVEPOINT s3;
INSERT INTO t1 VALUES (3, 4, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (3, 5, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (3, 6, @b);
SAVEPOINT s4;
INSERT INTO t1 VALUES (3, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
INSERT INTO t1 VALUES (3, 8, @b);
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=3 ORDER BY a;
a length(b)
1 642
2 642
6 642
7 642
8 642
BEGIN;
INSERT INTO t1 VALUES (3, 10, @b);
SAVEPOINT s10;
INSERT INTO t1 VALUES (3, 11, @b);
INSERT INTO t2 VALUES (3, 12, @b);
ROLLBACK TO s10;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=3 AND a>=10 ORDER BY a;
a length(b)
10 642
SELECT a, length(b) FROM t2 WHERE i=3 ORDER BY a;
a length(b)
BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=3;
UPDATE t1 SET b='x' WHERE i=3;
ROLLBACK;
BEGIN;
INSERT INTO t1
VALUES (3, 101, @b), (3, 102, @b), (3, 103, @b), (3, 104, @b), (3, 105, @b);
UPDATE t1 SET a=a-104 WHERE i=3 AND a > 100;
ERROR 23000: Duplicate entry '3-1' for key 'PRIMARY'
UPDATE t1 SET a=a+10 WHERE i=3 AND a > 100;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=3 AND a >= 100 ORDER BY a;
a length(b)
111 642
112 642
113 642
114 642
115 642
SET @b= REPEAT('$', 3930);
BEGIN;
INSERT INTO t1 VALUES (4, 1, @b);
SAVEPOINT s1;
INSERT INTO t1 VALUES (4, 2, @b);
SAVEPOINT s2;
INSERT INTO t1 VALUES (4, 3, @b);
SAVEPOINT s3;
INSERT INTO t1 VALUES (4, 4, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (4, 5, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (4, 6, @b);
SAVEPOINT s4;
INSERT INTO t1 VALUES (4, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
INSERT INTO t1 VALUES (4, 8, @b);
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=4 ORDER BY a;
a length(b)
1 3930
2 3930
6 3930
7 3930
8 3930
BEGIN;
INSERT INTO t1 VALUES (4, 10, @b);
SAVEPOINT s10;
INSERT INTO t1 VALUES (4, 11, @b);
INSERT INTO t2 VALUES (4, 12, @b);
ROLLBACK TO s10;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=4 AND a>=10 ORDER BY a;
a length(b)
10 3930
SELECT a, length(b) FROM t2 WHERE i=4 ORDER BY a;
a length(b)
BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=4;
UPDATE t1 SET b='x' WHERE i=4;
ROLLBACK;
BEGIN;
INSERT INTO t1
VALUES (4, 101, @b), (4, 102, @b), (4, 103, @b), (4, 104, @b), (4, 105, @b);
UPDATE t1 SET a=a-104 WHERE i=4 AND a > 100;
ERROR 23000: Duplicate entry '4-1' for key 'PRIMARY'
UPDATE t1 SET a=a+10 WHERE i=4 AND a > 100;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=4 AND a >= 100 ORDER BY a;
a length(b)
111 3930
112 3930
113 3930
114 3930
115 3930
SET @b= REPEAT('$', 16000);
BEGIN;
INSERT INTO t1 VALUES (5, 1, @b);
SAVEPOINT s1;
INSERT INTO t1 VALUES (5, 2, @b);
SAVEPOINT s2;
INSERT INTO t1 VALUES (5, 3, @b);
SAVEPOINT s3;
INSERT INTO t1 VALUES (5, 4, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (5, 5, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (5, 6, @b);
SAVEPOINT s4;
INSERT INTO t1 VALUES (5, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
INSERT INTO t1 VALUES (5, 8, @b);
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=5 ORDER BY a;
a length(b)
1 16000
2 16000
6 16000
7 16000
8 16000
BEGIN;
INSERT INTO t1 VALUES (5, 10, @b);
SAVEPOINT s10;
INSERT INTO t1 VALUES (5, 11, @b);
INSERT INTO t2 VALUES (5, 12, @b);
ROLLBACK TO s10;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=5 AND a>=10 ORDER BY a;
a length(b)
10 16000
SELECT a, length(b) FROM t2 WHERE i=5 ORDER BY a;
a length(b)
BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=5;
UPDATE t1 SET b='x' WHERE i=5;
ROLLBACK;
BEGIN;
INSERT INTO t1
VALUES (5, 101, @b), (5, 102, @b), (5, 103, @b), (5, 104, @b), (5, 105, @b);
UPDATE t1 SET a=a-104 WHERE i=5 AND a > 100;
ERROR 23000: Duplicate entry '5-1' for key 'PRIMARY'
UPDATE t1 SET a=a+10 WHERE i=5 AND a > 100;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=5 AND a >= 100 ORDER BY a;
a length(b)
111 16000
112 16000
113 16000
114 16000
115 16000
SET @b= REPEAT('$', 40000);
BEGIN;
INSERT INTO t1 VALUES (6, 1, @b);
SAVEPOINT s1;
INSERT INTO t1 VALUES (6, 2, @b);
SAVEPOINT s2;
INSERT INTO t1 VALUES (6, 3, @b);
SAVEPOINT s3;
INSERT INTO t1 VALUES (6, 4, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (6, 5, @b);
ROLLBACK TO s2;
INSERT INTO t1 VALUES (6, 6, @b);
SAVEPOINT s4;
INSERT INTO t1 VALUES (6, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
INSERT INTO t1 VALUES (6, 8, @b);
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=6 ORDER BY a;
a length(b)
1 40000
2 40000
6 40000
7 40000
8 40000
BEGIN;
INSERT INTO t1 VALUES (6, 10, @b);
SAVEPOINT s10;
INSERT INTO t1 VALUES (6, 11, @b);
INSERT INTO t2 VALUES (6, 12, @b);
ROLLBACK TO s10;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=6 AND a>=10 ORDER BY a;
a length(b)
10 40000
SELECT a, length(b) FROM t2 WHERE i=6 ORDER BY a;
a length(b)
BEGIN;
UPDATE t1 SET a=a+1000 WHERE i=6;
UPDATE t1 SET b='x' WHERE i=6;
ROLLBACK;
BEGIN;
INSERT INTO t1
VALUES (6, 101, @b), (6, 102, @b), (6, 103, @b), (6, 104, @b), (6, 105, @b);
UPDATE t1 SET a=a-104 WHERE i=6 AND a > 100;
ERROR 23000: Duplicate entry '6-1' for key 'PRIMARY'
UPDATE t1 SET a=a+10 WHERE i=6 AND a > 100;
COMMIT;
SELECT a, length(b) FROM t1 WHERE i=6 AND a >= 100 ORDER BY a;
a length(b)
111 40000
112 40000
113 40000
114 40000
115 40000
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
*** Slave data checksums with master, all ok. ***
connection master;
DROP TABLE t1, t2;
include/rpl_end.inc

View File

@@ -0,0 +1,126 @@
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
--source include/have_innodb_binlog.inc
CREATE TABLE t1 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB;
# ToDo CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=MyISAM;
CREATE TABLE t2 (i INT, a INT, b TEXT, PRIMARY KEY(i, a)) ENGINE=InnoDB;
# Add different amounts of data, to test various cases where event
# groups fit or do not fit in case, are binlogged / not binlogged as
# oob data.
--let $i = 0
while ($i <= 6) {
if ($i == 0) {
SET @b= REPEAT('$', 0);
}
if ($i == 1) {
SET @b= REPEAT('$', 10);
}
if ($i == 2) {
SET @b= REPEAT('$', 100);
}
if ($i == 3) {
SET @b= REPEAT('$', 642);
}
if ($i == 4) {
SET @b= REPEAT('$', 3930);
}
if ($i == 5) {
SET @b= REPEAT('$', 16000);
}
if ($i == 6) {
SET @b= REPEAT('$', 40000);
}
BEGIN;
eval INSERT INTO t1 VALUES ($i, 1, @b);
SAVEPOINT s1;
eval INSERT INTO t1 VALUES ($i, 2, @b);
SAVEPOINT s2;
eval INSERT INTO t1 VALUES ($i, 3, @b);
SAVEPOINT s3;
eval INSERT INTO t1 VALUES ($i, 4, @b);
ROLLBACK TO s2;
eval INSERT INTO t1 VALUES ($i, 5, @b);
ROLLBACK TO s2;
eval INSERT INTO t1 VALUES ($i, 6, @b);
SAVEPOINT s4;
eval INSERT INTO t1 VALUES ($i, 7, @b);
SAVEPOINT s5;
ROLLBACK TO s5;
eval INSERT INTO t1 VALUES ($i, 8, @b);
COMMIT;
eval SELECT a, length(b) FROM t1 WHERE i=$i ORDER BY a;
BEGIN;
eval INSERT INTO t1 VALUES ($i, 10, @b);
SAVEPOINT s10;
eval INSERT INTO t1 VALUES ($i, 11, @b);
eval INSERT INTO t2 VALUES ($i, 12, @b);
ROLLBACK TO s10;
COMMIT;
eval SELECT a, length(b) FROM t1 WHERE i=$i AND a>=10 ORDER BY a;
eval SELECT a, length(b) FROM t2 WHERE i=$i ORDER BY a;
# Test a full rollback.
BEGIN;
eval UPDATE t1 SET a=a+1000 WHERE i=$i;
eval UPDATE t1 SET b='x' WHERE i=$i;
ROLLBACK;
# Test a statement that fails and is rolled back but the remaining
# transaction is committed.
BEGIN;
eval INSERT INTO t1
VALUES ($i, 101, @b), ($i, 102, @b), ($i, 103, @b), ($i, 104, @b), ($i, 105, @b);
--error ER_DUP_ENTRY
eval UPDATE t1 SET a=a-104 WHERE i=$i AND a > 100;
eval UPDATE t1 SET a=a+10 WHERE i=$i AND a > 100;
COMMIT;
eval SELECT a, length(b) FROM t1 WHERE i=$i AND a >= 100 ORDER BY a;
inc $i;
}
# Seeing the events generated useful for debugging, but hard to maintain the
# .result file over time, better to check slave data vs. master.
#--let $binlog_file= binlog-000000.ibb
#--let $binlog_start= 4096
#--source include/show_binlog_events.inc
--let $master_checksum1= query_get_value(CHECKSUM TABLE t1, Checksum, 1)
--let $master_checksum2= query_get_value(CHECKSUM TABLE t2, Checksum, 1)
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
--let $slave_checksum1= query_get_value(CHECKSUM TABLE t1, Checksum, 1)
--let slave_checksum2= query_get_value(CHECKSUM TABLE t2, Checksum, 1)
--let $ok= 1
if ($master_checksum1 != $slave_checksum1) {
--let $ok= 0
}
if ($master_checksum2 != $slave_checksum2) {
--let $ok= 0
}
if (!$ok) {
--connection master
--echo *** Data on master: ***
SELECT i, a, length(b) FROM t1 ORDER BY i, a;
SELECT i, a, length(b) FROM t2 ORDER BY i, a;
--connection slave
--echo *** Data on slave: ***
SELECT i, a, length(b) FROM t1 ORDER BY i, a;
SELECT i, a, length(b) FROM t2 ORDER BY i, a;
--die Slave data differs from master. Master checksums $master_checksum1 $master_checksum2, but slave $slave_checksum1 $slave_checksum2
}
if ($ok) {
--echo *** Slave data checksums with master, all ok. ***
}
--connection master
DROP TABLE t1, t2;
--source include/rpl_end.inc

View File

@@ -1566,11 +1566,32 @@ struct handlerton
must be binlogged transactionally during COMMIT. The engine_data points to
a pointer location that the engine can set to maintain its own context
for the out-of-band data.
Optionally savepoints can be set at the point at the start of the write
(ie. before any written data), when stmt_start_data and/or savepoint_data
are non-NULL. Such a point can later be rolled back to by calling
binlog_savepoint_rollback(). (Only) if stmt_start_data or savepoint_data
is non-null can data_len be null (to set savepoint(s) and do nothing else).
*/
bool (*binlog_oob_data_ordered)(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
size_t data_len, void **engine_data,
void **stmt_start_data,
void **savepoint_data);
bool (*binlog_oob_data)(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
/*
Rollback to a prior point in out-of-band binlogged partial transaction
data, for savepoint support. The stmt_start_data and/or savepoint_data,
if non-NULL, correspond to the point set by an earlier binlog_oob_data()
call.
Exactly one of stmt_start_data or savepoint_data will be non-NULL,
corresponding to either rolling back to the start of the current statement,
or to an earlier set savepoint.
*/
void (*binlog_savepoint_rollback)(THD *thd, void **engine_data,
void **stmt_start_data,
void **savepoint_data);
/*
Call to reset (for new transactions) the engine_data from
binlog_oob_data(). Can also change the pointer to point to different data

View File

@@ -164,6 +164,19 @@ static SHOW_VAR binlog_status_vars_detail[]=
{NullS, NullS, SHOW_LONG}
};
/*
This struct, for --binlog-storage-engine=ENGINE, keeps track of savepoints
set in the current transaction that are still within the in-memory trx
cache (not yet spilled as out-of-band data into the binlog).
*/
struct binlog_savepoint_info {
binlog_savepoint_info *next;
void *engine_ptr;
my_off_t cache_offset;
};
/*
Variables for the binlog background thread.
Protected by the MYSQL_BIN_LOG::LOCK_binlog_background_thread mutex.
@@ -376,6 +389,9 @@ public:
stmt_cache(precompute_checksums),
trx_cache(precompute_checksums),
last_commit_pos_offset(0),
stmt_start_engine_ptr(nullptr),
cache_savepoint_list(nullptr),
cache_savepoint_next_ptr(&cache_savepoint_list),
engine_binlog_info {0, 0, 0},
using_xa(FALSE), xa_xid(0)
{
@@ -406,17 +422,25 @@ public:
stmt_cache.reset();
if (do_trx)
{
trx_cache.reset();
using_xa= FALSE;
if (opt_binlog_engine_hton)
{
trx_cache.reset_for_engine_binlog();
last_commit_pos_file.engine_file_no= ~(uint64_t)0;
}
else
{
trx_cache.reset();
last_commit_pos_file.legacy_name[0]= 0;
}
last_commit_pos_offset= 0;
using_xa= FALSE;
}
if (likely(opt_binlog_engine_hton) &&
likely(opt_binlog_engine_hton->binlog_oob_data))
{
stmt_start_engine_ptr= nullptr;
cache_savepoint_list= nullptr;
cache_savepoint_next_ptr= &cache_savepoint_list;
/*
Use a custom write_function to spill to the engine-implemented binlog.
And re-use the IO_CACHE::append_read_pos as a handle for our
@@ -462,6 +486,14 @@ public:
} last_commit_pos_file;
uint64_t last_commit_pos_offset;
/* Engine data pointer for start-of-statement savepoint. */
void *stmt_start_engine_ptr;
/*
List of pending savepoints still in the trx cache (for engine-implemented
binlogging).
*/
binlog_savepoint_info *cache_savepoint_list;
binlog_savepoint_info **cache_savepoint_next_ptr;
/* Context for engine-implemented binlogging. */
handler_binlog_event_group_info engine_binlog_info;
@@ -1685,11 +1717,11 @@ int LOGGER::set_handlers(ulonglong slow_log_printer,
*/
static void
binlog_trans_log_savepos(THD *thd, my_off_t *pos)
binlog_trans_log_savepos(THD *thd, binlog_cache_mngr *cache_mngr, my_off_t *pos)
{
DBUG_ENTER("binlog_trans_log_savepos");
// DBUG_ASSERT(!opt_binlog_engine_hton);
DBUG_ASSERT(pos != NULL);
binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open());
*pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos));
@@ -1713,9 +1745,10 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos)
*/
static void
binlog_trans_log_truncate(THD *thd, my_off_t pos)
binlog_trans_log_truncate(THD *thd, binlog_savepoint_info *sv)
{
DBUG_ENTER("binlog_trans_log_truncate");
my_off_t pos= sv->cache_offset;
DBUG_PRINT("enter", ("pos: %lu", (ulong) pos));
DBUG_ASSERT(thd->binlog_get_cache_mngr() != NULL);
@@ -1723,7 +1756,61 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
DBUG_ASSERT(pos != ~(my_off_t) 0);
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
cache_mngr->trx_cache.restore_savepoint(pos);
binlog_cache_data *trx_cache= &cache_mngr->trx_cache;
if (!opt_binlog_engine_hton)
{
trx_cache->restore_savepoint(pos);
DBUG_VOID_RETURN;
}
/*
If the savepoint is still in the trx cache, then we can simply truncate
the cache.
If the savepoint was spilled as oob data, then we need to call into the
engine binlog to have it discard the to-be-rolled-back binlog data.
*/
IO_CACHE *cache= &trx_cache->cache_log;
if (pos >= cache->pos_in_file)
{
trx_cache->restore_savepoint(pos);
trx_cache->cache_log.write_function= binlog_spill_to_engine;
/* Remove any later in-cache savepoints. */
binlog_savepoint_info *sp= cache_mngr->cache_savepoint_list;
while (sp)
{
if (sp == sv)
{
sp->next= nullptr; /* Drop the tail of the list. */
cache_mngr->cache_savepoint_next_ptr= &sp->next;
break;
}
sp= sp->next;
}
/*
If the savepoint is at the start of the cache, then it might have been
already spilled to the engine binlog, then rolled back to (which would
leave the cache truncated to the point of that savepoint).
But otherwise, the savepoint is pending to be spilled to engine if
needed, and should be found in the list.
*/
DBUG_ASSERT(pos == cache->pos_in_file || sp != nullptr);
DBUG_VOID_RETURN;
}
/*
Truncate what's in the cache, then call into the engine to rollback to
the prior set savepoint.
*/
trx_cache->restore_savepoint(cache->pos_in_file);
trx_cache->reset_cache_for_engine(pos, binlog_spill_to_engine);
/* No pending savepoints in-cache anymore. */
cache_mngr->cache_savepoint_next_ptr= &cache_mngr->cache_savepoint_list;
cache_mngr->cache_savepoint_list= nullptr;
cache_mngr->engine_binlog_info.out_of_band_offset= sv->cache_offset;
(*opt_binlog_engine_hton->binlog_savepoint_rollback)
(thd, &cache_mngr->engine_binlog_info.engine_ptr, nullptr, &sv->engine_ptr);
DBUG_VOID_RETURN;
}
@@ -1737,7 +1824,7 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->savepoint_offset= sizeof(binlog_savepoint_info);
binlog_hton->close_connection= binlog_close_connection;
binlog_hton->savepoint_set= binlog_savepoint_set;
binlog_hton->savepoint_rollback= binlog_savepoint_rollback;
@@ -2114,8 +2201,36 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
If rolling back a statement in a transaction, we truncate the
transaction cache to remove the statement.
*/
else
else if (!opt_binlog_engine_hton)
trx_cache.restore_prev_position();
else
{
IO_CACHE *cache= &trx_cache.cache_log;
my_off_t stmt_pos= trx_cache.get_prev_position();
/* Drop any pending savepoints in the cache beyond statement start. */
binlog_savepoint_info **sp_ptr= &cache_mngr->cache_savepoint_list;
for (;;)
{
binlog_savepoint_info *sp= *sp_ptr;
if (!sp || sp->cache_offset > stmt_pos)
break;
sp_ptr= &sp->next;
}
*sp_ptr= nullptr;
cache_mngr->cache_savepoint_next_ptr= sp_ptr;
if (stmt_pos >= cache->pos_in_file)
trx_cache.restore_prev_position();
else
{
trx_cache.set_prev_position(cache->pos_in_file);
trx_cache.restore_prev_position();
trx_cache.reset_cache_for_engine(stmt_pos, binlog_spill_to_engine);
cache_mngr->engine_binlog_info.out_of_band_offset= stmt_pos;
(*opt_binlog_engine_hton->binlog_savepoint_rollback)
(thd, &cache_mngr->engine_binlog_info.engine_ptr,
&cache_mngr->stmt_start_engine_ptr, nullptr);
}
}
DBUG_ASSERT(trx_cache.pending() == NULL);
DBUG_RETURN(error);
@@ -2638,8 +2753,26 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
or "RELEASE S" without the preceding "SAVEPOINT S" in the binary
log.
*/
if (likely(!(error= mysql_bin_log.write(&qinfo))))
binlog_trans_log_savepos(thd, (my_off_t*) sv);
if (unlikely((error= mysql_bin_log.write(&qinfo)) != 0))
DBUG_RETURN(error);
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
binlog_savepoint_info *sp_info= (binlog_savepoint_info*)sv;
binlog_trans_log_savepos(thd, cache_mngr, &sp_info->cache_offset);
if (opt_binlog_engine_hton)
{
/*
Add the savepoint to the list of pending savepoints in the trx cache.
If the savepoint gets spilled to the binlog as oob data, then we need
to create an (engine) binlog savepoint from it so that the engine can
roll back the oob data if needed.
As long as the savepoint is in the cache, we can simply roll it back
by truncating the cache.
*/
*cache_mngr->cache_savepoint_next_ptr= sp_info;
cache_mngr->cache_savepoint_next_ptr= &sp_info->next;
sp_info->next= nullptr;
sp_info->engine_ptr= nullptr;
}
DBUG_RETURN(error);
}
@@ -2671,7 +2804,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
binlog_trans_log_truncate(thd, *(my_off_t*)sv);
binlog_trans_log_truncate(thd, (binlog_savepoint_info *)sv);
/*
When a SAVEPOINT is executed inside a stored function/trigger we force the
@@ -6555,7 +6688,7 @@ binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len)
{
len-= (len % cache->buffer_length);
if (!len)
return false;
return 0;
}
/* ToDo: If len > the cache size (32k default), then split up the write in multiple oob writes to the engine. This can happen if there is a large single write to the IO_CACHE. Maybe the split could happen also in the engine, depending if I want to split the size here to the binlog_cache_size which is known here, or if I want to split it to an engine imposed max size. But since the commit record size is determined by the upper layer here, I think it makes sense to determine the oob record size here also. */
@@ -6563,14 +6696,113 @@ binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len)
binlog_cache_mngr *mngr= (binlog_cache_mngr *)cache->append_read_pos;
void **engine_ptr= &mngr->engine_binlog_info.engine_ptr;
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
my_off_t spill_end= cache->pos_in_file + len;
size_t sofar= 0;
void **stmt_start_ptr= nullptr;
void **savepoint_ptr= nullptr;
/*
If there are any pending savepoints (or a start-of-statement point) in the
cache data that we're now spilling to the engine binlog, set an engine
savepoint for each of them so that we can roll back such spilled data,
if required.
*/
if (data == cache->write_buffer)
{
my_off_t spill_start= cache->pos_in_file;
my_off_t stmt_pos= mngr->trx_cache.get_prev_position();
bool do_stmt_pos= stmt_pos != MY_OFF_T_UNDEF &&
stmt_pos >= spill_start && stmt_pos < spill_end;
binlog_savepoint_info *sp= mngr->cache_savepoint_list;
for (;;)
{
/*
Find the next spill point.
It maybe be the next savepoint in the list, it may be the saved
start-of-statement point, or (if they coincide) it may be both.
*/
my_off_t spill_pos;
void **next_stmt_start_ptr;
void **next_savepoint_ptr;
binlog_savepoint_info *next_sp;
if (do_stmt_pos && sp && stmt_pos == sp->cache_offset)
{
/* Double savepoint and start-of-statement point. */
spill_pos= stmt_pos;
next_stmt_start_ptr= &mngr->stmt_start_engine_ptr;
next_savepoint_ptr= &sp->engine_ptr;
next_sp= sp->next;
}
else if (do_stmt_pos && (!sp || stmt_pos < sp->cache_offset))
{
/* Spill the start-of-statement point next. */
spill_pos= stmt_pos;
next_stmt_start_ptr= &mngr->stmt_start_engine_ptr;
next_savepoint_ptr= nullptr;
next_sp= sp;
do_stmt_pos= false;
}
else if (sp)
{
/* Spill the next savepoint now. */
spill_pos= sp->cache_offset;
next_stmt_start_ptr= nullptr;
next_savepoint_ptr= &sp->engine_ptr;
next_sp= sp->next;
}
else
break;
DBUG_ASSERT(spill_pos >= spill_start);
if (spill_pos >= spill_end)
break;
DBUG_ASSERT(spill_start + sofar <= spill_pos);
size_t part_len= spill_pos - (spill_start + sofar);
if (part_len > 0 || stmt_start_ptr || savepoint_ptr)
{
mysql_mutex_lock(&LOCK_commit_ordered);
int res= (*opt_binlog_engine_hton->binlog_oob_data_ordered)
(mngr->thd, data + sofar, part_len,
engine_ptr, stmt_start_ptr, savepoint_ptr);
mysql_mutex_unlock(&LOCK_commit_ordered);
if (likely(!res))
res= (*opt_binlog_engine_hton->binlog_oob_data)
(mngr->thd, data + sofar, part_len, engine_ptr);
if (unlikely(res))
return res;
sofar+= part_len;
}
stmt_start_ptr= next_stmt_start_ptr;
savepoint_ptr= next_savepoint_ptr;
sp= next_sp;
}
mngr->cache_savepoint_list= sp; /* Remove any points spilled from cache. */
if (likely(sp == nullptr))
mngr->cache_savepoint_next_ptr= &mngr->cache_savepoint_list;
/*
We currently always spill the entire cache contents, which should mean
that at this point the remaining list of pending savepoints in the cache
is always empty.
Let's assert that this is so. However, if we ever want to partially
spill the cache and thus have remaining entries at this point, that is
fine, it is supported by the code and then this assertion can just be
removed.
*/
DBUG_ASSERT(sp == nullptr);
}
DBUG_ASSERT(sofar < len);
mysql_mutex_lock(&LOCK_commit_ordered);
bool res= (*opt_binlog_engine_hton->binlog_oob_data_ordered)(mngr->thd, data,
len, engine_ptr);
int res= (*opt_binlog_engine_hton->binlog_oob_data_ordered)
(mngr->thd, data + sofar, len - sofar, engine_ptr,
stmt_start_ptr, savepoint_ptr);
mysql_mutex_unlock(&LOCK_commit_ordered);
res|= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data,
len, engine_ptr);
if (likely(!res))
res= (*opt_binlog_engine_hton->binlog_oob_data)
(mngr->thd, data + sofar, len - sofar, engine_ptr);
mngr->engine_binlog_info.out_of_band_offset+= len;
cache->pos_in_file+= len;
cache->pos_in_file= spill_end;
return res;
}
@@ -6805,17 +7037,9 @@ THD::binlog_start_trans_and_stmt()
}
void THD::binlog_set_stmt_begin() {
binlog_cache_mngr *cache_mngr= binlog_get_cache_mngr();
/*
The call to binlog_trans_log_savepos() might create the cache_mngr
structure, if it didn't exist before, so we save the position
into an auto variable and then write it into the transaction
data for the binary log (i.e., cache_mngr).
*/
my_off_t pos= 0;
binlog_trans_log_savepos(this, &pos);
cache_mngr= binlog_get_cache_mngr();
binlog_cache_mngr *cache_mngr= binlog_setup_trx_data();
binlog_trans_log_savepos(this, cache_mngr, &pos);
cache_mngr->trx_cache.set_prev_position(pos);
}
@@ -13716,7 +13940,7 @@ void wsrep_register_binlog_handler(THD *thd, bool trx)
Set an implicit savepoint in order to be able to truncate a trx-cache.
*/
my_off_t pos= 0;
binlog_trans_log_savepos(thd, &pos);
binlog_trans_log_savepos(thd, cache_mngr, &pos);
cache_mngr->trx_cache.set_prev_position(pos);
/*

View File

@@ -105,6 +105,33 @@ public:
DBUG_ASSERT(empty());
}
void reset_for_engine_binlog()
{
bool cache_was_empty= empty();
truncate(cache_log.pos_in_file);
cache_log.pos_in_file= 0;
cache_log.request_pos= cache_log.write_pos= cache_log.buffer;
cache_log.write_end= cache_log.buffer + cache_log.buffer_length;
checksum_opt= BINLOG_CHECKSUM_ALG_OFF;
if (!cache_was_empty)
compute_statistics();
status= 0;
incident= FALSE;
before_stmt_pos= MY_OFF_T_UNDEF;
DBUG_ASSERT(empty());
}
void reset_cache_for_engine(my_off_t pos,
int (*fct)(struct st_io_cache *, const uchar *, size_t))
{
/* Bit fiddly here as we're abusing the IO_CACHE a bit for oob handling. */
cache_log.pos_in_file= pos;
cache_log.request_pos= cache_log.write_pos= cache_log.buffer;
cache_log.write_end=
(cache_log.buffer + cache_log.buffer_length - (pos & (IO_SIZE-1)));
cache_log.write_function= fct;
}
my_off_t get_byte_position() const
{
return my_b_tell(&cache_log);

View File

@@ -1845,7 +1845,7 @@ read_more_data:
enum chunk_reader_status res= fetch_current_page();
if (res == CHUNK_READER_EOF)
{
if (s.in_record)
if (s.in_record && s.file_no <= stop_file_no)
return read_error_corruption(s.file_no, s.page_no, "binlog tablespace "
"truncated in the middle of record");
else

View File

@@ -4138,6 +4138,7 @@ static int innodb_init(void* p)
innobase_hton->binlog_group_commit_ordered= ibb_group_commit;
innobase_hton->binlog_oob_data_ordered= innodb_binlog_oob_ordered;
innobase_hton->binlog_oob_data= innodb_binlog_oob;
innobase_hton->binlog_savepoint_rollback= ibb_savepoint_rollback;
innobase_hton->binlog_oob_reset= innodb_reset_oob;
innobase_hton->binlog_oob_free= innodb_free_oob;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader;

View File

@@ -97,6 +97,7 @@ mysql_pfs_key_t binlog_prealloc_thread_key;
/* Structure holding context for out-of-band chunks of binlogged event group. */
struct binlog_oob_context {
struct savepoint;
/*
Structure used to encapsulate the data to be binlogged in an out-of-band
chunk, for use by fsp_binlog_write_rec().
@@ -128,6 +129,10 @@ struct binlog_oob_context {
bool binlog_node(uint32_t node, uint64_t new_idx,
uint32_t left_node, uint32_t right_node,
chunk_data_oob *oob_data, LF_PINS *pins, mtr_t *mtr);
bool create_stmt_start_point();
savepoint *create_savepoint();
void rollback_to_savepoint(savepoint *savepoint);
void rollback_to_stmt_start();
/*
Pending binlog write for the ibb_pending_lsn_fifo.
@@ -140,6 +145,8 @@ struct binlog_oob_context {
uint64_t first_node_file_no;
uint64_t first_node_offset;
LF_PINS *lf_pins;
savepoint *stmt_start_point;
savepoint *savepoint_stack;
uint32_t node_list_len;
uint32_t node_list_alloc_len;
/*
@@ -161,6 +168,15 @@ struct binlog_oob_context {
uint64_t node_index;
uint32_t height;
} node_list [];
/* Saved oob state for implementing ROLLBACK TO SAVEPOINT. */
struct savepoint {
/* Maintain a stack of pending savepoints. */
savepoint *next;
uint32_t node_list_len;
uint32_t alloc_len;
struct node_info node_list[];
};
};
@@ -2141,6 +2157,8 @@ alloc_oob_context(uint32 list_length= 10)
ut_free(c);
return nullptr;
}
c->stmt_start_point= nullptr;
c->savepoint_stack= nullptr;
c->pending_file_no= ~(uint64_t)0;
c->node_list_alloc_len= list_length;
c->node_list_len= 0;
@@ -2174,6 +2192,14 @@ innodb_binlog_write_cache(IO_CACHE *cache,
static inline void
reset_oob_context(binlog_oob_context *c)
{
if (c->stmt_start_point)
c->stmt_start_point->node_list_len= 0;
while (c->savepoint_stack != nullptr)
{
binlog_oob_context::savepoint *next_savepoint= c->savepoint_stack->next;
ut_free(c->savepoint_stack);
c->savepoint_stack= next_savepoint;
}
c->pending_file_no= ~(uint64_t)0;
if (c->pending_refcount)
{
@@ -2189,6 +2215,7 @@ free_oob_context(binlog_oob_context *c)
{
ut_ad(!c->pending_refcount /* Should not have pending until free */);
reset_oob_context(c); /* Defensive programming, should be redundant */
ut_free(c->stmt_start_point);
lf_hash_put_pins(c->lf_pins);
ut_free(c);
}
@@ -2275,7 +2302,8 @@ ensure_oob_context(void **engine_data, uint32_t needed_len)
*/
bool
innodb_binlog_oob_ordered(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data)
void **engine_data, void **stm_start_data,
void **savepoint_data)
{
binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (!c)
@@ -2283,6 +2311,25 @@ innodb_binlog_oob_ordered(THD *thd, const unsigned char *data, size_t data_len,
if (UNIV_UNLIKELY(!c))
return true;
if (stm_start_data)
{
if (c->create_stmt_start_point())
return true;
*stm_start_data= nullptr; /* We do not need to store any data there. */
if (data_len == 0 && !savepoint_data)
return false;
}
if (savepoint_data)
{
binlog_oob_context::savepoint *sv= c->create_savepoint();
if (!sv)
return true;
*((binlog_oob_context::savepoint **)savepoint_data)= sv;
if (data_len == 0)
return false;
}
ut_ad(data_len > 0);
mtr_t mtr;
uint32_t i= c->node_list_len;
uint64_t new_idx= i==0 ? 0 : c->node_list[i-1].node_index + 1;
@@ -2418,6 +2465,102 @@ binlog_oob_context::chunk_data_oob::copy_data(byte *p, uint32_t max_len)
}
bool
binlog_oob_context::create_stmt_start_point()
{
if (!stmt_start_point || node_list_len > stmt_start_point->alloc_len)
{
ut_free(stmt_start_point);
size_t size= sizeof(savepoint) + node_list_len * sizeof(node_info);
stmt_start_point= (savepoint *) ut_malloc(size, mem_key_binlog);
if (!stmt_start_point)
{
my_error(ER_OUTOFMEMORY, MYF(0), size);
return true;
}
stmt_start_point->alloc_len= node_list_len;
}
stmt_start_point->node_list_len= node_list_len;
memcpy(stmt_start_point->node_list, node_list,
node_list_len * sizeof(node_info));
return false;
}
binlog_oob_context::savepoint *
binlog_oob_context::create_savepoint()
{
size_t size= sizeof(savepoint) + node_list_len * sizeof(node_info);
savepoint *s= (savepoint *) ut_malloc(size, mem_key_binlog);
if (!s)
{
my_error(ER_OUTOFMEMORY, MYF(0), size);
return nullptr;
}
s->next= savepoint_stack;
s->node_list_len= node_list_len;
memcpy(s->node_list, node_list, node_list_len * sizeof(node_info));
savepoint_stack= s;
return s;
}
void
binlog_oob_context::rollback_to_savepoint(savepoint *savepoint)
{
ut_a(node_list_alloc_len >= savepoint->node_list_len);
node_list_len= savepoint->node_list_len;
memcpy(node_list, savepoint->node_list,
savepoint->node_list_len * sizeof(node_info));
/* Remove any later savepoints from the stack. */
for (;;)
{
struct savepoint *s= savepoint_stack;
ut_ad(s != nullptr /* Should always find the savepoint on the stack. */);
if (UNIV_UNLIKELY(!s))
break;
if (s == savepoint)
break;
savepoint_stack= s->next;
ut_free(s);
}
}
void
binlog_oob_context::rollback_to_stmt_start()
{
ut_a(node_list_alloc_len >= stmt_start_point->node_list_len);
node_list_len= stmt_start_point->node_list_len;
memcpy(node_list, stmt_start_point->node_list,
stmt_start_point->node_list_len * sizeof(node_info));
}
void
ibb_savepoint_rollback(THD *thd, void **engine_data,
void **stmt_start_data, void **savepoint_data)
{
binlog_oob_context *c= (binlog_oob_context *)*engine_data;
ut_a(c != nullptr);
if (stmt_start_data)
{
ut_ad(savepoint_data == nullptr);
c->rollback_to_stmt_start();
}
if (savepoint_data)
{
ut_ad(stmt_start_data == nullptr);
binlog_oob_context::savepoint *savepoint=
(binlog_oob_context::savepoint *)*savepoint_data;
c->rollback_to_savepoint(savepoint);
}
}
void
innodb_reset_oob(THD *thd, void **engine_data)
{

View File

@@ -235,9 +235,13 @@ extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
fsp_binlog_page_entry * &block, uint32_t &page_no,
uint32_t &page_offset, uint64_t file_no);
extern bool innodb_binlog_oob_ordered(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
size_t data_len, void **engine_data,
void **stm_start_data,
void **savepoint_data);
extern bool innodb_binlog_oob(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
void ibb_savepoint_rollback(THD *thd, void **engine_data,
void **stmt_start_data, void **savepoint_data);
extern void innodb_reset_oob(THD *thd, void **engine_data);
extern void innodb_free_oob(THD *thd, void *engine_data);
extern handler_binlog_reader *innodb_get_binlog_reader(bool wait_durable);