1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34705: Binlog-in-engine: Crash-safe slave

This patch makes replication crash-safe with the new binlog implementation,
even when --innodb-flush-log-at-trx-commit=0|2. The point is to not send any
binlog events to the slave until they have become durable on master, thus
avoiding that a slave may replicate a transaction that is lost during master
recovery, diverging the slave from the master.

Keep track of which point in the binlog has been durably synced to disk
(meaning the corresponding LSN has been durably synced to disk in the InnoDB
redo log). Each write to the binlog inserts an entry with offset and
corresponding LSN in a FIFO. Dump threads will first read only up to the
durable point in the binlog. A dump thread will then check the LSN fifo, and
do an InnoDB redo log sync if anything is pending. Then the FIFO is emptied
of any LSNs that have now become durable, and the durable point in the
binlog is updated and reading the binlog can continue.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-07-01 15:07:03 +02:00
parent baec2064a1
commit d26851a575
33 changed files with 851 additions and 149 deletions

View File

@@ -0,0 +1,58 @@
# ==== Purpose ====
#
# Execute a RESET MASTER on the current connection, first terminating any
# lingering binlog dump threads that might still be sitting idle and would
# block the RESTE MASTER.
#
# Note that any configured IO threads on other servers must be stopped before
# calling this, as RESET MASTER cannot be run while there is a slave connected.
#
#
# ==== Usage ====
#
# [--let $kill_timeout= NUMBER]
# [--let $reset_master_retries= NUMBER]
# --source include/reset_master.inc
#
# Parameters:
# $kill_timeout
# Maximum number of seconds to wait for dump threads to disappear.
# $reset_master_retries
# Maximum number of times RESET MASTER can get ER_BINLOG_IN_USE before
# giving up.
--let $include_filename= reset_master.inc
--source include/begin_include_file.inc
--disable_query_log
let $_retries= 10;
if ($reset_master_retries)
{
let $_retries= $reset_master_retries;
}
let $_success= 0;
let $_i= 0;
while ($_i < $_retries)
{
inc $_i;
--source include/kill_binlog_dump_threads.inc
--let $errno= 0
--error 0,ER_BINLOG_IN_USE
RESET MASTER;
if (!$errno) {
let $_success= 1;
let $_i = $_retries;
}
}
if (!$_success)
{
SHOW FULL PROCESSLIST;
--die Timeout while trying to remove dump threads and run RESET MASTER.
}
--enable_query_log
--let $include_filename= reset_master.inc
--source include/end_include_file.inc

View File

@@ -1,4 +1,4 @@
RESET MASTER;
include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
@@ -27,7 +27,7 @@ FLUSH BINARY LOGS;
FLUSH BINARY LOGS;
SHOW BINLOG EVENTS IN 'binlog-000000.ibb' LIMIT 1;
ERROR HY000: Error when executing command SHOW BINLOG EVENTS: error reading event data
RESET MASTER;
include/reset_master.inc
SHOW BINARY LOGS;
Log_name File_size
binlog-000000.ibb 262144

View File

@@ -1,7 +1,7 @@
--source include/have_binlog_format_row.inc
--source include/have_innodb_binlog.inc
RESET MASTER;
--source include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB;
@@ -51,7 +51,7 @@ EOF
eval SHOW BINLOG EVENTS IN '$file' LIMIT 1;
RESET MASTER;
--source include/reset_master.inc
--let $binlog_name= binlog-000001.ibb
--let $binlog_size= 262144
--source include/wait_for_engine_binlog.inc

View File

@@ -1,4 +1,4 @@
RESET MASTER;
include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
BEGIN;

View File

@@ -6,7 +6,7 @@
# files and only need to include the one.
--source include/have_innodb_binlog.inc
RESET MASTER;
--source include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
--let $gtid_pos= `SELECT @@last_gtid`

View File

@@ -1,4 +1,4 @@
RESET MASTER;
include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
INSERT INTO t1 SELECT seq+1000000+100000*0, seq FROM seq_1_to_4000;

View File

@@ -6,7 +6,7 @@
# Note: This test also tests the --binlog-directory option by putting it
# in binlog_in_engine_restart.opt .
RESET MASTER;
--source include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);

View File

@@ -1,4 +1,4 @@
RESET MASTER;
include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
connect con1,localhost,root,,;

View File

@@ -1,7 +1,7 @@
--source include/have_binlog_format_mixed.inc
--source include/have_innodb_binlog.inc
RESET MASTER;
--source include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);

View File

@@ -0,0 +1 @@
--innodb-log-file-mmap=OFF

View File

@@ -0,0 +1,25 @@
include/master-slave.inc
[connection master]
*** Test that slave doesn't get ahead of a non-durable master that crashes.
connection master;
SET GLOBAL innodb_flush_log_at_trx_commit= 0;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT, c TEXT) ENGINE=InnoDB;
SELECT COUNT(*) FROM t1;
COUNT(*)
630
SET SESSION debug_dbug="+d,crash_dispatch_command_before";
SELECT 1;
Got one of the listed errors
connection master1;
connection server_1;
connection default;
connection master;
include/rpl_reconnect.inc
SET STATEMENT gtid_domain_id= 1 FOR INSERT INTO t1 VALUES (9, 9, 'extra');
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
# Asserted this: Row count should match on master and slave (no extra rows on slave)
connection master;
DROP TABLE t1;
include/rpl_end.inc

View File

@@ -0,0 +1,90 @@
--source include/not_embedded.inc
--source include/not_valgrind.inc
--source include/have_debug.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
--source include/have_innodb_binlog.inc
-- echo *** Test that slave doesn't get ahead of a non-durable master that crashes.
--connection master
--let $old_flatc= `SELECT @@GLOBAL.innodb_flush_log_at_trx_commit`
SET GLOBAL innodb_flush_log_at_trx_commit= 0;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT, c TEXT) ENGINE=InnoDB;
# Create a bunch of transactions, when --innodb-flush-log-at-trx-commit=0
# we should lose some of them in master crash.
# Note though that this is ineffective when running in /dev/shm/ (./mtr --mem).
# Because InnoDB is hard-coded to simulate PMEM in this case and forces
# mmap on the log file even though we have --innodb-log-file-mmap=OFF in our
# .opt file. Then the memory mapping gets updated immediately when the mtr
# commits, and kill -9 cannot lose any transactions. The test will still pass,
# but no transactions are lost on the master so nothing much is tested.
--disable_query_log
--let loop= 0
while ($loop < 10) {
eval INSERT INTO t1 VALUES ($loop*1000, 0, '');
BEGIN;
--let $i= 0
while ($i < 20) {
eval INSERT INTO t1 VALUES ($loop*1000 + $i*10 + 1, 1, REPEAT('a', 1000));
eval INSERT INTO t1 VALUES ($loop*1000 + $i*10 + 2, 2, REPEAT('z', 1000));
eval INSERT INTO t1 VALUES ($loop*1000 + $i*10 + 3, 3, REPEAT('#', 1000));
inc $i;
}
COMMIT;
# CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
eval INSERT INTO t1 VALUES ($loop*1000 + 5, 5, 'zyxxy');
# INSERT INTO t2 VALUES (1, 0);
eval INSERT INTO t1 VALUES ($loop*1000 + 6, 6, 'the quick brown fox');
# DROP TABLE t2;
inc $loop;
}
--enable_query_log
# Crash the master
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait-recovery.test
EOF
SELECT COUNT(*) FROM t1;
SET SESSION debug_dbug="+d,crash_dispatch_command_before";
--error 2006,2013
SELECT 1;
--source include/wait_until_disconnected.inc
--connection master1
--source include/wait_until_disconnected.inc
--connection server_1
--source include/wait_until_disconnected.inc
--connection default
--source include/wait_until_disconnected.inc
--connection master
--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
restart
EOF
--let $rpl_server_number= 1
--source include/rpl_reconnect.inc
# After the crash, transactions can be lost and the table row count
# may be smaller than before the crash.
--let $master_count= `SELECT COUNT(*) FROM t1`
SET STATEMENT gtid_domain_id= 1 FOR INSERT INTO t1 VALUES (9, 9, 'extra');
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
# The slave should have the same amount of rows as the master (ie. not have
# any extra transactions that were lost on the master by the crash).
--let $slave_count= `SELECT COUNT(*) FROM t1`
--let $assert_text= Row count should match on master and slave (no extra rows on slave)
--let $assert_cond= $master_count + 1 = $slave_count
--source include/rpl_assert.inc
--connection master
--disable_query_log
eval SET GLOBAL innodb_flush_log_at_trx_commit= $old_flatc;
--enable_query_log
DROP TABLE t1;
--source include/rpl_end.inc

