1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34705: Binlog-in-engine: Implement refcounting outstanding OOB records

Keep track of, for each binlog file, how many open transactions have
out-of-band data starting in that file. Then at the start of each new binlog
file, in the header page, record the file_no of the earliest file that this
file might contain commit records with references back to OOB records in
that earlier file.

Use this in PURGE BINARY LOGS, so that when a dump thread (slave connection)
is active in file number N, and that file (or a later one) may require
looking back in an earlier file number M for out-of-band records, purge will
stop already at file number M. This way, we avoid that purge accidentally
deletes some binlog file that a dump thread would later get an error on
because it needs to read out-of-band data.

This patch also includes placeholder data for a similar facility for XA
references. The actual implementation of support for XA is for later though.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-04-19 07:46:56 +02:00
parent d496e5278d
commit f0d4b63bac
14 changed files with 982 additions and 69 deletions

View File

@@ -0,0 +1,5 @@
--max-binlog-size=64k
--innodb-binlog-state-interval=16k
--log-bin
--binlog-cache-size=8192
--binlog-stmt-cache-size=8192

View File

@@ -0,0 +1,132 @@
include/master-slave.inc
[connection master]
RESET MASTER;
CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (0, 0, 'Start');
*** Test iteration, RESTART=0
connection master;
FLUSH BINARY LOGS;
connection master1;
*** Create a transaction with active OOB records (to be committed).
connection default;
*** Create a transaction with active OOB records (to be rolled back).
connection master;
FLUSH BINARY LOGS;
*** Generating 10 large transactions in 5 interleaved connections
connection master;
FLUSH BINARY LOGS;
INSERT INTO t1 VALUES (0 + 2, 0, "Park point 1 for dump thread");
include/save_master_gtid.inc
SELECT COUNT(*) FROM t1;
COUNT(*)
122
connection slave;
include/sync_with_master_gtid.inc
SELECT COUNT(*) FROM t1;
COUNT(*)
122
connection master;
SET @old_dbug= @@global.debug_dbug;
SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid";
INSERT INTO t1 VALUES (0 + 2, 1, "Transaction to pause dump thread");
PURGE BINARY LOGS TO 'OOB1_START';
PURGE BINARY LOGS TO 'OOB1_AFTER';
ERROR HY000: A purgeable log is in use, will not purge
PURGE BINARY LOGS TO 'OOB1_LATER';
ERROR HY000: A purgeable log is in use, will not purge
PURGE BINARY LOGS TO 'OOB1_CURRENT';
ERROR HY000: A purgeable log is in use, will not purge
connection master1;
COMMIT;
connection default;
ROLLBACK;
connection master;
PURGE BINARY LOGS TO 'OOB1_CURRENT';
ERROR HY000: A purgeable log is in use, will not purge
*** Allow the dump thread to proceed, and see that purge is now possible.
SET GLOBAL debug_dbug= @old_dbug;
SET debug_sync= 'now SIGNAL signal.continue';
FLUSH BINARY LOGS;
INSERT INTO t1 VALUES (0 + 2, 2, 'Transaction to get dump thread to the next file');
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
134 120676 708 220262
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
134 120676 708 220262
connection master;
SET debug_sync= 'RESET';
PURGE BINARY LOGS TO 'OOB1_START';
PURGE BINARY LOGS TO 'OOB1_AFTER';
PURGE BINARY LOGS TO 'OOB1_LATER';
PURGE BINARY LOGS TO 'OOB1_CURRENT';
*** Test iteration, RESTART=1
connection master;
FLUSH BINARY LOGS;
connection master1;
*** Create a transaction with active OOB records (to be committed).
connection default;
*** Create a transaction with active OOB records (to be rolled back).
connection master;
FLUSH BINARY LOGS;
*** Generating 10 large transactions in 5 interleaved connections
connection master;
FLUSH BINARY LOGS;
INSERT INTO t1 VALUES (100000 + 2, 0, "Park point 1 for dump thread");
include/save_master_gtid.inc
SELECT COUNT(*) FROM t1;
COUNT(*)
255
connection slave;
include/sync_with_master_gtid.inc
SELECT COUNT(*) FROM t1;
COUNT(*)
255
connection slave;
include/stop_slave.inc
connection master1;
COMMIT;
connection default;
ROLLBACK;
connection master;
include/rpl_restart_server.inc [server_number=1 parameters: --skip-slave-start]
connection master;
SET @old_dbug= @@global.debug_dbug;
SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid";
INSERT INTO t1 VALUES (100000 + 2, 1, "Transaction to pause dump thread");
connection slave;
include/start_slave.inc
connection master;
PURGE BINARY LOGS TO 'OOB1_START';
PURGE BINARY LOGS TO 'OOB1_AFTER';
ERROR HY000: A purgeable log is in use, will not purge
PURGE BINARY LOGS TO 'OOB1_LATER';
ERROR HY000: A purgeable log is in use, will not purge
PURGE BINARY LOGS TO 'OOB1_CURRENT';
ERROR HY000: A purgeable log is in use, will not purge
*** Allow the dump thread to proceed, and see that purge is now possible.
SET GLOBAL debug_dbug= @old_dbug;
SET debug_sync= 'now SIGNAL signal.continue';
FLUSH BINARY LOGS;
INSERT INTO t1 VALUES (100000 + 2, 2, 'Transaction to get dump thread to the next file');
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
267 13541352 1416 440519
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
267 13541352 1416 440519
connection master;
SET debug_sync= 'RESET';
PURGE BINARY LOGS TO 'OOB1_START';
PURGE BINARY LOGS TO 'OOB1_AFTER';
PURGE BINARY LOGS TO 'OOB1_LATER';
PURGE BINARY LOGS TO 'OOB1_CURRENT';
connection master;
DROP TABLE t1;
include/rpl_end.inc

View File

@@ -0,0 +1,207 @@
--source include/have_debug.inc
--source include/have_debug_sync.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
--source include/have_innodb_binlog.inc
--let $NUM_CONNECTIONS= 5
# $NUM_TRANSACTIONS is total, not per connection.
--let $NUM_TRANSACTIONS=10
--let $NUM_PIECES= 10
--let $PIECE_SIZE= 2000
# Test that PURGE BINARY LOGS avoids purging files containing OOB records
# referenced from files that a dump thread is still active in.
#
# The test has --max-binlog-size=64k to have a larger number of binlog files
# to test with. The --binlog-cache-size is set to 8k, so more event data than
# that causes OOB binlogging.
RESET MASTER;
CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (0, 0, 'Start');
# Run twice. Once where the OOB references to earlier file numbers is kept
# track of in-memory. And once, where server is restarted so the references
# must be read from the file headers.
--let $restart= 0
while ($restart <= 1) {
--echo *** Test iteration, RESTART=$restart
--connection master
--let $D= `SELECT $restart*100000`
FLUSH BINARY LOGS;
# Start a transaction that will have OOB references in this specific binlog file.
--let $oob1_start= query_get_value(SHOW MASTER STATUS, File, 1)
--connection master1
--echo *** Create a transaction with active OOB records (to be committed).
--disable_query_log
BEGIN;
--let $i= 0
while ($i < 10) {
eval INSERT INTO t1 VALUES ($D+1, $i, REPEAT(CHR(65 + ($i MOD 26)), 2000));
inc $i;
}
--enable_query_log
# Leaving the transaction open, so the commit record will end up in a later
# binlog file and have a reference back that blocks purge.
# Also test an OOB record for a transaction that is later rolled back.
--connection default
--echo *** Create a transaction with active OOB records (to be rolled back).
--disable_query_log
BEGIN;
--let $i= 0
while ($i < 10) {
eval INSERT INTO t1 VALUES ($D+10, $i, REPEAT(CHR(65 + ($i MOD 26)), 2000));
inc $i;
}
--enable_query_log
--connection master
FLUSH BINARY LOGS;
--let $oob1_after= query_get_value(SHOW MASTER STATUS, File, 1)
# Generate a bunch of more transactions that contain OOB and flex the
# OOB refcounting.
--echo *** Generating $NUM_TRANSACTIONS large transactions in $NUM_CONNECTIONS interleaved connections
--disable_query_log
let $t= 0;
while ($t < $NUM_TRANSACTIONS) {
let $b= $t;
let $i= 1;
while ($i <= $NUM_CONNECTIONS) {
--connect(con$i,localhost,root,,)
START TRANSACTION;
eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, 0, 'Initial $i');
inc $i;
inc $t;
}
let $p= 1;
while ($p <= $NUM_PIECES) {
let $i= 1;
while ($i <= $NUM_CONNECTIONS) {
--connection con$i
eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, $p, REPEAT(CHR(65 + ($p + $i MOD 26)), $PIECE_SIZE));
inc $i;
}
inc $p;
}
let $i= 1;
while ($i <= $NUM_CONNECTIONS) {
--connection con$i
eval INSERT INTO t1 VALUES ($D + 1000 + $b + $i, $NUM_PIECES+1, 'Last $i');
COMMIT;
--disconnect con$i
inc $i;
}
}
--enable_query_log
--connection master
--let $oob1_later= query_get_value(SHOW MASTER STATUS, File, 1)
FLUSH BINARY LOGS;
eval INSERT INTO t1 VALUES ($D + 2, 0, "Park point 1 for dump thread");
# Now get the dump thread to the current point.
--source include/save_master_gtid.inc
SELECT COUNT(*) FROM t1;
--connection slave
--source include/sync_with_master_gtid.inc
SELECT COUNT(*) FROM t1;
if ($restart) {
--connection slave
--source include/stop_slave.inc
--connection master1
# Commit the transaction with OOB references back to an earlier binlog
# file, so that the reference will be there also after server restart.
COMMIT;
--connection default
# Roll back the other transaction with OOB.
ROLLBACK;
--connection master
--let $rpl_server_number=1
--let $rpl_server_parameters= --skip-slave-start
--source include/rpl_restart_server.inc
}
--connection master
SET @old_dbug= @@global.debug_dbug;
SET GLOBAL debug_dbug= "+d,dump_thread_wait_before_send_xid";
eval INSERT INTO t1 VALUES ($D + 2, 1, "Transaction to pause dump thread");
--let $oob1_current= query_get_value(SHOW MASTER STATUS, File, 1)
if ($restart) {
--connection slave
--source include/start_slave.inc
--connection master
}
let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE Command = 'Binlog Dump' AND State = 'debug sync point: now';
--source include/wait_condition.inc
# At this point, we have a dump thread active in $oob1_current. But we still
# have an active OOB record in $oob1_start, so neither of $oob1_start or
# any other prior to $oob1_current must be purged.
# The file before $oob1_start is allowed to be purged, though.
--replace_result $oob1_start OOB1_START
eval PURGE BINARY LOGS TO '$oob1_start';
--replace_result $oob1_after OOB1_AFTER
--error ER_LOG_IN_USE
eval PURGE BINARY LOGS TO '$oob1_after';
--replace_result $oob1_later OOB1_LATER
--error ER_LOG_IN_USE
eval PURGE BINARY LOGS TO '$oob1_later';
--replace_result $oob1_current OOB1_CURRENT
--error ER_LOG_IN_USE
eval PURGE BINARY LOGS TO '$oob1_current';
if (!$restart) {
--connection master1
COMMIT;
--connection default
ROLLBACK;
--connection master
--replace_result $oob1_current OOB1_CURRENT
--error ER_LOG_IN_USE
eval PURGE BINARY LOGS TO '$oob1_current';
}
--echo *** Allow the dump thread to proceed, and see that purge is now possible.
SET GLOBAL debug_dbug= @old_dbug;
SET debug_sync= 'now SIGNAL signal.continue';
FLUSH BINARY LOGS;
eval INSERT INTO t1 VALUES ($D + 2, 2, 'Transaction to get dump thread to the next file');
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
--connection master
SET debug_sync= 'RESET';
# Now the dump thread is past $oob1_current, so all PURGE should succeed.
--replace_result $oob1_start OOB1_START
eval PURGE BINARY LOGS TO '$oob1_start';
--replace_result $oob1_after OOB1_AFTER
eval PURGE BINARY LOGS TO '$oob1_after';
--replace_result $oob1_later OOB1_LATER
eval PURGE BINARY LOGS TO '$oob1_later';
--replace_result $oob1_current OOB1_CURRENT
eval PURGE BINARY LOGS TO '$oob1_current';
inc $restart;
}
# Cleanup.
--connection master
DROP TABLE t1;
--source include/rpl_end.inc

