diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread-master.opt b/mysql-test/suite/binlog_in_engine/purge_dump_thread-master.opt new file mode 100644 index 00000000000..8fbd6fd1b81 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread-master.opt @@ -0,0 +1,5 @@ +--max-binlog-size=64k +--innodb-binlog-state-interval=16k +--log-bin +--binlog-cache-size=8192 +--binlog-stmt-cache-size=8192 diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread.result b/mysql-test/suite/binlog_in_engine/purge_dump_thread.result new file mode 100644 index 00000000000..7fd49437319 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread.result @@ -0,0 +1,132 @@ +include/master-slave.inc +[connection master] +RESET MASTER; +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); +*** Test iteration, RESTART=0 +connection master; +FLUSH BINARY LOGS; +connection master1; +*** Create a transaction with active OOB records (to be committed). +connection default; +*** Create a transaction with active OOB records (to be rolled back). +connection master; +FLUSH BINARY LOGS; +*** Generating 10 large transactions in 5 interleaved connections +connection master; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (0 + 2, 0, "Park point 1 for dump thread"); +include/save_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +122 +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +122 +connection master; +SET @old_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid"; +INSERT INTO t1 VALUES (0 + 2, 1, "Transaction to pause dump thread"); +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_LATER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +ERROR HY000: A purgeable log is in use, will not purge +connection master1; +COMMIT; +connection default; +ROLLBACK; +connection master; +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +ERROR HY000: A purgeable log is in use, will not purge +*** Allow the dump thread to proceed, and see that purge is now possible. +SET GLOBAL debug_dbug= @old_dbug; +SET debug_sync= 'now SIGNAL signal.continue'; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (0 + 2, 2, 'Transaction to get dump thread to the next file'); +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +134 120676 708 220262 +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +134 120676 708 220262 +connection master; +SET debug_sync= 'RESET'; +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +PURGE BINARY LOGS TO 'OOB1_LATER'; +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +*** Test iteration, RESTART=1 +connection master; +FLUSH BINARY LOGS; +connection master1; +*** Create a transaction with active OOB records (to be committed). +connection default; +*** Create a transaction with active OOB records (to be rolled back). +connection master; +FLUSH BINARY LOGS; +*** Generating 10 large transactions in 5 interleaved connections +connection master; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (100000 + 2, 0, "Park point 1 for dump thread"); +include/save_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +255 +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*) FROM t1; +COUNT(*) +255 +connection slave; +include/stop_slave.inc +connection master1; +COMMIT; +connection default; +ROLLBACK; +connection master; +include/rpl_restart_server.inc [server_number=1 parameters: --skip-slave-start] +connection master; +SET @old_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid"; +INSERT INTO t1 VALUES (100000 + 2, 1, "Transaction to pause dump thread"); +connection slave; +include/start_slave.inc +connection master; +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_LATER'; +ERROR HY000: A purgeable log is in use, will not purge +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +ERROR HY000: A purgeable log is in use, will not purge +*** Allow the dump thread to proceed, and see that purge is now possible. +SET GLOBAL debug_dbug= @old_dbug; +SET debug_sync= 'now SIGNAL signal.continue'; +FLUSH BINARY LOGS; +INSERT INTO t1 VALUES (100000 + 2, 2, 'Transaction to get dump thread to the next file'); +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +267 13541352 1416 440519 +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +267 13541352 1416 440519 +connection master; +SET debug_sync= 'RESET'; +PURGE BINARY LOGS TO 'OOB1_START'; +PURGE BINARY LOGS TO 'OOB1_AFTER'; +PURGE BINARY LOGS TO 'OOB1_LATER'; +PURGE BINARY LOGS TO 'OOB1_CURRENT'; +connection master; +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread.test b/mysql-test/suite/binlog_in_engine/purge_dump_thread.test new file mode 100644 index 00000000000..94e95440e04 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread.test @@ -0,0 +1,207 @@ +--source include/have_debug.inc +--source include/have_debug_sync.inc +--source include/have_binlog_format_row.inc +--source include/master-slave.inc +--source include/have_innodb_binlog.inc + +--let $NUM_CONNECTIONS= 5 +# $NUM_TRANSACTIONS is total, not per connection. +--let $NUM_TRANSACTIONS=10 +--let $NUM_PIECES= 10 +--let $PIECE_SIZE= 2000 + +# Test that PURGE BINARY LOGS avoids purging files containing OOB records +# referenced from files that a dump thread is still active in. +# +# The test has --max-binlog-size=64k to have a larger number of binlog files +# to test with. The --binlog-cache-size is set to 8k, so more event data than +# that causes OOB binlogging. + +RESET MASTER; + +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); + +# Run twice. Once where the OOB references to earlier file numbers is kept +# track of in-memory. And once, where server is restarted so the references +# must be read from the file headers. +--let $restart= 0 +while ($restart <= 1) { + + --echo *** Test iteration, RESTART=$restart + --connection master + --let $D= `SELECT $restart*100000` + FLUSH BINARY LOGS; + + # Start a transaction that will have OOB references in this specific binlog file. + --let $oob1_start= query_get_value(SHOW MASTER STATUS, File, 1) + --connection master1 + --echo *** Create a transaction with active OOB records (to be committed). + --disable_query_log + BEGIN; + --let $i= 0 + while ($i < 10) { + eval INSERT INTO t1 VALUES ($D+1, $i, REPEAT(CHR(65 + ($i MOD 26)), 2000)); + inc $i; + } + --enable_query_log + # Leaving the transaction open, so the commit record will end up in a later + # binlog file and have a reference back that blocks purge. + + # Also test an OOB record for a transaction that is later rolled back. + --connection default + --echo *** Create a transaction with active OOB records (to be rolled back). + --disable_query_log + BEGIN; + --let $i= 0 + while ($i < 10) { + eval INSERT INTO t1 VALUES ($D+10, $i, REPEAT(CHR(65 + ($i MOD 26)), 2000)); + inc $i; + } + --enable_query_log + + --connection master + FLUSH BINARY LOGS; + --let $oob1_after= query_get_value(SHOW MASTER STATUS, File, 1) + + # Generate a bunch of more transactions that contain OOB and flex the + # OOB refcounting. + --echo *** Generating $NUM_TRANSACTIONS large transactions in $NUM_CONNECTIONS interleaved connections + --disable_query_log + let $t= 0; + while ($t < $NUM_TRANSACTIONS) { + let $b= $t; + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connect(con$i,localhost,root,,) + START TRANSACTION; + eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, 0, 'Initial $i'); + inc $i; + inc $t; + } + + let $p= 1; + while ($p <= $NUM_PIECES) { + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, $p, REPEAT(CHR(65 + ($p + $i MOD 26)), $PIECE_SIZE)); + inc $i; + } + inc $p; + } + + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, $NUM_PIECES+1, 'Last $i'); + COMMIT; + --disconnect con$i + inc $i; + } + } + --enable_query_log + + --connection master + --let $oob1_later= query_get_value(SHOW MASTER STATUS, File, 1) + FLUSH BINARY LOGS; + eval INSERT INTO t1 VALUES ($D + 2, 0, "Park point 1 for dump thread"); + + # Now get the dump thread to the current point. + --source include/save_master_gtid.inc + SELECT COUNT(*) FROM t1; + + --connection slave + --source include/sync_with_master_gtid.inc + SELECT COUNT(*) FROM t1; + + if ($restart) { + --connection slave + --source include/stop_slave.inc + + --connection master1 + # Commit the transaction with OOB references back to an earlier binlog + # file, so that the reference will be there also after server restart. + COMMIT; + --connection default + # Roll back the other transaction with OOB. + ROLLBACK; + + --connection master + --let $rpl_server_number=1 + --let $rpl_server_parameters= --skip-slave-start + --source include/rpl_restart_server.inc + } + + --connection master + SET @old_dbug= @@global.debug_dbug; + SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid"; + eval INSERT INTO t1 VALUES ($D + 2, 1, "Transaction to pause dump thread"); + --let $oob1_current= query_get_value(SHOW MASTER STATUS, File, 1) + + if ($restart) { + --connection slave + --source include/start_slave.inc + --connection master + } + + let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE Command = 'Binlog Dump' AND State = 'debug sync point: now'; + --source include/wait_condition.inc + + # At this point, we have a dump thread active in $oob1_current. But we still + # have an active OOB record in $oob1_start, so neither of $oob1_start or + # any other prior to $oob1_current must be purged. + # The file before $oob1_start is allowed to be purged, though. + --replace_result $oob1_start OOB1_START + eval PURGE BINARY LOGS TO '$oob1_start'; + --replace_result $oob1_after OOB1_AFTER + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_after'; + --replace_result $oob1_later OOB1_LATER + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_later'; + --replace_result $oob1_current OOB1_CURRENT + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_current'; + if (!$restart) { + --connection master1 + COMMIT; + --connection default + ROLLBACK; + --connection master + --replace_result $oob1_current OOB1_CURRENT + --error ER_LOG_IN_USE + eval PURGE BINARY LOGS TO '$oob1_current'; + } + + --echo *** Allow the dump thread to proceed, and see that purge is now possible. + SET GLOBAL debug_dbug= @old_dbug; + SET debug_sync= 'now SIGNAL signal.continue'; + FLUSH BINARY LOGS; + eval INSERT INTO t1 VALUES ($D + 2, 2, 'Transaction to get dump thread to the next file'); + SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; + --source include/save_master_gtid.inc + --connection slave + --source include/sync_with_master_gtid.inc + SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; + + --connection master + SET debug_sync= 'RESET'; + # Now the dump thread is past $oob1_current, so all PURGE should succeed. + --replace_result $oob1_start OOB1_START + eval PURGE BINARY LOGS TO '$oob1_start'; + --replace_result $oob1_after OOB1_AFTER + eval PURGE BINARY LOGS TO '$oob1_after'; + --replace_result $oob1_later OOB1_LATER + eval PURGE BINARY LOGS TO '$oob1_later'; + --replace_result $oob1_current OOB1_CURRENT + eval PURGE BINARY LOGS TO '$oob1_current'; + + inc $restart; +} + +# Cleanup. +--connection master +DROP TABLE t1; +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_binlog_directory.result b/mysql-test/suite/rpl/r/rpl_binlog_directory.result index 80373a0614a..f58e9c470fd 100644 --- a/mysql-test/suite/rpl/r/rpl_binlog_directory.result +++ b/mysql-test/suite/rpl/r/rpl_binlog_directory.result @@ -13,6 +13,7 @@ INSERT INTO t1 VALUES (2, 11); include/rpl_stop_server.inc [server_number=1] include/rpl_start_server.inc [server_number=1] INSERT INTO t1 VALUES (3, 12); +include/save_master_gtid.inc show binary logs; Log_name File_size master-bin.000001 # @@ -29,6 +30,7 @@ a b 3 12 connection slave; include/start_slave.inc +include/sync_with_master_gtid.inc SELECT * FROM t1 ORDER BY a; a b 1 10 diff --git a/mysql-test/suite/rpl/t/rpl_binlog_directory.test b/mysql-test/suite/rpl/t/rpl_binlog_directory.test index f7c643dee8b..e13c9c88e72 100644 --- a/mysql-test/suite/rpl/t/rpl_binlog_directory.test +++ b/mysql-test/suite/rpl/t/rpl_binlog_directory.test @@ -35,7 +35,7 @@ INSERT INTO t1 VALUES (2, 11); --source include/rpl_start_server.inc INSERT INTO t1 VALUES (3, 12); ---save_master_pos +--source include/save_master_gtid.inc --source include/show_binary_logs.inc --echo *** Contents of master-bin.index (including directory path): --cat_file $master_datadir/master-bin.index @@ -43,7 +43,7 @@ SELECT * FROM t1 ORDER BY a; --connection slave --source include/start_slave.inc ---sync_with_master +--source include/sync_with_master_gtid.inc SELECT * FROM t1 ORDER BY a; # Clean up. diff --git a/mysql-test/suite/sys_vars/r/sysvars_innodb.result b/mysql-test/suite/sys_vars/r/sysvars_innodb.result index f35b09faf2b..fd6ecbe5b40 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_innodb.result +++ b/mysql-test/suite/sys_vars/r/sysvars_innodb.result @@ -96,8 +96,8 @@ SESSION_VALUE NULL DEFAULT_VALUE 2097152 VARIABLE_SCOPE GLOBAL VARIABLE_TYPE BIGINT UNSIGNED -VARIABLE_COMMENT Interval (in bytes) at which to write the GTID binlog state to binlog files to speed up GTID lookups. Must be a multiple of innodb_page_size -NUMERIC_MIN_VALUE 65536 +VARIABLE_COMMENT Interval (in bytes) at which to write the GTID binlog state to binlog files to speed up GTID lookups. Must be a multiple of the binlog page size (4096 bytes) +NUMERIC_MIN_VALUE 8192 NUMERIC_MAX_VALUE 18446744073709551615 NUMERIC_BLOCK_SIZE 0 ENUM_VALUE_LIST NULL diff --git a/sql/handler.h b/sql/handler.h index 6a671f7e6c4..43bcbaa6df8 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1560,6 +1560,12 @@ struct handlerton */ bool (*binlog_oob_data)(THD *thd, const unsigned char *data, size_t data_len, void **engine_data); + /* + Call to reset (for new transactions) the engine_data from + binlog_oob_data(). Can also change the pointer to point to different data + (or set it to NULL). + */ + void (*binlog_oob_reset)(THD *thd, void **engine_data); /* Call to allow engine to release the engine_data from binlog_oob_data(). */ void (*binlog_oob_free)(THD *thd, void *engine_data); /* Obtain an object to allow reading from the binlog. */ diff --git a/sql/log.cc b/sql/log.cc index af1962281c7..dbe1beba993 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -390,12 +390,18 @@ public: else last_commit_pos_file.legacy_name[0]= 0; } + ~binlog_cache_mngr() + { + if (engine_binlog_info.engine_ptr) + (*opt_binlog_engine_hton->binlog_oob_free) + (thd, engine_binlog_info.engine_ptr); + } void reset(bool do_stmt, bool do_trx) { if (engine_binlog_info.engine_ptr) - (*opt_binlog_engine_hton->binlog_oob_free)(thd, - engine_binlog_info.engine_ptr); + (*opt_binlog_engine_hton->binlog_oob_reset) + (thd, &engine_binlog_info.engine_ptr); if (do_stmt) stmt_cache.reset(); if (do_trx) @@ -418,7 +424,9 @@ public: */ trx_cache.cache_log.write_function= binlog_spill_to_engine; trx_cache.cache_log.append_read_pos= (uchar *)this; - engine_binlog_info= {0, 0, 0}; + engine_binlog_info.out_of_band_offset= 0; + engine_binlog_info.gtid_offset= 0; + /* Preserve the engine_ptr for the engine to re-use, was reset above. */ } } diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 6e931f806b7..c4611c5e80e 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -3391,6 +3391,30 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo) (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); DBUG_ASSERT(event_type != START_ENCRYPTION_EVENT); +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", + { + if (event_type == XID_EVENT) + { + net_flush(info->net); + const char act[]= + "now " + "wait_for signal.continue"; + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action( + info->thd, + STRING_WITH_LEN(act))); + + const char act2[]= + "now " + "signal signal.continued"; + DBUG_ASSERT(!debug_sync_set_action( + info->thd, + STRING_WITH_LEN(act2))); + } + }); +#endif + if (((info->errmsg= send_event_to_slave(info, event_type, nullptr, ev_offset, &info->error_gtid)))) return 1; diff --git a/storage/innobase/fsp/fsp_binlog.cc b/storage/innobase/fsp/fsp_binlog.cc index 7113103156d..ab0c12539d7 100644 --- a/storage/innobase/fsp/fsp_binlog.cc +++ b/storage/innobase/fsp/fsp_binlog.cc @@ -21,6 +21,7 @@ this program; if not, write to the Free Software Foundation, Inc., InnoDB implementation of binlog. *******************************************************/ +#include #include "ut0bitop.h" #include "fsp0fsp.h" #include "buf0flu.h" @@ -107,6 +108,9 @@ std::atomic binlog_cur_end_offset[2]; fsp_binlog_page_fifo *binlog_page_fifo; +/* Object to keep track of outstanding oob references in binlog files. */ +ibb_file_oob_refs ibb_file_hash; + fsp_binlog_page_entry * fsp_binlog_page_fifo::create_page(uint64_t file_no, uint32_t page_no) @@ -736,6 +740,225 @@ crc32_pread_page(pfs_os_file_t fh, byte *buf, uint32_t page_no, myf MyFlags) } +/* + Need specific constructor/initializer for struct ibb_tblspc_entry stored in + the ibb_file_hash. This is a work-around for C++ abstractions that makes it + non-standard behaviour to memcpy() std::atomic objects. +*/ +static void +ibb_file_hash_constructor(uchar *arg) +{ + new(arg + LF_HASH_OVERHEAD) ibb_tblspc_entry(); +} + + +static void +ibb_file_hash_destructor(uchar *arg) +{ + ibb_tblspc_entry *e= (ibb_tblspc_entry *)(arg + LF_HASH_OVERHEAD); + e->~ibb_tblspc_entry(); +} + + +static void +ibb_file_hash_initializer(LF_HASH *hash, void *dst, const void *src) +{ + ibb_tblspc_entry *src_e= (ibb_tblspc_entry *)src; + ibb_tblspc_entry *dst_e= (ibb_tblspc_entry *)dst; + dst_e->file_no= src_e->file_no; + dst_e->oob_refs.store(src_e->oob_refs.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dst_e->xa_refs.store(src_e->xa_refs.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dst_e->oob_ref_file_no.store(src_e->oob_ref_file_no.load(std::memory_order_relaxed), + std::memory_order_relaxed); + dst_e->xa_ref_file_no.store(src_e->xa_ref_file_no.load(std::memory_order_relaxed), + std::memory_order_relaxed); +} + + +void +ibb_file_oob_refs::init() noexcept +{ + lf_hash_init(&hash, sizeof(ibb_tblspc_entry), LF_HASH_UNIQUE, + offsetof(ibb_tblspc_entry, file_no), + sizeof(ibb_tblspc_entry::file_no), nullptr, nullptr); + hash.alloc.constructor= ibb_file_hash_constructor; + hash.alloc.destructor= ibb_file_hash_destructor; + hash.initializer= ibb_file_hash_initializer; + earliest_oob_ref= ~(uint64_t)0; + earliest_xa_ref= ~(uint64_t)0; +} + + +void +ibb_file_oob_refs::destroy() noexcept +{ + lf_hash_destroy(&hash); +} + + +void +ibb_file_oob_refs::remove(uint64_t file_no, LF_PINS *pins) +{ + lf_hash_delete(&hash, pins, &file_no, sizeof(file_no)); +} + + +void +ibb_file_oob_refs::remove_up_to(uint64_t file_no, LF_PINS *pins) +{ + for (;;) + { + int res= lf_hash_delete(&hash, pins, &file_no, sizeof(file_no)); + if (res || file_no == 0) + break; + --file_no; + } +} + + +bool +ibb_file_oob_refs::oob_ref_inc(uint64_t file_no, LF_PINS *pins) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return false; + e->oob_refs.fetch_add(1, std::memory_order_acquire); + lf_hash_search_unpin(pins); + return true; +} + + +bool +ibb_file_oob_refs::oob_ref_dec(uint64_t file_no, LF_PINS *pins) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return true; + uint64_t refcnt= e->oob_refs.fetch_sub(1, std::memory_order_acquire) - 1; + lf_hash_search_unpin(pins); + ut_ad(refcnt != (uint64_t)0 - 1); + + if (refcnt == 0) + do_zero_refcnt_action(file_no, pins, false); + return false; +} + + +void +ibb_file_oob_refs::do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins, + bool active_moving) +{ + for (;;) + { + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return; + uint64_t refcnt= e->oob_refs.load(std::memory_order_acquire); + lf_hash_search_unpin(pins); + if (refcnt > 0) + return; + /* + Reference count reached zero. Check if this was the earliest_oob_ref + that reached zero, and if so move it to the next file. Repeat this + for consecutive refcount-is-zero file_no, in case N+1 reaches zero + before N does. + + As records are written into the active binlog file, the refcount can + reach zero temporarily and then go up again, so do not move the + earliest_oob_ref ahead yet. + + As the active is about to move to the next file, we check again, and + this time move the earliest_oob_ref if the refcount on the (previously) + active binlog file ended up at zero. + */ + uint64_t active= active_binlog_file_no.load(std::memory_order_acquire); + ut_ad(file_no <= active + active_moving); + if (file_no >= active + active_moving) + return; + bool ok; + do + { + uint64_t read_file_no= earliest_oob_ref.load(std::memory_order_relaxed); + if (read_file_no != file_no) + break; + ok= earliest_oob_ref.compare_exchange_weak(read_file_no, file_no + 1, + std::memory_order_relaxed); + } while (!ok); + /* Handle any following file_no that may have dropped to zero earlier. */ + ++file_no; + } +} + + +bool +ibb_file_oob_refs::update_refs(uint64_t file_no, LF_PINS *pins, + uint64_t oob_ref, uint64_t xa_ref) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + return false; + e->oob_ref_file_no.store(oob_ref, std::memory_order_relaxed); + e->xa_ref_file_no.store(xa_ref, std::memory_order_relaxed); + lf_hash_search_unpin(pins); + return true; +} + + +bool +ibb_file_oob_refs::get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins, + uint64_t *out_oob_ref_file_no) +{ + ibb_tblspc_entry *e= + (ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no)); + if (!e) + { + *out_oob_ref_file_no= ~(uint64_t)0; + return false; + } + *out_oob_ref_file_no= e->oob_ref_file_no.load(std::memory_order_relaxed); + lf_hash_search_unpin(pins); + return true; +} + + +bool +ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref, uint64_t xa_ref, + LF_PINS *in_pins) +{ + bool err= false; + LF_PINS *pins= in_pins ? in_pins : lf_hash_get_pins(&ibb_file_hash.hash); + if (!pins) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return true; + } + ibb_tblspc_entry entry; + entry.file_no= file_no; + entry.oob_refs.store(0, std::memory_order_relaxed); + entry.xa_refs.store(0, std::memory_order_relaxed); + entry.oob_ref_file_no.store(oob_ref, std::memory_order_relaxed); + entry.xa_ref_file_no.store(xa_ref, std::memory_order_relaxed); + int res= lf_hash_insert(&ibb_file_hash.hash, pins, &entry); + if (res) + { + ut_ad(res < 0 /* Should not get unique violation, never insert twice */); + sql_print_error("InnoDB: Could not initialize in-memory structure for " + "binlog tablespace file number %" PRIu64 ", %s", file_no, + (res < 0 ? "out of memory" : "internal error")); + err= true; + } + if (!in_pins) + lf_hash_put_pins(pins); + return err; +} + + void binlog_write_up_to_now() noexcept { @@ -789,6 +1012,8 @@ fsp_binlog_extract_header_page(const byte *page_buf, out_header_data-> page_count= uint8korr(page_buf + 24); out_header_data-> start_lsn= uint8korr(page_buf + 32); out_header_data-> diff_state_interval= uint8korr(page_buf + 40); + out_header_data->oob_ref_file_no= uint8korr(page_buf + 48); + out_header_data->xa_ref_file_no= uint8korr(page_buf + 56); } @@ -839,6 +1064,7 @@ fsp_binlog_init() { mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr); pthread_cond_init(&active_binlog_cond, nullptr); + ibb_file_hash.init(); binlog_page_fifo= new fsp_binlog_page_fifo(); binlog_page_fifo->start_flush_thread(); } @@ -849,6 +1075,7 @@ fsp_binlog_shutdown() { binlog_page_fifo->stop_flush_thread(); delete binlog_page_fifo; + ibb_file_hash.destroy(); pthread_cond_destroy(&active_binlog_cond); mysql_mutex_destroy(&active_binlog_mutex); } @@ -927,7 +1154,8 @@ fsp_binlog_open(const char *file_name, pfs_os_file_t fh, /** Create a binlog tablespace file @param[in] file_no Index of the binlog tablespace @return DB_SUCCESS or error code */ -dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) +dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages, + LF_PINS *pins) { pfs_os_file_t fh; bool ret; @@ -962,6 +1190,13 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) return DB_ERROR; } + /* + Enter an initial entry in the hash for this binlog tablespace file. + It will be later updated with the appropriate values when the file + first gets used and the header page is written. + */ + ibb_record_in_file_hash(file_no, ~(uint64_t)0,~(uint64_t)0, pins); + binlog_page_fifo->create_tablespace(file_no, size_in_pages); os_file_close(fh); @@ -979,7 +1214,8 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) GTID state record, used for FLUSH BINARY LOGS. */ std::pair -fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) +fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type, + LF_PINS *pins) { uint32_t page_size= (uint32_t)ibb_page_size; uint32_t page_size_shift= ibb_page_size_shift; @@ -1038,9 +1274,16 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) /* Write the header page at the start of a binlog tablespace file. */ if (page_no == 0) { + /* Active is moving to next file, so check if oob refcount of previous + file is zero. + */ + if (UNIV_LIKELY(file_no > 0)) + ibb_file_hash.do_zero_refcnt_action(file_no - 1, pins, true); + lsn_t start_lsn= log_sys.get_lsn(std::memory_order_acquire); bool err= ibb_write_header_page(mtr, file_no, file_size_in_pages, - start_lsn, current_binlog_state_interval); + start_lsn, + current_binlog_state_interval, pins); ut_a(!err /* ToDo error handling */); page_no= 1; } @@ -1244,6 +1487,8 @@ fsp_binlog_flush() my_sync(fh, MYF(0)); binlog_page_fifo->unlock(); + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); uint32_t page_offset= binlog_cur_page_offset; if (page_offset > BINLOG_PAGE_DATA || page_offset < ibb_page_size - BINLOG_PAGE_DATA_END) @@ -1254,7 +1499,7 @@ fsp_binlog_flush() end-of-file of the entire binlog. */ mtr.start(); - fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY); + fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY, lf_pins); mtr.commit(); } @@ -1280,8 +1525,9 @@ fsp_binlog_flush() persisted across a server restart. */ mtr.start(); - fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER); + fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins); mtr.commit(); + lf_hash_put_pins(lf_pins); log_buffer_flush_to_disk(srv_flush_log_at_trx_commit & 1); return false; diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index cef9f17d9da..521f2f11b64 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -3759,10 +3759,10 @@ static int innodb_init_params() if (innodb_binlog_state_interval == 0 || innodb_binlog_state_interval != (ulonglong)1 << (63 - nlz(innodb_binlog_state_interval)) || - innodb_binlog_state_interval % (ulonglong)srv_page_size) { + innodb_binlog_state_interval % (ulonglong)ibb_page_size) { ib::error() << "innodb_binlog_state_interval must be a " - "power-of-two multiple of the innodb_page_size=" - << srv_page_size; + "power-of-two multiple of the innodb binlog page size=" + << ibb_page_size; DBUG_RETURN(HA_ERR_INITIALIZATION); } @@ -4132,6 +4132,7 @@ static int innodb_init(void* p) innobase_hton->binlog_init= innodb_binlog_init; innobase_hton->binlog_write_direct= innobase_binlog_write_direct; innobase_hton->binlog_oob_data= innodb_binlog_oob; + innobase_hton->binlog_oob_reset= innodb_reset_oob; innobase_hton->binlog_oob_free= innodb_free_oob; innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list; @@ -19931,9 +19932,10 @@ static MYSQL_SYSVAR_ULONGLONG(binlog_state_interval, innodb_binlog_state_interval, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, "Interval (in bytes) at which to write the GTID binlog state to binlog " - "files to speed up GTID lookups. Must be a multiple of innodb_page_size", + "files to speed up GTID lookups. Must be a multiple of the binlog page " + "size (4096 bytes)", NULL, NULL, 2*1024*1024, - UNIV_PAGE_SIZE_MAX, ULONGLONG_MAX, 0); + 8192, ULONGLONG_MAX, 0); static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(autoextend_increment), diff --git a/storage/innobase/handler/innodb_binlog.cc b/storage/innobase/handler/innodb_binlog.cc index 79c41b9a2ef..b90ea380990 100644 --- a/storage/innobase/handler/innodb_binlog.cc +++ b/storage/innobase/handler/innodb_binlog.cc @@ -117,12 +117,18 @@ struct binlog_oob_context { bool binlog_node(uint32_t node, uint64_t new_idx, uint32_t left_node, uint32_t right_node, - chunk_data_oob *oob_data); + chunk_data_oob *oob_data, LF_PINS *pins); uint64_t first_node_file_no; uint64_t first_node_offset; + LF_PINS *lf_pins; uint32_t node_list_len; uint32_t node_list_alloc_len; + /* + Set if we incremented refcount in first_node_file_no, so we need to + decrement again at commit record write or reset/rollback. + */ + bool pending_refcount; /* The node_list contains the root of each tree in the forest of perfect binary trees. @@ -249,6 +255,7 @@ public: struct chunk_data_cache : public chunk_data_base { IO_CACHE *cache; + binlog_oob_context *oob_ctx; size_t main_remain; size_t gtid_remain; uint32_t header_remain; @@ -270,6 +277,8 @@ struct chunk_data_cache : public chunk_data_base { binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr; unsigned char *p; + ut_ad(c); + oob_ctx= c; if (c && c->node_list_len) { /* @@ -321,6 +330,13 @@ struct chunk_data_cache : public chunk_data_base { virtual std::pair copy_data(byte *p, uint32_t max_len) final { uint32_t size= 0; + + if (UNIV_LIKELY(oob_ctx != nullptr) && oob_ctx->pending_refcount) + { + ibb_file_hash.oob_ref_dec(oob_ctx->first_node_file_no, oob_ctx->lf_pins); + oob_ctx->pending_refcount= false; + } + /* Write header data, if any still available. */ if (header_remain > 0) { @@ -505,7 +521,7 @@ static int scan_for_binlogs(const char *binlog_dir, found_binlogs *binlog_files, bool error_if_missing) noexcept; static int innodb_binlog_discover(); static bool binlog_state_recover(); -static void innodb_binlog_autopurge(uint64_t first_open_file_no); +static void innodb_binlog_autopurge(uint64_t first_open_file_no, LF_PINS *pins); static int read_gtid_state_from_page(rpl_binlog_state_base *state, const byte *page, uint32_t page_no) noexcept; @@ -1144,6 +1160,7 @@ innodb_binlog_init_state() earliest_binlog_file_no= ~(uint64_t)0; total_binlog_used_size= 0; active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed); binlog_cur_page_no= 0; binlog_cur_page_offset= BINLOG_PAGE_DATA; current_binlog_state_interval= @@ -1184,9 +1201,12 @@ binlog_sync_initial() { chunk_data_flush dummy_data; mtr_t mtr; + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); mtr.start(); - fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER); + fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins); mtr.commit(); + lf_hash_put_pins(lf_pins); log_buffer_flush_to_disk(true); binlog_page_fifo->flush_up_to(0, 0); binlog_page_fifo->do_fdatasync(0); @@ -1342,7 +1362,7 @@ binlog_page_empty(const byte *page) static int find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, uint32_t *out_page_no, uint32_t *out_pos_in_page, - uint64_t *out_state_interval) + binlog_header_data *out_header_data) { const uint32_t page_size= (uint32_t)ibb_page_size; const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift; @@ -1351,11 +1371,11 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, uint32_t p_0, p_1, p_2, last_nonempty; byte *p, *page_end; bool ret; - binlog_header_data header_data; *out_page_no= 0; *out_pos_in_page= BINLOG_PAGE_DATA; - *out_state_interval= 0; + out_header_data->diff_state_interval= 0; + out_header_data->is_invalid= true; binlog_name_make(file_name, file_no); pfs_os_file_t fh= os_file_create(innodb_data_file_key, file_name, @@ -1371,27 +1391,27 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, os_file_close(fh); return -1; } - fsp_binlog_extract_header_page(page_buf, &header_data); - if (header_data.is_invalid) + fsp_binlog_extract_header_page(page_buf, out_header_data); + if (out_header_data->is_invalid) { sql_print_error("InnoDB: Invalid or corrupt file header in file " "'%s'", file_name); return -1; } - if (header_data.is_empty) { + if (out_header_data->is_empty) { ret= fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr); binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed); return (ret ? -1 : 0); } - if (header_data.file_no != file_no) + if (out_header_data->file_no != file_no) { sql_print_error("InnoDB: Inconsistent file header in file '%s', " - "wrong file_no %" PRIu64, file_name, header_data.file_no); + "wrong file_no %" PRIu64, file_name, + out_header_data->file_no); return -1; } - *out_state_interval= header_data.diff_state_interval; last_nonempty= 0; /* @@ -1471,7 +1491,7 @@ innodb_binlog_discover() const uint32_t page_size= (uint32_t)ibb_page_size; const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift; struct found_binlogs binlog_files; - uint64_t diff_state_interval; + binlog_header_data header; int res= scan_for_binlogs(innodb_binlog_directory, &binlog_files, false); if (res <= 0) @@ -1494,10 +1514,13 @@ innodb_binlog_discover() res= find_pos_in_binlog(binlog_files.last_file_no, binlog_files.last_size, page_buf.get(), &page_no, &pos_in_page, - &diff_state_interval); + &header); if (res < 0) { file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); current_binlog_state_interval= innodb_binlog_state_interval; sql_print_warning("Binlog number %llu could no be opened. Starting a new " "binlog file from number %llu", @@ -1508,8 +1531,12 @@ innodb_binlog_discover() if (res > 0) { /* Found start position in the last binlog file. */ file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no, + header.xa_ref_file_no)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); - current_binlog_state_interval= diff_state_interval; + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); + current_binlog_state_interval= header.diff_state_interval; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -1525,10 +1552,13 @@ innodb_binlog_discover() binlog_files.prev_size, page_buf.get(), &prev_page_no, &prev_pos_in_page, - &diff_state_interval); + &header); if (res < 0) { file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); current_binlog_state_interval= innodb_binlog_state_interval; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; @@ -1538,8 +1568,12 @@ innodb_binlog_discover() return 1; } file_no= binlog_files.prev_file_no; + if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no, + header.xa_ref_file_no)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); - current_binlog_state_interval= diff_state_interval; + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); + current_binlog_state_interval= header.diff_state_interval; binlog_cur_page_no= prev_page_no; binlog_cur_page_offset= prev_pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -1551,7 +1585,10 @@ innodb_binlog_discover() /* Just one empty binlog file found. */ file_no= binlog_files.last_file_no; + if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) + return -1; active_binlog_file_no.store(file_no, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); current_binlog_state_interval= innodb_binlog_state_interval; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; @@ -1563,6 +1600,7 @@ innodb_binlog_discover() /* No binlog files found, start from scratch. */ file_no= 0; earliest_binlog_file_no= 0; + ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed); total_binlog_used_size= 0; current_binlog_state_interval= innodb_binlog_state_interval; ib::info() << "Starting a new binlog from file number " << file_no << "."; @@ -1612,6 +1650,8 @@ innodb_binlog_prealloc_thread() #ifdef UNIV_PFS_THREAD pfs_register_thread(binlog_prealloc_thread_key); #endif + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); mysql_mutex_lock(&active_binlog_mutex); while (1) @@ -1633,12 +1673,13 @@ innodb_binlog_prealloc_thread() mysql_mutex_lock(&purge_binlog_mutex); uint32_t size_in_pages= innodb_binlog_size_in_pages; - dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages); + dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages, + lf_pins); if (earliest_binlog_file_no == ~(uint64_t)0) earliest_binlog_file_no= last_created; total_binlog_used_size+= (size_in_pages << ibb_page_size_shift); - innodb_binlog_autopurge(first_open); + innodb_binlog_autopurge(first_open, lf_pins); mysql_mutex_unlock(&purge_binlog_mutex); mysql_mutex_lock(&active_binlog_mutex); @@ -1649,6 +1690,8 @@ innodb_binlog_prealloc_thread() ut_ad(active < ~(uint64_t)0 || last_created == 0); if (active == ~(uint64_t)0) { active_binlog_file_no.store(last_created, std::memory_order_relaxed); + ibb_file_hash.earliest_oob_ref.store(last_created, + std::memory_order_relaxed); } if (first_open == ~(uint64_t)0) first_open_binlog_file_no= first_open= last_created; @@ -1680,6 +1723,7 @@ innodb_binlog_prealloc_thread() } mysql_mutex_unlock(&active_binlog_mutex); + lf_hash_put_pins(lf_pins); my_thread_end(); #ifdef UNIV_PFS_THREAD @@ -1690,7 +1734,8 @@ innodb_binlog_prealloc_thread() bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, - lsn_t start_lsn, uint64_t gtid_state_interval_in_pages) + lsn_t start_lsn, uint64_t gtid_state_interval_in_pages, + LF_PINS *pins) { fsp_binlog_page_entry *block; uint32_t used_bytes; @@ -1698,6 +1743,11 @@ ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, block= binlog_page_fifo->create_page(file_no, 0); ut_a(block /* ToDo: error handling? */); byte *ptr= &block->page_buf[0]; + uint64_t oob_ref_file_no= + ibb_file_hash.earliest_oob_ref.load(std::memory_order_relaxed); + uint64_t xa_ref_file_no= + ibb_file_hash.earliest_xa_ref.load(std::memory_order_relaxed); + ibb_file_hash.update_refs(file_no, pins, oob_ref_file_no, xa_ref_file_no); int4store(ptr, IBB_MAGIC); int4store(ptr + 4, ibb_page_size_shift); @@ -1707,7 +1757,9 @@ ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, int8store(ptr + 24, file_size_in_pages); int8store(ptr + 32, start_lsn); int8store(ptr + 40, gtid_state_interval_in_pages); - used_bytes= 48; + int8store(ptr + 48, oob_ref_file_no); + int8store(ptr + 56, xa_ref_file_no); + used_bytes= IBB_BINLOG_HEADER_SIZE; ut_ad(ibb_page_size >= IBB_HEADER_PAGE_SIZE); memset(ptr + used_bytes, 0, ibb_page_size - (used_bytes + BINLOG_PAGE_CHECKSUM)); /* @@ -2009,18 +2061,9 @@ binlog_state_recover() } -static void -innodb_binlog_write_cache(IO_CACHE *cache, - handler_binlog_event_group_info *binlog_info, mtr_t *mtr) -{ - chunk_data_cache chunk_data(cache, binlog_info); - fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT); -} - - /* Allocate a context for out-of-band binlogging. */ static binlog_oob_context * -alloc_oob_context(uint32 list_length) +alloc_oob_context(uint32 list_length= 10) { size_t needed= sizeof(binlog_oob_context) + list_length * sizeof(binlog_oob_context::node_info); @@ -2028,8 +2071,15 @@ alloc_oob_context(uint32 list_length) (binlog_oob_context *) ut_malloc(needed, mem_key_binlog); if (c) { + if (!(c->lf_pins= lf_hash_get_pins(&ibb_file_hash.hash))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + ut_free(c); + return nullptr; + } c->node_list_alloc_len= list_length; c->node_list_len= 0; + c->pending_refcount= false; } else my_error(ER_OUTOFMEMORY, MYF(0), needed); @@ -2038,9 +2088,38 @@ alloc_oob_context(uint32 list_length) } +static void +innodb_binlog_write_cache(IO_CACHE *cache, + handler_binlog_event_group_info *binlog_info, mtr_t *mtr) +{ + binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr; + if (!c) + binlog_info->engine_ptr= c= alloc_oob_context(); + ut_a(c); + chunk_data_cache chunk_data(cache, binlog_info); + + fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT, c->lf_pins); +} + + +static inline void +reset_oob_context(binlog_oob_context *c) +{ + if (c->pending_refcount) + { + ibb_file_hash.oob_ref_dec(c->first_node_file_no, c->lf_pins); + c->pending_refcount= false; + } + c->node_list_len= 0; +} + + static inline void free_oob_context(binlog_oob_context *c) { + ut_ad(!c->pending_refcount /* Should not have pending until free */); + reset_oob_context(c); /* Defensive programming, should be redundant */ + lf_hash_put_pins(c->lf_pins); ut_free(c); } @@ -2060,7 +2139,7 @@ ensure_oob_context(void **engine_data, uint32_t needed_len) needed_len*sizeof(binlog_oob_context::node_info)); new_c->node_list_alloc_len= needed_len; *engine_data= new_c; - free_oob_context(c); + ut_free(c); return new_c; } @@ -2130,7 +2209,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, { binlog_oob_context *c= (binlog_oob_context *)*engine_data; if (!c) - *engine_data= c= alloc_oob_context(10); + *engine_data= c= alloc_oob_context(); if (UNIV_UNLIKELY(!c)) return true; @@ -2144,7 +2223,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, c->node_list[i-2].file_no, c->node_list[i-2].offset, c->node_list[i-1].file_no, c->node_list[i-1].offset, (byte *)data, data_len); - if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data)) + if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data, c->lf_pins)) return true; c->node_list_len= i - 1; } @@ -2159,7 +2238,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, 0, 0, /* NULL left child signifies a leaf */ c->node_list[i-1].file_no, c->node_list[i-1].offset, (byte *)data, data_len); - if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data)) + if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data, c->lf_pins)) return true; c->node_list_len= i + 1; } @@ -2168,11 +2247,14 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, /* Special case i==0, like case 2 but no prior node to link to. */ binlog_oob_context::chunk_data_oob oob_data (new_idx, 0, 0, 0, 0, (byte *)data, data_len); - if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data)) + if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data, + c->lf_pins)) return true; c->first_node_file_no= c->node_list[i].file_no; c->first_node_offset= c->node_list[i].offset; c->node_list_len= 1; + c->pending_refcount= + ibb_file_hash.oob_ref_inc(c->first_node_file_no, c->lf_pins); } return false; @@ -2187,14 +2269,14 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, bool binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx, uint32_t left_node, uint32_t right_node, - chunk_data_oob *oob_data) + chunk_data_oob *oob_data, LF_PINS *pins) { uint32_t new_height= left_node == right_node ? 1 : 1 + node_list[left_node].height; mtr_t mtr; mtr.start(); std::pair new_file_no_offset= - fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA); + fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA, pins); mtr.commit(); node_list[node].file_no= new_file_no_offset.first; node_list[node].offset= new_file_no_offset.second; @@ -2249,6 +2331,15 @@ binlog_oob_context::chunk_data_oob::copy_data(byte *p, uint32_t max_len) } +void +innodb_reset_oob(THD *thd, void **engine_data) +{ + binlog_oob_context *c= (binlog_oob_context *)*engine_data; + if (c) + reset_oob_context(c); +} + + void innodb_free_oob(THD *thd, void *engine_data) { @@ -2700,7 +2791,8 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, if (page_no > (end_offset >> ibb_page_size_shift)) { ut_ad(!block); - return READ_NOT_FOUND; + if (file_no == active) + return READ_NOT_FOUND; } } @@ -3055,7 +3147,8 @@ bool innodb_reset_binlogs() { bool err= false; - + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); ut_a(innodb_binlog_inited >= 2); /* Close existing binlog tablespaces and stop the pre-alloc thread. */ @@ -3074,6 +3167,8 @@ innodb_reset_binlogs() binlog_page_fifo->lock_wait_for_idle(); binlog_page_fifo->reset(); + ibb_file_hash.remove_up_to(last_created_binlog_file_no, lf_pins); + /* Delete all binlog files in the directory. */ MY_DIR *dir= my_dir(innodb_binlog_directory, MYF(MY_WME)); if (!dir) @@ -3095,6 +3190,12 @@ innodb_reset_binlogs() binlog_name_make(full_path, file_no); if (my_delete(full_path, MYF(MY_WME))) err= true; + /* + Just as defensive coding, also remove any entry from the file hash + with this file_no. We would expect to have already deleted everything + in remove_up_to() above. + */ + ibb_file_hash.remove(file_no, lf_pins); } my_dirend(dir); } @@ -3110,10 +3211,82 @@ innodb_reset_binlogs() start_binlog_prealloc_thread(); binlog_sync_initial(); + lf_hash_put_pins(lf_pins); return err; } +/* + Given a limit_file_no that is still needed by a slave (dump thread). + The dump thread will need to read any oob records references from event + groups in that file_no, so it will then also need to read from any earlier + file_no referenced from limit_file_no. + + This function handles this dependency, by reading the header page (or + getting from the ibb_file_hash if available) to get any earlier file_no + containing such references. +*/ +static bool +purge_adjust_limit_file_no(handler_binlog_purge_info *purge_info, LF_PINS *pins) +{ + uint64_t limit_file_no= purge_info->limit_file_no; + if (limit_file_no == ~(uint64_t)0) + return false; + + uint64_t referenced_file_no; + if (ibb_file_hash.get_oob_ref_file_no(limit_file_no, pins, + &referenced_file_no)) + { + if (referenced_file_no < limit_file_no) + purge_info->limit_file_no= referenced_file_no; + else + ut_ad(referenced_file_no == limit_file_no || + referenced_file_no == ~(uint64_t)0); + return false; + } + + byte *page_buf= (byte *)ut_malloc(ibb_page_size, mem_key_binlog); + if (!page_buf) + { + my_error(ER_OUTOFMEMORY, MYF(0), ibb_page_size); + return true; + } + char filename[OS_FILE_MAX_PATH]; + binlog_name_make(filename, limit_file_no); + File fh= my_open(filename, O_RDONLY | O_BINARY, MYF(0)); + if (fh < (File)0) + { + my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno); + ut_free(page_buf); + return true; + } + int res= crc32_pread_page(fh, page_buf, 0, MYF(0)); + my_close(fh, MYF(0)); + if (res <= 0) + { + ut_free(page_buf); + my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno); + return true; + } + binlog_header_data header; + fsp_binlog_extract_header_page(page_buf, &header); + ut_free(page_buf); + if (header.is_invalid || header.is_empty) + { + my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno); + return true; + } + if (header.oob_ref_file_no < limit_file_no) + purge_info->limit_file_no= header.oob_ref_file_no; + else + ut_ad(header.oob_ref_file_no == limit_file_no || + header.oob_ref_file_no == ~(uint64_t)0); + ibb_record_in_file_hash(limit_file_no, header.oob_ref_file_no, + header.xa_ref_file_no, pins); + return false; +} + + /* The low-level function handling binlog purge. @@ -3142,7 +3315,8 @@ innodb_reset_binlogs() */ static int innodb_binlog_purge_low(handler_binlog_purge_info *purge_info, - uint64_t limit_name_file_no, uint64_t *out_file_no) + uint64_t limit_name_file_no, LF_PINS *lf_pins, + uint64_t *out_file_no) noexcept { uint64_t limit_file_no= purge_info->limit_file_no; @@ -3211,6 +3385,7 @@ innodb_binlog_purge_low(handler_binlog_purge_info *purge_info, need_active_flush= false; } + ibb_file_hash.remove(file_no, lf_pins); if (my_delete(filename, MYF(0))) { if (my_errno == ENOENT) @@ -3235,7 +3410,7 @@ innodb_binlog_purge_low(handler_binlog_purge_info *purge_info, static void -innodb_binlog_autopurge(uint64_t first_open_file_no) +innodb_binlog_autopurge(uint64_t first_open_file_no, LF_PINS *pins) { handler_binlog_purge_info purge_info; #ifdef HAVE_REPLICATION @@ -3249,11 +3424,8 @@ innodb_binlog_autopurge(uint64_t first_open_file_no) !(purge_info.purge_by_size || purge_info.purge_by_date)) return; - /* - ToDo: Here, we need to move back the purge_info.limit_file_no to the - earliest file containing any oob data referenced from the supplied - purge_info.limit_file_no. - */ + if (purge_adjust_limit_file_no(&purge_info, pins)) + return; /* Don't purge any actively open tablespace files. */ uint64_t orig_limit_file_no= purge_info.limit_file_no; @@ -3266,7 +3438,7 @@ innodb_binlog_autopurge(uint64_t first_open_file_no) purge_info.purge_by_name= false; uint64_t file_no; - int res= innodb_binlog_purge_low(&purge_info, 0, &file_no); + int res= innodb_binlog_purge_low(&purge_info, 0, pins, &file_no); if (res) { if (!purge_warning_given) @@ -3327,13 +3499,23 @@ innodb_binlog_purge(handler_binlog_purge_info *purge_info) return LOG_INFO_EOF; } + LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash); + ut_a(lf_pins); + if (purge_adjust_limit_file_no(purge_info, lf_pins)) + { + lf_hash_put_pins(lf_pins); + return LOG_INFO_IO; + } + uint64_t orig_limit_file_no= purge_info->limit_file_no; purge_info->limit_file_no= std::min(orig_limit_file_no, limit_file_no); mysql_mutex_lock(&purge_binlog_mutex); uint64_t file_no; - int res= innodb_binlog_purge_low(purge_info, to_file_no, &file_no); + int res= innodb_binlog_purge_low(purge_info, to_file_no, lf_pins, &file_no); mysql_mutex_unlock(&purge_binlog_mutex); + lf_hash_put_pins(lf_pins); + if (res == 1) { static_assert(sizeof(purge_info->nonpurge_filename) >= BINLOG_NAME_MAX_LEN, diff --git a/storage/innobase/include/fsp_binlog.h b/storage/innobase/include/fsp_binlog.h index 93bdc3fc40b..bb8a0e1a3a1 100644 --- a/storage/innobase/include/fsp_binlog.h +++ b/storage/innobase/include/fsp_binlog.h @@ -27,6 +27,8 @@ InnoDB implementation of binlog. #include #include +#include "lf.h" + #include "univ.i" #include "mtr0mtr.h" @@ -203,6 +205,85 @@ public: }; +/* Structure of an entry in the hash of binlog tablespace files. */ +struct ibb_tblspc_entry { + uint64_t file_no; + /* + Active transactions/oob-event-groups that start in this binlog tablespace + file (including any user XA). + */ + std::atomicoob_refs; + /* + Active XA transactions whose oob start in this binlog tablespace file. + ToDo: Note that user XA is not yet implemented. + */ + std::atomicxa_refs; + /* + The earliest file number that this binlog tablespace file has oob + references into. + (This is a conservative estimate, references may not actually exist in + case their commit record went into a later file, or they ended up rolling + back). + Includes any XA oob records. + */ + std::atomicoob_ref_file_no; + /* + Earliest file number that we have XA references into. + ToDo: Note that user XA is not yet implemented. + */ + std::atomicxa_ref_file_no; + + ibb_tblspc_entry()= default; + ~ibb_tblspc_entry()= default; +}; + + +/* + Class keeping reference counts of oob records starting in different binlog + tablespace files. + Used to keep track of which files should not be purged because they contain + oob (start) records that are still referenced by needed binlog tablespace + files or by active transactions. +*/ +class ibb_file_oob_refs { +public: + /* Hash contains struct ibb_tblspc_entry keyed on file_no. */ + LF_HASH hash; + /* + Earliest file_no with start oob records that are still referenced by active + transactions / event groups. + */ + std::atomic earliest_oob_ref; + /* + Same, but restricted to those oob that constitute XA transactions. + Thus, this may be larger than earliest_oob_ref or even ~(uint64_t)0 in + case there are no active XA. + */ + std::atomic earliest_xa_ref; + +public: + /* Init the hash empty. */ + void init() noexcept; + void destroy() noexcept; + /* Delete an entry from the hash. */ + void remove(uint64_t file_no, LF_PINS *pins); + /* Delete all (consecutive) entries from file_no down. */ + void remove_up_to(uint64_t file_no, LF_PINS *pins); + /* Update an entry when an OOB record is started/completed. */ + bool oob_ref_inc(uint64_t file_no, LF_PINS *pins); + bool oob_ref_dec(uint64_t file_no, LF_PINS *pins); + /* Update earliest_oob_ref when refcount drops to zero. */ + void do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins, + bool active_moving); + /* Update the oob and xa file_no's active at start of this file_no. */ + bool update_refs(uint64_t file_no, LF_PINS *pins, + uint64_t oob_ref, uint64_t xa_ref); + /* Lookup the oob-referenced file_no from a file_no. */ + bool get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins, + uint64_t *out_oob_ref_file_no); +}; + + class binlog_chunk_reader { public: enum chunk_reader_status { @@ -328,6 +409,8 @@ extern std::atomic binlog_cur_written_offset[2]; extern std::atomic binlog_cur_end_offset[2]; extern fsp_binlog_page_fifo *binlog_page_fifo; +extern ibb_file_oob_refs ibb_file_hash; + static inline void fsp_binlog_release(fsp_binlog_page_entry *page) @@ -341,6 +424,8 @@ extern int crc32_pread_page(File fd, byte *buf, uint32_t page_no, myf MyFlags) noexcept; extern int crc32_pread_page(pfs_os_file_t fh, byte *buf, uint32_t page_no, myf MyFlags) noexcept; +extern bool ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref, + uint64_t xa_ref, LF_PINS *in_pins=nullptr); extern void binlog_write_up_to_now() noexcept; extern void fsp_binlog_extract_header_page(const byte *page_buf, binlog_header_data *out_header_data) @@ -356,9 +441,11 @@ extern bool fsp_binlog_open(const char *file_name, pfs_os_file_t fh, uint64_t file_no, size_t file_size, uint32_t init_page, byte *partial_page); extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no, - uint32_t size_in_pages); + uint32_t size_in_pages, + LF_PINS *pins); extern std::pair fsp_binlog_write_rec( - struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type); + struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type, + LF_PINS *pins); extern bool fsp_binlog_flush(); #endif /* fsp_binlog_h */ diff --git a/storage/innobase/include/innodb_binlog.h b/storage/innobase/include/innodb_binlog.h index 20da1ba9f77..5698ca25367 100644 --- a/storage/innobase/include/innodb_binlog.h +++ b/storage/innobase/include/innodb_binlog.h @@ -45,6 +45,7 @@ struct handler_binlog_purge_info; Currently used for: - chunk_data_cache: A binlog trx cache to be binlogged as a commit record. - chunk_data_oob: An out-of-band piece of event group data. + - chunk_data_flush: For dummy filler data. */ struct chunk_data_base { /* @@ -74,6 +75,8 @@ struct chunk_data_flush : public chunk_data_base { }; +static constexpr size_t IBB_BINLOG_HEADER_SIZE= 64; + /* Data stored at the start of each binlog file. (The data is stored as little-engian values in the first page of the file; @@ -101,6 +104,13 @@ struct binlog_header_data { binlog file was created. */ uint64_t diff_state_interval; + /* The earliest file_no that we have oob references into. */ + uint64_t oob_ref_file_no; + /* + The earliest file_no that we have XA oob references into. + ToDo: This in preparation for when XA is implemented. + */ + uint64_t xa_ref_file_no; /* The log_2 of the page size (eg. ibb_page_size_shift). */ uint32_t page_size_shift; /* @@ -163,12 +173,14 @@ extern bool innodb_binlog_init(size_t binlog_size, const char *directory); extern void innodb_binlog_close(bool shutdown); extern bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, lsn_t start_lsn, - uint64_t gtid_state_interval_in_pages); + uint64_t gtid_state_interval_in_pages, + LF_PINS *pins); extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, fsp_binlog_page_entry * &block, uint32_t &page_no, uint32_t &page_offset, uint64_t file_no); extern bool innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, void **engine_data); +extern void innodb_reset_oob(THD *thd, void **engine_data); extern void innodb_free_oob(THD *thd, void *engine_data); extern handler_binlog_reader *innodb_get_binlog_reader(); extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no);