From f0d4b63bacfbc2e2d81167adf10765d6b27e6c80 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Sat, 19 Apr 2025 07:46:56 +0200 Subject: [PATCH] MDEV-34705: Binlog-in-engine: Implement refcounting outstanding OOB records Keep track of, for each binlog file, how many open transactions have out-of-band data starting in that file. Then at the start of each new binlog file, in the header page, record the file_no of the earliest file that this file might contain commit records with references back to OOB records in that earlier file. Use this in PURGE BINARY LOGS, so that when a dump thread (slave connection) is active in file number N, and that file (or a later one) may require looking back in an earlier file number M for out-of-band records, purge will stop already at file number M. This way, we avoid that purge accidentally deletes some binlog file that a dump thread would later get an error on because it needs to read out-of-band data. This patch also includes placeholder data for a similar facility for XA references. The actual implementation of support for XA is for later though. Signed-off-by: Kristian Nielsen --- .../purge_dump_thread-master.opt | 5 + .../binlog_in_engine/purge_dump_thread.result | 132 +++++++++ .../binlog_in_engine/purge_dump_thread.test | 207 +++++++++++++ .../suite/rpl/r/rpl_binlog_directory.result | 2 + .../suite/rpl/t/rpl_binlog_directory.test | 4 +- .../suite/sys_vars/r/sysvars_innodb.result | 4 +- sql/handler.h | 6 + sql/log.cc | 14 +- sql/sql_repl.cc | 24 ++ storage/innobase/fsp/fsp_binlog.cc | 256 +++++++++++++++- storage/innobase/handler/ha_innodb.cc | 12 +- storage/innobase/handler/innodb_binlog.cc | 280 +++++++++++++++--- storage/innobase/include/fsp_binlog.h | 91 +++++- storage/innobase/include/innodb_binlog.h | 14 +- 14 files changed, 982 insertions(+), 69 deletions(-) create mode 100644 mysql-test/suite/binlog_in_engine/purge_dump_thread-master.opt create mode 100644 mysql-test/suite/binlog_in_engine/purge_dump_thread.result create mode 100644 mysql-test/suite/binlog_in_engine/purge_dump_thread.test diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread-master.opt b/mysql-test/suite/binlog_in_engine/purge_dump_thread-master.opt new file mode 100644 index 00000000000..8fbd6fd1b81 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread-master.opt @@ -0,0 +1,5 @@ +--max-binlog-size=64k +--innodb-binlog-state-interval=16k +--log-bin +--binlog-cache-size=8192 +--binlog-stmt-cache-size=8192 diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread.result b/mysql-test/suite/binlog_in_engine/purge_dump_thread.result new file mode 100644 index 00000000000..7fd49437319 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread.result @@ -0,0 +1,132 @@ +include/master-slave.inc +[connection master] +RESET MASTER; +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); +*** Test iteration, RESTART=0 +connection master; +FLUSH BINARY LOGS; +connection master1; +*** Create a transaction with active OOB records (to be committed). +connection default; +*** Create a transaction with active OOB records (to be rolled back). +connection master; +FLUSH BINARY LOGS; +*** Generating 10 large transactions in 5 interleaved connections +connection master; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (0 + 2, 0, "Park point 1 for dump thread"); +include/save_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +122 +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +122 +connection master; +SET @old_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid"; +INSERT INTO t1 VALUES (0 + 2, 1, "Transaction to pause dump thread"); +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_LATER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +ERROR HY000: A purgeable log is in use, will not purge +connection master1; +COMMIT; +connection default; +ROLLBACK; +connection master; +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +ERROR HY000: A purgeable log is in use, will not purge +*** Allow the dump thread to proceed, and see that purge is now possible. +SET GLOBAL debug_dbug= @old_dbug; +SET debug_sync= 'now SIGNAL signal.continue'; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (0 + 2, 2, 'Transaction to get dump thread to the next file'); +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +134 120676 708 220262 +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +134 120676 708 220262 +connection master; +SET debug_sync= 'RESET'; +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +PURGE BINARY LOGS TO 'OOB1_LATER'; +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +*** Test iteration, RESTART=1 +connection master; +FLUSH BINARY LOGS; +connection master1; +*** Create a transaction with active OOB records (to be committed). +connection default; +*** Create a transaction with active OOB records (to be rolled back). +connection master; +FLUSH BINARY LOGS; +*** Generating 10 large transactions in 5 interleaved connections +connection master; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (100000 + 2, 0, "Park point 1 for dump thread"); +include/save_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +255 +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +255 +connection slave; +include/stop_slave.inc +connection master1; +COMMIT; +connection default; +ROLLBACK; +connection master; +include/rpl_restart_server.inc [server_number=1 parameters: --skip-slave-start] +connection master; +SET @old_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid"; +INSERT INTO t1 VALUES (100000 + 2, 1, "Transaction to pause dump thread"); +connection slave; +include/start_slave.inc +connection master; +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_LATER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +ERROR HY000: A purgeable log is in use, will not purge +*** Allow the dump thread to proceed, and see that purge is now possible. +SET GLOBAL debug_dbug= @old_dbug; +SET debug_sync= 'now SIGNAL signal.continue'; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (100000 + 2, 2, 'Transaction to get dump thread to the next file'); +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +267 13541352 1416 440519 +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +267 13541352 1416 440519 +connection master; +SET debug_sync= 'RESET'; +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +PURGE BINARY LOGS TO 'OOB1_LATER'; +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +connection master; +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread.test b/mysql-test/suite/binlog_in_engine/purge_dump_thread.test new file mode 100644 index 00000000000..94e95440e04 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread.test @@ -0,0 +1,207 @@ +--source include/have_debug.inc +--source include/have_debug_sync.inc +--source include/have_binlog_format_row.inc +--source include/master-slave.inc +--source include/have_innodb_binlog.inc + +--let $NUM_CONNECTIONS= 5 +# $NUM_TRANSACTIONS is total, not per connection. +--let $NUM_TRANSACTIONS=10 +--let $NUM_PIECES= 10 +--let $PIECE_SIZE= 2000 + +# Test that PURGE BINARY LOGS avoids purging files containing OOB records +# referenced from files that a dump thread is still active in. +# +# The test has --max-binlog-size=64k to have a larger number of binlog files +# to test with. The --binlog-cache-size is set to 8k, so more event data than +# that causes OOB binlogging. + +RESET MASTER; + +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); + +# Run twice. Once where the OOB references to earlier file numbers is kept +# track of in-memory. And once, where server is restarted so the references +# must be read from the file headers. +--let $restart= 0 +while ($restart <= 1) { + + --echo *** Test iteration, RESTART=$restart + --connection master + --let $D= `SELECT $restart*100000` + FLUSH BINARY LOGS; + + # Start a transaction that will have OOB references in this specific binlog file. + --let $oob1_start= query_get_value(SHOW MASTER STATUS, File, 1) + --connection master1 + --echo *** Create a transaction with active OOB records (to be committed). + --disable_query_log + BEGIN; + --let $i= 0 + while ($i < 10) { + eval INSERT INTO t1 VALUES ($D+1, $i, REPEAT(CHR(65 + ($i MOD 26)), 2000)); + inc $i; + } + --enable_query_log + # Leaving the transaction open, so the commit record will end up in a later + # binlog file and have a reference back that blocks purge. + + # Also test an OOB record for a transaction that is later rolled back. + --connection default + --echo *** Create a transaction with active OOB records (to be rolled back). + --disable_query_log + BEGIN; + --let $i= 0 + while ($i < 10) { + eval INSERT INTO t1 VALUES ($D+10, $i, REPEAT(CHR(65 + ($i MOD 26)), 2000)); + inc $i; + } + --enable_query_log + + --connection master + FLUSH BINARY LOGS; + --let $oob1_after= query_get_value(SHOW MASTER STATUS, File, 1) + + # Generate a bunch of more transactions that contain OOB and flex the + # OOB refcounting. + --echo *** Generating $NUM_TRANSACTIONS large transactions in $NUM_CONNECTIONS interleaved connections + --disable_query_log + let $t= 0; + while ($t < $NUM_TRANSACTIONS) { + let $b= $t; + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connect(con$i,localhost,root,,) + START TRANSACTION; + eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, 0, 'Initial $i'); + inc $i; + inc $t; + } + + let $p= 1; + while ($p <= $NUM_PIECES) { + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, $p, REPEAT(CHR(65 + ($p + $i MOD 26)), $PIECE_SIZE)); + inc $i; + } + inc $p; + } + + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, $NUM_PIECES+1, 'Last $i'); + COMMIT; + --disconnect con$i + inc $i; + } + } + --enable_query_log + + --connection master + --let $oob1_later= query_get_value(SHOW MASTER STATUS, File, 1) + FLUSH BINARY LOGS; + eval INSERT INTO t1 VALUES ($D + 2, 0, "Park point 1 for dump thread"); + + # Now get the dump thread to the current point. + --source include/save_master_gtid.inc + SELECT COUNT(*) FROM t1; + + --connection slave + --source include/sync_with_master_gtid.inc + SELECT COUNT(*) FROM t1; + + if ($restart) { + --connection slave + --source include/stop_slave.inc + + --connection master1 + # Commit the transaction with OOB references back to an earlier binlog + # file, so that the reference will be there also after server restart. + COMMIT; + --connection default + # Roll back the other transaction with OOB. + ROLLBACK; + + --connection master + --let $rpl_server_number=1 + --let $rpl_server_parameters= --skip-slave-start + --source include/rpl_restart_server.inc + } + + --connection master + SET @old_dbug= @@global.debug_dbug; + SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid"; + eval INSERT INTO t1 VALUES ($D + 2, 1, "Transaction to pause dump thread"); + --let $oob1_current= query_get_value(SHOW MASTER STATUS, File, 1) + + if ($restart) { + --connection slave + --source include/start_slave.inc + --connection master + } + + let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE Command = 'Binlog Dump' AND State = 'debug sync point: now'; + --source include/wait_condition.inc + + # At this point, we have a dump thread active in $oob1_current. But we still + # have an active OOB record in $oob1_start, so neither of $oob1_start or + # any other prior to $oob1_current must be purged. + # The file before $oob1_start is allowed to be purged, though. + --replace_result $oob1_start OOB1_START + eval PURGE BINARY LOGS TO '$oob1_start'; + --replace_result $oob1_after OOB1_AFTER + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_after'; + --replace_result $oob1_later OOB1_LATER + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_later'; + --replace_result $oob1_current OOB1_CURRENT + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_current'; + if (!$restart) { + --connection master1 + COMMIT; + --connection default + ROLLBACK; + --connection master + --replace_result $oob1_current OOB1_CURRENT + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_current'; + } + + --echo *** Allow the dump thread to proceed, and see that purge is now possible. + SET GLOBAL debug_dbug= @old_dbug; + SET debug_sync= 'now SIGNAL signal.continue'; + FLUSH BINARY LOGS; + eval INSERT INTO t1 VALUES ($D + 2, 2, 'Transaction to get dump thread to the next file'); + SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; + --source include/save_master_gtid.inc + --connection slave + --source include/sync_with_master_gtid.inc + SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; + + --connection master + SET debug_sync= 'RESET'; + # Now the dump thread is past $oob1_current, so all PURGE should succeed. + --replace_result $oob1_start OOB1_START + eval PURGE BINARY LOGS TO '$oob1_start'; + --replace_result $oob1_after OOB1_AFTER + eval PURGE BINARY LOGS TO '$oob1_after'; + --replace_result $oob1_later OOB1_LATER + eval PURGE BINARY LOGS TO '$oob1_later'; + --replace_result $oob1_current OOB1_CURRENT + eval PURGE BINARY LOGS TO '$oob1_current'; + + inc $restart; +} + +# Cleanup. +--connection master +DROP TABLE t1; +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_binlog_directory.result b/mysql-test/suite/rpl/r/rpl_binlog_directory.result index 80373a0614a..f58e9c470fd 100644 --- a/mysql-test/suite/rpl/r/rpl_binlog_directory.result +++ b/mysql-test/suite/rpl/r/rpl_binlog_directory.result @@ -13,6 +13,7 @@ INSERT INTO t1 VALUES (2, 11); include/rpl_stop_server.inc [server_number=1] include/rpl_start_server.inc [server_number=1] INSERT INTO t1 VALUES (3, 12); +include/save_master_gtid.inc show binary logs; Log_name File_size master-bin.000001 # @@ -29,6 +30,7 @@ a b 3 12 connection slave; include/start_slave.inc +include/sync_with_master_gtid.inc SELECT * FROM t1 ORDER BY a; a b 1 10 diff --git a/mysql-test/suite/rpl/t/rpl_binlog_directory.test b/mysql-test/suite/rpl/t/rpl_binlog_directory.test index f7c643dee8b..e13c9c88e72 100644 --- a/mysql-test/suite/rpl/t/rpl_binlog_directory.test +++ b/mysql-test/suite/rpl/t/rpl_binlog_directory.test @@ -35,7 +35,7 @@ INSERT INTO t1 VALUES (2, 11); --source include/rpl_start_server.inc INSERT INTO t1 VALUES (3, 12); ---save_master_pos +--source include/save_master_gtid.inc --source include/show_binary_logs.inc --echo *** Contents of master-bin.index (including directory path): --cat_file $master_datadir/master-bin.index @@ -43,7 +43,7 @@ SELECT * FROM t1 ORDER BY a; --connection slave --source include/start_slave.inc ---sync_with_master +--source include/sync_with_master_gtid.inc SELECT * FROM t1 ORDER BY a; # Clean up. diff --git a/mysql-test/suite/sys_vars/r/sysvars_innodb.result b/mysql-test/suite/sys_vars/r/sysvars_innodb.result index f35b09faf2b..fd6ecbe5b40 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_innodb.result +++ b/mysql-test/suite/sys_vars/r/sysvars_innodb.result @@ -96,8 +96,8 @@ SESSION_VALUE NULL DEFAULT_VALUE 2097152 VARIABLE_SCOPE GLOBAL VARIABLE_TYPE BIGINT UNSIGNED -VARIABLE_COMMENT Interval (in bytes) at which to write the GTID binlog state to binlog files to speed up GTID lookups. Must be a multiple of innodb_page_size -NUMERIC_MIN_VALUE 65536 +VARIABLE_COMMENT Interval (in bytes) at which to write the GTID binlog state to binlog files to speed up GTID lookups. Must be a multiple of the binlog page size (4096 bytes) +NUMERIC_MIN_VALUE 8192 NUMERIC_MAX_VALUE 18446744073709551615 NUMERIC_BLOCK_SIZE 0 ENUM_VALUE_LIST NULL diff --git a/sql/handler.h b/sql/handler.h index 6a671f7e6c4..43bcbaa6df8 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1560,6 +1560,12 @@ struct handlerton */ bool (*binlog_oob_data)(THD *thd, const unsigned char *data, size_t data_len, void **engine_data); + /* + Call to reset (for new transactions) the engine_data from + binlog_oob_data(). Can also change the pointer to point to different data + (or set it to NULL). + */ + void (*binlog_oob_reset)(THD *thd, void **engine_data); /* Call to allow engine to release the engine_data from binlog_oob_data(). */ void (*binlog_oob_free)(THD *thd, void *engine_data); /* Obtain an object to allow reading from the binlog. */ diff --git a/sql/log.cc b/sql/log.cc index af1962281c7..dbe1beba993 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -390,12 +390,18 @@ public: else last_commit_pos_file.legacy_name[0]= 0; } + ~binlog_cache_mngr() + { + if (engine_binlog_info.engine_ptr) + (*opt_binlog_engine_hton->binlog_oob_free) + (thd, engine_binlog_info.engine_ptr); + } void reset(bool do_stmt, bool do_trx) { if (engine_binlog_info.engine_ptr) - (*opt_binlog_engine_hton->binlog_oob_free)(thd, - engine_binlog_info.engine_ptr); + (*opt_binlog_engine_hton->binlog_oob_reset) + (thd, &engine_binlog_info.engine_ptr); if (do_stmt) stmt_cache.reset(); if (do_trx) @@ -418,7 +424,9 @@ public: */ trx_cache.cache_log.write_function= binlog_spill_to_engine; trx_cache.cache_log.append_read_pos= (uchar *)this; - engine_binlog_info= {0, 0, 0}; + engine_binlog_info.out_of_band_offset= 0; + engine_binlog_info.gtid_offset= 0; + /* Preserve the engine_ptr for the engine to re-use, was reset above. */ } } diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 6e931f806b7..c4611c5e80e 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -3391,6 +3391,30 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo) (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); DBUG_ASSERT(event_type != START_ENCRYPTION_EVENT); +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", + { + if (event_type == XID_EVENT) + { + net_flush(info->net); + const char act[]= + "now " + "wait_for signal.continue"; + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action( + info->thd, + STRING_WITH_LEN(act))); + + const char act2[]= + "now " + "signal signal.continued"; + DBUG_ASSERT(!debug_sync_set_action( + info->thd, + STRING_WITH_LEN(act2))); + } + }); +#endif + if (((info->errmsg= send_event_to_slave(info, event_type, nullptr, ev_offset, &info->error_gtid)))) return 1; diff --git a/storage/innobase/fsp/fsp_binlog.cc b/storage/innobase/fsp/fsp_binlog.cc index 7113103156d..ab0c12539d7 100644 --- a/storage/innobase/fsp/fsp_binlog.cc +++ b/storage/innobase/fsp/fsp_binlog.cc @@ -21,6 +21,7 @@ this program; if not, write to the Free Software Foundation, Inc., InnoDB implementation of binlog. *******************************************************/ +#include #include "ut0bitop.h" #include "fsp0fsp.h" #include "buf0flu.h" @@ -107,6 +108,9 @@ std::atomic binlog_cur_end_offset[2]; fsp_binlog_page_fifo *binlog_page_fifo; +/* Object to keep track of outstanding oob references in binlog files. */ +ibb_file_oob_refs ibb_file_hash; + fsp_binlog_page_entry * fsp_binlog_page_fifo::create_page(uint64_t file_no, uint32_t page_no) @@ -736,6 +740,225 @@ crc32_pread_page(pfs_os_file_t fh, byte *buf, uint32_t page_no, myf MyFlags) } +/* + Need specific constructor/initializer for struct ibb_tblspc_entry stored in + the ibb_file_hash. This is a work-around for C++ abstractions that makes it + non-standard behaviour to memcpy() std::atomic objects. +*/ +static void +ibb_file_hash_constructor(uchar *arg) +{ + new(arg + LF_HASH_OVERHEAD) ibb_tblspc_entry(); +} + + +static void +ibb_file_hash_destructor(uchar *arg) +{ + ibb_tblspc_entry *e= (ibb_tblspc_entry *)(arg + LF_HASH_OVERHEAD); + e->~ibb_tblspc_entry(); +} + + +static void +ibb_file_hash_initializer(LF_HASH *hash, void *dst, const void *src) +{ + ibb_tblspc_entry *src_e= (ibb_tblspc_entry *)src; + ibb_tblspc_entry *dst_e= (ibb_tblspc_entry *)dst; + dst_e->file_no= src_e->file_no; + dst_e->oob_refs.store(src_e->oob_refs.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dst_e->xa_refs.store(src_e->xa_refs.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dst_e->oob_ref_file_no.store(src_e->oob_ref_file_no.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dst_e->xa_ref_file_no.store(src_e->xa_ref_file_no.load(std::memory_order_relaxed), + std::memory_order_relaxed); +} + + +void +ibb_file_oob_refs::init() noexcept +{ + lf_hash_init(&hash, sizeof(ibb_tblspc_entry), LF_HASH_UNIQUE, + offsetof(ibb_tblspc_entry, file_no), + sizeof(ibb_tblspc_entry::file_no), nullptr, nullptr); + hash.alloc.constructor= ibb_file_hash_constructor; + hash.alloc.destructor= ibb_file_hash_destructor; + hash.initializer= ibb_file_hash_initializer; + earliest_oob_ref= ~(uint64_t)0; + earliest_xa_ref= ~(uint64_t)0; +} + + +void +ibb_file_oob_refs::destroy() noexcept +{ + lf_hash_destroy(&hash); +} + + +void +ibb_file_oob_refs::remove(uint64_t file_no, LF_PINS *pins) +{ + lf_hash_delete(&hash, pins, &file_no, sizeof(file_no)); +} + + +void +ibb_file_oob_refs::remove_up_to(uint64_t file_no, LF_PINS *pins) +{ + for (;;) + { + int res= lf_hash_delete(&hash, pins, &file_no, sizeof(file_no)); + if (res || file_no == 0) + break; + --file_no; + } +} + + +bool +ibb_file_oob_refs::oob_ref_inc(uint64_t file_no, LF_PINS *pins) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return false; + e->oob_refs.fetch_add(1, std::memory_order_acquire); + lf_hash_search_unpin(pins); + return true; +} + + +bool +ibb_file_oob_refs::oob_ref_dec(uint64_t file_no, LF_PINS *pins) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return true; + uint64_t refcnt= e->oob_refs.fetch_sub(1, std::memory_order_acquire) - 1; + lf_hash_search_unpin(pins); + ut_ad(refcnt != (uint64_t)0 - 1); + + if (refcnt == 0) + do_zero_refcnt_action(file_no, pins, false); + return false; +} + + +void +ibb_file_oob_refs::do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins, + bool active_moving) +{ + for (;;) + { + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return; + uint64_t refcnt= e->oob_refs.load(std::memory_order_acquire); + lf_hash_search_unpin(pins); + if (refcnt > 0) + return; + /* + Reference count reached zero. Check if this was the earliest_oob_ref + that reached zero, and if so move it to the next file. Repeat this + for consecutive refcount-is-zero file_no, in case N+1 reaches zero + before N does. + + As records are written into the active binlog file, the refcount can + reach zero temporarily and then go up again, so do not move the + earliest_oob_ref ahead yet. + + As the active is about to move to the next file, we check again, and + this time move the earliest_oob_ref if the refcount on the (previously) + active binlog file ended up at zero. + */ + uint64_t active= active_binlog_file_no.load(std::memory_order_acquire); + ut_ad(file_no <= active + active_moving); + if (file_no >= active + active_moving) + return; + bool ok; + do + { + uint64_t read_file_no= earliest_oob_ref.load(std::memory_order_relaxed); + if (read_file_no != file_no) + break; + ok= earliest_oob_ref.compare_exchange_weak(read_file_no, file_no + 1, + std::memory_order_relaxed); + } while (!ok); + /* Handle any following file_no that may have dropped to zero earlier. */ + ++file_no; + } +} + + +bool +ibb_file_oob_refs::update_refs(uint64_t file_no, LF_PINS *pins, + uint64_t oob_ref, uint64_t xa_ref) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return false; + e->oob_ref_file_no.store(oob_ref, std::memory_order_relaxed); + e->xa_ref_file_no.store(xa_ref, std::memory_order_relaxed); + lf_hash_search_unpin(pins); + return true; +} + + +bool +ibb_file_oob_refs::get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins, + uint64_t *out_oob_ref_file_no) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + { + *out_oob_ref_file_no= ~(uint64_t)0; + return false; + } + *out_oob_ref_file_no= e->oob_ref_file_no.load(std::memory_order_relaxed); + lf_hash_search_unpin(pins); + return true; +} + + +bool +ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref, uint64_t xa_ref, + LF_PINS *in_pins) +{ + bool err= false; + LF_PINS *pins= in_pins ? in_pins : lf_hash_get_pins(&ibb_file_hash.hash); + if (!pins) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return true; + } + ibb_tblspc_entry entry; + entry.file_no= file_no; + entry.oob_refs.store(0, std::memory_order_relaxed); + entry.xa_refs.store(0, std::memory_order_relaxed); + entry.oob_ref_file_no.store(oob_ref, std::memory_order_relaxed); + entry.xa_ref_file_no.store(xa_ref, std::memory_order_relaxed); + int res= lf_hash_insert(&ibb_file_hash.hash, pins, &entry); + if (res) + { + ut_ad(res < 0 /* Should not get unique violation, never insert twice */); + sql_print_error("InnoDB: Could not initialize in-memory structure for " + "binlog tablespace file number %" PRIu64 ", %s", file_no, + (res < 0 ? "out of memory" : "internal error")); + err= true; + } + if (!in_pins) + lf_hash_put_pins(pins); + return err; +} + + void binlog_write_up_to_now() noexcept { @@ -789,6 +1012,8 @@ fsp_binlog_extract_header_page(const byte *page_buf, out_header_data-> page_count= uint8korr(page_buf + 24); out_header_data-> start_lsn= uint8korr(page_buf + 32); out_header_data-> diff_state_interval= uint8korr(page_buf + 40); + out_header_data->oob_ref_file_no= uint8korr(page_buf + 48); + out_header_data->xa_ref_file_no= uint8korr(page_buf + 56); } @@ -839,6 +1064,7 @@ fsp_binlog_init() { mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr); pthread_cond_init(&active_binlog_cond, nullptr); + ibb_file_hash.init(); binlog_page_fifo= new fsp_binlog_page_fifo(); binlog_page_fifo->start_flush_thread(); } @@ -849,6 +1075,7 @@ fsp_binlog_shutdown() { binlog_page_fifo->stop_flush_thread(); delete binlog_page_fifo; + ibb_file_hash.destroy(); pthread_cond_destroy(&active_binlog_cond); mysql_mutex_destroy(&active_binlog_mutex); } @@ -927,7 +1154,8 @@ fsp_binlog_open(const char *file_name, pfs_os_file_t fh, /** Create a binlog tablespace file @param[in] file_no Index of the binlog tablespace @return DB_SUCCESS or error code */ -dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) +dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages, + LF_PINS *pins) { pfs_os_file_t fh; bool ret; @@ -962,6 +1190,13 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) return DB_ERROR; } + /* + Enter an initial entry in the hash for this binlog tablespace file. + It will be later updated with the appropriate values when the file + first gets used and the header page is written. + */ + ibb_record_in_file_hash(file_no, ~(uint64_t)0,~(uint64_t)0, pins); + binlog_page_fifo->create_tablespace(file_no, size_in_pages); os_file_close(fh); @@ -979,7 +1214,8 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) GTID state record, used for FLUSH BINARY LOGS. */ std::pair -fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) +fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type, + LF_PINS *pins) { uint32_t page_size= (uint32_t)ibb_page_size; uint32_t page_size_shift= ibb_page_size_shift; @@ -1038,9 +1274,16 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) /* Write the header page at the start of a binlog tablespace file. */ if (page_no == 0) { + /* Active is moving to next file, so check if oob refcount of previous + file is zero. + */ + if (UNIV_LIKELY(file_no > 0)) + ibb_file_hash.do_zero_refcnt_action(file_no - 1, pins, true); + lsn_t start_lsn= log_sys.get_lsn(std::memory_order_acquire); bool err= ibb_write_header_page(mtr, file_no, file_size_in_pages, - start_lsn, current_binlog_state_interval); + start_lsn, + current_binlog_state_interval, pins); ut_a(!err /* ToDo error handling */); page_no= 1; } @@ -1244,6 +1487,8 @@ fsp_binlog_flush() my_sync(fh, MYF(0)); binlog_page_fifo->unlock(); + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); uint32_t page_offset= binlog_cur_page_offset; if (page_offset > BINLOG_PAGE_DATA || page_offset < ibb_page_size - BINLOG_PAGE_DATA_END) @@ -1254,7 +1499,7 @@ fsp_binlog_flush() end-of-file of the entire binlog. */ mtr.start(); - fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY); + fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY, lf_pins); mtr.commit(); } @@ -1280,8 +1525,9 @@ fsp_binlog_flush() persisted across a server restart. */ mtr.start(); - fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER); + fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins); mtr.commit(); + lf_hash_put_pins(lf_pins); log_buffer_flush_to_disk(srv_flush_log_at_trx_commit & 1); return false; diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index cef9f17d9da..521f2f11b64 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -3759,10 +3759,10 @@ static int innodb_init_params() if (innodb_binlog_state_interval == 0 || innodb_binlog_state_interval != (ulonglong)1 << (63 - nlz(innodb_binlog_state_interval)) || - innodb_binlog_state_interval % (ulonglong)srv_page_size) { + innodb_binlog_state_interval % (ulonglong)ibb_page_size) { ib::error() << "innodb_binlog_state_interval must be a " - "power-of-two multiple of the innodb_page_size=" - << srv_page_size; + "power-of-two multiple of the innodb binlog page size=" + << ibb_page_size; DBUG_RETURN(HA_ERR_INITIALIZATION); } @@ -4132,6 +4132,7 @@ static int innodb_init(void* p) innobase_hton->binlog_init= innodb_binlog_init; innobase_hton->binlog_write_direct= innobase_binlog_write_direct; innobase_hton->binlog_oob_data= innodb_binlog_oob; + innobase_hton->binlog_oob_reset= innodb_reset_oob; innobase_hton->binlog_oob_free= innodb_free_oob; innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list; @@ -19931,9 +19932,10 @@ static MYSQL_SYSVAR_ULONGLONG(binlog_state_interval, innodb_binlog_state_interval, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, "Interval (in bytes) at which to write the GTID binlog state to binlog " - "files to speed up GTID lookups. Must be a multiple of innodb_page_size", + "files to speed up GTID lookups. Must be a multiple of the binlog page " + "size (4096 bytes)", NULL, NULL, 2*1024*1024, - UNIV_PAGE_SIZE_MAX, ULONGLONG_MAX, 0); + 8192, ULONGLONG_MAX, 0); static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(autoextend_increment), diff --git a/storage/innobase/handler/innodb_binlog.cc b/storage/innobase/handler/innodb_binlog.cc index 79c41b9a2ef..b90ea380990 100644 --- a/storage/innobase/handler/innodb_binlog.cc +++ b/storage/innobase/handler/innodb_binlog.cc @@ -117,12 +117,18 @@ struct binlog_oob_context { bool binlog_node(uint32_t node, uint64_t new_idx, uint32_t left_node, uint32_t right_node, - chunk_data_oob *oob_data); + chunk_data_oob *oob_data, LF_PINS *pins); uint64_t first_node_file_no; uint64_t first_node_offset; + LF_PINS *lf_pins; uint32_t node_list_len; uint32_t node_list_alloc_len; + /* + Set if we incremented refcount in first_node_file_no, so we need to + decrement again at commit record write or reset/rollback. + */ + bool pending_refcount; /* The node_list contains the root of each tree in the forest of perfect binary trees. @@ -249,6 +255,7 @@ public: struct chunk_data_cache : public chunk_data_base { IO_CACHE *cache; + binlog_oob_context *oob_ctx; size_t main_remain; size_t gtid_remain; uint32_t header_remain; @@ -270,6 +277,8 @@ struct chunk_data_cache : public chunk_data_base { binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr; unsigned char *p; + ut_ad(c); + oob_ctx= c; if (c && c->node_list_len) { /* @@ -321,6 +330,13 @@ struct chunk_data_cache : public chunk_data_base { virtual std::pair copy_data(byte *p, uint32_t max_len) final { uint32_t size= 0; + + if (UNIV_LIKELY(oob_ctx != nullptr) && oob_ctx->pending_refcount) + { + ibb_file_hash.oob_ref_dec(oob_ctx->first_node_file_no, oob_ctx->lf_pins); + oob_ctx->pending_refcount= false; + } + /* Write header data, if any still available. */ if (header_remain > 0) { @@ -505,7 +521,7 @@ static int scan_for_binlogs(const char *binlog_dir, found_binlogs *binlog_files, bool error_if_missing) noexcept; static int innodb_binlog_discover(); static bool binlog_state_recover(); -static void innodb_binlog_autopurge(uint64_t first_open_file_no); +static void innodb_binlog_autopurge(uint64_t first_open_file_no, LF_PINS *pins); static int read_gtid_state_from_page(rpl_binlog_state_base *state, const byte *page, uint32_t page_no) noexcept; @@ -1144,6 +1160,7 @@ innodb_binlog_init_state() earliest_binlog_file_no= ~(uint64_t)0; total_binlog_used_size= 0; active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed); binlog_cur_page_no= 0; binlog_cur_page_offset= BINLOG_PAGE_DATA; current_binlog_state_interval= @@ -1184,9 +1201,12 @@ binlog_sync_initial() { chunk_data_flush dummy_data; mtr_t mtr; + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); mtr.start(); - fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER); + fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins); mtr.commit(); + lf_hash_put_pins(lf_pins); log_buffer_flush_to_disk(true); binlog_page_fifo->flush_up_to(0, 0); binlog_page_fifo->do_fdatasync(0); @@ -1342,7 +1362,7 @@ binlog_page_empty(const byte *page) static int find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, uint32_t *out_page_no, uint32_t *out_pos_in_page, - uint64_t *out_state_interval) + binlog_header_data *out_header_data) { const uint32_t page_size= (uint32_t)ibb_page_size; const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift; @@ -1351,11 +1371,11 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, uint32_t p_0, p_1, p_2, last_nonempty; byte *p, *page_end; bool ret; - binlog_header_data header_data; *out_page_no= 0; *out_pos_in_page= BINLOG_PAGE_DATA; - *out_state_interval= 0; + out_header_data->diff_state_interval= 0; + out_header_data->is_invalid= true; binlog_name_make(file_name, file_no); pfs_os_file_t fh= os_file_create(innodb_data_file_key, file_name, @@ -1371,27 +1391,27 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, os_file_close(fh); return -1; } - fsp_binlog_extract_header_page(page_buf, &header_data); - if (header_data.is_invalid) + fsp_binlog_extract_header_page(page_buf, out_header_data); + if (out_header_data->is_invalid) { sql_print_error("InnoDB: Invalid or corrupt file header in file " "'%s'", file_name); return -1; } - if (header_data.is_empty) { + if (out_header_data->is_empty) { ret= fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr); binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed); return (ret ? -1 : 0); } - if (header_data.file_no != file_no) + if (out_header_data->file_no != file_no) { sql_print_error("InnoDB: Inconsistent file header in file '%s', " - "wrong file_no %" PRIu64, file_name, header_data.file_no); + "wrong file_no %" PRIu64, file_name, + out_header_data->file_no); return -1; } - *out_state_interval= header_data.diff_state_interval; last_nonempty= 0; /* @@ -1471,7 +1491,7 @@ innodb_binlog_discover() const uint32_t page_size= (uint32_t)ibb_page_size; const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift; struct found_binlogs binlog_files; - uint64_t diff_state_interval; + binlog_header_data header; int res= scan_for_binlogs(innodb_binlog_directory, &binlog_files, false); if (res <= 0) @@ -1494,10 +1514,13 @@ innodb_binlog_discover() res= find_pos_in_binlog(binlog_files.last_file_no, binlog_files.last_size, page_buf.get(), &page_no, &pos_in_page, - &diff_state_interval); + &header); if (res < 0) { file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); current_binlog_state_interval= innodb_binlog_state_interval; sql_print_warning("Binlog number %llu could no be opened. Starting a new " "binlog file from number %llu", @@ -1508,8 +1531,12 @@ innodb_binlog_discover() if (res > 0) { /* Found start position in the last binlog file. */ file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no, + header.xa_ref_file_no)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); - current_binlog_state_interval= diff_state_interval; + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); + current_binlog_state_interval= header.diff_state_interval; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -1525,10 +1552,13 @@ innodb_binlog_discover() binlog_files.prev_size, page_buf.get(), &prev_page_no, &prev_pos_in_page, - &diff_state_interval); + &header); if (res < 0) { file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); current_binlog_state_interval= innodb_binlog_state_interval; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; @@ -1538,8 +1568,12 @@ innodb_binlog_discover() return 1; } file_no= binlog_files.prev_file_no; + if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no, + header.xa_ref_file_no)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); - current_binlog_state_interval= diff_state_interval; + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); + current_binlog_state_interval= header.diff_state_interval; binlog_cur_page_no= prev_page_no; binlog_cur_page_offset= prev_pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -1551,7 +1585,10 @@ innodb_binlog_discover() /* Just one empty binlog file found. */ file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); current_binlog_state_interval= innodb_binlog_state_interval; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; @@ -1563,6 +1600,7 @@ innodb_binlog_discover() /* No binlog files found, start from scratch. */ file_no= 0; earliest_binlog_file_no= 0; + ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed); total_binlog_used_size= 0; current_binlog_state_interval= innodb_binlog_state_interval; ib::info() << "Starting a new binlog from file number " << file_no << "."; @@ -1612,6 +1650,8 @@ innodb_binlog_prealloc_thread() #ifdef UNIV_PFS_THREAD pfs_register_thread(binlog_prealloc_thread_key); #endif + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); mysql_mutex_lock(&active_binlog_mutex); while (1) @@ -1633,12 +1673,13 @@ innodb_binlog_prealloc_thread() mysql_mutex_lock(&purge_binlog_mutex); uint32_t size_in_pages= innodb_binlog_size_in_pages; - dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages); + dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages, + lf_pins); if (earliest_binlog_file_no == ~(uint64_t)0) earliest_binlog_file_no= last_created; total_binlog_used_size+= (size_in_pages << ibb_page_size_shift); - innodb_binlog_autopurge(first_open); + innodb_binlog_autopurge(first_open, lf_pins); mysql_mutex_unlock(&purge_binlog_mutex); mysql_mutex_lock(&active_binlog_mutex); @@ -1649,6 +1690,8 @@ innodb_binlog_prealloc_thread() ut_ad(active < ~(uint64_t)0 || last_created == 0); if (active == ~(uint64_t)0) { active_binlog_file_no.store(last_created, std::memory_order_relaxed); + ibb_file_hash.earliest_oob_ref.store(last_created, + std::memory_order_relaxed); } if (first_open == ~(uint64_t)0) first_open_binlog_file_no= first_open= last_created; @@ -1680,6 +1723,7 @@ innodb_binlog_prealloc_thread() } mysql_mutex_unlock(&active_binlog_mutex); + lf_hash_put_pins(lf_pins); my_thread_end(); #ifdef UNIV_PFS_THREAD @@ -1690,7 +1734,8 @@ innodb_binlog_prealloc_thread() bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, - lsn_t start_lsn, uint64_t gtid_state_interval_in_pages) + lsn_t start_lsn, uint64_t gtid_state_interval_in_pages, + LF_PINS *pins) { fsp_binlog_page_entry *block; uint32_t used_bytes; @@ -1698,6 +1743,11 @@ ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, block= binlog_page_fifo->create_page(file_no, 0); ut_a(block /* ToDo: error handling? */); byte *ptr= &block->page_buf[0]; + uint64_t oob_ref_file_no= + ibb_file_hash.earliest_oob_ref.load(std::memory_order_relaxed); + uint64_t xa_ref_file_no= + ibb_file_hash.earliest_xa_ref.load(std::memory_order_relaxed); + ibb_file_hash.update_refs(file_no, pins, oob_ref_file_no, xa_ref_file_no); int4store(ptr, IBB_MAGIC); int4store(ptr + 4, ibb_page_size_shift); @@ -1707,7 +1757,9 @@ ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, int8store(ptr + 24, file_size_in_pages); int8store(ptr + 32, start_lsn); int8store(ptr + 40, gtid_state_interval_in_pages); - used_bytes= 48; + int8store(ptr + 48, oob_ref_file_no); + int8store(ptr + 56, xa_ref_file_no); + used_bytes= IBB_BINLOG_HEADER_SIZE; ut_ad(ibb_page_size >= IBB_HEADER_PAGE_SIZE); memset(ptr + used_bytes, 0, ibb_page_size - (used_bytes + BINLOG_PAGE_CHECKSUM)); /* @@ -2009,18 +2061,9 @@ binlog_state_recover() } -static void -innodb_binlog_write_cache(IO_CACHE *cache, - handler_binlog_event_group_info *binlog_info, mtr_t *mtr) -{ - chunk_data_cache chunk_data(cache, binlog_info); - fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT); -} - - /* Allocate a context for out-of-band binlogging. */ static binlog_oob_context * -alloc_oob_context(uint32 list_length) +alloc_oob_context(uint32 list_length= 10) { size_t needed= sizeof(binlog_oob_context) + list_length * sizeof(binlog_oob_context::node_info); @@ -2028,8 +2071,15 @@ alloc_oob_context(uint32 list_length) (binlog_oob_context *) ut_malloc(needed, mem_key_binlog); if (c) { + if (!(c->lf_pins= lf_hash_get_pins(&ibb_file_hash.hash))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + ut_free(c); + return nullptr; + } c->node_list_alloc_len= list_length; c->node_list_len= 0; + c->pending_refcount= false; } else my_error(ER_OUTOFMEMORY, MYF(0), needed); @@ -2038,9 +2088,38 @@ alloc_oob_context(uint32 list_length) } +static void +innodb_binlog_write_cache(IO_CACHE *cache, + handler_binlog_event_group_info *binlog_info, mtr_t *mtr) +{ + binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr; + if (!c) + binlog_info->engine_ptr= c= alloc_oob_context(); + ut_a(c); + chunk_data_cache chunk_data(cache, binlog_info); + + fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT, c->lf_pins); +} + + +static inline void +reset_oob_context(binlog_oob_context *c) +{ + if (c->pending_refcount) + { + ibb_file_hash.oob_ref_dec(c->first_node_file_no, c->lf_pins); + c->pending_refcount= false; + } + c->node_list_len= 0; +} + + static inline void free_oob_context(binlog_oob_context *c) { + ut_ad(!c->pending_refcount /* Should not have pending until free */); + reset_oob_context(c); /* Defensive programming, should be redundant */ + lf_hash_put_pins(c->lf_pins); ut_free(c); } @@ -2060,7 +2139,7 @@ ensure_oob_context(void **engine_data, uint32_t needed_len) needed_len*sizeof(binlog_oob_context::node_info)); new_c->node_list_alloc_len= needed_len; *engine_data= new_c; - free_oob_context(c); + ut_free(c); return new_c; } @@ -2130,7 +2209,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, { binlog_oob_context *c= (binlog_oob_context *)*engine_data; if (!c) - *engine_data= c= alloc_oob_context(10); + *engine_data= c= alloc_oob_context(); if (UNIV_UNLIKELY(!c)) return true; @@ -2144,7 +2223,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, c->node_list[i-2].file_no, c->node_list[i-2].offset, c->node_list[i-1].file_no, c->node_list[i-1].offset, (byte *)data, data_len); - if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data)) + if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data, c->lf_pins)) return true; c->node_list_len= i - 1; } @@ -2159,7 +2238,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, 0, 0, /* NULL left child signifies a leaf */ c->node_list[i-1].file_no, c->node_list[i-1].offset, (byte *)data, data_len); - if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data)) + if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data, c->lf_pins)) return true; c->node_list_len= i + 1; } @@ -2168,11 +2247,14 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, /* Special case i==0, like case 2 but no prior node to link to. */ binlog_oob_context::chunk_data_oob oob_data (new_idx, 0, 0, 0, 0, (byte *)data, data_len); - if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data)) + if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data, + c->lf_pins)) return true; c->first_node_file_no= c->node_list[i].file_no; c->first_node_offset= c->node_list[i].offset; c->node_list_len= 1; + c->pending_refcount= + ibb_file_hash.oob_ref_inc(c->first_node_file_no, c->lf_pins); } return false; @@ -2187,14 +2269,14 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, bool binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx, uint32_t left_node, uint32_t right_node, - chunk_data_oob *oob_data) + chunk_data_oob *oob_data, LF_PINS *pins) { uint32_t new_height= left_node == right_node ? 1 : 1 + node_list[left_node].height; mtr_t mtr; mtr.start(); std::pair new_file_no_offset= - fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA); + fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA, pins); mtr.commit(); node_list[node].file_no= new_file_no_offset.first; node_list[node].offset= new_file_no_offset.second; @@ -2249,6 +2331,15 @@ binlog_oob_context::chunk_data_oob::copy_data(byte *p, uint32_t max_len) } +void +innodb_reset_oob(THD *thd, void **engine_data) +{ + binlog_oob_context *c= (binlog_oob_context *)*engine_data; + if (c) + reset_oob_context(c); +} + + void innodb_free_oob(THD *thd, void *engine_data) { @@ -2700,7 +2791,8 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, if (page_no > (end_offset >> ibb_page_size_shift)) { ut_ad(!block); - return READ_NOT_FOUND; + if (file_no == active) + return READ_NOT_FOUND; } } @@ -3055,7 +3147,8 @@ bool innodb_reset_binlogs() { bool err= false; - + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); ut_a(innodb_binlog_inited >= 2); /* Close existing binlog tablespaces and stop the pre-alloc thread. */ @@ -3074,6 +3167,8 @@ innodb_reset_binlogs() binlog_page_fifo->lock_wait_for_idle(); binlog_page_fifo->reset(); + ibb_file_hash.remove_up_to(last_created_binlog_file_no, lf_pins); + /* Delete all binlog files in the directory. */ MY_DIR *dir= my_dir(innodb_binlog_directory, MYF(MY_WME)); if (!dir) @@ -3095,6 +3190,12 @@ innodb_reset_binlogs() binlog_name_make(full_path, file_no); if (my_delete(full_path, MYF(MY_WME))) err= true; + /* + Just as defensive coding, also remove any entry from the file hash + with this file_no. We would expect to have already deleted everything + in remove_up_to() above. + */ + ibb_file_hash.remove(file_no, lf_pins); } my_dirend(dir); } @@ -3110,10 +3211,82 @@ innodb_reset_binlogs() start_binlog_prealloc_thread(); binlog_sync_initial(); + lf_hash_put_pins(lf_pins); return err; } +/* + Given a limit_file_no that is still needed by a slave (dump thread). + The dump thread will need to read any oob records references from event + groups in that file_no, so it will then also need to read from any earlier + file_no referenced from limit_file_no. + + This function handles this dependency, by reading the header page (or + getting from the ibb_file_hash if available) to get any earlier file_no + containing such references. +*/ +static bool +purge_adjust_limit_file_no(handler_binlog_purge_info *purge_info, LF_PINS *pins) +{ + uint64_t limit_file_no= purge_info->limit_file_no; + if (limit_file_no == ~(uint64_t)0) + return false; + + uint64_t referenced_file_no; + if (ibb_file_hash.get_oob_ref_file_no(limit_file_no, pins, + &referenced_file_no)) + { + if (referenced_file_no < limit_file_no) + purge_info->limit_file_no= referenced_file_no; + else + ut_ad(referenced_file_no == limit_file_no || + referenced_file_no == ~(uint64_t)0); + return false; + } + + byte *page_buf= (byte *)ut_malloc(ibb_page_size, mem_key_binlog); + if (!page_buf) + { + my_error(ER_OUTOFMEMORY, MYF(0), ibb_page_size); + return true; + } + char filename[OS_FILE_MAX_PATH]; + binlog_name_make(filename, limit_file_no); + File fh= my_open(filename, O_RDONLY | O_BINARY, MYF(0)); + if (fh < (File)0) + { + my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno); + ut_free(page_buf); + return true; + } + int res= crc32_pread_page(fh, page_buf, 0, MYF(0)); + my_close(fh, MYF(0)); + if (res <= 0) + { + ut_free(page_buf); + my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno); + return true; + } + binlog_header_data header; + fsp_binlog_extract_header_page(page_buf, &header); + ut_free(page_buf); + if (header.is_invalid || header.is_empty) + { + my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno); + return true; + } + if (header.oob_ref_file_no < limit_file_no) + purge_info->limit_file_no= header.oob_ref_file_no; + else + ut_ad(header.oob_ref_file_no == limit_file_no || + header.oob_ref_file_no == ~(uint64_t)0); + ibb_record_in_file_hash(limit_file_no, header.oob_ref_file_no, + header.xa_ref_file_no, pins); + return false; +} + + /* The low-level function handling binlog purge. @@ -3142,7 +3315,8 @@ innodb_reset_binlogs() */ static int innodb_binlog_purge_low(handler_binlog_purge_info *purge_info, - uint64_t limit_name_file_no, uint64_t *out_file_no) + uint64_t limit_name_file_no, LF_PINS *lf_pins, + uint64_t *out_file_no) noexcept { uint64_t limit_file_no= purge_info->limit_file_no; @@ -3211,6 +3385,7 @@ innodb_binlog_purge_low(handler_binlog_purge_info *purge_info, need_active_flush= false; } + ibb_file_hash.remove(file_no, lf_pins); if (my_delete(filename, MYF(0))) { if (my_errno == ENOENT) @@ -3235,7 +3410,7 @@ innodb_binlog_purge_low(handler_binlog_purge_info *purge_info, static void -innodb_binlog_autopurge(uint64_t first_open_file_no) +innodb_binlog_autopurge(uint64_t first_open_file_no, LF_PINS *pins) { handler_binlog_purge_info purge_info; #ifdef HAVE_REPLICATION @@ -3249,11 +3424,8 @@ innodb_binlog_autopurge(uint64_t first_open_file_no) !(purge_info.purge_by_size || purge_info.purge_by_date)) return; - /* - ToDo: Here, we need to move back the purge_info.limit_file_no to the - earliest file containing any oob data referenced from the supplied - purge_info.limit_file_no. - */ + if (purge_adjust_limit_file_no(&purge_info, pins)) + return; /* Don't purge any actively open tablespace files. */ uint64_t orig_limit_file_no= purge_info.limit_file_no; @@ -3266,7 +3438,7 @@ innodb_binlog_autopurge(uint64_t first_open_file_no) purge_info.purge_by_name= false; uint64_t file_no; - int res= innodb_binlog_purge_low(&purge_info, 0, &file_no); + int res= innodb_binlog_purge_low(&purge_info, 0, pins, &file_no); if (res) { if (!purge_warning_given) @@ -3327,13 +3499,23 @@ innodb_binlog_purge(handler_binlog_purge_info *purge_info) return LOG_INFO_EOF; } + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); + if (purge_adjust_limit_file_no(purge_info, lf_pins)) + { + lf_hash_put_pins(lf_pins); + return LOG_INFO_IO; + } + uint64_t orig_limit_file_no= purge_info->limit_file_no; purge_info->limit_file_no= std::min(orig_limit_file_no, limit_file_no); mysql_mutex_lock(&purge_binlog_mutex); uint64_t file_no; - int res= innodb_binlog_purge_low(purge_info, to_file_no, &file_no); + int res= innodb_binlog_purge_low(purge_info, to_file_no, lf_pins, &file_no); mysql_mutex_unlock(&purge_binlog_mutex); + lf_hash_put_pins(lf_pins); + if (res == 1) { static_assert(sizeof(purge_info->nonpurge_filename) >= BINLOG_NAME_MAX_LEN, diff --git a/storage/innobase/include/fsp_binlog.h b/storage/innobase/include/fsp_binlog.h index 93bdc3fc40b..bb8a0e1a3a1 100644 --- a/storage/innobase/include/fsp_binlog.h +++ b/storage/innobase/include/fsp_binlog.h @@ -27,6 +27,8 @@ InnoDB implementation of binlog. #include #include +#include "lf.h" + #include "univ.i" #include "mtr0mtr.h" @@ -203,6 +205,85 @@ public: }; +/* Structure of an entry in the hash of binlog tablespace files. */ +struct ibb_tblspc_entry { + uint64_t file_no; + /* + Active transactions/oob-event-groups that start in this binlog tablespace + file (including any user XA). + */ + std::atomicoob_refs; + /* + Active XA transactions whose oob start in this binlog tablespace file. + ToDo: Note that user XA is not yet implemented. + */ + std::atomicxa_refs; + /* + The earliest file number that this binlog tablespace file has oob + references into. + (This is a conservative estimate, references may not actually exist in + case their commit record went into a later file, or they ended up rolling + back). + Includes any XA oob records. + */ + std::atomicoob_ref_file_no; + /* + Earliest file number that we have XA references into. + ToDo: Note that user XA is not yet implemented. + */ + std::atomicxa_ref_file_no; + + ibb_tblspc_entry()= default; + ~ibb_tblspc_entry()= default; +}; + + +/* + Class keeping reference counts of oob records starting in different binlog + tablespace files. + Used to keep track of which files should not be purged because they contain + oob (start) records that are still referenced by needed binlog tablespace + files or by active transactions. +*/ +class ibb_file_oob_refs { +public: + /* Hash contains struct ibb_tblspc_entry keyed on file_no. */ + LF_HASH hash; + /* + Earliest file_no with start oob records that are still referenced by active + transactions / event groups. + */ + std::atomic earliest_oob_ref; + /* + Same, but restricted to those oob that constitute XA transactions. + Thus, this may be larger than earliest_oob_ref or even ~(uint64_t)0 in + case there are no active XA. + */ + std::atomic earliest_xa_ref; + +public: + /* Init the hash empty. */ + void init() noexcept; + void destroy() noexcept; + /* Delete an entry from the hash. */ + void remove(uint64_t file_no, LF_PINS *pins); + /* Delete all (consecutive) entries from file_no down. */ + void remove_up_to(uint64_t file_no, LF_PINS *pins); + /* Update an entry when an OOB record is started/completed. */ + bool oob_ref_inc(uint64_t file_no, LF_PINS *pins); + bool oob_ref_dec(uint64_t file_no, LF_PINS *pins); + /* Update earliest_oob_ref when refcount drops to zero. */ + void do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins, + bool active_moving); + /* Update the oob and xa file_no's active at start of this file_no. */ + bool update_refs(uint64_t file_no, LF_PINS *pins, + uint64_t oob_ref, uint64_t xa_ref); + /* Lookup the oob-referenced file_no from a file_no. */ + bool get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins, + uint64_t *out_oob_ref_file_no); +}; + + class binlog_chunk_reader { public: enum chunk_reader_status { @@ -328,6 +409,8 @@ extern std::atomic binlog_cur_written_offset[2]; extern std::atomic binlog_cur_end_offset[2]; extern fsp_binlog_page_fifo *binlog_page_fifo; +extern ibb_file_oob_refs ibb_file_hash; + static inline void fsp_binlog_release(fsp_binlog_page_entry *page) @@ -341,6 +424,8 @@ extern int crc32_pread_page(File fd, byte *buf, uint32_t page_no, myf MyFlags) noexcept; extern int crc32_pread_page(pfs_os_file_t fh, byte *buf, uint32_t page_no, myf MyFlags) noexcept; +extern bool ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref, + uint64_t xa_ref, LF_PINS *in_pins=nullptr); extern void binlog_write_up_to_now() noexcept; extern void fsp_binlog_extract_header_page(const byte *page_buf, binlog_header_data *out_header_data) @@ -356,9 +441,11 @@ extern bool fsp_binlog_open(const char *file_name, pfs_os_file_t fh, uint64_t file_no, size_t file_size, uint32_t init_page, byte *partial_page); extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no, - uint32_t size_in_pages); + uint32_t size_in_pages, + LF_PINS *pins); extern std::pair fsp_binlog_write_rec( - struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type); + struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type, + LF_PINS *pins); extern bool fsp_binlog_flush(); #endif /* fsp_binlog_h */ diff --git a/storage/innobase/include/innodb_binlog.h b/storage/innobase/include/innodb_binlog.h index 20da1ba9f77..5698ca25367 100644 --- a/storage/innobase/include/innodb_binlog.h +++ b/storage/innobase/include/innodb_binlog.h @@ -45,6 +45,7 @@ struct handler_binlog_purge_info; Currently used for: - chunk_data_cache: A binlog trx cache to be binlogged as a commit record. - chunk_data_oob: An out-of-band piece of event group data. + - chunk_data_flush: For dummy filler data. */ struct chunk_data_base { /* @@ -74,6 +75,8 @@ struct chunk_data_flush : public chunk_data_base { }; +static constexpr size_t IBB_BINLOG_HEADER_SIZE= 64; + /* Data stored at the start of each binlog file. (The data is stored as little-engian values in the first page of the file; @@ -101,6 +104,13 @@ struct binlog_header_data { binlog file was created. */ uint64_t diff_state_interval; + /* The earliest file_no that we have oob references into. */ + uint64_t oob_ref_file_no; + /* + The earliest file_no that we have XA oob references into. + ToDo: This in preparation for when XA is implemented. + */ + uint64_t xa_ref_file_no; /* The log_2 of the page size (eg. ibb_page_size_shift). */ uint32_t page_size_shift; /* @@ -163,12 +173,14 @@ extern bool innodb_binlog_init(size_t binlog_size, const char *directory); extern void innodb_binlog_close(bool shutdown); extern bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, lsn_t start_lsn, - uint64_t gtid_state_interval_in_pages); + uint64_t gtid_state_interval_in_pages, + LF_PINS *pins); extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, fsp_binlog_page_entry * &block, uint32_t &page_no, uint32_t &page_offset, uint64_t file_no); extern bool innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, void **engine_data); +extern void innodb_reset_oob(THD *thd, void **engine_data); extern void innodb_free_oob(THD *thd, void *engine_data); extern handler_binlog_reader *innodb_get_binlog_reader(); extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no);