View File

@@ -13,6 +13,7 @@ INSERT INTO t1 VALUES (2, 11);
include/rpl_stop_server.inc [server_number=1] include/rpl_stop_server.inc [server_number=1]
include/rpl_start_server.inc [server_number=1] include/rpl_start_server.inc [server_number=1]
INSERT INTO t1 VALUES (3, 12); INSERT INTO t1 VALUES (3, 12);
include/save_master_gtid.inc
show binary logs; show binary logs;
Log_name File_size Log_name File_size
master-bin.000001 # master-bin.000001 #
@@ -29,6 +30,7 @@ a b
3 12 3 12
connection slave; connection slave;
include/start_slave.inc include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a; SELECT * FROM t1 ORDER BY a;
a b a b
1 10 1 10

View File

@@ -35,7 +35,7 @@ INSERT INTO t1 VALUES (2, 11);
--source include/rpl_start_server.inc --source include/rpl_start_server.inc
INSERT INTO t1 VALUES (3, 12); INSERT INTO t1 VALUES (3, 12);
--save_master_pos --source include/save_master_gtid.inc
--source include/show_binary_logs.inc --source include/show_binary_logs.inc
--echo *** Contents of master-bin.index (including directory path): --echo *** Contents of master-bin.index (including directory path):
--cat_file $master_datadir/master-bin.index --cat_file $master_datadir/master-bin.index
@@ -43,7 +43,7 @@ SELECT * FROM t1 ORDER BY a;
--connection slave --connection slave
--source include/start_slave.inc --source include/start_slave.inc
--sync_with_master --source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a; SELECT * FROM t1 ORDER BY a;
# Clean up. # Clean up.

View File

@@ -96,8 +96,8 @@ SESSION_VALUE NULL
DEFAULT_VALUE 2097152 DEFAULT_VALUE 2097152
VARIABLE_SCOPE GLOBAL VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Interval (in bytes) at which to write the GTID binlog state to binlog files to speed up GTID lookups. Must be a multiple of innodb_page_size VARIABLE_COMMENT Interval (in bytes) at which to write the GTID binlog state to binlog files to speed up GTID lookups. Must be a multiple of the binlog page size (4096 bytes)
NUMERIC_MIN_VALUE 65536 NUMERIC_MIN_VALUE 8192
NUMERIC_MAX_VALUE 18446744073709551615 NUMERIC_MAX_VALUE 18446744073709551615
NUMERIC_BLOCK_SIZE 0 NUMERIC_BLOCK_SIZE 0
ENUM_VALUE_LIST NULL ENUM_VALUE_LIST NULL

View File

@@ -1560,6 +1560,12 @@ struct handlerton
*/ */
bool (*binlog_oob_data)(THD *thd, const unsigned char *data, size_t data_len, bool (*binlog_oob_data)(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data); void **engine_data);
/*
Call to reset (for new transactions) the engine_data from
binlog_oob_data(). Can also change the pointer to point to different data
(or set it to NULL).
*/
void (*binlog_oob_reset)(THD *thd, void **engine_data);
/* Call to allow engine to release the engine_data from binlog_oob_data(). */ /* Call to allow engine to release the engine_data from binlog_oob_data(). */
void (*binlog_oob_free)(THD *thd, void *engine_data); void (*binlog_oob_free)(THD *thd, void *engine_data);
/* Obtain an object to allow reading from the binlog. */ /* Obtain an object to allow reading from the binlog. */

View File

@@ -390,12 +390,18 @@ public:
else else
last_commit_pos_file.legacy_name[0]= 0; last_commit_pos_file.legacy_name[0]= 0;
} }
~binlog_cache_mngr()
{
if (engine_binlog_info.engine_ptr)
(*opt_binlog_engine_hton->binlog_oob_free)
(thd, engine_binlog_info.engine_ptr);
}
void reset(bool do_stmt, bool do_trx) void reset(bool do_stmt, bool do_trx)
{ {
if (engine_binlog_info.engine_ptr) if (engine_binlog_info.engine_ptr)
(*opt_binlog_engine_hton->binlog_oob_free)(thd, (*opt_binlog_engine_hton->binlog_oob_reset)
engine_binlog_info.engine_ptr); (thd, &engine_binlog_info.engine_ptr);
if (do_stmt) if (do_stmt)
stmt_cache.reset(); stmt_cache.reset();
if (do_trx) if (do_trx)
@@ -418,7 +424,9 @@ public:
*/ */
trx_cache.cache_log.write_function= binlog_spill_to_engine; trx_cache.cache_log.write_function= binlog_spill_to_engine;
trx_cache.cache_log.append_read_pos= (uchar *)this; trx_cache.cache_log.append_read_pos= (uchar *)this;
engine_binlog_info= {0, 0, 0}; engine_binlog_info.out_of_band_offset= 0;
engine_binlog_info.gtid_offset= 0;
/* Preserve the engine_ptr for the engine to re-use, was reset above. */
} }
} }

View File

@@ -3391,6 +3391,30 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo)
(Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
DBUG_ASSERT(event_type != START_ENCRYPTION_EVENT); DBUG_ASSERT(event_type != START_ENCRYPTION_EVENT);
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
if (event_type == XID_EVENT)
{
net_flush(info->net);
const char act[]=
"now "
"wait_for signal.continue";
DBUG_ASSERT(debug_sync_service);
DBUG_ASSERT(!debug_sync_set_action(
info->thd,
STRING_WITH_LEN(act)));
const char act2[]=
"now "
"signal signal.continued";
DBUG_ASSERT(!debug_sync_set_action(
info->thd,
STRING_WITH_LEN(act2)));
}
});
#endif
if (((info->errmsg= send_event_to_slave(info, event_type, nullptr, if (((info->errmsg= send_event_to_slave(info, event_type, nullptr,
ev_offset, &info->error_gtid)))) ev_offset, &info->error_gtid))))
return 1; return 1;

View File