View File

@@ -1,6 +1,5 @@
include/master-slave.inc
[connection master]
RESET MASTER;
CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (0, 0, 'Start');
*** Test iteration, RESTART=0

View File

@@ -17,8 +17,6 @@
# to test with. The --binlog-cache-size is set to 8k, so more event data than
# that causes OOB binlogging.
RESET MASTER;
CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (0, 0, 'Start');

View File

@@ -1,4 +1,4 @@
RESET MASTER;
include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);

View File

@@ -5,13 +5,13 @@
--source include/have_innodb_binlog.inc
--let $datadir= `SELECT @@datadir`
RESET MASTER;
--source include/reset_master.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
--let $no_checkpoint_flush= 1
--let $no_checkpoint_kill= 1
--let no_checkpoint_kill= 1
--source ../../suite/innodb/include/no_checkpoint_start.inc
--let $file= query_get_value(SHOW MASTER STATUS, File, 1)
--let $pos= query_get_value(SHOW MASTER STATUS, Position, 1)

View File

@@ -4,7 +4,7 @@ connection slave;
include/stop_slave.inc
SET GLOBAL gtid_slave_pos= '';
connection master;
RESET MASTER;
include/reset_master.inc
DROP TABLE IF EXISTS t1, t2;
Warnings:
Note 1051 Unknown table 'test.t1,test.t2'

View File

@@ -14,7 +14,7 @@ SET GLOBAL gtid_slave_pos= '';
--connection master
--let $datadir= `SELECT @@datadir`
RESET MASTER;
--source include/reset_master.inc
DROP TABLE IF EXISTS t1, t2;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;

View File

@@ -1548,9 +1548,18 @@ struct handlerton
/* Optional implementation of binlog in the engine. */
bool (*binlog_init)(size_t binlog_size, const char *directory);
/* Binlog an event group that doesn't go through commit_ordered. */
bool (*binlog_write_direct_ordered)(IO_CACHE *cache,
handler_binlog_event_group_info *binlog_info,
const rpl_gtid *gtid);
bool (*binlog_write_direct)(IO_CACHE *cache,
handler_binlog_event_group_info *binlog_info,
const rpl_gtid *gtid);
/*
Called for the last transaction (only) in a binlog group commit, with
no locks being held.
*/
void (*binlog_group_commit_ordered)(THD *thd,
handler_binlog_event_group_info *binlog_info);
/*
Binlog parts of large transactions out-of-band, in different chunks in the
binlog as the transaction executes. This limits the amount of data that
@@ -1558,8 +1567,10 @@ struct handlerton
a pointer location that the engine can set to maintain its own context
for the out-of-band data.
*/
bool (*binlog_oob_data)(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data);
bool (*binlog_oob_data_ordered)(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
bool (*binlog_oob_data)(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
/*
Call to reset (for new transactions) the engine_data from
binlog_oob_data(). Can also change the pointer to point to different data
@@ -1568,8 +1579,15 @@ struct handlerton
void (*binlog_oob_reset)(THD *thd, void **engine_data);
/* Call to allow engine to release the engine_data from binlog_oob_data(). */
void (*binlog_oob_free)(THD *thd, void *engine_data);
/* Obtain an object to allow reading from the binlog. */
handler_binlog_reader * (*get_binlog_reader)();
/*
Obtain an object to allow reading from the binlog.
The boolean argument wait_durable is set to true to require that
transactions be durable before they can be read and returned from the
reader. This is used to make replication crash-safe without requiring
durability; this way, if the master crashes, when it comes back up the
slave will not be ahead and replication will not diverge.
*/
handler_binlog_reader * (*get_binlog_reader)(bool wait_durable);
/*
Obtain the current position in the binlog.
Used to support legacy SHOW MASTER STATUS.
@@ -5919,6 +5937,12 @@ public:
virtual ~handler_binlog_reader() { };
virtual int read_binlog_data(uchar *buf, uint32_t len) = 0;
virtual bool data_available()= 0;
/*
Wait for data to be available to read, for kill, or for timeout.
Returns true in case of timeout reached, false otherwise.
Caller should check for kill before calling again (to avoid busy-loop).
*/
virtual bool wait_available(THD *thd, const struct timespec *abstime) = 0;
/*
This initializes the current read position to the point of the slave GTID
position passed in as POS. It is permissible to start at a position a bit

View File

@@ -6469,9 +6469,11 @@ binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len)
void **engine_ptr= &mngr->engine_binlog_info.engine_ptr;
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
mysql_mutex_lock(&LOCK_commit_ordered);
bool res= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data, len,
engine_ptr);
bool res= (*opt_binlog_engine_hton->binlog_oob_data_ordered)(mngr->thd, data,
len, engine_ptr);
mysql_mutex_unlock(&LOCK_commit_ordered);
res|= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data,
len, engine_ptr);
mngr->engine_binlog_info.out_of_band_offset+= len;
cache->pos_in_file+= len;
@@ -7511,6 +7513,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
bool events_direct;
ulong UNINIT_VAR(prev_binlog_id);
uint64 UNINIT_VAR(commit_id);
const rpl_gtid *commit_gtid;
DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
/*
@@ -7812,12 +7815,19 @@ err:
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_after_binlog_sync);
if ((*opt_binlog_engine_hton->binlog_write_direct)
(file, engine_context, thd->get_last_commit_gtid()))
/* ToDo: Is this correct? How do we guarantee here correct gtid allocation order? By chained LOCK_log and LOCK_commit_ordered ? */
commit_gtid= thd->get_last_commit_gtid();
if (unlikely((*opt_binlog_engine_hton->binlog_write_direct_ordered)
(file, engine_context, commit_gtid)))
{
mysql_mutex_unlock(&LOCK_commit_ordered);
goto engine_fail;
}
mysql_mutex_unlock(&LOCK_commit_ordered);
update_binlog_end_pos();
if (unlikely((*opt_binlog_engine_hton->binlog_write_direct)
(file, engine_context, commit_gtid)))
goto engine_fail;
status_var_add(thd->status_var.binlog_bytes_written, binlog_total_bytes);
goto engine_ok;
@@ -9351,9 +9361,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
}
else
{
if (opt_binlog_engine_hton)
update_binlog_end_pos();
/*
If we rotated the binlog, and if we are using the unoptimized thread
scheduling where every thread runs its own commit_ordered(), then we
@@ -9789,7 +9796,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
DEBUG_SYNC(leader->thd, "commit_after_group_release_commit_ordered");
if (opt_binlog_engine_hton)
update_binlog_end_pos();
(*opt_binlog_engine_hton->binlog_group_commit_ordered)
(last_in_queue->thd, &last_in_queue->cache_mngr->engine_binlog_info);
else if (check_purge)
checkpoint_and_purge(binlog_id);
@@ -10110,6 +10118,7 @@ int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd,
int ret= 0;
DBUG_ENTER("wait_for_update_binlog_end_pos");
DBUG_ASSERT(!opt_binlog_engine_hton);
thd_wait_begin(thd, THD_WAIT_BINLOG);
mysql_mutex_assert_owner(get_binlog_end_pos_lock());
if (!timeout)

View File

@@ -968,6 +968,7 @@ public:
signal_relay_log_update();
else
{
DBUG_ASSERT(!opt_binlog_engine_hton);
lock_binlog_end_pos();
binlog_end_pos= my_b_safe_tell(&log_file);
signal_bin_log_update();
@@ -976,6 +977,7 @@ public:
}
void update_binlog_end_pos(my_off_t pos)
{
DBUG_ASSERT(!opt_binlog_engine_hton);
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos);
lock_binlog_end_pos();

View File

@@ -12292,6 +12292,8 @@ ER_VECTOR_FORMAT_INVALID
eng "Invalid vector format at offset: %d for '%-.100s'. Must be a valid JSON array of numbers."
ER_PSEUDO_THREAD_ID_OVERWRITE
eng "Pseudo thread id should not be modified by the client as it will be overwritten"
ER_BINLOG_IN_USE
eng "Cannot execute RESET MASTER as the binlog is in use by a connected slave or other RESET MASTER or binlog reader. Check SHOW PROCESSLIST for "Binlog Dump" commands and use KILL to stop such readers"
ER_CANNOT_INIT_ENGINE_BINLOG_READER
eng "Cannot initialize binlog reader from storage engine %s"
ER_ENGINE_BINLOG_REQUIRES_GTID

View File

@@ -2568,7 +2568,7 @@ static int init_binlog_sender(binlog_send_info *info,
/* ToDo: Need more complex logic here. I want to be able to at least switch from legacy to innodb binlog without having to RESET MASTER. So we need to be able to start reading from legacy and then switch over to the binlog in innodb. Also, we might want to pass the init GTID position in so that the binlog reader can find the place to start by itself? But probably still want to allocate the reader here like this. */
if (opt_binlog_engine_hton &&
!(info->engine_binlog_reader=
(*opt_binlog_engine_hton->get_binlog_reader)()))
(*opt_binlog_engine_hton->get_binlog_reader)(true)))
{
LEX_CSTRING *engine_name= hton_name(opt_binlog_engine_hton);
my_error(ER_CANNOT_INIT_ENGINE_BINLOG_READER, MYF(0), engine_name->str);
@@ -3345,8 +3345,6 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo)
if (error == LOG_READ_EOF)
{
PSI_stage_info old_stage;
/**
* check if we should wait for more data
*/
@@ -3367,23 +3365,15 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo)
return 1;
}
mysql_bin_log.lock_binlog_end_pos();
info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(),
mysql_bin_log.get_binlog_end_pos_lock(),
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
while (!should_stop(info, true) &&
!reader->data_available())
while (!should_stop(info, true) && !reader->data_available())
{
//DBUG_ASSERT(!info->heartbeat_period /* ToDo: Implement support for heartbeat events while waiting for more data. */);
int ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL);
if (ret != 0 && ret != ETIMEDOUT && ret != ETIME)
bool ret= reader->wait_available(info->thd, nullptr /* ToDo hearthbeat period. */);
if (ret)
{
ret= 1; // error
break;
// ToDo Hearthbeat sending
}
}
info->thd->EXIT_COND(&old_stage);
continue;
}
@@ -4853,7 +4843,8 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi)
DBUG_ASSERT(opt_binlog_engine_hton);
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS);
handler_binlog_reader *reader= (*opt_binlog_engine_hton->get_binlog_reader)();
handler_binlog_reader *reader=
(*opt_binlog_engine_hton->get_binlog_reader)(false);
if (!reader)
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));

