diff --git a/mysql-test/include/reset_master.inc b/mysql-test/include/reset_master.inc new file mode 100644 index 00000000000..b4036ec31be --- /dev/null +++ b/mysql-test/include/reset_master.inc @@ -0,0 +1,58 @@ +# ==== Purpose ==== +# +# Execute a RESET MASTER on the current connection, first terminating any +# lingering binlog dump threads that might still be sitting idle and would +# block the RESTE MASTER. +# +# Note that any configured IO threads on other servers must be stopped before +# calling this, as RESET MASTER cannot be run while there is a slave connected. +# +# +# ==== Usage ==== +# +# [--let $kill_timeout= NUMBER] +# [--let $reset_master_retries= NUMBER] +# --source include/reset_master.inc +# +# Parameters: +# $kill_timeout +# Maximum number of seconds to wait for dump threads to disappear. +# $reset_master_retries +# Maximum number of times RESET MASTER can get ER_BINLOG_IN_USE before +# giving up. + +--let $include_filename= reset_master.inc +--source include/begin_include_file.inc + +--disable_query_log + +let $_retries= 10; +if ($reset_master_retries) +{ + let $_retries= $reset_master_retries; +} + +let $_success= 0; +let $_i= 0; +while ($_i < $_retries) +{ + inc $_i; + --source include/kill_binlog_dump_threads.inc + --let $errno= 0 + --error 0,ER_BINLOG_IN_USE + RESET MASTER; + if (!$errno) { + let $_success= 1; + let $_i = $_retries; + } +} +if (!$_success) +{ + SHOW FULL PROCESSLIST; + --die Timeout while trying to remove dump threads and run RESET MASTER. +} + +--enable_query_log + +--let $include_filename= reset_master.inc +--source include/end_include_file.inc diff --git a/mysql-test/suite/binlog_in_engine/binlog_flush_purge.result b/mysql-test/suite/binlog_in_engine/binlog_flush_purge.result index c95d3612698..59ebd0c2714 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_flush_purge.result +++ b/mysql-test/suite/binlog_in_engine/binlog_flush_purge.result @@ -1,4 +1,4 @@ -RESET MASTER; +include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB; INSERT INTO t1 VALUES (1); @@ -27,7 +27,7 @@ FLUSH BINARY LOGS; FLUSH BINARY LOGS; SHOW BINLOG EVENTS IN 'binlog-000000.ibb' LIMIT 1; ERROR HY000: Error when executing command SHOW BINLOG EVENTS: error reading event data -RESET MASTER; +include/reset_master.inc SHOW BINARY LOGS; Log_name File_size binlog-000000.ibb 262144 diff --git a/mysql-test/suite/binlog_in_engine/binlog_flush_purge.test b/mysql-test/suite/binlog_in_engine/binlog_flush_purge.test index 16bacce0f66..d91485c738a 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_flush_purge.test +++ b/mysql-test/suite/binlog_in_engine/binlog_flush_purge.test @@ -1,7 +1,7 @@ --source include/have_binlog_format_row.inc --source include/have_innodb_binlog.inc -RESET MASTER; +--source include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB; @@ -51,7 +51,7 @@ EOF eval SHOW BINLOG EVENTS IN '$file' LIMIT 1; -RESET MASTER; +--source include/reset_master.inc --let $binlog_name= binlog-000001.ibb --let $binlog_size= 262144 --source include/wait_for_engine_binlog.inc diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine.result b/mysql-test/suite/binlog_in_engine/binlog_in_engine.result index cacf627cee7..0ad8e66d6d5 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_in_engine.result +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine.result @@ -1,4 +1,4 @@ -RESET MASTER; +include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; INSERT INTO t1 VALUES (1); BEGIN; diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine.test b/mysql-test/suite/binlog_in_engine/binlog_in_engine.test index 8785c3c8202..4adc64d1ede 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_in_engine.test +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine.test @@ -6,7 +6,7 @@ # files and only need to include the one. --source include/have_innodb_binlog.inc -RESET MASTER; +--source include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; --let $gtid_pos= `SELECT @@last_gtid` diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.result b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.result index d7657b333d0..e66ef9e3636 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.result +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.result @@ -1,4 +1,4 @@ -RESET MASTER; +include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; INSERT INTO t1 VALUES (1, 0); INSERT INTO t1 SELECT seq+1000000+100000*0, seq FROM seq_1_to_4000; diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test index 84d7abfd856..d6330c2fb66 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test @@ -6,7 +6,7 @@ # Note: This test also tests the --binlog-directory option by putting it # in binlog_in_engine_restart.opt . -RESET MASTER; +--source include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; INSERT INTO t1 VALUES (1, 0); diff --git a/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.result b/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.result index 8db516ba18b..291dbe4b817 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.result +++ b/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.result @@ -1,4 +1,4 @@ -RESET MASTER; +include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; INSERT INTO t1 VALUES (1, 0); connect con1,localhost,root,,; diff --git a/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.test b/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.test index 9d691400b55..2cee8abcf60 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.test +++ b/mysql-test/suite/binlog_in_engine/binlog_legacy_pos.test @@ -1,7 +1,7 @@ --source include/have_binlog_format_mixed.inc --source include/have_innodb_binlog.inc -RESET MASTER; +--source include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; INSERT INTO t1 VALUES (1, 0); diff --git a/mysql-test/suite/binlog_in_engine/crash_safe_slave-master.opt b/mysql-test/suite/binlog_in_engine/crash_safe_slave-master.opt new file mode 100644 index 00000000000..19ee75da547 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/crash_safe_slave-master.opt @@ -0,0 +1 @@ +--innodb-log-file-mmap=OFF diff --git a/mysql-test/suite/binlog_in_engine/crash_safe_slave.result b/mysql-test/suite/binlog_in_engine/crash_safe_slave.result new file mode 100644 index 00000000000..4288b7b2485 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/crash_safe_slave.result @@ -0,0 +1,25 @@ +include/master-slave.inc +[connection master] +*** Test that slave doesn't get ahead of a non-durable master that crashes. +connection master; +SET GLOBAL innodb_flush_log_at_trx_commit= 0; +CREATE TABLE t1 (a INT PRIMARY KEY, b INT, c TEXT) ENGINE=InnoDB; +SELECT COUNT(*) FROM t1; +COUNT(*) +630 +SET SESSION debug_dbug="+d,crash_dispatch_command_before"; +SELECT 1; +Got one of the listed errors +connection master1; +connection server_1; +connection default; +connection master; +include/rpl_reconnect.inc +SET STATEMENT gtid_domain_id= 1 FOR INSERT INTO t1 VALUES (9, 9, 'extra'); +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +# Asserted this: Row count should match on master and slave (no extra rows on slave) +connection master; +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/binlog_in_engine/crash_safe_slave.test b/mysql-test/suite/binlog_in_engine/crash_safe_slave.test new file mode 100644 index 00000000000..ad768bbfb67 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/crash_safe_slave.test @@ -0,0 +1,90 @@ +--source include/not_embedded.inc +--source include/not_valgrind.inc +--source include/have_debug.inc +--source include/have_binlog_format_row.inc +--source include/master-slave.inc +--source include/have_innodb_binlog.inc + +-- echo *** Test that slave doesn't get ahead of a non-durable master that crashes. +--connection master +--let $old_flatc= `SELECT @@GLOBAL.innodb_flush_log_at_trx_commit` +SET GLOBAL innodb_flush_log_at_trx_commit= 0; + +CREATE TABLE t1 (a INT PRIMARY KEY, b INT, c TEXT) ENGINE=InnoDB; + +# Create a bunch of transactions, when --innodb-flush-log-at-trx-commit=0 +# we should lose some of them in master crash. +# Note though that this is ineffective when running in /dev/shm/ (./mtr --mem). +# Because InnoDB is hard-coded to simulate PMEM in this case and forces +# mmap on the log file even though we have --innodb-log-file-mmap=OFF in our +# .opt file. Then the memory mapping gets updated immediately when the mtr +# commits, and kill -9 cannot lose any transactions. The test will still pass, +# but no transactions are lost on the master so nothing much is tested. + +--disable_query_log +--let loop= 0 +while ($loop < 10) { + eval INSERT INTO t1 VALUES ($loop*1000, 0, ''); + BEGIN; + --let $i= 0 + while ($i < 20) { + eval INSERT INTO t1 VALUES ($loop*1000 + $i*10 + 1, 1, REPEAT('a', 1000)); + eval INSERT INTO t1 VALUES ($loop*1000 + $i*10 + 2, 2, REPEAT('z', 1000)); + eval INSERT INTO t1 VALUES ($loop*1000 + $i*10 + 3, 3, REPEAT('#', 1000)); + inc $i; + } + COMMIT; +# CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; + eval INSERT INTO t1 VALUES ($loop*1000 + 5, 5, 'zyxxy'); +# INSERT INTO t2 VALUES (1, 0); + eval INSERT INTO t1 VALUES ($loop*1000 + 6, 6, 'the quick brown fox'); +# DROP TABLE t2; + inc $loop; +} +--enable_query_log + +# Crash the master +--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +wait-recovery.test +EOF + +SELECT COUNT(*) FROM t1; +SET SESSION debug_dbug="+d,crash_dispatch_command_before"; +--error 2006,2013 +SELECT 1; +--source include/wait_until_disconnected.inc +--connection master1 +--source include/wait_until_disconnected.inc +--connection server_1 +--source include/wait_until_disconnected.inc +--connection default +--source include/wait_until_disconnected.inc + +--connection master +--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +restart +EOF +--let $rpl_server_number= 1 +--source include/rpl_reconnect.inc + +# After the crash, transactions can be lost and the table row count +# may be smaller than before the crash. +--let $master_count= `SELECT COUNT(*) FROM t1` +SET STATEMENT gtid_domain_id= 1 FOR INSERT INTO t1 VALUES (9, 9, 'extra'); +--source include/save_master_gtid.inc + +--connection slave +--source include/sync_with_master_gtid.inc +# The slave should have the same amount of rows as the master (ie. not have +# any extra transactions that were lost on the master by the crash). +--let $slave_count= `SELECT COUNT(*) FROM t1` +--let $assert_text= Row count should match on master and slave (no extra rows on slave) +--let $assert_cond= $master_count + 1 = $slave_count +--source include/rpl_assert.inc + +--connection master +--disable_query_log +eval SET GLOBAL innodb_flush_log_at_trx_commit= $old_flatc; +--enable_query_log +DROP TABLE t1; +--source include/rpl_end.inc diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread.result b/mysql-test/suite/binlog_in_engine/purge_dump_thread.result index 7fd49437319..93fc516af36 100644 --- a/mysql-test/suite/binlog_in_engine/purge_dump_thread.result +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread.result @@ -1,6 +1,5 @@ include/master-slave.inc [connection master] -RESET MASTER; CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; INSERT INTO t1 VALUES (0, 0, 'Start'); *** Test iteration, RESTART=0 diff --git a/mysql-test/suite/binlog_in_engine/purge_dump_thread.test b/mysql-test/suite/binlog_in_engine/purge_dump_thread.test index 94e95440e04..132b0357748 100644 --- a/mysql-test/suite/binlog_in_engine/purge_dump_thread.test +++ b/mysql-test/suite/binlog_in_engine/purge_dump_thread.test @@ -17,8 +17,6 @@ # to test with. The --binlog-cache-size is set to 8k, so more event data than # that causes OOB binlogging. -RESET MASTER; - CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; INSERT INTO t1 VALUES (0, 0, 'Start'); diff --git a/mysql-test/suite/binlog_in_engine/recovery.result b/mysql-test/suite/binlog_in_engine/recovery.result index d703d7540bf..4bb6910ebcf 100644 --- a/mysql-test/suite/binlog_in_engine/recovery.result +++ b/mysql-test/suite/binlog_in_engine/recovery.result @@ -1,4 +1,4 @@ -RESET MASTER; +include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; INSERT INTO t1 VALUES (1); diff --git a/mysql-test/suite/binlog_in_engine/recovery.test b/mysql-test/suite/binlog_in_engine/recovery.test index bf81777e3b6..5d9a2865013 100644 --- a/mysql-test/suite/binlog_in_engine/recovery.test +++ b/mysql-test/suite/binlog_in_engine/recovery.test @@ -5,13 +5,13 @@ --source include/have_innodb_binlog.inc --let $datadir= `SELECT @@datadir` -RESET MASTER; +--source include/reset_master.inc CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; INSERT INTO t1 VALUES (1); --let $no_checkpoint_flush= 1 ---let $no_checkpoint_kill= 1 +--let no_checkpoint_kill= 1 --source ../../suite/innodb/include/no_checkpoint_start.inc --let $file= query_get_value(SHOW MASTER STATUS, File, 1) --let $pos= query_get_value(SHOW MASTER STATUS, Position, 1) diff --git a/mysql-test/suite/binlog_in_engine/recovery_large.result b/mysql-test/suite/binlog_in_engine/recovery_large.result index 2b4dc5d205a..3f19651c9cd 100644 --- a/mysql-test/suite/binlog_in_engine/recovery_large.result +++ b/mysql-test/suite/binlog_in_engine/recovery_large.result @@ -4,7 +4,7 @@ connection slave; include/stop_slave.inc SET GLOBAL gtid_slave_pos= ''; connection master; -RESET MASTER; +include/reset_master.inc DROP TABLE IF EXISTS t1, t2; Warnings: Note 1051 Unknown table 'test.t1,test.t2' diff --git a/mysql-test/suite/binlog_in_engine/recovery_large.test b/mysql-test/suite/binlog_in_engine/recovery_large.test index 21b9432de1f..33ba8f65af8 100644 --- a/mysql-test/suite/binlog_in_engine/recovery_large.test +++ b/mysql-test/suite/binlog_in_engine/recovery_large.test @@ -14,7 +14,7 @@ SET GLOBAL gtid_slave_pos= ''; --connection master --let $datadir= `SELECT @@datadir` -RESET MASTER; +--source include/reset_master.inc DROP TABLE IF EXISTS t1, t2; CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; diff --git a/sql/handler.h b/sql/handler.h index 43bcbaa6df8..ab24f37fde2 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1548,9 +1548,18 @@ struct handlerton /* Optional implementation of binlog in the engine. */ bool (*binlog_init)(size_t binlog_size, const char *directory); /* Binlog an event group that doesn't go through commit_ordered. */ + bool (*binlog_write_direct_ordered)(IO_CACHE *cache, + handler_binlog_event_group_info *binlog_info, + const rpl_gtid *gtid); bool (*binlog_write_direct)(IO_CACHE *cache, handler_binlog_event_group_info *binlog_info, const rpl_gtid *gtid); + /* + Called for the last transaction (only) in a binlog group commit, with + no locks being held. + */ + void (*binlog_group_commit_ordered)(THD *thd, + handler_binlog_event_group_info *binlog_info); /* Binlog parts of large transactions out-of-band, in different chunks in the binlog as the transaction executes. This limits the amount of data that @@ -1558,8 +1567,10 @@ struct handlerton a pointer location that the engine can set to maintain its own context for the out-of-band data. */ - bool (*binlog_oob_data)(THD *thd, const unsigned char *data, size_t data_len, - void **engine_data); + bool (*binlog_oob_data_ordered)(THD *thd, const unsigned char *data, + size_t data_len, void **engine_data); + bool (*binlog_oob_data)(THD *thd, const unsigned char *data, + size_t data_len, void **engine_data); /* Call to reset (for new transactions) the engine_data from binlog_oob_data(). Can also change the pointer to point to different data @@ -1568,8 +1579,15 @@ struct handlerton void (*binlog_oob_reset)(THD *thd, void **engine_data); /* Call to allow engine to release the engine_data from binlog_oob_data(). */ void (*binlog_oob_free)(THD *thd, void *engine_data); - /* Obtain an object to allow reading from the binlog. */ - handler_binlog_reader * (*get_binlog_reader)(); + /* + Obtain an object to allow reading from the binlog. + The boolean argument wait_durable is set to true to require that + transactions be durable before they can be read and returned from the + reader. This is used to make replication crash-safe without requiring + durability; this way, if the master crashes, when it comes back up the + slave will not be ahead and replication will not diverge. + */ + handler_binlog_reader * (*get_binlog_reader)(bool wait_durable); /* Obtain the current position in the binlog. Used to support legacy SHOW MASTER STATUS. @@ -5919,6 +5937,12 @@ public: virtual ~handler_binlog_reader() { }; virtual int read_binlog_data(uchar *buf, uint32_t len) = 0; virtual bool data_available()= 0; + /* + Wait for data to be available to read, for kill, or for timeout. + Returns true in case of timeout reached, false otherwise. + Caller should check for kill before calling again (to avoid busy-loop). + */ + virtual bool wait_available(THD *thd, const struct timespec *abstime) = 0; /* This initializes the current read position to the point of the slave GTID position passed in as POS. It is permissible to start at a position a bit diff --git a/sql/log.cc b/sql/log.cc index 0175ace4d85..1a5a74d64da 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6469,9 +6469,11 @@ binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len) void **engine_ptr= &mngr->engine_binlog_info.engine_ptr; mysql_mutex_assert_not_owner(&LOCK_commit_ordered); mysql_mutex_lock(&LOCK_commit_ordered); - bool res= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data, len, - engine_ptr); + bool res= (*opt_binlog_engine_hton->binlog_oob_data_ordered)(mngr->thd, data, + len, engine_ptr); mysql_mutex_unlock(&LOCK_commit_ordered); + res|= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data, + len, engine_ptr); mngr->engine_binlog_info.out_of_band_offset+= len; cache->pos_in_file+= len; @@ -7511,6 +7513,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) bool events_direct; ulong UNINIT_VAR(prev_binlog_id); uint64 UNINIT_VAR(commit_id); + const rpl_gtid *commit_gtid; DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); /* @@ -7812,12 +7815,19 @@ err: mysql_mutex_unlock(&LOCK_log); mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_unlock(&LOCK_after_binlog_sync); - if ((*opt_binlog_engine_hton->binlog_write_direct) - (file, engine_context, thd->get_last_commit_gtid())) + /* ToDo: Is this correct? How do we guarantee here correct gtid allocation order? By chained LOCK_log and LOCK_commit_ordered ? */ + commit_gtid= thd->get_last_commit_gtid(); + if (unlikely((*opt_binlog_engine_hton->binlog_write_direct_ordered) + (file, engine_context, commit_gtid))) + { + mysql_mutex_unlock(&LOCK_commit_ordered); goto engine_fail; + } mysql_mutex_unlock(&LOCK_commit_ordered); - update_binlog_end_pos(); + if (unlikely((*opt_binlog_engine_hton->binlog_write_direct) + (file, engine_context, commit_gtid))) + goto engine_fail; status_var_add(thd->status_var.binlog_bytes_written, binlog_total_bytes); goto engine_ok; @@ -9351,9 +9361,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) } else { - if (opt_binlog_engine_hton) - update_binlog_end_pos(); - /* If we rotated the binlog, and if we are using the unoptimized thread scheduling where every thread runs its own commit_ordered(), then we @@ -9789,7 +9796,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) DEBUG_SYNC(leader->thd, "commit_after_group_release_commit_ordered"); if (opt_binlog_engine_hton) - update_binlog_end_pos(); + (*opt_binlog_engine_hton->binlog_group_commit_ordered) + (last_in_queue->thd, &last_in_queue->cache_mngr->engine_binlog_info); else if (check_purge) checkpoint_and_purge(binlog_id); @@ -10110,6 +10118,7 @@ int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd, int ret= 0; DBUG_ENTER("wait_for_update_binlog_end_pos"); + DBUG_ASSERT(!opt_binlog_engine_hton); thd_wait_begin(thd, THD_WAIT_BINLOG); mysql_mutex_assert_owner(get_binlog_end_pos_lock()); if (!timeout) diff --git a/sql/log.h b/sql/log.h index 54d4aa691ed..ba45da38254 100644 --- a/sql/log.h +++ b/sql/log.h @@ -968,6 +968,7 @@ public: signal_relay_log_update(); else { + DBUG_ASSERT(!opt_binlog_engine_hton); lock_binlog_end_pos(); binlog_end_pos= my_b_safe_tell(&log_file); signal_bin_log_update(); @@ -976,6 +977,7 @@ public: } void update_binlog_end_pos(my_off_t pos) { + DBUG_ASSERT(!opt_binlog_engine_hton); mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos); lock_binlog_end_pos(); diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 2bcd66517c6..38458924051 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -12292,6 +12292,8 @@ ER_VECTOR_FORMAT_INVALID eng "Invalid vector format at offset: %d for '%-.100s'. Must be a valid JSON array of numbers." ER_PSEUDO_THREAD_ID_OVERWRITE eng "Pseudo thread id should not be modified by the client as it will be overwritten" +ER_BINLOG_IN_USE + eng "Cannot execute RESET MASTER as the binlog is in use by a connected slave or other RESET MASTER or binlog reader. Check SHOW PROCESSLIST for "Binlog Dump" commands and use KILL to stop such readers" ER_CANNOT_INIT_ENGINE_BINLOG_READER eng "Cannot initialize binlog reader from storage engine %s" ER_ENGINE_BINLOG_REQUIRES_GTID diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index c4611c5e80e..c3cd3750556 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -2568,7 +2568,7 @@ static int init_binlog_sender(binlog_send_info *info, /* ToDo: Need more complex logic here. I want to be able to at least switch from legacy to innodb binlog without having to RESET MASTER. So we need to be able to start reading from legacy and then switch over to the binlog in innodb. Also, we might want to pass the init GTID position in so that the binlog reader can find the place to start by itself? But probably still want to allocate the reader here like this. */ if (opt_binlog_engine_hton && !(info->engine_binlog_reader= - (*opt_binlog_engine_hton->get_binlog_reader)())) + (*opt_binlog_engine_hton->get_binlog_reader)(true))) { LEX_CSTRING *engine_name= hton_name(opt_binlog_engine_hton); my_error(ER_CANNOT_INIT_ENGINE_BINLOG_READER, MYF(0), engine_name->str); @@ -3345,8 +3345,6 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo) if (error == LOG_READ_EOF) { - PSI_stage_info old_stage; - /** * check if we should wait for more data */ @@ -3367,23 +3365,15 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo) return 1; } - mysql_bin_log.lock_binlog_end_pos(); - info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(), - mysql_bin_log.get_binlog_end_pos_lock(), - &stage_master_has_sent_all_binlog_to_slave, - &old_stage); - while (!should_stop(info, true) && - !reader->data_available()) + while (!should_stop(info, true) && !reader->data_available()) { //DBUG_ASSERT(!info->heartbeat_period /* ToDo: Implement support for heartbeat events while waiting for more data. */); - int ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL); - if (ret != 0 && ret != ETIMEDOUT && ret != ETIME) + bool ret= reader->wait_available(info->thd, nullptr /* ToDo hearthbeat period. */); + if (ret) { - ret= 1; // error - break; + // ToDo Hearthbeat sending } } - info->thd->EXIT_COND(&old_stage); continue; } @@ -4853,7 +4843,8 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi) DBUG_ASSERT(opt_binlog_engine_hton); DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS); - handler_binlog_reader *reader= (*opt_binlog_engine_hton->get_binlog_reader)(); + handler_binlog_reader *reader= + (*opt_binlog_engine_hton->get_binlog_reader)(false); if (!reader) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); diff --git a/storage/innobase/CMakeLists.txt b/storage/innobase/CMakeLists.txt index 131bc98728d..78b2da8c864 100644 --- a/storage/innobase/CMakeLists.txt +++ b/storage/innobase/CMakeLists.txt @@ -89,7 +89,9 @@ ENDIF() IF (CMAKE_CXX_COMPILER_ID MATCHES "Clang" OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wconversion -Wno-sign-conversion") - SET_SOURCE_FILES_PROPERTIES(fts/fts0pars.cc + # Source file innodb_binlog.cc needs to include server layer stuff that is + # not -Wconversion clean. + SET_SOURCE_FILES_PROPERTIES(fts/fts0pars.cc handler/innodb_binlog.cc PROPERTIES COMPILE_FLAGS -Wno-conversion) ENDIF() diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 955ba3de5a0..0d10fcb9c85 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -2023,6 +2023,7 @@ void mtr_t::write_binlog(bool space_id, uint32_t page_no, m_log.close(end); m_log.push(static_cast(buf), uint32_t(size)); } + m_modifications= true; } /** Write binlog data diff --git a/storage/innobase/fsp/fsp_binlog.cc b/storage/innobase/fsp/fsp_binlog.cc index 2dd4e22b7ec..b8ec1258cc9 100644 --- a/storage/innobase/fsp/fsp_binlog.cc +++ b/storage/innobase/fsp/fsp_binlog.cc @@ -69,6 +69,9 @@ uint64_t current_binlog_state_interval; */ mysql_mutex_t active_binlog_mutex; pthread_cond_t active_binlog_cond; +/* Mutex protecting binlog_cur_durable_offset[] and ibb_pending_lsn_fifo. */ +mysql_mutex_t binlog_durable_mutex; +mysql_cond_t binlog_durable_cond; /* The currently being written binlog tablespace. */ std::atomic active_binlog_file_no; @@ -95,16 +98,21 @@ uint64_t last_created_binlog_file_no; Point at which it is guaranteed that all data has been written out to the binlog file (on the OS level; not necessarily fsync()'ed yet). - Stores the most recent two values, each corresponding to active_binlog_file_no&1. + Stores the most recent four values, each corresponding to + active_binlog_file_no&4. This is so that it can be always valid for both + active and active-1 (active-2 is always durable, as we make the entire binlog + file N durable before pre-allocating N+2). Just before active moves to the + next file_no, we can set the value for active+1, leaving active and active-1 + still valid. (Only 3 entries are needed, but we use four to be able to use + bit-wise and instead of modulo-3). */ -/* ToDo: maintain this offset value as up to where data has been written out to the OS. Needs to be binary-searched in current binlog file at server restart; which is also a reason why it might not be a multiple of the page size. */ -std::atomic binlog_cur_written_offset[2]; +std::atomic binlog_cur_durable_offset[4]; /* - Offset of last valid byte of data in most recent 2 binlog files. + Offset of last valid byte of data in most recent 4 binlog files. A value of ~0 means that file is not opened as a tablespace (and data is valid until the end of the file). */ -std::atomic binlog_cur_end_offset[2]; +std::atomic binlog_cur_end_offset[4]; fsp_binlog_page_fifo *binlog_page_fifo; @@ -1170,6 +1178,10 @@ fsp_binlog_init() { mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr); pthread_cond_init(&active_binlog_cond, nullptr); + mysql_mutex_init(fsp_binlog_durable_mutex_key, &binlog_durable_mutex, nullptr); + mysql_cond_init(fsp_binlog_durable_cond_key, &binlog_durable_cond, nullptr); + mysql_mutex_record_order(&binlog_durable_mutex, &active_binlog_mutex); + ibb_file_hash.init(); binlog_page_fifo= new fsp_binlog_page_fifo(); binlog_page_fifo->start_flush_thread(); @@ -1182,6 +1194,8 @@ fsp_binlog_shutdown() binlog_page_fifo->stop_flush_thread(); delete binlog_page_fifo; ibb_file_hash.destroy(); + mysql_cond_destroy(&binlog_durable_cond); + mysql_mutex_destroy(&binlog_durable_mutex); pthread_cond_destroy(&active_binlog_cond); mysql_mutex_destroy(&active_binlog_mutex); } @@ -1194,6 +1208,8 @@ dberr_t fsp_binlog_tablespace_close(uint64_t file_no) { binlog_page_fifo->flush_up_to(file_no, ~(uint32_t)0); + uint32_t size= + binlog_page_fifo->size_in_pages(file_no) << ibb_page_size_shift; /* release_tablespace() will fdatasync() the file first. */ binlog_page_fifo->release_tablespace(file_no); /* @@ -1202,6 +1218,23 @@ fsp_binlog_tablespace_close(uint64_t file_no) recovery, at most from the latest two existing files. */ log_buffer_flush_to_disk(true); + uint64_t end_offset= + binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed); + binlog_cur_end_offset[file_no & 3].store(size, std::memory_order_relaxed); + /* + Wait for the last record in the file to be marked durably synced to the + (redo) log. We already ensured that the record is durable with the above + call to log_buffer_flush_to_disk(); this way, we ensure that the update + of binlog_cur_durable_offset[] happens correctly through the + ibb_pending_lsn_fifo, so that the current durable position will be + consistent with a recorded LSN, and a reader will not see EOF in the + middle of a record. + */ + uint64_t dur_offset= + binlog_cur_durable_offset[file_no & 3].load(std:: memory_order_relaxed); + if (dur_offset < end_offset) + ibb_wait_durable_offset(file_no, end_offset); + return DB_SUCCESS; } @@ -1368,8 +1401,8 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type, // ToDo: assert that a single write doesn't span more than two binlog files. ++file_no; file_size_in_pages= binlog_page_fifo->size_in_pages(file_no); - binlog_cur_written_offset[file_no & 1].store(0, std::memory_order_relaxed); - binlog_cur_end_offset[file_no & 1].store(0, std::memory_order_relaxed); + binlog_cur_durable_offset[file_no & 3].store(0, std::memory_order_relaxed); + binlog_cur_end_offset[file_no & 3].store(0, std::memory_order_relaxed); pthread_cond_signal(&active_binlog_cond); mysql_mutex_unlock(&active_binlog_mutex); binlog_cur_page_no= page_no= 0; @@ -1533,16 +1566,19 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type, binlog_page_fifo->release_page_mtr(block, mtr); binlog_cur_page_no= page_no; binlog_cur_page_offset= page_offset; - binlog_cur_end_offset[file_no & 1].store(((uint64_t)page_no << page_size_shift) + page_offset, - std::memory_order_relaxed); + binlog_cur_end_offset[file_no & 3].store + (((uint64_t)page_no << page_size_shift) + page_offset, + std::memory_order_relaxed); if (UNIV_UNLIKELY(pending_prev_end_offset != 0)) { + mysql_mutex_lock(&binlog_durable_mutex); mysql_mutex_lock(&active_binlog_mutex); - binlog_cur_end_offset[(file_no-1) & 1].store(pending_prev_end_offset, + binlog_cur_end_offset[(file_no-1) & 3].store(pending_prev_end_offset, std::memory_order_relaxed); active_binlog_file_no.store(file_no, std::memory_order_release); pthread_cond_signal(&active_binlog_cond); mysql_mutex_unlock(&active_binlog_mutex); + mysql_mutex_unlock(&binlog_durable_mutex); } return {start_file_no, start_offset}; } @@ -1636,15 +1672,18 @@ fsp_binlog_flush() mtr.commit(); lf_hash_put_pins(lf_pins); log_buffer_flush_to_disk(srv_flush_log_at_trx_commit & 1); + ibb_pending_lsn_fifo.add_to_fifo(mtr.commit_lsn(), file_no+1, + binlog_cur_end_offset[(file_no + 1) & 3].load(std::memory_order_relaxed)); return false; } -binlog_chunk_reader::binlog_chunk_reader() +binlog_chunk_reader::binlog_chunk_reader(std::atomic *limit_offset_) : s { 0, 0, 0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }, page_ptr(0), cur_block(0), page_buffer(nullptr), - cur_file_handle((File)-1), skipping_partial(false) + limit_offset(limit_offset_), cur_file_handle((File)-1), + skipping_partial(false) { /* Nothing else. */ } @@ -1691,8 +1730,8 @@ binlog_chunk_reader::fetch_current_page() uint64_t offset= (s.page_no << ibb_page_size_shift) | s.in_page_offset; uint64_t active= active2; uint64_t end_offset= - binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire); - if (s.file_no > active) + limit_offset[s.file_no & 3].load(std::memory_order_acquire); + if (s.file_no > active || UNIV_UNLIKELY(active == ~(uint64_t)0)) { ut_ad(s.page_no == 1); ut_ad(s.in_page_offset == 0); @@ -1705,15 +1744,8 @@ binlog_chunk_reader::fetch_current_page() if (s.file_no + 1 >= active) { /* Check if we should read from the buffer pool or from the file. */ - if (end_offset != ~(uint64_t)0 && offset < end_offset) { - /* - ToDo: Should we keep track of the last block read and use it as a - hint? Will be mainly useful when reading the partially written active - page at the current end of the active binlog, which might be a common - case. - */ + if (end_offset != ~(uint64_t)0 && offset < end_offset) block= binlog_page_fifo->get_page(s.file_no, s.page_no); - } active2= active_binlog_file_no.load(std::memory_order_acquire); if (UNIV_UNLIKELY(active2 != active)) { /* @@ -1765,18 +1797,13 @@ binlog_chunk_reader::fetch_current_page() } cur_file_length= stat_buf.st_size; } - if (s.file_no == active) + if (s.file_no + 1 >= active) cur_end_offset= end_offset; else cur_end_offset= cur_file_length; if (offset >= cur_file_length) { /* End of this file, move to the next one. */ - /* - ToDo: Should also obey binlog_cur_written_offset[], once we start - actually maintaining that, to save unnecessary buffer pool - lookup. - */ goto_next_file: if (cur_file_handle >= (File)0) { @@ -1845,8 +1872,7 @@ read_more_data: "Replace static_assert with code from above comment"); /* Check for end-of-file. */ - if (cur_end_offset == ~(uint64_t)0 || - (s.page_no << ibb_page_size_shift) + s.in_page_offset >= cur_end_offset) + if ((s.page_no << ibb_page_size_shift) + s.in_page_offset >= cur_end_offset) return sofar; if (s.in_page_offset >= ibb_page_size - (BINLOG_PAGE_DATA_END + 3) || @@ -2075,23 +2101,39 @@ bool binlog_chunk_reader::data_available() if (!end_of_record()) return true; uint64_t active= active_binlog_file_no.load(std::memory_order_acquire); - if (active != s.file_no) + if (UNIV_UNLIKELY(active == ~(uint64_t)0)) + return false; + uint64_t end_offset; + for (;;) { - ut_ad(active > s.file_no || (s.page_no == 1 && s.in_page_offset == 0)); - return active > s.file_no; + if (active > s.file_no + 1) + return true; + end_offset= limit_offset[s.file_no & 3].load(std::memory_order_acquire); + uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); + if (active2 == active) + break; + /* Active moved while we were checking, try again. */ + active= active2; } - uint64_t end_offset= - binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire); - uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); - if (active2 != active) - return true; // Active moved while we were checking - if (end_offset == ~(uint64_t)0) - return false; // Nothing in this binlog file yet uint64_t offset= (s.page_no << ibb_page_size_shift) | s.in_page_offset; if (offset < end_offset) return true; - ut_ad(s.file_no == active2); - ut_ad(offset == end_offset); + ut_ad(s.file_no + 1 == active || s.file_no == active); + ut_ad(offset == end_offset || (offset == ibb_page_size && end_offset == 0)); + return false; +} + + +bool +binlog_chunk_reader::is_before_pos(uint64_t file_no, uint64_t offset) +{ + if (s.file_no < file_no) + return true; + if (s.file_no > file_no) + return false; + uint64_t own_offset= (s.page_no << ibb_page_size_shift) | s.in_page_offset; + if (own_offset < offset) + return true; return false; } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 521f2f11b64..eb05b251dad 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -549,6 +549,8 @@ mysql_pfs_key_t trx_sys_mutex_key; mysql_pfs_key_t srv_threads_mutex_key; mysql_pfs_key_t tpool_cache_mutex_key; mysql_pfs_key_t fsp_active_binlog_mutex_key; +mysql_pfs_key_t fsp_binlog_durable_mutex_key; +mysql_pfs_key_t fsp_binlog_durable_cond_key; mysql_pfs_key_t fsp_purge_binlog_mutex_key; mysql_pfs_key_t fsp_page_fifo_mutex_key; @@ -4130,7 +4132,11 @@ static int innodb_init(void* p) innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs; innobase_hton->binlog_init= innodb_binlog_init; + innobase_hton->binlog_write_direct_ordered= + innobase_binlog_write_direct_ordered; innobase_hton->binlog_write_direct= innobase_binlog_write_direct; + innobase_hton->binlog_group_commit_ordered= ibb_group_commit; + innobase_hton->binlog_oob_data_ordered= innodb_binlog_oob_ordered; innobase_hton->binlog_oob_data= innodb_binlog_oob; innobase_hton->binlog_oob_reset= innodb_reset_oob; innobase_hton->binlog_oob_free= innodb_free_oob; diff --git a/storage/innobase/handler/innodb_binlog.cc b/storage/innobase/handler/innodb_binlog.cc index 1f4872cbd9b..9e3d77e7793 100644 --- a/storage/innobase/handler/innodb_binlog.cc +++ b/storage/innobase/handler/innodb_binlog.cc @@ -21,6 +21,14 @@ this program; if not, write to the Free Software Foundation, Inc., InnoDB implementation of binlog. *******************************************************/ +/* + Need MYSQL_SERVER defined to be able to use THD_ENTER_COND from sql_class.h + to make my_cond_wait() killable. +*/ +#define MYSQL_SERVER 1 +#include +#include "sql_class.h" + #include "ut0compr_int.h" #include "innodb_binlog.h" #include "mtr0log.h" @@ -37,6 +45,7 @@ InnoDB implementation of binlog. static int innodb_binlog_inited= 0; +pending_lsn_fifo ibb_pending_lsn_fifo; uint32_t innodb_binlog_size_in_pages; const char *innodb_binlog_directory; @@ -118,7 +127,15 @@ struct binlog_oob_context { bool binlog_node(uint32_t node, uint64_t new_idx, uint32_t left_node, uint32_t right_node, - chunk_data_oob *oob_data, LF_PINS *pins); + chunk_data_oob *oob_data, LF_PINS *pins, mtr_t *mtr); + + /* + Pending binlog write for the ibb_pending_lsn_fifo. + pending_file_no is ~0 when no write is pending. + */ + uint64_t pending_file_no; + uint64_t pending_offset; + lsn_t pending_lsn; uint64_t first_node_file_no; uint64_t first_node_offset; @@ -223,12 +240,12 @@ class ha_innodb_binlog_reader : public handler_binlog_reader { innodb_binlog_oob_reader oob_reader; binlog_chunk_reader::saved_position saved_commit_pos; - /* Buffer to hold a page read directly from the binlog file. */ - uchar *page_buf; /* Out-of-band data to read after commit record, if any. */ uint64_t oob_count; uint64_t oob_last_file_no; uint64_t oob_last_offset; + /* Buffer to hold a page read directly from the binlog file. */ + uchar *page_buf; /* Keep track of pending bytes in the rd_buf. */ uint32_t rd_buf_len; uint32_t rd_buf_sofar; @@ -241,13 +258,16 @@ private: int read_data(uchar *buf, uint32_t len); public: - ha_innodb_binlog_reader(uint64_t file_no= 0, uint64_t offset= 0); + ha_innodb_binlog_reader(bool wait_durable, uint64_t file_no= 0, + uint64_t offset= 0); ~ha_innodb_binlog_reader(); virtual int read_binlog_data(uchar *buf, uint32_t len) final; virtual bool data_available() final; + virtual bool wait_available(THD *thd, const struct timespec *abstime) final; virtual int init_gtid_pos(slave_connection_state *pos, rpl_binlog_state_base *state) final; virtual int init_legacy_pos(const char *filename, ulonglong offset) final; + void seek_internal(uint64_t file_no, uint64_t offset); }; @@ -1160,8 +1180,11 @@ static void innodb_binlog_init_state() { first_open_binlog_file_no= ~(uint64_t)0; - binlog_cur_end_offset[0].store(~(uint64_t)0, std::memory_order_relaxed); - binlog_cur_end_offset[1].store(~(uint64_t)0, std::memory_order_relaxed); + for (uint32_t i= 0; i < 4; ++i) + { + binlog_cur_end_offset[i].store(~(uint64_t)0, std::memory_order_relaxed); + binlog_cur_durable_offset[i].store(~(uint64_t)0, std::memory_order_relaxed); + } last_created_binlog_file_no= ~(uint64_t)0; earliest_binlog_file_no= ~(uint64_t)0; total_binlog_used_size= 0; @@ -1211,11 +1234,14 @@ binlog_sync_initial() ut_a(lf_pins); mtr.start(); fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER, lf_pins); + uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed); mtr.commit(); lf_hash_put_pins(lf_pins); log_buffer_flush_to_disk(true); binlog_page_fifo->flush_up_to(0, 0); binlog_page_fifo->do_fdatasync(0); + ibb_pending_lsn_fifo.add_to_fifo(mtr.commit_lsn(), file_no, + binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed)); } @@ -1380,7 +1406,7 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, { const uint32_t page_size= (uint32_t)ibb_page_size; const uint32_t page_size_shift= (uint32_t)ibb_page_size_shift; - const uint32_t idx= file_no & 1; + const uint32_t idx= file_no & 3; char file_name[OS_FILE_MAX_PATH]; uint32_t p_0, p_1, p_2, last_nonempty; byte *p, *page_end; @@ -1415,7 +1441,7 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, if (out_header_data->is_empty) { ret= fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr); - binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed); + binlog_cur_durable_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed); return (ret ? -1 : 0); } @@ -1498,12 +1524,22 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, ret= fsp_binlog_open(file_name, fh, file_no, file_size, *out_page_no, partial_page); uint64_t pos= (*out_page_no << page_size_shift) | *out_pos_in_page; - binlog_cur_written_offset[idx].store(pos, std::memory_order_relaxed); + binlog_cur_durable_offset[idx].store(pos, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(pos, std::memory_order_relaxed); return ret ? -1 : 1; } +static void +binlog_discover_init(uint64_t file_no, uint64_t interval) +{ + active_binlog_file_no.store(file_no, std::memory_order_release); + ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); + current_binlog_state_interval= interval; + ibb_pending_lsn_fifo.init(file_no); +} + + /* Returns: -1 error @@ -1546,9 +1582,7 @@ innodb_binlog_discover() file_no= binlog_files.last_file_no; if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) return -1; - active_binlog_file_no.store(file_no, std::memory_order_release); - ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); - current_binlog_state_interval= innodb_binlog_state_interval; + binlog_discover_init(file_no, innodb_binlog_state_interval); sql_print_warning("Binlog number %llu could no be opened. Starting a new " "binlog file from number %llu", binlog_files.last_file_no, (file_no + 1)); @@ -1561,9 +1595,7 @@ innodb_binlog_discover() if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no, header.xa_ref_file_no)) return -1; - active_binlog_file_no.store(file_no, std::memory_order_release); - ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); - current_binlog_state_interval= header.diff_state_interval; + binlog_discover_init(file_no, header.diff_state_interval); binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -1584,9 +1616,7 @@ innodb_binlog_discover() file_no= binlog_files.last_file_no; if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) return -1; - active_binlog_file_no.store(file_no, std::memory_order_release); - ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); - current_binlog_state_interval= innodb_binlog_state_interval; + binlog_discover_init(file_no, innodb_binlog_state_interval); binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; sql_print_warning("Binlog number %llu could not be opened, starting " @@ -1598,9 +1628,7 @@ innodb_binlog_discover() if (ibb_record_in_file_hash(file_no, header.oob_ref_file_no, header.xa_ref_file_no)) return -1; - active_binlog_file_no.store(file_no, std::memory_order_release); - ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); - current_binlog_state_interval= header.diff_state_interval; + binlog_discover_init(file_no, header.diff_state_interval); binlog_cur_page_no= prev_page_no; binlog_cur_page_offset= prev_pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -1614,9 +1642,7 @@ innodb_binlog_discover() file_no= binlog_files.last_file_no; if (ibb_record_in_file_hash(file_no, ~(uint64_t)0, ~(uint64_t)0)) return -1; - active_binlog_file_no.store(file_no, std::memory_order_release); - ibb_file_hash.earliest_oob_ref.store(file_no, std::memory_order_relaxed); - current_binlog_state_interval= innodb_binlog_state_interval; + binlog_discover_init(file_no, innodb_binlog_state_interval); binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -1629,6 +1655,7 @@ innodb_binlog_discover() earliest_binlog_file_no= 0; ibb_file_hash.earliest_oob_ref.store(0, std::memory_order_relaxed); total_binlog_used_size= 0; + ibb_pending_lsn_fifo.init(0); current_binlog_state_interval= innodb_binlog_state_interval; ib::info() << "Starting a new binlog from file number " << file_no << "."; return 0; @@ -1721,6 +1748,10 @@ innodb_binlog_prealloc_thread() /* If we created the initial tablespace file, make it the active one. */ ut_ad(active < ~(uint64_t)0 || last_created == 0); if (active == ~(uint64_t)0) { + binlog_cur_end_offset[last_created & 3]. + store(0, std::memory_order_release); + binlog_cur_durable_offset[last_created & 3] + .store(0, std::memory_order_release); active_binlog_file_no.store(last_created, std::memory_order_relaxed); ibb_file_hash.earliest_oob_ref.store(last_created, std::memory_order_relaxed); @@ -1742,8 +1773,6 @@ innodb_binlog_prealloc_thread() fsp_binlog_tablespace_close(active - 1); mysql_mutex_lock(&active_binlog_mutex); first_open_binlog_file_no= first_open + 1; - binlog_cur_end_offset[first_open & 1].store(~(uint64_t)0, - std::memory_order_relaxed); continue; /* Re-start loop after releasing/reacquiring mutex. */ } @@ -2089,7 +2118,8 @@ binlog_state_recover() } my_close(file, MYF(0)); - ha_innodb_binlog_reader reader(active, page_no << ibb_page_size_shift); + ha_innodb_binlog_reader reader(false, active, + page_no << ibb_page_size_shift); return binlog_recover_gtid_state(&state, &reader); } @@ -2110,6 +2140,7 @@ alloc_oob_context(uint32 list_length= 10) ut_free(c); return nullptr; } + c->pending_file_no= ~(uint64_t)0; c->node_list_alloc_len= list_length; c->node_list_len= 0; c->pending_refcount= false; @@ -2132,12 +2163,17 @@ innodb_binlog_write_cache(IO_CACHE *cache, chunk_data_cache chunk_data(cache, binlog_info); fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT, c->lf_pins); + uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed); + c->pending_file_no= file_no; + c->pending_offset= + binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed); } static inline void reset_oob_context(binlog_oob_context *c) { + c->pending_file_no= ~(uint64_t)0; if (c->pending_refcount) { ibb_file_hash.oob_ref_dec(c->first_node_file_no, c->lf_pins); @@ -2237,8 +2273,8 @@ ensure_oob_context(void **engine_data, uint32_t needed_len) are purged. */ bool -innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, - void **engine_data) +innodb_binlog_oob_ordered(THD *thd, const unsigned char *data, size_t data_len, + void **engine_data) { binlog_oob_context *c= (binlog_oob_context *)*engine_data; if (!c) @@ -2246,6 +2282,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, if (UNIV_UNLIKELY(!c)) return true; + mtr_t mtr; uint32_t i= c->node_list_len; uint64_t new_idx= i==0 ? 0 : c->node_list[i-1].node_index + 1; if (i >= 2 && c->node_list[i-2].height == c->node_list[i-1].height) @@ -2256,7 +2293,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, c->node_list[i-2].file_no, c->node_list[i-2].offset, c->node_list[i-1].file_no, c->node_list[i-1].offset, (byte *)data, data_len); - if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data, c->lf_pins)) + if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data, c->lf_pins, &mtr)) return true; c->node_list_len= i - 1; } @@ -2271,7 +2308,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, 0, 0, /* NULL left child signifies a leaf */ c->node_list[i-1].file_no, c->node_list[i-1].offset, (byte *)data, data_len); - if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data, c->lf_pins)) + if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data, c->lf_pins, &mtr)) return true; c->node_list_len= i + 1; } @@ -2281,7 +2318,7 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, binlog_oob_context::chunk_data_oob oob_data (new_idx, 0, 0, 0, 0, (byte *)data, data_len); if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data, - c->lf_pins)) + c->lf_pins, &mtr)) return true; c->first_node_file_no= c->node_list[i].file_no; c->first_node_offset= c->node_list[i].offset; @@ -2290,6 +2327,22 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, ibb_file_hash.oob_ref_inc(c->first_node_file_no, c->lf_pins); } + uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed); + c->pending_file_no= file_no; + c->pending_offset= + binlog_cur_end_offset[file_no & 3].load(std::memory_order_relaxed); + innodb_binlog_post_commit(&mtr, c); + return false; +} + + +bool +innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, + void **engine_data) +{ + binlog_oob_context *c= (binlog_oob_context *)*engine_data; + if (UNIV_LIKELY(c != nullptr)) + ibb_pending_lsn_fifo.record_commit(c); return false; } @@ -2302,15 +2355,15 @@ innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, bool binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx, uint32_t left_node, uint32_t right_node, - chunk_data_oob *oob_data, LF_PINS *pins) + chunk_data_oob *oob_data, LF_PINS *pins, + mtr_t *mtr) { uint32_t new_height= left_node == right_node ? 1 : 1 + node_list[left_node].height; - mtr_t mtr; - mtr.start(); + mtr->start(); std::pair new_file_no_offset= - fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA, pins); - mtr.commit(); + fsp_binlog_write_rec(oob_data, mtr, FSP_BINLOG_TYPE_OOB_DATA, pins); + mtr->commit(); node_list[node].file_no= new_file_no_offset.first; node_list[node].offset= new_file_no_offset.second; node_list[node].node_index= new_idx; @@ -2560,9 +2613,12 @@ again: } -ha_innodb_binlog_reader::ha_innodb_binlog_reader(uint64_t file_no, +ha_innodb_binlog_reader::ha_innodb_binlog_reader(bool wait_durable, + uint64_t file_no, uint64_t offset) - : rd_buf_len(0), rd_buf_sofar(0), state(ST_read_next_event_group) + : chunk_rd(wait_durable ? + binlog_cur_durable_offset : binlog_cur_end_offset), + rd_buf_len(0), rd_buf_sofar(0), state(ST_read_next_event_group) { page_buf= (uchar *)ut_malloc(ibb_page_size, mem_key_binlog); chunk_rd.set_page_buf(page_buf); @@ -2738,10 +2794,108 @@ ha_innodb_binlog_reader::data_available() } -handler_binlog_reader * -innodb_get_binlog_reader() +bool +ha_innodb_binlog_reader::wait_available(THD *thd, + const struct timespec *abstime) { - return new ha_innodb_binlog_reader(); + bool is_timeout= false; + bool pending_sync_lsn= 0; + bool did_enter_cond= false; + PSI_stage_info old_stage; + + if (data_available()) + return false; + + mysql_mutex_lock(&binlog_durable_mutex); + for (;;) + { + /* Process anything that has become durable since we last looked. */ + lsn_t durable_lsn= log_sys.get_flushed_lsn(std::memory_order_relaxed); + ibb_pending_lsn_fifo.process_durable_lsn(durable_lsn); + + /* Check if there is anything more pending to be made durable. */ + if (!ibb_pending_lsn_fifo.is_empty()) + { + pending_lsn_fifo::entry &e= ibb_pending_lsn_fifo.cur_head(); + if (durable_lsn < e.lsn) + pending_sync_lsn= e.lsn; + } + + /* + Check if there is data available for us now. + As we are holding binlog_durable_mutex, active_binlog_file_no cannot + move during this check. + */ + uint64_t active= active_binlog_file_no.load(std::memory_order_relaxed); + uint64_t durable_offset= + binlog_cur_durable_offset[active & 3].load(std::memory_order_relaxed); + if (chunk_rd.is_before_pos(active, durable_offset)) + break; + + if (pending_sync_lsn != 0 && ibb_pending_lsn_fifo.flushing_lsn == 0) + { + /* + There is no data available for us now, but there is data that will be + available when the InnoDB redo log has been durably flushed to disk. + So now we will do such a sync (unless another thread is already doing + it), so we can proceed getting more data out. + */ + ibb_pending_lsn_fifo.flushing_lsn= pending_sync_lsn; + mysql_mutex_unlock(&binlog_durable_mutex); + log_write_up_to(pending_sync_lsn, true); + mysql_mutex_lock(&binlog_durable_mutex); + ibb_pending_lsn_fifo.flushing_lsn= pending_sync_lsn= 0; + /* Need to loop back to repeat all checks, after releasing the mutex. */ + continue; + } + + if (thd && thd_kill_level(thd)) + break; + + if (thd && !did_enter_cond) + { + THD_ENTER_COND(thd, &binlog_durable_cond, &binlog_durable_mutex, + &stage_master_has_sent_all_binlog_to_slave, &old_stage); + did_enter_cond= true; + } + if (abstime) + { + int res= mysql_cond_timedwait(&binlog_durable_cond, + &binlog_durable_mutex, + abstime); + if (res == ETIMEDOUT) + { + is_timeout= true; + break; + } + } + else + mysql_cond_wait(&binlog_durable_cond, &binlog_durable_mutex); + } + /* + If there is pending binlog data to durably sync to the redo log, but we + did not do this sync ourselves, then signal another thread (if any) to + wakeup and sync. This is necessary to not lose the sync wakeup signal. + + (We use wake-one rather than wake-all for signalling a pending redo log + sync to avoid wakeup-storm). + */ + if (pending_sync_lsn != 0) + mysql_cond_signal(&binlog_durable_cond); + + if (did_enter_cond) + THD_EXIT_COND(thd, &old_stage); + else + mysql_mutex_unlock(&binlog_durable_mutex); + + return is_timeout; +} + + +handler_binlog_reader * +innodb_get_binlog_reader(bool wait_durable) +{ + return new ha_innodb_binlog_reader(wait_durable); } @@ -2786,7 +2940,7 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, { uint64_t active= active2; uint64_t end_offset= - binlog_cur_end_offset[file_no&1].load(std::memory_order_acquire); + binlog_cur_end_offset[file_no & 3].load(std::memory_order_acquire); fsp_binlog_page_entry *block; if (file_no + 1 >= active && @@ -2924,7 +3078,7 @@ gtid_search::find_gtid_pos(slave_connection_state *pos, return -1; } { - binlog_chunk_reader chunk_reader; + binlog_chunk_reader chunk_reader(binlog_cur_end_offset); chunk_reader.set_page_buf(page_buffer); chunk_reader.seek(file_no, 0); err= chunk_reader.get_file_header(&header); @@ -3076,6 +3230,178 @@ ha_innodb_binlog_reader::init_legacy_pos(const char *filename, ulonglong offset) } +void +ha_innodb_binlog_reader::seek_internal(uint64_t file_no, uint64_t offset) +{ + chunk_rd.seek(file_no, offset); + chunk_rd.skip_partial(true); + cur_file_no= chunk_rd.current_file_no(); + cur_file_pos= chunk_rd.current_pos(); +} + + +void +ibb_wait_durable_offset(uint64_t file_no, uint64_t wait_offset) +{ + uint64_t dur_offset= + binlog_cur_durable_offset[file_no & 3].load(std:: memory_order_relaxed); + ha_innodb_binlog_reader reader(true, file_no, dur_offset); + for (;;) + { + reader.wait_available(nullptr, nullptr); + dur_offset= + binlog_cur_durable_offset[file_no & 3].load(std:: memory_order_relaxed); + if (dur_offset >= wait_offset) + break; + reader.seek_internal(file_no, dur_offset); + } +} + + +pending_lsn_fifo::pending_lsn_fifo() + : flushing_lsn(0), cur_file_no(~(uint64_t)0), head(0), tail(0) +{ +} + + +void +pending_lsn_fifo::init(uint64_t start_file_no) +{ + mysql_mutex_lock(&binlog_durable_mutex); + ut_ad(cur_file_no == ~(uint64_t)0); + cur_file_no= start_file_no; + mysql_mutex_unlock(&binlog_durable_mutex); +} + + +void +pending_lsn_fifo::reset() +{ + mysql_mutex_lock(&binlog_durable_mutex); + cur_file_no= ~(uint64_t)0; + mysql_mutex_unlock(&binlog_durable_mutex); +} + + +bool +pending_lsn_fifo::process_durable_lsn(lsn_t lsn) +{ + mysql_mutex_assert_owner(&binlog_durable_mutex); + ut_ad(cur_file_no != ~(uint64_t)0); + + entry *got= nullptr; + for (;;) + { + if (is_empty()) + break; + entry &e= cur_tail(); + if (lsn < e.lsn) + break; + got= &e; + drop_tail(); + } + if (got) + { + uint64_t active= active_binlog_file_no.load(std::memory_order_relaxed); + if (got->file_no + 1 >= active) + binlog_cur_durable_offset[got->file_no & 3].store + (got->offset, std::memory_order_relaxed); + /* + If we moved the durable point to the next file_no, mark the prior + file_no as now fully durable. + Since we only ever have at most two binlog tablespaces open, and since + we make file_no=N fully durable (by calling into this function) before + pre-allocating N+2, we can only ever move ahead one file_no at a time + here. + */ + if (cur_file_no != got->file_no) + { + ut_ad(got->file_no == cur_file_no + 1); + binlog_cur_durable_offset[cur_file_no & 3].store( + binlog_cur_end_offset[cur_file_no & 3].load(std::memory_order_relaxed), + std::memory_order_relaxed); + cur_file_no= got->file_no; + } + mysql_cond_broadcast(&binlog_durable_cond); + return true; + } + return false; +} + + +/* + After a binlog commit, put the LSN and the corresponding binlog position + into the ibb_pending_lsn_fifo. We do this here (rather than immediately in + innodb_binlog_post_commit()), so that we can delay it until we are no longer + holding more critical locks that could block other writers. As we will be + contending with readers here on binlog_durable_mutex. +*/ +void +pending_lsn_fifo::record_commit(binlog_oob_context *c) +{ + uint64_t pending_file_no= c->pending_file_no; + if (pending_file_no == ~(uint64_t)0) + return; + c->pending_file_no= ~(uint64_t)0; + lsn_t pending_lsn= c->pending_lsn; + uint64_t pending_offset= c->pending_offset; + add_to_fifo(pending_lsn, pending_file_no, pending_offset); +} + + +void +pending_lsn_fifo::add_to_fifo(uint64_t lsn, uint64_t file_no, uint64_t offset) +{ + mysql_mutex_lock(&binlog_durable_mutex); + /* + The record_commit() operation is done outside of critical locks for + scalabitily, so can occur out-of-order. So only insert the new entry if + it is newer than any previously inserted. + */ + if (is_empty() || lsn > cur_tail().lsn) + { + if (is_full()) + { + /* + When the fifo is full, we just overwrite the head with a newer LSN. + This way, whenever _some_ LSN gets synced durably to disk, we will + always be able to make some progress and clear some fifo entries. And + when this latest LSN gets eventually synced, any overwritten entry + will progress as well. + */ + } + else + { + /* + Insert a new head. + Note that we make the fifo size a power-of-two (2 <mysql_thd) - return; + return nullptr; innodb_binlog_status(&file_no, &pos); binlog_get_cache(trx->mysql_thd, file_no, pos, &cache, &binlog_info, >id); if (UNIV_LIKELY(binlog_info != nullptr) && UNIV_LIKELY(binlog_info->gtid_offset > 0)) { binlog_diff_state.update_nolock(gtid); innodb_binlog_write_cache(cache, binlog_info, mtr); + return (binlog_oob_context *)binlog_info->engine_ptr; + } + return nullptr; +} + + +void +innodb_binlog_post_commit(mtr_t *mtr, binlog_oob_context *c) +{ + if (c) + { + c->pending_lsn= mtr->commit_lsn(); + ut_ad(c->pending_lsn != 0); } } bool -innobase_binlog_write_direct(IO_CACHE *cache, +innobase_binlog_write_direct_ordered(IO_CACHE *cache, handler_binlog_event_group_info *binlog_info, const rpl_gtid *gtid) { @@ -3120,12 +3459,48 @@ innobase_binlog_write_direct(IO_CACHE *cache, mtr.start(); innodb_binlog_write_cache(cache, binlog_info, &mtr); mtr.commit(); - /* ToDo: Should we sync the log here? Maybe depending on an extra bool parameter? */ + innodb_binlog_post_commit(&mtr, + (binlog_oob_context *)binlog_info->engine_ptr); /* ToDo: Presumably innodb_binlog_write_cache() should be able to fail in some cases? Then return any such error to the caller. */ return false; } +bool +innobase_binlog_write_direct(IO_CACHE *cache, + handler_binlog_event_group_info *binlog_info, + const rpl_gtid *gtid) +{ + binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr; + if (UNIV_LIKELY(c != nullptr)) + { + if (srv_flush_log_at_trx_commit & 1) + log_write_up_to(c->pending_lsn, true); + ibb_pending_lsn_fifo.record_commit(c); + } + return false; +} + + +void +ibb_group_commit(THD *thd, handler_binlog_event_group_info *binlog_info) +{ + binlog_oob_context *c= (binlog_oob_context *)binlog_info->engine_ptr; + if (UNIV_LIKELY(c != nullptr)) + { + if (srv_flush_log_at_trx_commit & 1 && c->pending_lsn) + { + /* + Sync the InnoDB redo log durably to disk here for the entire group + commit, so that it will be available for all binlog readers. + */ + log_write_up_to(c->pending_lsn, true); + } + ibb_pending_lsn_fifo.record_commit(c); + } +} + + bool innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last) { @@ -3199,6 +3574,7 @@ innodb_reset_binlogs() /* Prevent any flushing activity while resetting. */ binlog_page_fifo->lock_wait_for_idle(); binlog_page_fifo->reset(); + ibb_pending_lsn_fifo.reset(); ibb_file_hash.remove_up_to(last_created_binlog_file_no, lf_pins); @@ -3240,6 +3616,7 @@ innodb_reset_binlogs() /* Re-initialize empty binlog state and start the pre-alloc thread. */ innodb_binlog_init_state(); + ibb_pending_lsn_fifo.init(0); binlog_page_fifo->unlock_with_delayed_free(); start_binlog_prealloc_thread(); binlog_sync_initial(); diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index f8b7056e7d2..8a85f38fc9f 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -1582,7 +1582,7 @@ public: size_t flush_list_requests; TPOOL_SUPPRESS_TSAN void add_flush_list_requests(size_t size) - { ut_ad(size); flush_list_requests+= size; } + { flush_list_requests+= size; } private: static constexpr unsigned PAGE_CLEANER_IDLE= 1; static constexpr unsigned FLUSH_LIST_ACTIVE= 2; diff --git a/storage/innobase/include/fsp_binlog.h b/storage/innobase/include/fsp_binlog.h index 0acbf4929ec..4d0133b2340 100644 --- a/storage/innobase/include/fsp_binlog.h +++ b/storage/innobase/include/fsp_binlog.h @@ -379,6 +379,10 @@ public: bool in_record; } s; + /* Amount of data in file, valid after fetch_current_page(). */ + uint64_t cur_end_offset; + /* Length of the currently open file, valid if cur_file_handle != -1. */ + uint64_t cur_file_length; /* After fetch_current_page(), this points into either cur_block or page_buffer as appropriate. @@ -388,10 +392,12 @@ public: fsp_binlog_page_entry *cur_block; /* Buffer for reading a page directly from a tablespace file. */ byte *page_buffer; - /* Amount of data in file, valid after fetch_current_page(). */ - uint64_t cur_end_offset; - /* Length of the currently open file, valid if cur_file_handle != -1. */ - uint64_t cur_file_length; + /* + Points to either binlog_cur_durable_offset, for readers that should not + see binlog data until it has become durable on disk; or + binlog_cur_end_offset otherwise. + */ + std::atomic * const limit_offset; /* Open file handle to tablespace file_no, or -1. */ File cur_file_handle; /* @@ -400,7 +406,7 @@ public: */ bool skipping_partial; - binlog_chunk_reader(); + binlog_chunk_reader(std::atomic *limit_offset_); void set_page_buf(byte *in_page_buf) { page_buffer= in_page_buf; } ~binlog_chunk_reader(); @@ -451,6 +457,7 @@ public: /* Release any buffer pool page latch. */ void release(bool release_file_page= false); bool data_available(); + bool is_before_pos(uint64_t file_no, uint64_t offset); uint64_t current_file_no() { return s.file_no; } uint64_t current_pos() { return (s.page_no << srv_page_size_shift) + s.in_page_offset; @@ -464,11 +471,13 @@ extern ulong ibb_page_size; extern uint64_t current_binlog_state_interval; extern mysql_mutex_t active_binlog_mutex; extern pthread_cond_t active_binlog_cond; +extern mysql_mutex_t binlog_durable_mutex; +extern mysql_cond_t binlog_durable_cond; extern std::atomic active_binlog_file_no; extern uint64_t first_open_binlog_file_no; extern uint64_t last_created_binlog_file_no; -extern std::atomic binlog_cur_written_offset[2]; -extern std::atomic binlog_cur_end_offset[2]; +extern std::atomic binlog_cur_durable_offset[4]; +extern std::atomic binlog_cur_end_offset[4]; extern fsp_binlog_page_fifo *binlog_page_fifo; extern ibb_file_oob_refs ibb_file_hash; diff --git a/storage/innobase/include/innodb_binlog.h b/storage/innobase/include/innodb_binlog.h index b4df00bd0d7..ff8165ed59e 100644 --- a/storage/innobase/include/innodb_binlog.h +++ b/storage/innobase/include/innodb_binlog.h @@ -34,6 +34,7 @@ struct rpl_gtid; struct handler_binlog_event_group_info; class handler_binlog_reader; struct handler_binlog_purge_info; +struct binlog_oob_context; /* @@ -128,12 +129,64 @@ struct binlog_header_data { }; +/* + The class pending_lsn_fifo keeps track of pending LSNs - and their + corresponding binlog file_no/offset - that have been mtr-committed, but have + not yet become durable. + + Used to delay sending to slaves any data that might be lost in case the + master crashes just after sending. +*/ +class pending_lsn_fifo { + static constexpr uint32_t fixed_size_log2= 10; + static constexpr uint32_t fixed_size= (2 << fixed_size_log2); + static constexpr uint32_t mask= (2 << fixed_size_log2) - 1; +public: + struct entry { + lsn_t lsn; + uint64_t file_no; + uint64_t offset; + } fifo[fixed_size]; + /* + Set while we are duing a durable sync of the redo log to the LSN that we + are requesting to become durable. Used to avoid multiple threads + needlessly trying to sync the redo log on top of one another. + */ + lsn_t flushing_lsn; + /* + The current file_no that has any durable data. Used to detect when an LSN + moves the current durable end point to the next file, so that the previous + file can then be marked as fully durable. + The value ~0 is used as a marker for "not yet initialized". + */ + uint64_t cur_file_no; + /* The `head' points one past the most recent element. */ + uint32_t head; + /* The `tail' points to the earliest element. */ + uint32_t tail; + + pending_lsn_fifo(); + void init(uint64_t start_file_no); + void reset(); + bool is_empty() { return head == tail; } + bool is_full() { return head == tail + fixed_size; } + entry &cur_head() { ut_ad(!is_empty()); return fifo[(head - 1) & mask]; } + entry &cur_tail() { ut_ad(!is_empty()); return fifo[tail & mask]; } + void drop_tail() { ut_ad(!is_empty()); ++tail; } + void new_head() { ut_ad(!is_full()); ++head; } + void record_commit(binlog_oob_context *c); + void add_to_fifo(uint64_t lsn, uint64_t file_no, uint64_t offset); + bool process_durable_lsn(lsn_t lsn); +}; + + #define BINLOG_NAME_BASE "binlog-" #define BINLOG_NAME_EXT ".ibb" /* '/' + "binlog-" + (<=20 digits) + '.' + "ibb" + '\0'. */ #define BINLOG_NAME_MAX_LEN 1 + 1 + 7 + 20 + 1 + 3 + 1 +extern pending_lsn_fifo ibb_pending_lsn_fifo; extern uint32_t innodb_binlog_size_in_pages; extern const char *innodb_binlog_directory; extern uint32_t binlog_cur_page_no; @@ -181,16 +234,25 @@ extern bool ibb_write_header_page(mtr_t *mtr, uint64_t file_no, extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, fsp_binlog_page_entry * &block, uint32_t &page_no, uint32_t &page_offset, uint64_t file_no); +extern bool innodb_binlog_oob_ordered(THD *thd, const unsigned char *data, + size_t data_len, void **engine_data); extern bool innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, void **engine_data); extern void innodb_reset_oob(THD *thd, void **engine_data); extern void innodb_free_oob(THD *thd, void *engine_data); -extern handler_binlog_reader *innodb_get_binlog_reader(); +extern handler_binlog_reader *innodb_get_binlog_reader(bool wait_durable); +extern void ibb_wait_durable_offset(uint64_t file_no, uint64_t wait_offset); extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no); -extern void innodb_binlog_trx(trx_t *trx, mtr_t *mtr); +extern binlog_oob_context *innodb_binlog_trx(trx_t *trx, mtr_t *mtr); +extern void innodb_binlog_post_commit(mtr_t *mtr, binlog_oob_context *c); +extern bool innobase_binlog_write_direct_ordered + (IO_CACHE *cache, handler_binlog_event_group_info *binlog_info, + const rpl_gtid *gtid); extern bool innobase_binlog_write_direct (IO_CACHE *cache, handler_binlog_event_group_info *binlog_info, const rpl_gtid *gtid); +extern void ibb_group_commit(THD *thd, + handler_binlog_event_group_info *binlog_info); extern bool innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last); extern void innodb_binlog_status(uint64_t *out_file_no, uint64_t *out_pos); extern bool innodb_binlog_get_init_state(rpl_binlog_state_base *out_state); diff --git a/storage/innobase/include/univ.i b/storage/innobase/include/univ.i index edf32459d07..58eac5f37e0 100644 --- a/storage/innobase/include/univ.i +++ b/storage/innobase/include/univ.i @@ -482,6 +482,8 @@ extern mysql_pfs_key_t trx_pool_manager_mutex_key; extern mysql_pfs_key_t lock_wait_mutex_key; extern mysql_pfs_key_t srv_threads_mutex_key; extern mysql_pfs_key_t fsp_active_binlog_mutex_key; +extern mysql_pfs_key_t fsp_binlog_durable_mutex_key; +extern mysql_pfs_key_t fsp_binlog_durable_cond_key; extern mysql_pfs_key_t fsp_purge_binlog_mutex_key; extern mysql_pfs_key_t fsp_page_fifo_mutex_key; # endif /* UNIV_PFS_MUTEX */ diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 191412fa897..3572b33ea51 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -1120,6 +1120,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr) ut_ad(!read_only); trx_rseg_t *rseg= rsegs.m_redo.rseg; trx_undo_t *&undo= rsegs.m_redo.undo; + binlog_oob_context *binlog_ctx= nullptr; if (UNIV_LIKELY(undo != nullptr)) { MONITOR_INC(MONITOR_TRX_COMMIT_UNDO); @@ -1161,7 +1162,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr) trx_sys.assign_new_trx_no(this); /* Include binlog data in the commit record, if any. */ - innodb_binlog_trx(this, mtr); + binlog_ctx= innodb_binlog_trx(this, mtr); UT_LIST_REMOVE(rseg->undo_list, undo); /* Change the undo log segment state from TRX_UNDO_ACTIVE, to @@ -1175,6 +1176,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr) else rseg->release(); mtr->commit(); + innodb_binlog_post_commit(mtr, binlog_ctx); } /********************************************************************