@@ -21,6 +21,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
InnoDB implementation of binlog. InnoDB implementation of binlog.
*******************************************************/ *******************************************************/
#include <type_traits>
#include "ut0bitop.h" #include "ut0bitop.h"
#include "fsp0fsp.h" #include "fsp0fsp.h"
#include "buf0flu.h" #include "buf0flu.h"
@@ -107,6 +108,9 @@ std::atomic<uint64_t> binlog_cur_end_offset[2];
fsp_binlog_page_fifo *binlog_page_fifo; fsp_binlog_page_fifo *binlog_page_fifo;
/* Object to keep track of outstanding oob references in binlog files. */
ibb_file_oob_refs ibb_file_hash;
fsp_binlog_page_entry * fsp_binlog_page_entry *
fsp_binlog_page_fifo::create_page(uint64_t file_no, uint32_t page_no) fsp_binlog_page_fifo::create_page(uint64_t file_no, uint32_t page_no)
@@ -736,6 +740,225 @@ crc32_pread_page(pfs_os_file_t fh, byte *buf, uint32_t page_no, myf MyFlags)
} }
/*
Need specific constructor/initializer for struct ibb_tblspc_entry stored in
the ibb_file_hash. This is a work-around for C++ abstractions that makes it
non-standard behaviour to memcpy() std::atomic objects.
*/
static void
ibb_file_hash_constructor(uchar *arg)
{
new(arg + LF_HASH_OVERHEAD) ibb_tblspc_entry();
}
static void
ibb_file_hash_destructor(uchar *arg)
{
ibb_tblspc_entry *e= (ibb_tblspc_entry *)(arg + LF_HASH_OVERHEAD);
e->~ibb_tblspc_entry();
}
static void
ibb_file_hash_initializer(LF_HASH *hash, void *dst, const void *src)
{
ibb_tblspc_entry *src_e= (ibb_tblspc_entry *)src;
ibb_tblspc_entry *dst_e= (ibb_tblspc_entry *)dst;
dst_e->file_no= src_e->file_no;
dst_e->oob_refs.store(src_e->oob_refs.load(std::memory_order_relaxed),
std::memory_order_relaxed);
dst_e->xa_refs.store(src_e->xa_refs.load(std::memory_order_relaxed),
std::memory_order_relaxed);
dst_e->oob_ref_file_no.store(src_e->oob_ref_file_no.load(std::memory_order_relaxed),
std::memory_order_relaxed);
dst_e->xa_ref_file_no.store(src_e->xa_ref_file_no.load(std::memory_order_relaxed),
std::memory_order_relaxed);
}
void
ibb_file_oob_refs::init() noexcept
{
lf_hash_init(&hash, sizeof(ibb_tblspc_entry), LF_HASH_UNIQUE,
offsetof(ibb_tblspc_entry, file_no),
sizeof(ibb_tblspc_entry::file_no), nullptr, nullptr);
hash.alloc.constructor= ibb_file_hash_constructor;
hash.alloc.destructor= ibb_file_hash_destructor;
hash.initializer= ibb_file_hash_initializer;
earliest_oob_ref= ~(uint64_t)0;
earliest_xa_ref= ~(uint64_t)0;
}
void
ibb_file_oob_refs::destroy() noexcept
{
lf_hash_destroy(&hash);
}
void
ibb_file_oob_refs::remove(uint64_t file_no, LF_PINS *pins)
{
lf_hash_delete(&hash, pins, &file_no, sizeof(file_no));
}
void
ibb_file_oob_refs::remove_up_to(uint64_t file_no, LF_PINS *pins)
{
for (;;)
{
int res= lf_hash_delete(&hash, pins, &file_no, sizeof(file_no));
if (res || file_no == 0)
break;
--file_no;
}
}
bool
ibb_file_oob_refs::oob_ref_inc(uint64_t file_no, LF_PINS *pins)
{
ibb_tblspc_entry *e=
(ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no));
if (!e)
return false;
e->oob_refs.fetch_add(1, std::memory_order_acquire);
lf_hash_search_unpin(pins);
return true;
}
bool
ibb_file_oob_refs::oob_ref_dec(uint64_t file_no, LF_PINS *pins)
{
ibb_tblspc_entry *e=
(ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no));
if (!e)
return true;
uint64_t refcnt= e->oob_refs.fetch_sub(1, std::memory_order_acquire) - 1;
lf_hash_search_unpin(pins);
ut_ad(refcnt != (uint64_t)0 - 1);
if (refcnt == 0)
do_zero_refcnt_action(file_no, pins, false);
return false;
}
void
ibb_file_oob_refs::do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins,
bool active_moving)
{
for (;;)
{
ibb_tblspc_entry *e=
(ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no));
if (!e)
return;
uint64_t refcnt= e->oob_refs.load(std::memory_order_acquire);
lf_hash_search_unpin(pins);
if (refcnt > 0)
return;
/*
Reference count reached zero. Check if this was the earliest_oob_ref
that reached zero, and if so move it to the next file. Repeat this
for consecutive refcount-is-zero file_no, in case N+1 reaches zero
before N does.
As records are written into the active binlog file, the refcount can
reach zero temporarily and then go up again, so do not move the
earliest_oob_ref ahead yet.
As the active is about to move to the next file, we check again, and
this time move the earliest_oob_ref if the refcount on the (previously)
active binlog file ended up at zero.
*/
uint64_t active= active_binlog_file_no.load(std::memory_order_acquire);
ut_ad(file_no <= active + active_moving);
if (file_no >= active + active_moving)
return;
bool ok;
do
{
uint64_t read_file_no= earliest_oob_ref.load(std::memory_order_relaxed);
if (read_file_no != file_no)
break;
ok= earliest_oob_ref.compare_exchange_weak(read_file_no, file_no + 1,
std::memory_order_relaxed);
} while (!ok);
/* Handle any following file_no that may have dropped to zero earlier. */
++file_no;
}
}
bool
ibb_file_oob_refs::update_refs(uint64_t file_no, LF_PINS *pins,
uint64_t oob_ref, uint64_t xa_ref)
{
ibb_tblspc_entry *e=
(ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no));
if (!e)
return false;
e->oob_ref_file_no.store(oob_ref, std::memory_order_relaxed);
e->xa_ref_file_no.store(xa_ref, std::memory_order_relaxed);
lf_hash_search_unpin(pins);
return true;
}
bool
ibb_file_oob_refs::get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins,
uint64_t *out_oob_ref_file_no)
{
ibb_tblspc_entry *e=
(ibb_tblspc_entry *)lf_hash_search(&hash, pins, &file_no, sizeof(file_no));
if (!e)
{
*out_oob_ref_file_no= ~(uint64_t)0;
return false;
}
*out_oob_ref_file_no= e->oob_ref_file_no.load(std::memory_order_relaxed);
lf_hash_search_unpin(pins);
return true;
}
bool
ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref, uint64_t xa_ref,
LF_PINS *in_pins)
{
bool err= false;
LF_PINS *pins= in_pins ? in_pins : lf_hash_get_pins(&ibb_file_hash.hash);
if (!pins)
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
return true;
}
ibb_tblspc_entry entry;
entry.file_no= file_no;
entry.oob_refs.store(0, std::memory_order_relaxed);
entry.xa_refs.store(0, std::memory_order_relaxed);
entry.oob_ref_file_no.store(oob_ref, std::memory_order_relaxed);
entry.xa_ref_file_no.store(xa_ref, std::memory_order_relaxed);
int res= lf_hash_insert(&ibb_file_hash.hash, pins, &entry);
if (res)
{
ut_ad(res < 0 /* Should not get unique violation, never insert twice */);
sql_print_error("InnoDB: Could not initialize in-memory structure for "
"binlog tablespace file number %" PRIu64 ", %s", file_no,
(res < 0 ? "out of memory" : "internal error"));
err= true;
}
if (!in_pins)
lf_hash_put_pins(pins);
return err;
}
void void
binlog_write_up_to_now() noexcept binlog_write_up_to_now() noexcept
{ {
@@ -789,6 +1012,8 @@ fsp_binlog_extract_header_page(const byte *page_buf,
out_header_data-> page_count= uint8korr(page_buf + 24); out_header_data-> page_count= uint8korr(page_buf + 24);
out_header_data-> start_lsn= uint8korr(page_buf + 32); out_header_data-> start_lsn= uint8korr(page_buf + 32);
out_header_data-> diff_state_interval= uint8korr(page_buf + 40); out_header_data-> diff_state_interval= uint8korr(page_buf + 40);
out_header_data->oob_ref_file_no= uint8korr(page_buf + 48);
out_header_data->xa_ref_file_no= uint8korr(page_buf + 56);
} }
@@ -839,6 +1064,7 @@ fsp_binlog_init()
{ {
mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr); mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr);
pthread_cond_init(&active_binlog_cond, nullptr); pthread_cond_init(&active_binlog_cond, nullptr);
ibb_file_hash.init();
binlog_page_fifo= new fsp_binlog_page_fifo(); binlog_page_fifo= new fsp_binlog_page_fifo();
binlog_page_fifo->start_flush_thread(); binlog_page_fifo->start_flush_thread();
} }
@@ -849,6 +1075,7 @@ fsp_binlog_shutdown()
{ {
binlog_page_fifo->stop_flush_thread(); binlog_page_fifo->stop_flush_thread();
delete binlog_page_fifo; delete binlog_page_fifo;
ibb_file_hash.destroy();
pthread_cond_destroy(&active_binlog_cond); pthread_cond_destroy(&active_binlog_cond);
mysql_mutex_destroy(&active_binlog_mutex); mysql_mutex_destroy(&active_binlog_mutex);
} }
@@ -927,7 +1154,8 @@ fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
/** Create a binlog tablespace file /** Create a binlog tablespace file
@param[in] file_no Index of the binlog tablespace @param[in] file_no Index of the binlog tablespace
@return DB_SUCCESS or error code */ @return DB_SUCCESS or error code */
dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages,
LF_PINS *pins)
{ {
pfs_os_file_t fh; pfs_os_file_t fh;
bool ret; bool ret;
@@ -962,6 +1190,13 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages)
return DB_ERROR; return DB_ERROR;
} }
/*
Enter an initial entry in the hash for this binlog tablespace file.
It will be later updated with the appropriate values when the file
first gets used and the header page is written.
*/
ibb_record_in_file_hash(file_no, ~(uint64_t)0,~(uint64_t)0, pins);
binlog_page_fifo->create_tablespace(file_no, size_in_pages); binlog_page_fifo->create_tablespace(file_no, size_in_pages);
os_file_close(fh); os_file_close(fh);
@@ -979,7 +1214,8 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages)
GTID state record, used for FLUSH BINARY LOGS. GTID state record, used for FLUSH BINARY LOGS.
*/ */
std::pair<uint64_t, uint64_t> std::pair<uint64_t, uint64_t>
fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type,
LF_PINS *pins)
{ {
uint32_t page_size= (uint32_t)ibb_page_size; uint32_t page_size= (uint32_t)ibb_page_size;
uint32_t page_size_shift= ibb_page_size_shift; uint32_t page_size_shift= ibb_page_size_shift;
@@ -1038,9 +1274,16 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
/* Write the header page at the start of a binlog tablespace file. */ /* Write the header page at the start of a binlog tablespace file. */
if (page_no == 0) if (page_no == 0)
{ {
/* Active is moving to next file, so check if oob refcount of previous
file is zero.
*/
if (UNIV_LIKELY(file_no > 0))
ibb_file_hash.do_zero_refcnt_action(file_no - 1, pins, true);
lsn_t start_lsn= log_sys.get_lsn(std::memory_order_acquire); lsn_t start_lsn= log_sys.get_lsn(std::memory_order_acquire);
bool err= ibb_write_header_page(mtr, file_no, file_size_in_pages, bool err= ibb_write_header_page(mtr, file_no, file_size_in_pages,
start_lsn, current_binlog_state_interval); start_lsn,
current_binlog_state_interval, pins);
ut_a(!err /* ToDo error handling */); ut_a(!err /* ToDo error handling */);
page_no= 1; page_no= 1;
} }
@@ -1244,6 +1487,8 @@ fsp_binlog_flush()
my_sync(fh, MYF(0)); my_sync(fh, MYF(0));
binlog_page_fifo->unlock(); binlog_page_fifo->unlock();
LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash);
ut_a(lf_pins);
uint32_t page_offset= binlog_cur_page_offset; uint32_t page_offset= binlog_cur_page_offset;
if (page_offset > BINLOG_PAGE_DATA || if (page_offset > BINLOG_PAGE_DATA ||
page_offset < ibb_page_size - BINLOG_PAGE_DATA_END) page_offset < ibb_page_size - BINLOG_PAGE_DATA_END)
@@ -1254,7 +1499,7 @@ fsp_binlog_flush()
end-of-file of the entire binlog. end-of-file of the entire binlog.
*/ */
mtr.start(); mtr.start();
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY); fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY, lf_pins);
mtr.commit(); mtr.commit();
} }
@@ -1280,8 +1525,9 @@ fsp_binlog_flush()
persisted across a server restart. persisted across a server restart.
*/ */
mtr.start(); mtr.start();
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER); fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins);
mtr.commit(); mtr.commit();
lf_hash_put_pins(lf_pins);
log_buffer_flush_to_disk(srv_flush_log_at_trx_commit & 1); log_buffer_flush_to_disk(srv_flush_log_at_trx_commit & 1);
return false; return false;