View File

@@ -89,7 +89,9 @@ ENDIF()
IF (CMAKE_CXX_COMPILER_ID MATCHES "Clang" OR
CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wconversion -Wno-sign-conversion")
SET_SOURCE_FILES_PROPERTIES(fts/fts0pars.cc
# Source file innodb_binlog.cc needs to include server layer stuff that is
# not -Wconversion clean.
SET_SOURCE_FILES_PROPERTIES(fts/fts0pars.cc handler/innodb_binlog.cc
PROPERTIES COMPILE_FLAGS -Wno-conversion)
ENDIF()

View File

@@ -2023,6 +2023,7 @@ void mtr_t::write_binlog(bool space_id, uint32_t page_no,
m_log.close(end);
m_log.push(static_cast<const byte*>(buf), uint32_t(size));
}
m_modifications= true;
}
/** Write binlog data

View File

@@ -69,6 +69,9 @@ uint64_t current_binlog_state_interval;
*/
mysql_mutex_t active_binlog_mutex;
pthread_cond_t active_binlog_cond;
/* Mutex protecting binlog_cur_durable_offset[] and ibb_pending_lsn_fifo. */
mysql_mutex_t binlog_durable_mutex;
mysql_cond_t binlog_durable_cond;
/* The currently being written binlog tablespace. */
std::atomic<uint64_t> active_binlog_file_no;
@@ -95,16 +98,21 @@ uint64_t last_created_binlog_file_no;
Point at which it is guaranteed that all data has been written out to the
binlog file (on the OS level; not necessarily fsync()'ed yet).
Stores the most recent two values, each corresponding to active_binlog_file_no&1.
Stores the most recent four values, each corresponding to
active_binlog_file_no&4. This is so that it can be always valid for both
active and active-1 (active-2 is always durable, as we make the entire binlog
file N durable before pre-allocating N+2). Just before active moves to the
next file_no, we can set the value for active+1, leaving active and active-1
still valid. (Only 3 entries are needed, but we use four to be able to use
bit-wise and instead of modulo-3).
*/
/* ToDo: maintain this offset value as up to where data has been written out to the OS. Needs to be binary-searched in current binlog file at server restart; which is also a reason why it might not be a multiple of the page size. */
std::atomic<uint64_t> binlog_cur_written_offset[2];
std::atomic<uint64_t> binlog_cur_durable_offset[4];
/*
Offset of last valid byte of data in most recent 2 binlog files.
Offset of last valid byte of data in most recent 4 binlog files.
A value of ~0 means that file is not opened as a tablespace (and data is
valid until the end of the file).
*/
std::atomic<uint64_t> binlog_cur_end_offset[2];
std::atomic<uint64_t> binlog_cur_end_offset[4];
fsp_binlog_page_fifo *binlog_page_fifo;
@@ -1170,6 +1178,10 @@ fsp_binlog_init()
{
mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr);
pthread_cond_init(&active_binlog_cond, nullptr);
mysql_mutex_init(fsp_binlog_durable_mutex_key, &binlog_durable_mutex, nullptr);
mysql_cond_init(fsp_binlog_durable_cond_key, &binlog_durable_cond, nullptr);
mysql_mutex_record_order(&binlog_durable_mutex, &active_binlog_mutex);
ibb_file_hash.init();
binlog_page_fifo= new fsp_binlog_page_fifo();
binlog_page_fifo->start_flush_thread();
@@ -1182,6 +1194,8 @@ fsp_binlog_shutdown()
binlog_page_fifo->stop_flush_thread();
delete binlog_page_fifo;
ibb_file_hash.destroy();
mysql_cond_destroy(&binlog_durable_cond);
mysql_mutex_destroy(&binlog_durable_mutex);
pthread_cond_destroy(&active_binlog_cond);
mysql_mutex_destroy(&active_binlog_mutex);
}
@@ -1194,6 +1208,8 @@ dberr_t
fsp_binlog_tablespace_close(uint64_t file_no)
{
binlog_page_fifo->flush_up_to(file_no, ~(uint32_t)0);
uint32_t size=
binlog_page_fifo->size_in_pages(file_no) << ibb_page_size_shift;
/* release_tablespace() will fdatasync() the file first. */
binlog_page_fifo->release_tablespace(file_no);
/*
@@ -1202,6 +1218,23 @@ fsp_binlog_tablespace_close(uint64_t file_no)
recovery, at most from the latest two existing files.
*/
log_buffer_flush_to_disk(true);
uint64_t end_offset=
binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 3].store(size, std::memory_order_relaxed);
/*
Wait for the last record in the file to be marked durably synced to the
(redo) log. We already ensured that the record is durable with the above
call to log_buffer_flush_to_disk(); this way, we ensure that the update
of binlog_cur_durable_offset[] happens correctly through the
ibb_pending_lsn_fifo, so that the current durable position will be
consistent with a recorded LSN, and a reader will not see EOF in the
middle of a record.
*/
uint64_t dur_offset=
binlog_cur_durable_offset[file_no & 3].load(std:: memory_order_relaxed);
if (dur_offset < end_offset)
ibb_wait_durable_offset(file_no, end_offset);
return DB_SUCCESS;
}
@@ -1368,8 +1401,8 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type,
// ToDo: assert that a single write doesn't span more than two binlog files.
++file_no;
file_size_in_pages= binlog_page_fifo->size_in_pages(file_no);
binlog_cur_written_offset[file_no & 1].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 1].store(0, std::memory_order_relaxed);
binlog_cur_durable_offset[file_no & 3].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 3].store(0, std::memory_order_relaxed);
pthread_cond_signal(&active_binlog_cond);
mysql_mutex_unlock(&active_binlog_mutex);
binlog_cur_page_no= page_no= 0;
@@ -1533,16 +1566,19 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type,
binlog_page_fifo->release_page_mtr(block, mtr);
binlog_cur_page_no= page_no;
binlog_cur_page_offset= page_offset;
binlog_cur_end_offset[file_no & 1].store(((uint64_t)page_no << page_size_shift) + page_offset,
std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 3].store
(((uint64_t)page_no << page_size_shift) + page_offset,
std::memory_order_relaxed);
if (UNIV_UNLIKELY(pending_prev_end_offset != 0))
{
mysql_mutex_lock(&binlog_durable_mutex);
mysql_mutex_lock(&active_binlog_mutex);
binlog_cur_end_offset[(file_no-1) & 1].store(pending_prev_end_offset,
binlog_cur_end_offset[(file_no-1) & 3].store(pending_prev_end_offset,
std::memory_order_relaxed);
active_binlog_file_no.store(file_no, std::memory_order_release);
pthread_cond_signal(&active_binlog_cond);
mysql_mutex_unlock(&active_binlog_mutex);
mysql_mutex_unlock(&binlog_durable_mutex);
}
return {start_file_no, start_offset};
}
@@ -1636,15 +1672,18 @@ fsp_binlog_flush()
mtr.commit();
lf_hash_put_pins(lf_pins);
log_buffer_flush_to_disk(srv_flush_log_at_trx_commit & 1);
ibb_pending_lsn_fifo.add_to_fifo(mtr.commit_lsn(), file_no+1,
binlog_cur_end_offset[(file_no + 1) & 3].load(std::memory_order_relaxed));
return false;
}
binlog_chunk_reader::binlog_chunk_reader()
binlog_chunk_reader::binlog_chunk_reader(std::atomic<uint64_t> *limit_offset_)
: s { 0, 0, 0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false },
page_ptr(0), cur_block(0), page_buffer(nullptr),
cur_file_handle((File)-1), skipping_partial(false)
limit_offset(limit_offset_), cur_file_handle((File)-1),
skipping_partial(false)
{
/* Nothing else. */
}
@@ -1691,8 +1730,8 @@ binlog_chunk_reader::fetch_current_page()
uint64_t offset= (s.page_no << ibb_page_size_shift) | s.in_page_offset;
uint64_t active= active2;
uint64_t end_offset=
binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire);
if (s.file_no > active)
limit_offset[s.file_no & 3].load(std::memory_order_acquire);
if (s.file_no > active || UNIV_UNLIKELY(active == ~(uint64_t)0))
{
ut_ad(s.page_no == 1);
ut_ad(s.in_page_offset == 0);
@@ -1705,15 +1744,8 @@ binlog_chunk_reader::fetch_current_page()
if (s.file_no + 1 >= active) {
/* Check if we should read from the buffer pool or from the file. */
if (end_offset != ~(uint64_t)0 && offset < end_offset) {
/*
ToDo: Should we keep track of the last block read and use it as a
hint? Will be mainly useful when reading the partially written active
page at the current end of the active binlog, which might be a common
case.
*/
if (end_offset != ~(uint64_t)0 && offset < end_offset)
block= binlog_page_fifo->get_page(s.file_no, s.page_no);
}
active2= active_binlog_file_no.load(std::memory_order_acquire);
if (UNIV_UNLIKELY(active2 != active)) {
/*
@@ -1765,18 +1797,13 @@ binlog_chunk_reader::fetch_current_page()
}
cur_file_length= stat_buf.st_size;
}
if (s.file_no == active)
if (s.file_no + 1 >= active)
cur_end_offset= end_offset;
else
cur_end_offset= cur_file_length;
if (offset >= cur_file_length) {
/* End of this file, move to the next one. */
/*
ToDo: Should also obey binlog_cur_written_offset[], once we start
actually maintaining that, to save unnecessary buffer pool
lookup.
*/
goto_next_file:
if (cur_file_handle >= (File)0)
{
@@ -1845,8 +1872,7 @@ read_more_data:
"Replace static_assert with code from above comment");
/* Check for end-of-file. */
if (cur_end_offset == ~(uint64_t)0 ||
(s.page_no << ibb_page_size_shift) + s.in_page_offset >= cur_end_offset)
if ((s.page_no << ibb_page_size_shift) + s.in_page_offset >= cur_end_offset)
return sofar;
if (s.in_page_offset >= ibb_page_size - (BINLOG_PAGE_DATA_END + 3) ||
@@ -2075,23 +2101,39 @@ bool binlog_chunk_reader::data_available()
if (!end_of_record())
return true;
uint64_t active= active_binlog_file_no.load(std::memory_order_acquire);
if (active != s.file_no)
if (UNIV_UNLIKELY(active == ~(uint64_t)0))
return false;
uint64_t end_offset;
for (;;)
{
ut_ad(active > s.file_no || (s.page_no == 1 && s.in_page_offset == 0));
return active > s.file_no;
if (active > s.file_no + 1)
return true;
end_offset= limit_offset[s.file_no & 3].load(std::memory_order_acquire);
uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire);
if (active2 == active)
break;
/* Active moved while we were checking, try again. */
active= active2;
}
uint64_t end_offset=
binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire);
uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire);
if (active2 != active)
return true; // Active moved while we were checking
if (end_offset == ~(uint64_t)0)
return false; // Nothing in this binlog file yet
uint64_t offset= (s.page_no << ibb_page_size_shift) | s.in_page_offset;
if (offset < end_offset)
return true;
ut_ad(s.file_no == active2);
ut_ad(offset == end_offset);
ut_ad(s.file_no + 1 == active || s.file_no == active);
ut_ad(offset == end_offset || (offset == ibb_page_size && end_offset == 0));
return false;
}
bool
binlog_chunk_reader::is_before_pos(uint64_t file_no, uint64_t offset)
{
if (s.file_no < file_no)
return true;
if (s.file_no > file_no)
return false;
uint64_t own_offset= (s.page_no << ibb_page_size_shift) | s.in_page_offset;
if (own_offset < offset)
return true;
return false;
}

View File

@@ -549,6 +549,8 @@ mysql_pfs_key_t trx_sys_mutex_key;
mysql_pfs_key_t srv_threads_mutex_key;
mysql_pfs_key_t tpool_cache_mutex_key;
mysql_pfs_key_t fsp_active_binlog_mutex_key;
mysql_pfs_key_t fsp_binlog_durable_mutex_key;
mysql_pfs_key_t fsp_binlog_durable_cond_key;
mysql_pfs_key_t fsp_purge_binlog_mutex_key;
mysql_pfs_key_t fsp_page_fifo_mutex_key;
@@ -4130,7 +4132,11 @@ static int innodb_init(void* p)
innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs;
innobase_hton->binlog_init= innodb_binlog_init;
innobase_hton->binlog_write_direct_ordered=
innobase_binlog_write_direct_ordered;
innobase_hton->binlog_write_direct= innobase_binlog_write_direct;
innobase_hton->binlog_group_commit_ordered= ibb_group_commit;
innobase_hton->binlog_oob_data_ordered= innodb_binlog_oob_ordered;
innobase_hton->binlog_oob_data= innodb_binlog_oob;
innobase_hton->binlog_oob_reset= innodb_reset_oob;
innobase_hton->binlog_oob_free= innodb_free_oob;

View File