View File

@@ -3759,10 +3759,10 @@ static int innodb_init_params()
if (innodb_binlog_state_interval == 0 || if (innodb_binlog_state_interval == 0 ||
innodb_binlog_state_interval != innodb_binlog_state_interval !=
(ulonglong)1 << (63 - nlz(innodb_binlog_state_interval)) || (ulonglong)1 << (63 - nlz(innodb_binlog_state_interval)) ||
innodb_binlog_state_interval % (ulonglong)srv_page_size) { innodb_binlog_state_interval % (ulonglong)ibb_page_size) {
ib::error() << "innodb_binlog_state_interval must be a " ib::error() << "innodb_binlog_state_interval must be a "
"power-of-two multiple of the innodb_page_size=" "power-of-two multiple of the innodb binlog page size="
<< srv_page_size; << ibb_page_size;
DBUG_RETURN(HA_ERR_INITIALIZATION); DBUG_RETURN(HA_ERR_INITIALIZATION);
} }
@@ -4132,6 +4132,7 @@ static int innodb_init(void* p)
innobase_hton->binlog_init= innodb_binlog_init; innobase_hton->binlog_init= innodb_binlog_init;
innobase_hton->binlog_write_direct= innobase_binlog_write_direct; innobase_hton->binlog_write_direct= innobase_binlog_write_direct;
innobase_hton->binlog_oob_data= innodb_binlog_oob; innobase_hton->binlog_oob_data= innodb_binlog_oob;
innobase_hton->binlog_oob_reset= innodb_reset_oob;
innobase_hton->binlog_oob_free= innodb_free_oob; innobase_hton->binlog_oob_free= innodb_free_oob;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innobase_hton->get_binlog_reader= innodb_get_binlog_reader;
innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list; innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list;
@@ -19931,9 +19932,10 @@ static MYSQL_SYSVAR_ULONGLONG(binlog_state_interval,
innodb_binlog_state_interval, innodb_binlog_state_interval,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Interval (in bytes) at which to write the GTID binlog state to binlog " "Interval (in bytes) at which to write the GTID binlog state to binlog "
"files to speed up GTID lookups. Must be a multiple of innodb_page_size", "files to speed up GTID lookups. Must be a multiple of the binlog page "
"size (4096 bytes)",
NULL, NULL, 2*1024*1024, NULL, NULL, 2*1024*1024,
UNIV_PAGE_SIZE_MAX, ULONGLONG_MAX, 0); 8192, ULONGLONG_MAX, 0);
static struct st_mysql_sys_var* innobase_system_variables[]= { static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(autoextend_increment), MYSQL_SYSVAR(autoextend_increment),

View File

@@ -117,12 +117,18 @@ struct binlog_oob_context {
bool binlog_node(uint32_t node, uint64_t new_idx, bool binlog_node(uint32_t node, uint64_t new_idx,
uint32_t left_node, uint32_t right_node, uint32_t left_node, uint32_t right_node,
chunk_data_oob *oob_data); chunk_data_oob *oob_data, LF_PINS *pins);
uint64_t first_node_file_no; uint64_t first_node_file_no;
uint64_t first_node_offset; uint64_t first_node_offset;
LF_PINS *lf_pins;
uint32_t node_list_len; uint32_t node_list_len;
uint32_t node_list_alloc_len; uint32_t node_list_alloc_len;
/*
Set if we incremented refcount in first_node_file_no, so we need to
decrement again at commit record write or reset/rollback.
*/
bool pending_refcount;
/* /*
The node_list contains the root of each tree in the forest of perfect The node_list contains the root of each tree in the forest of perfect
binary trees. binary trees.
@@ -249,6 +255,7 @@ public:
struct chunk_data_cache : public chunk_data_base { struct chunk_data_cache : public chunk_data_base {
IO_CACHE *cache; IO_CACHE *cache;
binlog_oob_context *oob_ctx;
size_t main_remain; size_t main_remain;
size_t gtid_remain; size_t gtid_remain;
uint32_t header_remain; uint32_t header_remain;
@@ -270,6 +277,8 @@ struct chunk_data_cache : public chunk_data_base {
binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr; binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr;
unsigned char *p; unsigned char *p;
ut_ad(c);
oob_ctx= c;
if (c && c->node_list_len) if (c && c->node_list_len)
{ {
/* /*
@@ -321,6 +330,13 @@ struct chunk_data_cache : public chunk_data_base {
virtual std::pair<uint32_t, bool> copy_data(byte *p, uint32_t max_len) final virtual std::pair<uint32_t, bool> copy_data(byte *p, uint32_t max_len) final
{ {
uint32_t size= 0; uint32_t size= 0;
if (UNIV_LIKELY(oob_ctx != nullptr) && oob_ctx->pending_refcount)
{
ibb_file_hash.oob_ref_dec(oob_ctx->first_node_file_no, oob_ctx->lf_pins);
oob_ctx->pending_refcount= false;
}
/* Write header data, if any still available. */ /* Write header data, if any still available. */
if (header_remain > 0) if (header_remain > 0)
{ {
@@ -505,7 +521,7 @@ static int scan_for_binlogs(const char *binlog_dir, found_binlogs *binlog_files,
bool error_if_missing) noexcept; bool error_if_missing) noexcept;
static int innodb_binlog_discover(); static int innodb_binlog_discover();
static bool binlog_state_recover(); static bool binlog_state_recover();
static void innodb_binlog_autopurge(uint64_t first_open_file_no); static void innodb_binlog_autopurge(uint64_t first_open_file_no, LF_PINS *pins);
static int read_gtid_state_from_page(rpl_binlog_state_base *state, static int read_gtid_state_from_page(rpl_binlog_state_base *state,
const byte *page, uint32_t page_no) const byte *page, uint32_t page_no)
noexcept; noexcept;
@@ -1144,6 +1160,7 @@ innodb_binlog_init_state()
earliest_binlog_file_no= ~(uint64_t)0; earliest_binlog_file_no= ~(uint64_t)0;
total_binlog_used_size= 0; total_binlog_used_size= 0;
active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release); active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed);
binlog_cur_page_no= 0; binlog_cur_page_no= 0;
binlog_cur_page_offset= BINLOG_PAGE_DATA; binlog_cur_page_offset= BINLOG_PAGE_DATA;
current_binlog_state_interval= current_binlog_state_interval=
@@ -1184,9 +1201,12 @@ binlog_sync_initial()
{ {
chunk_data_flush dummy_data; chunk_data_flush dummy_data;
mtr_t mtr; mtr_t mtr;
LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash);
ut_a(lf_pins);
mtr.start(); mtr.start();
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER); fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins);
mtr.commit(); mtr.commit();
lf_hash_put_pins(lf_pins);
log_buffer_flush_to_disk(true); log_buffer_flush_to_disk(true);
binlog_page_fifo->flush_up_to(0, 0); binlog_page_fifo->flush_up_to(0, 0);
binlog_page_fifo->do_fdatasync(0); binlog_page_fifo->do_fdatasync(0);
@@ -1342,7 +1362,7 @@ binlog_page_empty(const byte *page)
static int static int
find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
uint32_t *out_page_no, uint32_t *out_pos_in_page, uint32_t *out_page_no, uint32_t *out_pos_in_page,
uint64_t *out_state_interval) binlog_header_data *out_header_data)
{ {
const uint32_t page_size= (uint32_t)ibb_page_size; const uint32_t page_size= (uint32_t)ibb_page_size;
const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift; const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift;
@@ -1351,11 +1371,11 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
uint32_t p_0, p_1, p_2, last_nonempty; uint32_t p_0, p_1, p_2, last_nonempty;
byte *p, *page_end; byte *p, *page_end;
bool ret; bool ret;
binlog_header_data header_data;
*out_page_no= 0; *out_page_no= 0;
*out_pos_in_page= BINLOG_PAGE_DATA; *out_pos_in_page= BINLOG_PAGE_DATA;
*out_state_interval= 0; out_header_data->diff_state_interval= 0;
out_header_data->is_invalid= true;
binlog_name_make(file_name, file_no); binlog_name_make(file_name, file_no);
pfs_os_file_t fh= os_file_create(innodb_data_file_key, file_name, pfs_os_file_t fh= os_file_create(innodb_data_file_key, file_name,
@@ -1371,27 +1391,27 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
os_file_close(fh); os_file_close(fh);
return -1; return -1;
} }
fsp_binlog_extract_header_page(page_buf, &header_data); fsp_binlog_extract_header_page(page_buf, out_header_data);
if (header_data.is_invalid) if (out_header_data->is_invalid)
{ {
sql_print_error("InnoDB: Invalid or corrupt file header in file " sql_print_error("InnoDB: Invalid or corrupt file header in file "
"'%s'", file_name); "'%s'", file_name);
return -1; return -1;
} }
if (header_data.is_empty) { if (out_header_data->is_empty) {
ret= ret=
fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr); fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr);
binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed);
return (ret ? -1 : 0); return (ret ? -1 : 0);
} }
if (header_data.file_no != file_no) if (out_header_data->file_no != file_no)
{ {
sql_print_error("InnoDB: Inconsistent file header in file '%s', " sql_print_error("InnoDB: Inconsistent file header in file '%s', "
"wrong file_no %" PRIu64, file_name, header_data.file_no); "wrong file_no %" PRIu64, file_name,
out_header_data->file_no);
return -1; return -1;
} }
*out_state_interval= header_data.diff_state_interval;
last_nonempty= 0; last_nonempty= 0;
/* /*
@@ -1471,7 +1491,7 @@ innodb_binlog_discover()
const uint32_t page_size= (uint32_t)ibb_page_size; const uint32_t page_size= (uint32_t)ibb_page_size;
const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift; const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift;
struct found_binlogs binlog_files; struct found_binlogs binlog_files;
uint64_t diff_state_interval; binlog_header_data header;
int res= scan_for_binlogs(innodb_binlog_directory, &binlog_files, false); int res= scan_for_binlogs(innodb_binlog_directory, &binlog_files, false);
if (res <= 0) if (res <= 0)
@@ -1494,10 +1514,13 @@ innodb_binlog_discover()
res= find_pos_in_binlog(binlog_files.last_file_no, res= find_pos_in_binlog(binlog_files.last_file_no,
binlog_files.last_size, binlog_files.last_size,
page_buf.get(), &page_no, &pos_in_page, page_buf.get(), &page_no, &pos_in_page,
&diff_state_interval); &header);
if (res < 0) { if (res < 0) {
file_no= binlog_files.last_file_no; file_no= binlog_files.last_file_no;
if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release); active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= innodb_binlog_state_interval; current_binlog_state_interval= innodb_binlog_state_interval;
sql_print_warning("Binlog number %llu could no be opened. Starting a new " sql_print_warning("Binlog number %llu could no be opened. Starting a new "
"binlog file from number %llu", "binlog file from number %llu",
@@ -1508,8 +1531,12 @@ innodb_binlog_discover()
if (res > 0) { if (res > 0) {
/* Found start position in the last binlog file. */ /* Found start position in the last binlog file. */
file_no= binlog_files.last_file_no; file_no= binlog_files.last_file_no;
if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no,
header.xa_ref_file_no))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release); active_binlog_file_no.store(file_no, std::memory_order_release);
current_binlog_state_interval= diff_state_interval; ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= header.diff_state_interval;
binlog_cur_page_no= page_no; binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page; binlog_cur_page_offset= pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position " ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -1525,10 +1552,13 @@ innodb_binlog_discover()
binlog_files.prev_size, binlog_files.prev_size,
page_buf.get(), page_buf.get(),
&prev_page_no, &prev_pos_in_page, &prev_page_no, &prev_pos_in_page,
&diff_state_interval); &header);
if (res < 0) { if (res < 0) {
file_no= binlog_files.last_file_no; file_no= binlog_files.last_file_no;
if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release); active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= innodb_binlog_state_interval; current_binlog_state_interval= innodb_binlog_state_interval;
binlog_cur_page_no= page_no; binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page; binlog_cur_page_offset= pos_in_page;
@@ -1538,8 +1568,12 @@ innodb_binlog_discover()
return 1; return 1;
} }
file_no= binlog_files.prev_file_no; file_no= binlog_files.prev_file_no;
if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no,
header.xa_ref_file_no))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release); active_binlog_file_no.store(file_no, std::memory_order_release);
current_binlog_state_interval= diff_state_interval; ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= header.diff_state_interval;
binlog_cur_page_no= prev_page_no; binlog_cur_page_no= prev_page_no;
binlog_cur_page_offset= prev_pos_in_page; binlog_cur_page_offset= prev_pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position " ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -1551,7 +1585,10 @@ innodb_binlog_discover()
/* Just one empty binlog file found. */ /* Just one empty binlog file found. */
file_no= binlog_files.last_file_no; file_no= binlog_files.last_file_no;
if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release); active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= innodb_binlog_state_interval; current_binlog_state_interval= innodb_binlog_state_interval;
binlog_cur_page_no= page_no; binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page; binlog_cur_page_offset= pos_in_page;
@@ -1563,6 +1600,7 @@ innodb_binlog_discover()
/* No binlog files found, start from scratch. */ /* No binlog files found, start from scratch. */
file_no= 0; file_no= 0;
earliest_binlog_file_no= 0; earliest_binlog_file_no= 0;
ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed);
total_binlog_used_size= 0; total_binlog_used_size= 0;
current_binlog_state_interval= innodb_binlog_state_interval; current_binlog_state_interval= innodb_binlog_state_interval;
ib::info() << "Starting a new binlog from file number " << file_no << "."; ib::info() << "Starting a new binlog from file number " << file_no << ".";
@@ -1612,6 +1650,8 @@ innodb_binlog_prealloc_thread()
#ifdef UNIV_PFS_THREAD #ifdef UNIV_PFS_THREAD
pfs_register_thread(binlog_prealloc_thread_key); pfs_register_thread(binlog_prealloc_thread_key);
#endif #endif
LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash);
ut_a(lf_pins);
mysql_mutex_lock(&active_binlog_mutex); mysql_mutex_lock(&active_binlog_mutex);
while (1) while (1)
@@ -1633,12 +1673,13 @@ innodb_binlog_prealloc_thread()
mysql_mutex_lock(&purge_binlog_mutex); mysql_mutex_lock(&purge_binlog_mutex);
uint32_t size_in_pages= innodb_binlog_size_in_pages; uint32_t size_in_pages= innodb_binlog_size_in_pages;
dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages); dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages,
lf_pins);
if (earliest_binlog_file_no == ~(uint64_t)0) if (earliest_binlog_file_no == ~(uint64_t)0)
earliest_binlog_file_no= last_created; earliest_binlog_file_no= last_created;
total_binlog_used_size+= (size_in_pages << ibb_page_size_shift); total_binlog_used_size+= (size_in_pages << ibb_page_size_shift);
innodb_binlog_autopurge(first_open); innodb_binlog_autopurge(first_open, lf_pins);
mysql_mutex_unlock(&purge_binlog_mutex); mysql_mutex_unlock(&purge_binlog_mutex);
mysql_mutex_lock(&active_binlog_mutex); mysql_mutex_lock(&active_binlog_mutex);
@@ -1649,6 +1690,8 @@ innodb_binlog_prealloc_thread()
ut_ad(active < ~(uint64_t)0 || last_created == 0); ut_ad(active < ~(uint64_t)0 || last_created == 0);
if (active == ~(uint64_t)0) { if (active == ~(uint64_t)0) {
active_binlog_file_no.store(last_created, std::memory_order_relaxed); active_binlog_file_no.store(last_created, std::memory_order_relaxed);
ibb_file_hash.earliest_oob_ref.store(last_created,
std::memory_order_relaxed);
} }
if (first_open == ~(uint64_t)0) if (first_open == ~(uint64_t)0)
first_open_binlog_file_no= first_open= last_created; first_open_binlog_file_no= first_open= last_created;
@@ -1680,6 +1723,7 @@ innodb_binlog_prealloc_thread()
} }
mysql_mutex_unlock(&active_binlog_mutex); mysql_mutex_unlock(&active_binlog_mutex);
lf_hash_put_pins(lf_pins);
my_thread_end(); my_thread_end();
#ifdef UNIV_PFS_THREAD #ifdef UNIV_PFS_THREAD
@@ -1690,7 +1734,8 @@ innodb_binlog_prealloc_thread()
bool bool
ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages, ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages,
lsn_t start_lsn, uint64_t gtid_state_interval_in_pages) lsn_t start_lsn, uint64_t gtid_state_interval_in_pages,
LF_PINS *pins)
{ {
fsp_binlog_page_entry *block; fsp_binlog_page_entry *block;
uint32_t used_bytes; uint32_t used_bytes;
@@ -1698,6 +1743,11 @@ ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages,
block= binlog_page_fifo->create_page(file_no, 0); block= binlog_page_fifo->create_page(file_no, 0);
ut_a(block /* ToDo: error handling? */); ut_a(block /* ToDo: error handling? */);
byte *ptr= &block->page_buf[0]; byte *ptr= &block->page_buf[0];
uint64_t oob_ref_file_no=
ibb_file_hash.earliest_oob_ref.load(std::memory_order_relaxed);
uint64_t xa_ref_file_no=
ibb_file_hash.earliest_xa_ref.load(std::memory_order_relaxed);
ibb_file_hash.update_refs(file_no, pins, oob_ref_file_no, xa_ref_file_no);
int4store(ptr, IBB_MAGIC); int4store(ptr, IBB_MAGIC);
int4store(ptr + 4, ibb_page_size_shift); int4store(ptr + 4, ibb_page_size_shift);
@@ -1707,7 +1757,9 @@ ibb_write_header_page(mtr_t *mtr, uint64_t file_no, uint64_t file_size_in_pages,
int8store(ptr + 24, file_size_in_pages); int8store(ptr + 24, file_size_in_pages);
int8store(ptr + 32, start_lsn); int8store(ptr + 32, start_lsn);
int8store(ptr + 40, gtid_state_interval_in_pages); int8store(ptr + 40, gtid_state_interval_in_pages);
used_bytes= 48; int8store(ptr + 48, oob_ref_file_no);
int8store(ptr + 56, xa_ref_file_no);
used_bytes= IBB_BINLOG_HEADER_SIZE;
ut_ad(ibb_page_size >= IBB_HEADER_PAGE_SIZE); ut_ad(ibb_page_size >= IBB_HEADER_PAGE_SIZE);
memset(ptr + used_bytes, 0, ibb_page_size - (used_bytes + BINLOG_PAGE_CHECKSUM)); memset(ptr + used_bytes, 0, ibb_page_size - (used_bytes + BINLOG_PAGE_CHECKSUM));
/* /*
@@ -2009,18 +2061,9 @@ binlog_state_recover()
} }
static void
innodb_binlog_write_cache(IO_CACHE *cache,
handler_binlog_event_group_info *binlog_info, mtr_t *mtr)
{
chunk_data_cache chunk_data(cache, binlog_info);
fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT);
}
/* Allocate a context for out-of-band binlogging. */ /* Allocate a context for out-of-band binlogging. */
static binlog_oob_context * static binlog_oob_context *
alloc_oob_context(uint32 list_length) alloc_oob_context(uint32 list_length= 10)
{ {
size_t needed= sizeof(binlog_oob_context) + size_t needed= sizeof(binlog_oob_context) +
list_length * sizeof(binlog_oob_context::node_info); list_length * sizeof(binlog_oob_context::node_info);
@@ -2028,8 +2071,15 @@ alloc_oob_context(uint32 list_length)
(binlog_oob_context *) ut_malloc(needed, mem_key_binlog); (binlog_oob_context *) ut_malloc(needed, mem_key_binlog);
if (c) if (c)
{ {
if (!(c->lf_pins= lf_hash_get_pins(&ibb_file_hash.hash)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
ut_free(c);
return nullptr;
}
c->node_list_alloc_len= list_length; c->node_list_alloc_len= list_length;
c->node_list_len= 0; c->node_list_len= 0;
c->pending_refcount= false;
} }
else else
my_error(ER_OUTOFMEMORY, MYF(0), needed); my_error(ER_OUTOFMEMORY, MYF(0), needed);
@@ -2038,9 +2088,38 @@ alloc_oob_context(uint32 list_length)
} }
static void
innodb_binlog_write_cache(IO_CACHE *cache,
handler_binlog_event_group_info *binlog_info, mtr_t *mtr)
{
binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr;
if (!c)
binlog_info->engine_ptr= c= alloc_oob_context();
ut_a(c);
chunk_data_cache chunk_data(cache, binlog_info);
fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT, c->lf_pins);
}
static inline void
reset_oob_context(binlog_oob_context *c)
{
if (c->pending_refcount)
{
ibb_file_hash.oob_ref_dec(c->first_node_file_no, c->lf_pins);
c->pending_refcount= false;
}
c->node_list_len= 0;
}
static inline void static inline void
free_oob_context(binlog_oob_context *c) free_oob_context(binlog_oob_context *c)
{ {
ut_ad(!c->pending_refcount /* Should not have pending until free */);
reset_oob_context(c); /* Defensive programming, should be redundant */
lf_hash_put_pins(c->lf_pins);
ut_free(c); ut_free(c);
} }
@@ -2060,7 +2139,7 @@ ensure_oob_context(void **engine_data, uint32_t needed_len)
needed_len*sizeof(binlog_oob_context::node_info)); needed_len*sizeof(binlog_oob_context::node_info));
new_c->node_list_alloc_len= needed_len; new_c->node_list_alloc_len= needed_len;
*engine_data= new_c; *engine_data= new_c;
free_oob_context(c); ut_free(c);
return new_c; return new_c;
} }
@@ -2130,7 +2209,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
{ {
binlog_oob_context *c= (binlog_oob_context *)*engine_data; binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (!c) if (!c)
*engine_data= c= alloc_oob_context(10); *engine_data= c= alloc_oob_context();
if (UNIV_UNLIKELY(!c)) if (UNIV_UNLIKELY(!c))
return true; return true;
@@ -2144,7 +2223,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
c->node_list[i-2].file_no, c->node_list[i-2].offset, c->node_list[i-2].file_no, c->node_list[i-2].offset,
c->node_list[i-1].file_no, c->node_list[i-1].offset, c->node_list[i-1].file_no, c->node_list[i-1].offset,
(byte *)data, data_len); (byte *)data, data_len);
if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data)) if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data, c->lf_pins))
return true; return true;
c->node_list_len= i - 1; c->node_list_len= i - 1;
} }
@@ -2159,7 +2238,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
0, 0, /* NULL left child signifies a leaf */ 0, 0, /* NULL left child signifies a leaf */
c->node_list[i-1].file_no, c->node_list[i-1].offset, c->node_list[i-1].file_no, c->node_list[i-1].offset,
(byte *)data, data_len); (byte *)data, data_len);
if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data)) if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data, c->lf_pins))
return true; return true;
c->node_list_len= i + 1; c->node_list_len= i + 1;
} }
@@ -2168,11 +2247,14 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
/* Special case i==0, like case 2 but no prior node to link to. */ /* Special case i==0, like case 2 but no prior node to link to. */
binlog_oob_context::chunk_data_oob oob_data binlog_oob_context::chunk_data_oob oob_data
(new_idx, 0, 0, 0, 0, (byte *)data, data_len); (new_idx, 0, 0, 0, 0, (byte *)data, data_len);
if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data)) if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data,
c->lf_pins))
return true; return true;
c->first_node_file_no= c->node_list[i].file_no; c->first_node_file_no= c->node_list[i].file_no;
c->first_node_offset= c->node_list[i].offset; c->first_node_offset= c->node_list[i].offset;
c->node_list_len= 1; c->node_list_len= 1;
c->pending_refcount=
ibb_file_hash.oob_ref_inc(c->first_node_file_no, c->lf_pins);
} }
return false; return false;
@@ -2187,14 +2269,14 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
bool bool
binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx, binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx,
uint32_t left_node, uint32_t right_node, uint32_t left_node, uint32_t right_node,
chunk_data_oob *oob_data) chunk_data_oob *oob_data, LF_PINS *pins)
{ {
uint32_t new_height= uint32_t new_height=
left_node == right_node ? 1 : 1 + node_list[left_node].height; left_node == right_node ? 1 : 1 + node_list[left_node].height;
mtr_t mtr; mtr_t mtr;
mtr.start(); mtr.start();
std::pair<uint64_t, uint64_t> new_file_no_offset= std::pair<uint64_t, uint64_t> new_file_no_offset=
fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA); fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA, pins);
mtr.commit(); mtr.commit();
node_list[node].file_no= new_file_no_offset.first; node_list[node].file_no= new_file_no_offset.first;
node_list[node].offset= new_file_no_offset.second; node_list[node].offset= new_file_no_offset.second;
@@ -2249,6 +2331,15 @@ binlog_oob_context::chunk_data_oob::copy_data(byte *p, uint32_t max_len)
} }
void
innodb_reset_oob(THD *thd, void **engine_data)
{
binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (c)
reset_oob_context(c);
}
void void
innodb_free_oob(THD *thd, void *engine_data) innodb_free_oob(THD *thd, void *engine_data)
{ {
@@ -2700,6 +2791,7 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
if (page_no > (end_offset >> ibb_page_size_shift)) if (page_no > (end_offset >> ibb_page_size_shift))
{ {
ut_ad(!block); ut_ad(!block);
if (file_no == active)
return READ_NOT_FOUND; return READ_NOT_FOUND;
} }
} }
@@ -3055,7 +3147,8 @@ bool
innodb_reset_binlogs() innodb_reset_binlogs()
{ {
bool err= false; bool err= false;
LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash);
ut_a(lf_pins);
ut_a(innodb_binlog_inited >= 2); ut_a(innodb_binlog_inited >= 2);
/* Close existing binlog tablespaces and stop the pre-alloc thread. */ /* Close existing binlog tablespaces and stop the pre-alloc thread. */
@@ -3074,6 +3167,8 @@ innodb_reset_binlogs()
binlog_page_fifo->lock_wait_for_idle(); binlog_page_fifo->lock_wait_for_idle();
binlog_page_fifo->reset(); binlog_page_fifo->reset();
ibb_file_hash.remove_up_to(last_created_binlog_file_no, lf_pins);
/* Delete all binlog files in the directory. */ /* Delete all binlog files in the directory. */
MY_DIR *dir= my_dir(innodb_binlog_directory, MYF(MY_WME)); MY_DIR *dir= my_dir(innodb_binlog_directory, MYF(MY_WME));
if (!dir) if (!dir)
@@ -3095,6 +3190,12 @@ innodb_reset_binlogs()
binlog_name_make(full_path, file_no); binlog_name_make(full_path, file_no);
if (my_delete(full_path, MYF(MY_WME))) if (my_delete(full_path, MYF(MY_WME)))
err= true; err= true;
/*
Just as defensive coding, also remove any entry from the file hash
with this file_no. We would expect to have already deleted everything
in remove_up_to() above.
*/
ibb_file_hash.remove(file_no, lf_pins);
} }
my_dirend(dir); my_dirend(dir);
} }
@@ -3110,10 +3211,82 @@ innodb_reset_binlogs()
start_binlog_prealloc_thread(); start_binlog_prealloc_thread();
binlog_sync_initial(); binlog_sync_initial();
lf_hash_put_pins(lf_pins);
return err; return err;
} }
/*
Given a limit_file_no that is still needed by a slave (dump thread).
The dump thread will need to read any oob records references from event
groups in that file_no, so it will then also need to read from any earlier
file_no referenced from limit_file_no.
This function handles this dependency, by reading the header page (or
getting from the ibb_file_hash if available) to get any earlier file_no
containing such references.
*/
static bool
purge_adjust_limit_file_no(handler_binlog_purge_info *purge_info, LF_PINS *pins)
{
uint64_t limit_file_no= purge_info->limit_file_no;
if (limit_file_no == ~(uint64_t)0)
return false;
uint64_t referenced_file_no;
if (ibb_file_hash.get_oob_ref_file_no(limit_file_no, pins,
&referenced_file_no))
{
if (referenced_file_no < limit_file_no)
purge_info->limit_file_no= referenced_file_no;
else
ut_ad(referenced_file_no == limit_file_no ||
referenced_file_no == ~(uint64_t)0);
return false;
}
byte *page_buf= (byte *)ut_malloc(ibb_page_size, mem_key_binlog);
if (!page_buf)
{
my_error(ER_OUTOFMEMORY, MYF(0), ibb_page_size);
return true;
}
char filename[OS_FILE_MAX_PATH];
binlog_name_make(filename, limit_file_no);
File fh= my_open(filename, O_RDONLY | O_BINARY, MYF(0));
if (fh < (File)0)
{
my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno);
ut_free(page_buf);
return true;
}
int res= crc32_pread_page(fh, page_buf, 0, MYF(0));
my_close(fh, MYF(0));
if (res <= 0)
{
ut_free(page_buf);
my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno);
return true;
}
binlog_header_data header;
fsp_binlog_extract_header_page(page_buf, &header);
ut_free(page_buf);
if (header.is_invalid || header.is_empty)
{
my_error(ER_ERROR_ON_READ, MYF(0), filename, my_errno);
return true;
}
if (header.oob_ref_file_no < limit_file_no)
purge_info->limit_file_no= header.oob_ref_file_no;
else
ut_ad(header.oob_ref_file_no == limit_file_no ||
header.oob_ref_file_no == ~(uint64_t)0);
ibb_record_in_file_hash(limit_file_no, header.oob_ref_file_no,
header.xa_ref_file_no, pins);
return false;
}
/* /*
The low-level function handling binlog purge. The low-level function handling binlog purge.
@@ -3142,7 +3315,8 @@ innodb_reset_binlogs()
*/ */
static int static int
innodb_binlog_purge_low(handler_binlog_purge_info *purge_info, innodb_binlog_purge_low(handler_binlog_purge_info *purge_info,
uint64_t limit_name_file_no, uint64_t *out_file_no) uint64_t limit_name_file_no, LF_PINS *lf_pins,
uint64_t *out_file_no)
noexcept noexcept
{ {
uint64_t limit_file_no= purge_info->limit_file_no; uint64_t limit_file_no= purge_info->limit_file_no;
@@ -3211,6 +3385,7 @@ innodb_binlog_purge_low(handler_binlog_purge_info *purge_info,
need_active_flush= false; need_active_flush= false;
} }
ibb_file_hash.remove(file_no, lf_pins);
if (my_delete(filename, MYF(0))) if (my_delete(filename, MYF(0)))
{ {
if (my_errno == ENOENT) if (my_errno == ENOENT)
@@ -3235,7 +3410,7 @@ innodb_binlog_purge_low(handler_binlog_purge_info *purge_info,
static void static void
innodb_binlog_autopurge(uint64_t first_open_file_no) innodb_binlog_autopurge(uint64_t first_open_file_no, LF_PINS *pins)
{ {
handler_binlog_purge_info purge_info; handler_binlog_purge_info purge_info;
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
@@ -3249,11 +3424,8 @@ innodb_binlog_autopurge(uint64_t first_open_file_no)
!(purge_info.purge_by_size || purge_info.purge_by_date)) !(purge_info.purge_by_size || purge_info.purge_by_date))
return; return;
/* if (purge_adjust_limit_file_no(&purge_info, pins))
ToDo: Here, we need to move back the purge_info.limit_file_no to the return;
earliest file containing any oob data referenced from the supplied
purge_info.limit_file_no.
*/
/* Don't purge any actively open tablespace files. */ /* Don't purge any actively open tablespace files. */
uint64_t orig_limit_file_no= purge_info.limit_file_no; uint64_t orig_limit_file_no= purge_info.limit_file_no;
@@ -3266,7 +3438,7 @@ innodb_binlog_autopurge(uint64_t first_open_file_no)
purge_info.purge_by_name= false; purge_info.purge_by_name= false;
uint64_t file_no; uint64_t file_no;
int res= innodb_binlog_purge_low(&purge_info, 0, &file_no); int res= innodb_binlog_purge_low(&purge_info, 0, pins, &file_no);
if (res) if (res)
{ {
if (!purge_warning_given) if (!purge_warning_given)
@@ -3327,13 +3499,23 @@ innodb_binlog_purge(handler_binlog_purge_info *purge_info)
return LOG_INFO_EOF; return LOG_INFO_EOF;
} }
LF_PINS *lf_pins= lf_hash_get_pins(&ibb_file_hash.hash);
ut_a(lf_pins);
if (purge_adjust_limit_file_no(purge_info, lf_pins))
{
lf_hash_put_pins(lf_pins);
return LOG_INFO_IO;
}
uint64_t orig_limit_file_no= purge_info->limit_file_no; uint64_t orig_limit_file_no= purge_info->limit_file_no;
purge_info->limit_file_no= std::min(orig_limit_file_no, limit_file_no); purge_info->limit_file_no= std::min(orig_limit_file_no, limit_file_no);
mysql_mutex_lock(&purge_binlog_mutex); mysql_mutex_lock(&purge_binlog_mutex);
uint64_t file_no; uint64_t file_no;
int res= innodb_binlog_purge_low(purge_info, to_file_no, &file_no); int res= innodb_binlog_purge_low(purge_info, to_file_no, lf_pins, &file_no);
mysql_mutex_unlock(&purge_binlog_mutex); mysql_mutex_unlock(&purge_binlog_mutex);
lf_hash_put_pins(lf_pins);
if (res == 1) if (res == 1)
{ {
static_assert(sizeof(purge_info->nonpurge_filename) >= BINLOG_NAME_MAX_LEN, static_assert(sizeof(purge_info->nonpurge_filename) >= BINLOG_NAME_MAX_LEN,

View File

@@ -27,6 +27,8 @@ InnoDB implementation of binlog.
#include <utility> #include <utility>
#include <atomic> #include <atomic>
#include "lf.h"
#include "univ.i" #include "univ.i"
#include "mtr0mtr.h" #include "mtr0mtr.h"
@@ -203,6 +205,85 @@ public:
}; };
/* Structure of an entry in the hash of binlog tablespace files. */
struct ibb_tblspc_entry {
uint64_t file_no;
/*
Active transactions/oob-event-groups that start in this binlog tablespace
file (including any user XA).
*/
std::atomic<uint64_t>oob_refs;
/*
Active XA transactions whose oob start in this binlog tablespace file.
ToDo: Note that user XA is not yet implemented.
*/
std::atomic<uint64_t>xa_refs;
/*
The earliest file number that this binlog tablespace file has oob
references into.
(This is a conservative estimate, references may not actually exist in
case their commit record went into a later file, or they ended up rolling
back).
Includes any XA oob records.
*/
std::atomic<uint64_t>oob_ref_file_no;
/*
Earliest file number that we have XA references into.
ToDo: Note that user XA is not yet implemented.
*/
std::atomic<uint64_t>xa_ref_file_no;
ibb_tblspc_entry()= default;
~ibb_tblspc_entry()= default;
};
/*
Class keeping reference counts of oob records starting in different binlog
tablespace files.
Used to keep track of which files should not be purged because they contain
oob (start) records that are still referenced by needed binlog tablespace
files or by active transactions.
*/
class ibb_file_oob_refs {
public:
/* Hash contains struct ibb_tblspc_entry keyed on file_no. */
LF_HASH hash;
/*
Earliest file_no with start oob records that are still referenced by active
transactions / event groups.
*/
std::atomic<uint64_t> earliest_oob_ref;
/*
Same, but restricted to those oob that constitute XA transactions.
Thus, this may be larger than earliest_oob_ref or even ~(uint64_t)0 in
case there are no active XA.
*/
std::atomic<uint64_t> earliest_xa_ref;
public:
/* Init the hash empty. */
void init() noexcept;
void destroy() noexcept;
/* Delete an entry from the hash. */
void remove(uint64_t file_no, LF_PINS *pins);
/* Delete all (consecutive) entries from file_no down. */
void remove_up_to(uint64_t file_no, LF_PINS *pins);
/* Update an entry when an OOB record is started/completed. */
bool oob_ref_inc(uint64_t file_no, LF_PINS *pins);
bool oob_ref_dec(uint64_t file_no, LF_PINS *pins);
/* Update earliest_oob_ref when refcount drops to zero. */
void do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins,
bool active_moving);
/* Update the oob and xa file_no's active at start of this file_no. */
bool update_refs(uint64_t file_no, LF_PINS *pins,
uint64_t oob_ref, uint64_t xa_ref);
/* Lookup the oob-referenced file_no from a file_no. */
bool get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins,
uint64_t *out_oob_ref_file_no);
};
class binlog_chunk_reader { class binlog_chunk_reader {
public: public:
enum chunk_reader_status { enum chunk_reader_status {
@@ -328,6 +409,8 @@ extern std::atomic<uint64_t> binlog_cur_written_offset[2];
extern std::atomic<uint64_t> binlog_cur_end_offset[2]; extern std::atomic<uint64_t> binlog_cur_end_offset[2];
extern fsp_binlog_page_fifo *binlog_page_fifo; extern fsp_binlog_page_fifo *binlog_page_fifo;
extern ibb_file_oob_refs ibb_file_hash;
static inline void static inline void
fsp_binlog_release(fsp_binlog_page_entry *page) fsp_binlog_release(fsp_binlog_page_entry *page)
@@ -341,6 +424,8 @@ extern int crc32_pread_page(File fd, byte *buf, uint32_t page_no,
myf MyFlags) noexcept; myf MyFlags) noexcept;
extern int crc32_pread_page(pfs_os_file_t fh, byte *buf, uint32_t page_no, extern int crc32_pread_page(pfs_os_file_t fh, byte *buf, uint32_t page_no,
myf MyFlags) noexcept; myf MyFlags) noexcept;
extern bool ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref,
uint64_t xa_ref, LF_PINS *in_pins=nullptr);
extern void binlog_write_up_to_now() noexcept; extern void binlog_write_up_to_now() noexcept;
extern void fsp_binlog_extract_header_page(const byte *page_buf, extern void fsp_binlog_extract_header_page(const byte *page_buf,
binlog_header_data *out_header_data) binlog_header_data *out_header_data)
@@ -356,9 +441,11 @@ extern bool fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
uint64_t file_no, size_t file_size, uint64_t file_no, size_t file_size,
uint32_t init_page, byte *partial_page); uint32_t init_page, byte *partial_page);
extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no, extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no,
uint32_t size_in_pages); uint32_t size_in_pages,
LF_PINS *pins);
extern std::pair<uint64_t, uint64_t> fsp_binlog_write_rec( extern std::pair<uint64_t, uint64_t> fsp_binlog_write_rec(
struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type); struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type,
LF_PINS *pins);
extern bool fsp_binlog_flush(); extern bool fsp_binlog_flush();
#endif /* fsp_binlog_h */ #endif /* fsp_binlog_h */

View File

@@ -45,6 +45,7 @@ struct handler_binlog_purge_info;
Currently used for: Currently used for:
- chunk_data_cache: A binlog trx cache to be binlogged as a commit record. - chunk_data_cache: A binlog trx cache to be binlogged as a commit record.
- chunk_data_oob: An out-of-band piece of event group data. - chunk_data_oob: An out-of-band piece of event group data.
- chunk_data_flush: For dummy filler data.
*/ */
struct chunk_data_base { struct chunk_data_base {
/* /*
@@ -74,6 +75,8 @@ struct chunk_data_flush : public chunk_data_base {
}; };
static constexpr size_t IBB_BINLOG_HEADER_SIZE= 64;
/* /*
Data stored at the start of each binlog file. Data stored at the start of each binlog file.
(The data is stored as little-engian values in the first page of the file; (The data is stored as little-engian values in the first page of the file;
@@ -101,6 +104,13 @@ struct binlog_header_data {
binlog file was created. binlog file was created.
*/ */
uint64_t diff_state_interval; uint64_t diff_state_interval;
/* The earliest file_no that we have oob references into. */
uint64_t oob_ref_file_no;
/*
The earliest file_no that we have XA oob references into.
ToDo: This in preparation for when XA is implemented.
*/
uint64_t xa_ref_file_no;
/* The log_2 of the page size (eg. ibb_page_size_shift). */ /* The log_2 of the page size (eg. ibb_page_size_shift). */
uint32_t page_size_shift; uint32_t page_size_shift;
/* /*
@@ -163,12 +173,14 @@ extern bool innodb_binlog_init(size_t binlog_size, const char *directory);
extern void innodb_binlog_close(bool shutdown); extern void innodb_binlog_close(bool shutdown);
extern bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no, extern bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no,
uint64_t file_size_in_pages, lsn_t start_lsn, uint64_t file_size_in_pages, lsn_t start_lsn,
uint64_t gtid_state_interval_in_pages); uint64_t gtid_state_interval_in_pages,
LF_PINS *pins);
extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
fsp_binlog_page_entry * &block, uint32_t &page_no, fsp_binlog_page_entry * &block, uint32_t &page_no,
uint32_t &page_offset, uint64_t file_no); uint32_t &page_offset, uint64_t file_no);
extern bool innodb_binlog_oob(THD *thd, const unsigned char *data, extern bool innodb_binlog_oob(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data); size_t data_len, void **engine_data);
extern void innodb_reset_oob(THD *thd, void **engine_data);
extern void innodb_free_oob(THD *thd, void *engine_data); extern void innodb_free_oob(THD *thd, void *engine_data);
extern handler_binlog_reader *innodb_get_binlog_reader(); extern handler_binlog_reader *innodb_get_binlog_reader();
extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no); extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no);