@@ -21,6 +21,14 @@ this program; if not, write to the Free Software Foundation, Inc.,
InnoDB implementation of binlog.
*******************************************************/
/*
Need MYSQL_SERVER defined to be able to use THD_ENTER_COND from sql_class.h
to make my_cond_wait() killable.
*/
#define MYSQL_SERVER 1
#include <my_global.h>
#include "sql_class.h"
#include "ut0compr_int.h"
#include "innodb_binlog.h"
#include "mtr0log.h"
@@ -37,6 +45,7 @@ InnoDB implementation of binlog.
static int innodb_binlog_inited= 0;
pending_lsn_fifo ibb_pending_lsn_fifo;
uint32_t innodb_binlog_size_in_pages;
const char *innodb_binlog_directory;
@@ -118,7 +127,15 @@ struct binlog_oob_context {
bool binlog_node(uint32_t node, uint64_t new_idx,
uint32_t left_node, uint32_t right_node,
chunk_data_oob *oob_data, LF_PINS *pins);
chunk_data_oob *oob_data, LF_PINS *pins, mtr_t *mtr);
/*
Pending binlog write for the ibb_pending_lsn_fifo.
pending_file_no is ~0 when no write is pending.
*/
uint64_t pending_file_no;
uint64_t pending_offset;
lsn_t pending_lsn;
uint64_t first_node_file_no;
uint64_t first_node_offset;
@@ -223,12 +240,12 @@ class ha_innodb_binlog_reader : public handler_binlog_reader {
innodb_binlog_oob_reader oob_reader;
binlog_chunk_reader::saved_position saved_commit_pos;
/* Buffer to hold a page read directly from the binlog file. */
uchar *page_buf;
/* Out-of-band data to read after commit record, if any. */
uint64_t oob_count;
uint64_t oob_last_file_no;
uint64_t oob_last_offset;
/* Buffer to hold a page read directly from the binlog file. */
uchar *page_buf;
/* Keep track of pending bytes in the rd_buf. */
uint32_t rd_buf_len;
uint32_t rd_buf_sofar;
@@ -241,13 +258,16 @@ private:
int read_data(uchar *buf, uint32_t len);
public:
ha_innodb_binlog_reader(uint64_t file_no= 0, uint64_t offset= 0);
ha_innodb_binlog_reader(bool wait_durable, uint64_t file_no= 0,
uint64_t offset= 0);
~ha_innodb_binlog_reader();
virtual int read_binlog_data(uchar *buf, uint32_t len) final;
virtual bool data_available() final;
virtual bool wait_available(THD *thd, const struct timespec *abstime) final;
virtual int init_gtid_pos(slave_connection_state *pos,
rpl_binlog_state_base *state) final;
virtual int init_legacy_pos(const char *filename, ulonglong offset) final;
void seek_internal(uint64_t file_no, uint64_t offset);
};
@@ -1160,8 +1180,11 @@ static void
innodb_binlog_init_state()
{
first_open_binlog_file_no= ~(uint64_t)0;
binlog_cur_end_offset[0].store(~(uint64_t)0, std::memory_order_relaxed);
binlog_cur_end_offset[1].store(~(uint64_t)0, std::memory_order_relaxed);
for (uint32_t i= 0; i < 4; ++i)
{
binlog_cur_end_offset[i].store(~(uint64_t)0, std::memory_order_relaxed);
binlog_cur_durable_offset[i].store(~(uint64_t)0, std::memory_order_relaxed);
}
last_created_binlog_file_no= ~(uint64_t)0;
earliest_binlog_file_no= ~(uint64_t)0;
total_binlog_used_size= 0;
@@ -1211,11 +1234,14 @@ binlog_sync_initial()
ut_a(lf_pins);
mtr.start();
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins);
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
mtr.commit();
lf_hash_put_pins(lf_pins);
log_buffer_flush_to_disk(true);
binlog_page_fifo->flush_up_to(0, 0);
binlog_page_fifo->do_fdatasync(0);
ibb_pending_lsn_fifo.add_to_fifo(mtr.commit_lsn(), file_no,
binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed));
}
@@ -1380,7 +1406,7 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
{
const uint32_t page_size= (uint32_t)ibb_page_size;
const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift;
const uint32_t idx= file_no & 1;
const uint32_t idx= file_no & 3;
char file_name[OS_FILE_MAX_PATH];
uint32_t p_0, p_1, p_2, last_nonempty;
byte *p, *page_end;
@@ -1415,7 +1441,7 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
if (out_header_data->is_empty) {
ret=
fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr);
binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed);
binlog_cur_durable_offset[idx].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed);
return (ret ? -1 : 0);
}
@@ -1498,12 +1524,22 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
ret= fsp_binlog_open(file_name, fh, file_no, file_size,
*out_page_no, partial_page);
uint64_t pos= (*out_page_no << page_size_shift) | *out_pos_in_page;
binlog_cur_written_offset[idx].store(pos, std::memory_order_relaxed);
binlog_cur_durable_offset[idx].store(pos, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(pos, std::memory_order_relaxed);
return ret ? -1 : 1;
}
static void
binlog_discover_init(uint64_t file_no, uint64_t interval)
{
active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= interval;
ibb_pending_lsn_fifo.init(file_no);
}
/*
Returns:
-1 error
@@ -1546,9 +1582,7 @@ innodb_binlog_discover()
file_no= binlog_files.last_file_no;
if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= innodb_binlog_state_interval;
binlog_discover_init(file_no, innodb_binlog_state_interval);
sql_print_warning("Binlog number %llu could no be opened. Starting a new "
"binlog file from number %llu",
binlog_files.last_file_no, (file_no + 1));
@@ -1561,9 +1595,7 @@ innodb_binlog_discover()
if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no,
header.xa_ref_file_no))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= header.diff_state_interval;
binlog_discover_init(file_no, header.diff_state_interval);
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -1584,9 +1616,7 @@ innodb_binlog_discover()
file_no= binlog_files.last_file_no;
if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= innodb_binlog_state_interval;
binlog_discover_init(file_no, innodb_binlog_state_interval);
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
sql_print_warning("Binlog number %llu could not be opened, starting "
@@ -1598,9 +1628,7 @@ innodb_binlog_discover()
if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no,
header.xa_ref_file_no))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= header.diff_state_interval;
binlog_discover_init(file_no, header.diff_state_interval);
binlog_cur_page_no= prev_page_no;
binlog_cur_page_offset= prev_pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -1614,9 +1642,7 @@ innodb_binlog_discover()
file_no= binlog_files.last_file_no;
if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0))
return -1;
active_binlog_file_no.store(file_no, std::memory_order_release);
ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed);
current_binlog_state_interval= innodb_binlog_state_interval;
binlog_discover_init(file_no, innodb_binlog_state_interval);
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -1629,6 +1655,7 @@ innodb_binlog_discover()
earliest_binlog_file_no= 0;
ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed);
total_binlog_used_size= 0;
ibb_pending_lsn_fifo.init(0);
current_binlog_state_interval= innodb_binlog_state_interval;
ib::info() << "Starting a new binlog from file number " << file_no << ".";
return 0;
@@ -1721,6 +1748,10 @@ innodb_binlog_prealloc_thread()
/* If we created the initial tablespace file, make it the active one. */
ut_ad(active < ~(uint64_t)0 || last_created == 0);
if (active == ~(uint64_t)0) {
binlog_cur_end_offset[last_created & 3].
store(0, std::memory_order_release);
binlog_cur_durable_offset[last_created & 3]
.store(0, std::memory_order_release);
active_binlog_file_no.store(last_created, std::memory_order_relaxed);
ibb_file_hash.earliest_oob_ref.store(last_created,
std::memory_order_relaxed);
@@ -1742,8 +1773,6 @@ innodb_binlog_prealloc_thread()
fsp_binlog_tablespace_close(active - 1);
mysql_mutex_lock(&active_binlog_mutex);
first_open_binlog_file_no= first_open + 1;
binlog_cur_end_offset[first_open & 1].store(~(uint64_t)0,
std::memory_order_relaxed);
continue; /* Re-start loop after releasing/reacquiring mutex. */
}
@@ -2089,7 +2118,8 @@ binlog_state_recover()
}
my_close(file, MYF(0));
ha_innodb_binlog_reader reader(active, page_no << ibb_page_size_shift);
ha_innodb_binlog_reader reader(false, active,
page_no << ibb_page_size_shift);
return binlog_recover_gtid_state(&state, &reader);
}
@@ -2110,6 +2140,7 @@ alloc_oob_context(uint32 list_length= 10)
ut_free(c);
return nullptr;
}
c->pending_file_no= ~(uint64_t)0;
c->node_list_alloc_len= list_length;
c->node_list_len= 0;
c->pending_refcount= false;
@@ -2132,12 +2163,17 @@ innodb_binlog_write_cache(IO_CACHE *cache,
chunk_data_cache chunk_data(cache, binlog_info);
fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT, c->lf_pins);
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
c->pending_file_no= file_no;
c->pending_offset=
binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed);
}
static inline void
reset_oob_context(binlog_oob_context *c)
{
c->pending_file_no= ~(uint64_t)0;
if (c->pending_refcount)
{
ibb_file_hash.oob_ref_dec(c->first_node_file_no, c->lf_pins);
@@ -2237,8 +2273,8 @@ ensure_oob_context(void **engine_data, uint32_t needed_len)
are purged.
*/
bool
innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data)
innodb_binlog_oob_ordered(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data)
{
binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (!c)
@@ -2246,6 +2282,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
if (UNIV_UNLIKELY(!c))
return true;
mtr_t mtr;
uint32_t i= c->node_list_len;
uint64_t new_idx= i==0 ? 0 : c->node_list[i-1].node_index + 1;
if (i >= 2 && c->node_list[i-2].height == c->node_list[i-1].height)
@@ -2256,7 +2293,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
c->node_list[i-2].file_no, c->node_list[i-2].offset,
c->node_list[i-1].file_no, c->node_list[i-1].offset,
(byte *)data, data_len);
if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data, c->lf_pins))
if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data, c->lf_pins, &mtr))
return true;
c->node_list_len= i - 1;
}
@@ -2271,7 +2308,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
0, 0, /* NULL left child signifies a leaf */
c->node_list[i-1].file_no, c->node_list[i-1].offset,
(byte *)data, data_len);
if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data, c->lf_pins))
if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data, c->lf_pins, &mtr))
return true;
c->node_list_len= i + 1;
}
@@ -2281,7 +2318,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
binlog_oob_context::chunk_data_oob oob_data
(new_idx, 0, 0, 0, 0, (byte *)data, data_len);
if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data,
c->lf_pins))
c->lf_pins, &mtr))
return true;
c->first_node_file_no= c->node_list[i].file_no;
c->first_node_offset= c->node_list[i].offset;
@@ -2290,6 +2327,22 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
ibb_file_hash.oob_ref_inc(c->first_node_file_no, c->lf_pins);
}
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
c->pending_file_no= file_no;
c->pending_offset=
binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed);
innodb_binlog_post_commit(&mtr, c);
return false;
}
bool
innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data)
{
binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (UNIV_LIKELY(c != nullptr))
ibb_pending_lsn_fifo.record_commit(c);
return false;
}
@@ -2302,15 +2355,15 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
bool
binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx,
uint32_t left_node, uint32_t right_node,
chunk_data_oob *oob_data, LF_PINS *pins)
chunk_data_oob *oob_data, LF_PINS *pins,
mtr_t *mtr)
{
uint32_t new_height=
left_node == right_node ? 1 : 1 + node_list[left_node].height;
mtr_t mtr;
mtr.start();
mtr->start();
std::pair<uint64_t, uint64_t> new_file_no_offset=
fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA, pins);
mtr.commit();
fsp_binlog_write_rec(oob_data, mtr, FSP_BINLOG_TYPE_OOB_DATA, pins);
mtr->commit();
node_list[node].file_no= new_file_no_offset.first;
node_list[node].offset= new_file_no_offset.second;
node_list[node].node_index= new_idx;
@@ -2560,9 +2613,12 @@ again:
}
ha_innodb_binlog_reader::ha_innodb_binlog_reader(uint64_t file_no,
ha_innodb_binlog_reader::ha_innodb_binlog_reader(bool wait_durable,
uint64_t file_no,
uint64_t offset)
: rd_buf_len(0), rd_buf_sofar(0), state(ST_read_next_event_group)
: chunk_rd(wait_durable ?
binlog_cur_durable_offset : binlog_cur_end_offset),
rd_buf_len(0), rd_buf_sofar(0), state(ST_read_next_event_group)
{
page_buf= (uchar *)ut_malloc(ibb_page_size, mem_key_binlog);
chunk_rd.set_page_buf(page_buf);
@@ -2738,10 +2794,108 @@ ha_innodb_binlog_reader::data_available()
}
handler_binlog_reader *
innodb_get_binlog_reader()
bool
ha_innodb_binlog_reader::wait_available(THD *thd,
const struct timespec *abstime)
{
return new ha_innodb_binlog_reader();
bool is_timeout= false;
bool pending_sync_lsn= 0;
bool did_enter_cond= false;
PSI_stage_info old_stage;
if (data_available())
return false;
mysql_mutex_lock(&binlog_durable_mutex);
for (;;)
{
/* Process anything that has become durable since we last looked. */
lsn_t durable_lsn= log_sys.get_flushed_lsn(std::memory_order_relaxed);
ibb_pending_lsn_fifo.process_durable_lsn(durable_lsn);
/* Check if there is anything more pending to be made durable. */
if (!ibb_pending_lsn_fifo.is_empty())
{
pending_lsn_fifo::entry &e= ibb_pending_lsn_fifo.cur_head();
if (durable_lsn < e.lsn)
pending_sync_lsn= e.lsn;
}
/*
Check if there is data available for us now.
As we are holding binlog_durable_mutex, active_binlog_file_no cannot
move during this check.
*/
uint64_t active= active_binlog_file_no.load(std::memory_order_relaxed);
uint64_t durable_offset=
binlog_cur_durable_offset[active & 3].load(std::memory_order_relaxed);
if (chunk_rd.is_before_pos(active, durable_offset))
break;
if (pending_sync_lsn != 0 && ibb_pending_lsn_fifo.flushing_lsn == 0)
{
/*
There is no data available for us now, but there is data that will be
available when the InnoDB redo log has been durably flushed to disk.
So now we will do such a sync (unless another thread is already doing
it), so we can proceed getting more data out.
*/
ibb_pending_lsn_fifo.flushing_lsn= pending_sync_lsn;
mysql_mutex_unlock(&binlog_durable_mutex);
log_write_up_to(pending_sync_lsn, true);
mysql_mutex_lock(&binlog_durable_mutex);
ibb_pending_lsn_fifo.flushing_lsn= pending_sync_lsn= 0;
/* Need to loop back to repeat all checks, after releasing the mutex. */
continue;
}
if (thd && thd_kill_level(thd))
break;
if (thd && !did_enter_cond)
{
THD_ENTER_COND(thd, &binlog_durable_cond, &binlog_durable_mutex,
&stage_master_has_sent_all_binlog_to_slave, &old_stage);
did_enter_cond= true;
}
if (abstime)
{
int res= mysql_cond_timedwait(&binlog_durable_cond,
&binlog_durable_mutex,
abstime);
if (res == ETIMEDOUT)
{
is_timeout= true;
break;
}
}
else
mysql_cond_wait(&binlog_durable_cond, &binlog_durable_mutex);
}
/*
If there is pending binlog data to durably sync to the redo log, but we
did not do this sync ourselves, then signal another thread (if any) to
wakeup and sync. This is necessary to not lose the sync wakeup signal.
(We use wake-one rather than wake-all for signalling a pending redo log
sync to avoid wakeup-storm).
*/
if (pending_sync_lsn != 0)
mysql_cond_signal(&binlog_durable_cond);
if (did_enter_cond)
THD_EXIT_COND(thd, &old_stage);
else
mysql_mutex_unlock(&binlog_durable_mutex);
return is_timeout;
}
handler_binlog_reader *
innodb_get_binlog_reader(bool wait_durable)
{
return new ha_innodb_binlog_reader(wait_durable);
}
@@ -2786,7 +2940,7 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
{
uint64_t active= active2;
uint64_t end_offset=
binlog_cur_end_offset[file_no&1].load(std::memory_order_acquire);
binlog_cur_end_offset[file_no & 3].load(std::memory_order_acquire);
fsp_binlog_page_entry *block;
if (file_no + 1 >= active &&
@@ -2924,7 +3078,7 @@ gtid_search::find_gtid_pos(slave_connection_state *pos,
return -1;
}
{
binlog_chunk_reader chunk_reader;
binlog_chunk_reader chunk_reader(binlog_cur_end_offset);
chunk_reader.set_page_buf(page_buffer);
chunk_reader.seek(file_no, 0);
err= chunk_reader.get_file_header(&header);
@@ -3076,6 +3230,178 @@ ha_innodb_binlog_reader::init_legacy_pos(const char *filename, ulonglong offset)
}
void
ha_innodb_binlog_reader::seek_internal(uint64_t file_no, uint64_t offset)
{
chunk_rd.seek(file_no, offset);
chunk_rd.skip_partial(true);
cur_file_no= chunk_rd.current_file_no();
cur_file_pos= chunk_rd.current_pos();
}
void
ibb_wait_durable_offset(uint64_t file_no, uint64_t wait_offset)
{
uint64_t dur_offset=
binlog_cur_durable_offset[file_no & 3].load(std:: memory_order_relaxed);
ha_innodb_binlog_reader reader(true, file_no, dur_offset);
for (;;)
{
reader.wait_available(nullptr, nullptr);
dur_offset=
binlog_cur_durable_offset[file_no & 3].load(std:: memory_order_relaxed);
if (dur_offset >= wait_offset)
break;
reader.seek_internal(file_no, dur_offset);
}
}
pending_lsn_fifo::pending_lsn_fifo()
: flushing_lsn(0), cur_file_no(~(uint64_t)0), head(0), tail(0)
{
}
void
pending_lsn_fifo::init(uint64_t start_file_no)
{
mysql_mutex_lock(&binlog_durable_mutex);
ut_ad(cur_file_no == ~(uint64_t)0);
cur_file_no= start_file_no;
mysql_mutex_unlock(&binlog_durable_mutex);
}
void
pending_lsn_fifo::reset()
{
mysql_mutex_lock(&binlog_durable_mutex);
cur_file_no= ~(uint64_t)0;
mysql_mutex_unlock(&binlog_durable_mutex);
}
bool
pending_lsn_fifo::process_durable_lsn(lsn_t lsn)
{
mysql_mutex_assert_owner(&binlog_durable_mutex);
ut_ad(cur_file_no != ~(uint64_t)0);
entry *got= nullptr;
for (;;)
{
if (is_empty())
break;
entry &e= cur_tail();
if (lsn < e.lsn)
break;
got= &e;
drop_tail();
}
if (got)
{
uint64_t active= active_binlog_file_no.load(std::memory_order_relaxed);
if (got->file_no + 1 >= active)
binlog_cur_durable_offset[got->file_no & 3].store
(got->offset, std::memory_order_relaxed);
/*
If we moved the durable point to the next file_no, mark the prior
file_no as now fully durable.
Since we only ever have at most two binlog tablespaces open, and since
we make file_no=N fully durable (by calling into this function) before
pre-allocating N+2, we can only ever move ahead one file_no at a time
here.
*/
if (cur_file_no != got->file_no)
{
ut_ad(got->file_no == cur_file_no + 1);
binlog_cur_durable_offset[cur_file_no & 3].store(
binlog_cur_end_offset[cur_file_no & 3].load(std::memory_order_relaxed),
std::memory_order_relaxed);
cur_file_no= got->file_no;
}
mysql_cond_broadcast(&binlog_durable_cond);
return true;
}
return false;
}
/*
After a binlog commit, put the LSN and the corresponding binlog position
into the ibb_pending_lsn_fifo. We do this here (rather than immediately in
innodb_binlog_post_commit()), so that we can delay it until we are no longer
holding more critical locks that could block other writers. As we will be
contending with readers here on binlog_durable_mutex.
*/
void
pending_lsn_fifo::record_commit(binlog_oob_context *c)
{
uint64_t pending_file_no= c->pending_file_no;
if (pending_file_no == ~(uint64_t)0)
return;
c->pending_file_no= ~(uint64_t)0;
lsn_t pending_lsn= c->pending_lsn;
uint64_t pending_offset= c->pending_offset;
add_to_fifo(pending_lsn, pending_file_no, pending_offset);
}
void
pending_lsn_fifo::add_to_fifo(uint64_t lsn, uint64_t file_no, uint64_t offset)
{
mysql_mutex_lock(&binlog_durable_mutex);
/*
The record_commit() operation is done outside of critical locks for
scalabitily, so can occur out-of-order. So only insert the new entry if
it is newer than any previously inserted.
*/
if (is_empty() || lsn > cur_tail().lsn)
{
if (is_full())
{
/*
When the fifo is full, we just overwrite the head with a newer LSN.
This way, whenever _some_ LSN gets synced durably to disk, we will
always be able to make some progress and clear some fifo entries. And
when this latest LSN gets eventually synced, any overwritten entry
will progress as well.
*/
}
else
{
/*
Insert a new head.
Note that we make the fifo size a power-of-two (2 <<fixed_size_log2).
So if we wrap around uint32_t here, the outcome is still valid.
*/
new_head();
}
entry &h= cur_head();
h.file_no= file_no;
h.offset= offset;
h.lsn= lsn;
/* Make an immediate check in case the LSN is already durable. */
bool signalled=
process_durable_lsn(log_sys.get_flushed_lsn(std::memory_order_relaxed));
if (!signalled && flushing_lsn == 0)
{
/*
If process_durable_lsn() did not find any new data become durable, it
does not broadcast a wakeup signal. But since we inserted a new entry
in the fifo, we still want to signal _one_ other thread to potentially
wake up and start a redo log sync to make the new entry durable, unless
a thread is already doing such redo log sync.
*/
mysql_cond_signal(&binlog_durable_cond);
}
}
mysql_mutex_unlock(&binlog_durable_mutex);
}
void
ibb_get_filename(char name[FN_REFLEN], uint64_t file_no)
{
@@ -3089,7 +3415,7 @@ extern "C" void binlog_get_cache(THD *, uint64_t, uint64_t, IO_CACHE **,
handler_binlog_event_group_info **,
const rpl_gtid **);
void
binlog_oob_context *
innodb_binlog_trx(trx_t *trx, mtr_t *mtr)
{
IO_CACHE *cache;
@@ -3098,19 +3424,32 @@ innodb_binlog_trx(trx_t *trx, mtr_t *mtr)
uint64_t file_no, pos;
if (!trx->mysql_thd)
return;
return nullptr;
innodb_binlog_status(&file_no, &pos);
binlog_get_cache(trx->mysql_thd, file_no, pos, &cache, &binlog_info, &gtid);
if (UNIV_LIKELY(binlog_info != nullptr) &&
UNIV_LIKELY(binlog_info->gtid_offset > 0)) {
binlog_diff_state.update_nolock(gtid);
innodb_binlog_write_cache(cache, binlog_info, mtr);
return (binlog_oob_context *)binlog_info->engine_ptr;
}
return nullptr;
}
void
innodb_binlog_post_commit(mtr_t *mtr, binlog_oob_context *c)
{
if (c)
{
c->pending_lsn= mtr->commit_lsn();
ut_ad(c->pending_lsn != 0);
}
}
bool
innobase_binlog_write_direct(IO_CACHE *cache,
innobase_binlog_write_direct_ordered(IO_CACHE *cache,
handler_binlog_event_group_info *binlog_info,
const rpl_gtid *gtid)
{
@@ -3120,12 +3459,48 @@ innobase_binlog_write_direct(IO_CACHE *cache,
mtr.start();
innodb_binlog_write_cache(cache, binlog_info, &mtr);
mtr.commit();
/* ToDo: Should we sync the log here? Maybe depending on an extra bool parameter? */
innodb_binlog_post_commit(&mtr,
(binlog_oob_context *)binlog_info->engine_ptr);
/* ToDo: Presumably innodb_binlog_write_cache() should be able to fail in some cases? Then return any such error to the caller. */
return false;
}
bool
innobase_binlog_write_direct(IO_CACHE *cache,
handler_binlog_event_group_info *binlog_info,
const rpl_gtid *gtid)
{
binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr;
if (UNIV_LIKELY(c != nullptr))
{
if (srv_flush_log_at_trx_commit & 1)
log_write_up_to(c->pending_lsn, true);
ibb_pending_lsn_fifo.record_commit(c);
}
return false;
}
void
ibb_group_commit(THD *thd, handler_binlog_event_group_info *binlog_info)
{
binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr;
if (UNIV_LIKELY(c != nullptr))
{
if (srv_flush_log_at_trx_commit & 1 && c->pending_lsn)
{
/*
Sync the InnoDB redo log durably to disk here for the entire group
commit, so that it will be available for all binlog readers.
*/
log_write_up_to(c->pending_lsn, true);
}
ibb_pending_lsn_fifo.record_commit(c);
}
}
bool
innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last)
{
@@ -3199,6 +3574,7 @@ innodb_reset_binlogs()
/* Prevent any flushing activity while resetting. */
binlog_page_fifo->lock_wait_for_idle();
binlog_page_fifo->reset();
ibb_pending_lsn_fifo.reset();
ibb_file_hash.remove_up_to(last_created_binlog_file_no, lf_pins);
@@ -3240,6 +3616,7 @@ innodb_reset_binlogs()
/* Re-initialize empty binlog state and start the pre-alloc thread. */
innodb_binlog_init_state();
ibb_pending_lsn_fifo.init(0);
binlog_page_fifo->unlock_with_delayed_free();
start_binlog_prealloc_thread();
binlog_sync_initial();

View File

@@ -1582,7 +1582,7 @@ public:
size_t flush_list_requests;
TPOOL_SUPPRESS_TSAN void add_flush_list_requests(size_t size)
{ ut_ad(size); flush_list_requests+= size; }
{ flush_list_requests+= size; }
private:
static constexpr unsigned PAGE_CLEANER_IDLE= 1;
static constexpr unsigned FLUSH_LIST_ACTIVE= 2;

View File

@@ -379,6 +379,10 @@ public:
bool in_record;
} s;
/* Amount of data in file, valid after fetch_current_page(). */
uint64_t cur_end_offset;
/* Length of the currently open file, valid if cur_file_handle != -1. */
uint64_t cur_file_length;
/*
After fetch_current_page(), this points into either cur_block or
page_buffer as appropriate.
@@ -388,10 +392,12 @@ public:
fsp_binlog_page_entry *cur_block;
/* Buffer for reading a page directly from a tablespace file. */
byte *page_buffer;
/* Amount of data in file, valid after fetch_current_page(). */
uint64_t cur_end_offset;
/* Length of the currently open file, valid if cur_file_handle != -1. */
uint64_t cur_file_length;
/*
Points to either binlog_cur_durable_offset, for readers that should not
see binlog data until it has become durable on disk; or
binlog_cur_end_offset otherwise.
*/
std::atomic<uint64_t> * const limit_offset;
/* Open file handle to tablespace file_no, or -1. */
File cur_file_handle;
/*
@@ -400,7 +406,7 @@ public:
*/
bool skipping_partial;
binlog_chunk_reader();
binlog_chunk_reader(std::atomic<uint64_t> *limit_offset_);
void set_page_buf(byte *in_page_buf) { page_buffer= in_page_buf; }
~binlog_chunk_reader();
@@ -451,6 +457,7 @@ public:
/* Release any buffer pool page latch. */
void release(bool release_file_page= false);
bool data_available();
bool is_before_pos(uint64_t file_no, uint64_t offset);
uint64_t current_file_no() { return s.file_no; }
uint64_t current_pos() {
return (s.page_no << srv_page_size_shift) + s.in_page_offset;
@@ -464,11 +471,13 @@ extern ulong ibb_page_size;
extern uint64_t current_binlog_state_interval;
extern mysql_mutex_t active_binlog_mutex;
extern pthread_cond_t active_binlog_cond;
extern mysql_mutex_t binlog_durable_mutex;
extern mysql_cond_t binlog_durable_cond;
extern std::atomic<uint64_t> active_binlog_file_no;
extern uint64_t first_open_binlog_file_no;
extern uint64_t last_created_binlog_file_no;
extern std::atomic<uint64_t> binlog_cur_written_offset[2];
extern std::atomic<uint64_t> binlog_cur_end_offset[2];
extern std::atomic<uint64_t> binlog_cur_durable_offset[4];
extern std::atomic<uint64_t> binlog_cur_end_offset[4];
extern fsp_binlog_page_fifo *binlog_page_fifo;
extern ibb_file_oob_refs ibb_file_hash;

View File

@@ -34,6 +34,7 @@ struct rpl_gtid;
struct handler_binlog_event_group_info;
class handler_binlog_reader;
struct handler_binlog_purge_info;
struct binlog_oob_context;
/*
@@ -128,12 +129,64 @@ struct binlog_header_data {
};
/*
The class pending_lsn_fifo keeps track of pending LSNs - and their
corresponding binlog file_no/offset - that have been mtr-committed, but have
not yet become durable.
Used to delay sending to slaves any data that might be lost in case the
master crashes just after sending.
*/
class pending_lsn_fifo {
static constexpr uint32_t fixed_size_log2= 10;
static constexpr uint32_t fixed_size= (2 << fixed_size_log2);
static constexpr uint32_t mask= (2 << fixed_size_log2) - 1;
public:
struct entry {
lsn_t lsn;
uint64_t file_no;
uint64_t offset;
} fifo[fixed_size];
/*
Set while we are duing a durable sync of the redo log to the LSN that we
are requesting to become durable. Used to avoid multiple threads
needlessly trying to sync the redo log on top of one another.
*/
lsn_t flushing_lsn;
/*
The current file_no that has any durable data. Used to detect when an LSN
moves the current durable end point to the next file, so that the previous
file can then be marked as fully durable.
The value ~0 is used as a marker for "not yet initialized".
*/
uint64_t cur_file_no;
/* The `head' points one past the most recent element. */
uint32_t head;
/* The `tail' points to the earliest element. */
uint32_t tail;
pending_lsn_fifo();
void init(uint64_t start_file_no);
void reset();
bool is_empty() { return head == tail; }
bool is_full() { return head == tail + fixed_size; }
entry &cur_head() { ut_ad(!is_empty()); return fifo[(head - 1) & mask]; }
entry &cur_tail() { ut_ad(!is_empty()); return fifo[tail & mask]; }
void drop_tail() { ut_ad(!is_empty()); ++tail; }
void new_head() { ut_ad(!is_full()); ++head; }
void record_commit(binlog_oob_context *c);
void add_to_fifo(uint64_t lsn, uint64_t file_no, uint64_t offset);
bool process_durable_lsn(lsn_t lsn);
};
#define BINLOG_NAME_BASE "binlog-"
#define BINLOG_NAME_EXT ".ibb"
/* '/' + "binlog-" + (<=20 digits) + '.' + "ibb" + '\0'. */
#define BINLOG_NAME_MAX_LEN 1 + 1 + 7 + 20 + 1 + 3 + 1
extern pending_lsn_fifo ibb_pending_lsn_fifo;
extern uint32_t innodb_binlog_size_in_pages;
extern const char *innodb_binlog_directory;
extern uint32_t binlog_cur_page_no;
@@ -181,16 +234,25 @@ extern bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no,
extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
fsp_binlog_page_entry * &block, uint32_t &page_no,
uint32_t &page_offset, uint64_t file_no);
extern bool innodb_binlog_oob_ordered(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
extern bool innodb_binlog_oob(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
extern void innodb_reset_oob(THD *thd, void **engine_data);
extern void innodb_free_oob(THD *thd, void *engine_data);
extern handler_binlog_reader *innodb_get_binlog_reader();
extern handler_binlog_reader *innodb_get_binlog_reader(bool wait_durable);
extern void ibb_wait_durable_offset(uint64_t file_no, uint64_t wait_offset);
extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no);
extern void innodb_binlog_trx(trx_t *trx, mtr_t *mtr);
extern binlog_oob_context *innodb_binlog_trx(trx_t *trx, mtr_t *mtr);
extern void innodb_binlog_post_commit(mtr_t *mtr, binlog_oob_context *c);
extern bool innobase_binlog_write_direct_ordered
(IO_CACHE *cache, handler_binlog_event_group_info *binlog_info,
const rpl_gtid *gtid);
extern bool innobase_binlog_write_direct
(IO_CACHE *cache, handler_binlog_event_group_info *binlog_info,
const rpl_gtid *gtid);
extern void ibb_group_commit(THD *thd,
handler_binlog_event_group_info *binlog_info);
extern bool innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last);
extern void innodb_binlog_status(uint64_t *out_file_no, uint64_t *out_pos);
extern bool innodb_binlog_get_init_state(rpl_binlog_state_base *out_state);

View File

@@ -482,6 +482,8 @@ extern mysql_pfs_key_t trx_pool_manager_mutex_key;
extern mysql_pfs_key_t lock_wait_mutex_key;
extern mysql_pfs_key_t srv_threads_mutex_key;
extern mysql_pfs_key_t fsp_active_binlog_mutex_key;
extern mysql_pfs_key_t fsp_binlog_durable_mutex_key;
extern mysql_pfs_key_t fsp_binlog_durable_cond_key;
extern mysql_pfs_key_t fsp_purge_binlog_mutex_key;
extern mysql_pfs_key_t fsp_page_fifo_mutex_key;
# endif /* UNIV_PFS_MUTEX */

View File

@@ -1120,6 +1120,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
ut_ad(!read_only);
trx_rseg_t *rseg= rsegs.m_redo.rseg;
trx_undo_t *&undo= rsegs.m_redo.undo;
binlog_oob_context *binlog_ctx= nullptr;
if (UNIV_LIKELY(undo != nullptr))
{
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
@@ -1161,7 +1162,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
trx_sys.assign_new_trx_no(this);
/* Include binlog data in the commit record, if any. */
innodb_binlog_trx(this, mtr);
binlog_ctx= innodb_binlog_trx(this, mtr);
UT_LIST_REMOVE(rseg->undo_list, undo);
/* Change the undo log segment state from TRX_UNDO_ACTIVE, to
@@ -1175,6 +1176,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
else
rseg->release();
mtr->commit();
innodb_binlog_post_commit(mtr, binlog_ctx);
}
/********************************************************************