From 6f6baf96554b49010ca6e2c2c35b0b4af1992c8e Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Sat, 21 Dec 2024 09:41:55 +0100 Subject: [PATCH] MDEV-34705: Binlog-in-engine: Read side of out-of-band binlogging With this commit, the out-of-band binlogging of large event groups in multiple smaller records interleaved with other event groups is now working. Instead of flushing the binlog cache to disk when they reach @@binlog_cache_size, instead the cache is binlogged as an out-of-band record. Then at transaction commit, a commit record is written containing just the GTID and a link to the out-of-band data. To facilitate append-only operation, the binlogged records do not have a "next" pointer. Instead, they are written out as a forest of perfect binary trees, the leftmost leaf of one tree pointing to the root of the previous tree. This structure is used in the binlog reader to efficiently read out the event group data consecutively for the binlog dump thread, needing to maintain only O(log(N)) amount of memory during the reading. As part of this commit, the existing binlog reader code is refactored to be greatly improved, with a much cleaner explicit state machine and handling of chunk/page/file boundaries etc. Also fixes some bugs in the gtid_search::find_gtid_pos(). Signed-off-by: Kristian Nielsen --- .../suite/binlog_in_engine/rpl_oob.result | 19 + .../suite/binlog_in_engine/rpl_oob.test | 69 + .../suite/rpl/include/rpl_gtid_index.inc | 4 +- sql/handler.h | 26 +- sql/log.cc | 22 + storage/innobase/fsp/fsp0fsp.cc | 1321 +++++++++++++---- storage/innobase/include/ut0compr_int.h | 2 +- 7 files changed, 1114 insertions(+), 349 deletions(-) create mode 100644 mysql-test/suite/binlog_in_engine/rpl_oob.result create mode 100644 mysql-test/suite/binlog_in_engine/rpl_oob.test diff --git a/mysql-test/suite/binlog_in_engine/rpl_oob.result b/mysql-test/suite/binlog_in_engine/rpl_oob.result new file mode 100644 index 00000000000..598d2ff47ec --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/rpl_oob.result @@ -0,0 +1,19 @@ +include/master-slave.inc +[connection master] +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); +*** Generating 25 large transactions in 5 interleaved connections +connection master; +INSERT INTO t1 VALUES (0, 1, 'End'); +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +2552 33150 128776 5000383 +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +2552 33150 128776 5000383 +connection master; +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/binlog_in_engine/rpl_oob.test b/mysql-test/suite/binlog_in_engine/rpl_oob.test new file mode 100644 index 00000000000..0878dd7bf26 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/rpl_oob.test @@ -0,0 +1,69 @@ +--source include/have_innodb_binlog.inc +--source include/have_binlog_format_row.inc +--source include/master-slave.inc + +# Test a number of transactions that are large and get interleaved with each +# other over multiple binlog files. +--let $NUM_CONNECTIONS= 5 +# $NUM_TRANSACTIONS is total, not per connection. +--let $NUM_TRANSACTIONS=25 +--let $NUM_PIECES= 100 +--let $PIECE_SIZE= 2000 + + +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); + +--echo *** Generating $NUM_TRANSACTIONS large transactions in $NUM_CONNECTIONS interleaved connections +--disable_query_log +let $t= 0; +while ($t < $NUM_TRANSACTIONS) { + let $b= $t; + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connect(con$i,localhost,root,,) + START TRANSACTION; + eval INSERT INTO t1 VALUES ($b + $i, 0, 'Initial $i'); + inc $i; + inc $t; + } + + let $p= 1; + while ($p <= $NUM_PIECES) { + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($b + $i, $p, REPEAT(CHR(65 + ($p + $i MOD 26)), $PIECE_SIZE)); + inc $i; + } + inc $p; + } + + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($b + $i, $NUM_PIECES+1, 'Last $i'); + COMMIT; + --disconnect con$i + inc $i; + } +} +--enable_query_log + +--connection master +INSERT INTO t1 VALUES (0, 1, 'End'); + +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +--source include/save_master_gtid.inc +--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 --start-position=0-1-1 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.txt + +--connection slave +--source include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; + + +# Cleanup. +--connection master +DROP TABLE t1; +--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 --start-position=0-1-1 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.txt +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/include/rpl_gtid_index.inc b/mysql-test/suite/rpl/include/rpl_gtid_index.inc index f44bcfe81fb..ae8f49e70fc 100644 --- a/mysql-test/suite/rpl/include/rpl_gtid_index.inc +++ b/mysql-test/suite/rpl/include/rpl_gtid_index.inc @@ -109,7 +109,7 @@ while ($i < $NUM_POS) { --disable_result_log eval START SLAVE UNTIL master_gtid_pos='$gtid_pos'; --enable_result_log - --let $res= `SELECT MASTER_GTID_WAIT('$gtid_pos')` + --let $res= `SELECT MASTER_GTID_WAIT('$gtid_pos', 60)` if ($res != 0) { --die "FAIL: MASTER_GTID_WAIT($gtid_pos) returned $res, should have been 0" } @@ -154,7 +154,7 @@ while ($i <= $NUM_DOMAIN) { --disable_result_log eval START SLAVE UNTIL master_gtid_pos='$until_pos'; --enable_result_log - --let $res= `SELECT MASTER_GTID_WAIT('$until_pos')` + --let $res= `SELECT MASTER_GTID_WAIT('$until_pos', 60)` if ($res != 0) { --die "FAIL: MASTER_GTID_WAIT($until_pos) returned $res, should have been 0" } diff --git a/sql/handler.h b/sql/handler.h index adb621abb9e..4de527cfbd1 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -5845,21 +5845,6 @@ struct handler_binlog_event_group_info { Class for reading a binlog implemented in an engine. */ class handler_binlog_reader { -public: - /* ToDo: Should some of this state go to the derived class, in case different engines might want to do something different? */ - - /* The file number of the currently-being-read binlog file. */ - uint64_t cur_file_no; - /* The current offset into the binlog file. */ - uint64_t cur_file_offset; - /* - Open file handle of binlog file. - This may be NULL if the currently-being-read binlog file is "hot" and - is being read from in-memory buffers while the data may not yet be - written out to the file on the OS level. - */ - File cur_file; - private: /* Position and length of any remaining data in buf[]. */ uint32_t buf_data_pos; @@ -5869,8 +5854,7 @@ private: public: handler_binlog_reader() - : cur_file_no(~(uint64_t)0), cur_file_offset(0), cur_file((File)-1), - buf_data_pos(0), buf_data_remain(0) + : buf_data_pos(0), buf_data_remain(0) { } virtual ~handler_binlog_reader() { }; virtual int read_binlog_data(uchar *buf, uint32_t len) = 0; @@ -5879,14 +5863,6 @@ public: rpl_binlog_state_base *state) = 0; int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed); - -/* - cur_file_no -> implicitly gives file/tablespace - cur_file_offset -> implicitly gives page - cur_file_fh -> open fh, if any - cur_chunk_len - cur_chunk_sofar -*/ }; #endif /* HANDLER_INCLUDED */ diff --git a/sql/log.cc b/sql/log.cc index 67a13422f3b..4c135e0e7de 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6312,6 +6312,28 @@ bool stmt_has_updated_non_trans_table(const THD* thd) static int binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len) { + /* + Tricky: The mysys IO_CACHE write function can be called either from + my_b_flush_io_cache(), where it must write everything it was asked to; or + from _my_b_write(), where it needs only write as much as is efficient (eg. + an integer multiple of some block size), and any remainder (which must be + < cache size) will be put in the cache. + + The two cases are distinguished on whether the passed-in data pointer is + equal to cache->write_buffer or not. + + We want each oob record to be the full size, so write only integer + multiples of the cache size in the latter case. + */ + if (data != cache->write_buffer) + { + len-= (len % cache->buffer_length); + if (!len) + return false; + } + + /* ToDo: If len > the cache size (32k default), then split up the write in multiple oob writes to the engine. This can happen if there is a large single write to the IO_CACHE. Maybe the split could happen also in the engine, depending if I want to split the size here to the binlog_cache_size which is known here, or if I want to split it to an engine imposed max size. But since the commit record size is determined by the upper layer here, I think it makes sense to determine the oob record size here also. */ + binlog_cache_mngr *mngr= (binlog_cache_mngr *)cache->append_read_pos; void **engine_ptr= &mngr->engine_binlog_info.engine_ptr; bool res= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data, len, diff --git a/storage/innobase/fsp/fsp0fsp.cc b/storage/innobase/fsp/fsp0fsp.cc index a9150abcd6c..f0515d9342e 100644 --- a/storage/innobase/fsp/fsp0fsp.cc +++ b/storage/innobase/fsp/fsp0fsp.cc @@ -4292,6 +4292,8 @@ enum fsp_binlog_chunk_types { FSP_BINLOG_TYPE_GTID_STATE= 2, /* Out-of-band event group data. */ FSP_BINLOG_TYPE_OOB_DATA= 3, + /* Must be one more than the last type. */ + FSP_BINLOG_TYPE_END, /* Padding data at end of page. */ FSP_BINLOG_TYPE_FILLER= 0xff @@ -4312,6 +4314,17 @@ static constexpr uint32_t FSP_BINLOG_FLAG_LAST= (1 << FSP_BINLOG_FLAG_BIT_LAST); static constexpr uint32_t FSP_BINLOG_TYPE_MASK= ~(FSP_BINLOG_FLAG_CONT | FSP_BINLOG_FLAG_LAST); +/* + These are the chunk types that are allowed to occur in the middle of + another record. +*/ +static constexpr uint64_t ALLOWED_NESTED_RECORDS= + /* GTID STATE at start of page can occur in the middle of other record. */ + ((uint64_t)1 << FSP_BINLOG_TYPE_GTID_STATE) + ; +/* Ensure that all types fit in the ALLOWED_NESTED_RECORDS bitmask. */ +static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS)); + static uint32_t binlog_size_in_pages; buf_block_t *binlog_cur_block; @@ -4458,27 +4471,244 @@ struct binlog_oob_context { }; -class ha_innodb_binlog_reader : public handler_binlog_reader { - /* Buffer to hold a page read directly from the binlog file. */ - uchar *page_buf; - /* Length of the currently open file (if cur_file != -1). */ +class chunk_reader { +public: + enum chunk_reader_status { + CHUNK_READER_ERROR= -1, + CHUNK_READER_EOF= 0, + CHUNK_READER_FOUND= 1 + }; + + /* + Current state, can be obtained from save_pos() and later passed to + restore_pos(). + */ + struct saved_position { + /* Current position file. */ + uint64_t file_no; + /* Current position page. */ + uint32_t page_no; + /* Start of current chunk inside page. */ + uint32_t in_page_offset; + /* + The length of the current chunk, once the chunk type has been read. + If 0, it means the chunk type (and length) has not yet been read. + */ + uint32_t chunk_len; + /* The read position inside the current chunk. */ + uint32_t chunk_read_offset; + byte chunk_type; + /* When set, read will skip the current chunk, if any. */ + bool skip_current; + /* Set while we are in the middle of reading a record. */ + bool in_record; + } s; + + /* The mtr is started iff cur_block is non-NULL. */ + mtr_t mtr; + /* + After fetch_current_page(), this points into either cur_block or + page_buffer as appropriate. + */ + byte *page_ptr; + /* Valid after fetch_current_page(), if page found in buffer pool. */ + buf_block_t *cur_block; + /* Buffer for reading a page directly from a tablespace file. */ + byte *page_buffer; + /* Amount of data in file, valid after fetch_current_page(). */ + uint64_t cur_end_offset; + /* Length of the currently open file, valid if cur_file_handle != -1. */ uint64_t cur_file_length; - /* Used to keep track of partial chunk returned to reader. */ - uint32_t chunk_pos; - uint32_t chunk_remain; + /* Open file handle to tablespace file_no, or -1. */ + File cur_file_handle; /* Flag used to skip the rest of any partial chunk we might be starting in the middle of. */ bool skipping_partial; + + chunk_reader(); + void set_page_buf(byte *in_page_buf) { page_buffer= in_page_buf; } + ~chunk_reader(); + + /* Current type, or FSP_BINLOG_TYPE_FILLER if between records. */ + byte cur_type() { return (byte)(s.chunk_type & FSP_BINLOG_TYPE_MASK); } + bool cur_is_cont() { return (s.chunk_type & FSP_BINLOG_FLAG_CONT) != 0; } + bool end_of_record() { return !s.in_record; } + static int read_error_corruption(uint64_t file_no, uint64_t page_no, + const char *msg); + int read_error_corruption(const char *msg) + { + return read_error_corruption(s.file_no, s.page_no, msg); + } + enum chunk_reader_status fetch_current_page(); + /* + Try to read max_len bytes from a record into buffer. + + If multipage is true, will move across pages to read following + continuation chunks, if any, to try and read max_len total bytes. Only if + the record ends before max_len bytes is less amount of bytes returned. + + If multipage is false, will read as much is available on one page (up to + max of max_len), and then return. + + Returns number of bytes read, or -1 for error. + Returns 0 if the chunk_reader is pointing to start of a chunk at the end + of the current binlog (ie. end-of-file). + */ + int read_data(byte *buffer, int max_len, bool multipage); + + /* Save current position, and restore it later. */ + void save_pos(saved_position *out_pos) { *out_pos= s; } + void restore_pos(saved_position *pos); + void seek(uint64_t file_no, uint64_t offset); + + /* + Make next read_data() skip any data from the current chunk (if any), and + start reading data only from the beginning of the next chunk. */ + void skip_current() { s.skip_current= true; } + /* + Used initially, after seeking potentially into the middle of a (commit) + record, to skip any continuation chunks until we reach the start of the + first real record. + */ + void skip_partial(bool skip) { skipping_partial= skip; } + /* Release any buffer pool page latch. */ + void release(bool release_file_page= false); + + bool data_available() + { + if (!end_of_record()) + return true; + uint64_t active= active_binlog_file_no.load(std::memory_order_acquire); + if (active != s.file_no) + { + ut_ad(active > s.file_no); + return true; + } + uint64_t end_offset= + binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire); + uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); + if (active2 != active) + return true; // Active moved while we were checking + if (end_offset == ~(uint64_t)0) + return false; // Nothing in this binlog file yet + uint64_t offset= (s.page_no << srv_page_size_shift) | s.in_page_offset; + if (offset < end_offset) + return true; + + ut_ad(s.file_no == active2); + ut_ad(offset == end_offset); + return false; + } +}; + + +/* + A class for doing the post-order traversal of the forest of perfect binary + trees that make up the out-of-band data for a commit record. +*/ +class fsp_binlog_oob_reader { + enum oob_states { + /* The initial state, about to visit the node for the first time. */ + ST_initial, + /* State of leaf node while traversing the prior trees in the forest. */ + ST_traversing_prior_trees, + /* State of non-leaf node while traversing its left sub-tree. */ + ST_traversing_left_child, + /* State of non-leaf node while traversing its right sub-tree. */ + ST_traversing_right_child, + /* State of node while reading out its data. */ + ST_self + }; + + /* + Stack entry for one node currently taking part in post-order traversal. + We maintain a stack of pending nodes during the traversal, as the traversal + happens in a state machine rather than by recursion. + */ + struct stack_entry { + /* Saved position after reading header. */ + chunk_reader::saved_position saved_pos; + /* The location of this node's OOB record. */ + uint64_t file_no; + uint64_t offset; + /* Right child, to be traversed after left child. */ + uint64_t right_file_no; + uint64_t right_offset; + /* Offset of real data in this node, after header. */ + uint32_t header_len; + /* Amount of data read into rd_buf, and amount used to parse header. */ + uint32_t rd_buf_len; + uint32_t rd_buf_sofar; + /* Current state in post-order traversal state machine. */ + enum oob_states state; + /* Buffer for reading header. */ + byte rd_buf[5*COMPR_INT_MAX64]; + /* + True when the node is reached using only left child pointers, false + otherwise. Used to identify the left-most leaf in a tree which points to + a prior tree that must be traversed first. + */ + bool is_leftmost; + } init_stack[8], *stack; + + /* + If the stack_len is == sizeof(init_stack)/sizeof(init_stack[0]), then we + are using the embedded stack memory. Otherwise we are using a dynamically + allocated stack that must be freed. + */ + uint32_t stack_len; + /* Current top of stack. */ + uint32_t stack_top; + /* State machine current state. */ + enum oob_states state; + +public: + fsp_binlog_oob_reader(); + ~fsp_binlog_oob_reader(); + + bool start_traversal(uint64_t file_no, uint64_t offset); + bool oob_traversal_done() { return stack_top == 0; } + int read_data(chunk_reader *chunk_rd, uchar *buf, int max_len); + +private: + bool ensure_stack(uint32_t length); + bool push_state(enum oob_states state, uint64_t file_no, uint64_t offset, + bool is_leftmost); +}; + + +class ha_innodb_binlog_reader : public handler_binlog_reader { + enum reader_states { + ST_read_next_event_group, ST_read_oob_data, ST_read_commit_record + }; + + chunk_reader chunk_rd; + fsp_binlog_oob_reader oob_reader; + chunk_reader::saved_position saved_commit_pos; + + /* Buffer to hold a page read directly from the binlog file. */ + uchar *page_buf; + /* Out-of-band data to read after commit record, if any. */ + uint64_t oob_count; + uint64_t oob_last_file_no; + uint64_t oob_last_offset; + /* Keep track of pending bytes in the rd_buf. */ + uint32_t rd_buf_len; + uint32_t rd_buf_sofar; + /* State for state machine reading chunks one by one. */ + enum reader_states state; + + /* Used to read the header of the commit record. */ + byte rd_buf[5*COMPR_INT_MAX64]; private: - bool ensure_file_open(); - void next_file(); int read_from_buffer_pool_page(buf_block_t *block, uint64_t end_offset, uchar *buf, uint32_t len); int read_from_file(uint64_t end_offset, uchar *buf, uint32_t len); int read_from_page(uchar *page_ptr, uint64_t end_offset, uchar *buf, uint32_t len); + int read_data(uchar *buf, uint32_t len); public: ha_innodb_binlog_reader(uint64_t file_no= 0, uint64_t offset= 0); @@ -5254,8 +5484,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, { size_t buf_size= state->count_nolock() * (2*COMPR_INT_MAX32 + COMPR_INT_MAX64); - /* ToDo: InnoDB alloc function? */ - alloced_buf= (byte *)my_malloc(PSI_INSTRUMENT_ME, buf_size, MYF(MY_WME)); + alloced_buf= (byte *)ut_malloc(buf_size, mem_key_binlog); if (UNIV_UNLIKELY(!alloced_buf)) return true; buf= alloced_buf; @@ -5263,7 +5492,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, if (UNIV_UNLIKELY(used_bytes < 0)) { ut_ad(0 /* Shouldn't happen, as we allocated maximum needed size. */); - my_free(alloced_buf); + ut_free(alloced_buf); return true; } } @@ -5315,7 +5544,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, ++page_no; } } - my_free(alloced_buf); + ut_free(alloced_buf); /* Make sure we return a page for caller to write the main event data into. */ if (UNIV_UNLIKELY(!block)) { @@ -5782,7 +6011,7 @@ struct chunk_data_cache : public chunk_data_base { if (gtid_remain > 0) { uint32_t size2= gtid_remain > max_len ? max_len : (uint32_t)gtid_remain; - int res2= my_b_read(cache, p, size2); + int res2= my_b_read(cache, p + size, size2); ut_a(!res2 /* ToDo: Error handling */); gtid_remain-= size2; if (gtid_remain == 0) @@ -6083,58 +6312,634 @@ fsp_binlog_trx(trx_t *trx, mtr_t *mtr) } +chunk_reader::chunk_reader() + : s { 0, 0,0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }, + page_ptr(0), cur_block(0), page_buffer(nullptr), + cur_file_handle((File)-1), skipping_partial(false) +{ + /* Nothing else. */ +} + + +chunk_reader::~chunk_reader() +{ + release(); + if (cur_file_handle >= (File)0) + my_close(cur_file_handle, MYF(0)); +} + + +int +chunk_reader::read_error_corruption(uint64_t file_no, uint64_t page_no, + const char *msg) +{ + ib::error() << "Corrupt binlog found on page " << page_no << + " in binlog number " << file_no << ": " << msg; + return -1; +} + + +/* + Obtain the data on the page currently pointed to by the chunk reader. The + page is either latched in the buffer pool (starting an mtr), or read from + the file into the page buffer. + + The code does a dirty read of active_binlog_file_no to determine if the page + is known to be available to read from the file, or if it should be looked up + in the buffer pool. After making the decision, another dirty read is done to + protect against the race where the active tablespace changes in the middle, + and if so the operation is re-tried. This is necessary since the binlog files + N and N-2 use the same tablespace id, so we must ensure we do not mistake a + page from N as belonging to N-2. +*/ +enum chunk_reader::chunk_reader_status +chunk_reader::fetch_current_page() +{ + ut_ad(!cur_block /* Must have no active mtr */); + uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); + for (;;) { + buf_block_t *block= nullptr; + bool mtr_started= false; + uint64_t offset= (s.page_no << srv_page_size_shift) | s.in_page_offset; + uint64_t active= active2; + uint64_t end_offset= + binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire); + ut_ad(s.file_no <= active); + + if (s.file_no + 1 >= active) { + /* Check if we should read from the buffer pool or from the file. */ + if (end_offset != ~(uint64_t)0 && offset < end_offset) { + mtr.start(); + mtr_started= true; + /* + ToDo: Should we keep track of the last block read and use it as a + hint? Will be mainly useful when reading the partially written active + page at the current end of the active binlog, which might be a common + case. + */ + buf_block_t *hint_block= nullptr; + uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (s.file_no & 1); + dberr_t err= DB_SUCCESS; + block= buf_page_get_gen(page_id_t{space_id, s.page_no}, 0, + RW_S_LATCH, hint_block, BUF_GET_IF_IN_POOL, + &mtr, &err); + if (err != DB_SUCCESS) { + mtr.commit(); + return CHUNK_READER_ERROR; + } + } + active2= active_binlog_file_no.load(std::memory_order_acquire); + if (UNIV_UNLIKELY(active2 != active)) { + /* + The active binlog file changed while we were processing; we might + have gotten invalid end_offset or a buffer pool page from a wrong + tablespace. So just try again. + */ + if (mtr_started) + mtr.commit(); + continue; + } + cur_end_offset= end_offset; + if (offset >= end_offset) { + ut_ad(!mtr_started); + if (s.file_no == active) { + /* Reached end of the currently active binlog file -> EOF. */ + return CHUNK_READER_EOF; + } + } + if (block) { + cur_block= block; + ut_ad(mtr_started); + page_ptr= block->page.frame; + return CHUNK_READER_FOUND; + } else { + /* Not in buffer pool, just read it from the file. */ + if (mtr_started) + mtr.commit(); + /* Fall through to read from file. */ + } + } + + /* Tablespace is not open, just read from the file. */ + if (cur_file_handle < (File)0) + { + char filename[BINLOG_NAME_LEN]; + MY_STAT stat_buf; + + binlog_name_make(filename, s.file_no); + cur_file_handle= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME)); + if (UNIV_UNLIKELY(cur_file_handle < (File)0)) { + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + return CHUNK_READER_ERROR; + } + if (my_fstat(cur_file_handle, &stat_buf, MYF(0))) { + my_error(ER_CANT_GET_STAT, MYF(0), filename, errno); + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + return CHUNK_READER_ERROR; + } + cur_file_length= stat_buf.st_size; + } + if (s.file_no == active) + cur_end_offset= end_offset; + else + cur_end_offset= cur_file_length; + + if (offset >= cur_file_length) { + /* End of this file, move to the next one. */ + /* + ToDo: Should also obey binlog_cur_written_offset[], once we start + actually maintaining that, to save unnecessary buffer pool + lookup. + */ + if (cur_file_handle >= (File)0) + { + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + } + ++s.file_no; + s.page_no= 0; + continue; + } + + size_t res= my_pread(cur_file_handle, page_buffer, srv_page_size, + s.page_no << srv_page_size_shift, MYF(MY_WME)); + if (res == (size_t)-1) + return CHUNK_READER_ERROR; + page_ptr= page_buffer; + return CHUNK_READER_FOUND; + } + /* NOTREACHED */ +} + + +int +chunk_reader::read_data(byte *buffer, int max_len, bool multipage) +{ + uint32_t size; + int sofar= 0; + +read_more_data: + if (max_len == 0) + return sofar; + + if (!page_ptr) + { + enum chunk_reader_status res= fetch_current_page(); + if (res == CHUNK_READER_EOF) + { + if (s.in_record) + return read_error_corruption(s.file_no, s.page_no, "binlog tablespace " + "truncated in the middle of record"); + else + return 0; + } + else if (res == CHUNK_READER_ERROR) + return -1; + } + + if (s.chunk_len == 0) + { + byte type; + if (s.in_page_offset < FIL_PAGE_DATA) + s.in_page_offset= FIL_PAGE_DATA; + else if (s.in_page_offset >= srv_page_size - (FIL_PAGE_DATA_END + 3) || + page_ptr[s.in_page_offset] == FSP_BINLOG_TYPE_FILLER) + { + ut_ad(s.in_page_offset >= srv_page_size - FIL_PAGE_DATA_END || + page_ptr[s.in_page_offset] == FSP_BINLOG_TYPE_FILLER); + goto go_next_page; + } + + /* Check for end-of-file. */ + if (cur_end_offset == ~(uint64_t)0 || + (s.page_no << srv_page_size_shift) + s.in_page_offset >= cur_end_offset) + return sofar; + + type= page_ptr[s.in_page_offset]; + if (type == 0) + { + ut_ad(0 /* Should have detected end-of-file on cur_end_offset. */); + return 0; + } + + /* + Consistency check on the chunks. A record must consist in a sequence of + chunks of the same type, all but the first must have the + FSP_BINLOG_FLAG_BIT_CONT bit set, and the final one must have the + FSP_BINLOG_FLAG_BIT_LAST bit set. + */ + if (!s.in_record) + { + if (UNIV_UNLIKELY(type & FSP_BINLOG_FLAG_CONT) && !s.skip_current) + { + if (skipping_partial) + { + s.chunk_len= page_ptr[s.in_page_offset + 1] | + ((uint32_t)page_ptr[s.in_page_offset + 2] << 8); + s.skip_current= true; + goto skip_chunk; + } + else + return read_error_corruption(s.file_no, s.page_no, "Binlog record " + "starts with continuation chunk"); + } + } + else + { + if ((type ^ s.chunk_type) & FSP_BINLOG_TYPE_MASK) + { + /* + As a special case, we must allow a GTID state to appear in the + middle of a record. + */ + if (((uint64_t)1 << (type & FSP_BINLOG_TYPE_MASK)) & + ALLOWED_NESTED_RECORDS) + { + s.chunk_len= page_ptr[s.in_page_offset + 1] | + ((uint32_t)page_ptr[s.in_page_offset + 2] << 8); + goto skip_chunk; + } + /* Chunk type changed in the middle. */ + return read_error_corruption(s.file_no, s.page_no, "Binlog record missing " + "end chunk"); + } + if (!(type & FSP_BINLOG_FLAG_CONT)) + { + /* START chunk without END chunk. */ + return read_error_corruption(s.file_no, s.page_no, "Binlog record missing " + "end chunk"); + } + } + + s.skip_current= false; + s.chunk_type= type; + s.in_record= true; + s.chunk_len= page_ptr[s.in_page_offset + 1] | + ((uint32_t)page_ptr[s.in_page_offset + 2] << 8); + s.chunk_read_offset= 0; + } + + /* Now we have a chunk available to read data from. */ + ut_ad(s.chunk_read_offset < s.chunk_len); + if (s.skip_current && + (s.chunk_read_offset > 0 || (s.chunk_type & FSP_BINLOG_FLAG_CONT))) + { + /* + Skip initial continuation chunks. + Used to be able to start reading potentially in the middle of a record, + ie. at a GTID state point. + */ + s.chunk_read_offset= s.chunk_len; + } + else + { + size= std::min((uint32_t)max_len, s.chunk_len - s.chunk_read_offset); + memcpy(buffer, page_ptr + s.in_page_offset + 3 + s.chunk_read_offset, size); + buffer+= size; + s.chunk_read_offset+= size; + max_len-= size; + sofar+= size; + } + + if (s.chunk_len > s.chunk_read_offset) + { + ut_ad(max_len == 0 /* otherwise would have read more */); + return sofar; + } + + /* We have read all of the chunk. Move to next chunk or end of the record. */ +skip_chunk: + s.in_page_offset+= 3 + s.chunk_len; + s.chunk_len= 0; + s.chunk_read_offset= 0; + + if (s.chunk_type & FSP_BINLOG_FLAG_LAST) + { + s.in_record= false; /* End of record. */ + s.skip_current= false; + } + + if (s.in_page_offset >= srv_page_size - (FIL_PAGE_DATA_END + 3)) + { +go_next_page: + /* End of page reached, move to the next page. */ + ++s.page_no; + page_ptr= nullptr; + if (cur_block) + { + mtr.commit(); + cur_block= nullptr; + } + s.in_page_offset= 0; + + if (cur_file_handle >= (File)0 && + (s.page_no << srv_page_size_shift) >= cur_file_length) + { + /* Move to the next file. */ + /* + ToDo: Should we have similar logic for moving to the next file when + we reach the end of the last page using the buffer pool? + + Then we save re-fetching the page later. But then we need to keep + track of the file length also when reading from the buffer pool. + */ + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + ++s.file_no; + s.page_no= 0; + } + } + + if (sofar > 0 && (!multipage || !s.in_record)) + return sofar; + + goto read_more_data; +} + + +void +chunk_reader::restore_pos(chunk_reader::saved_position *pos) +{ + if (page_ptr && + !(pos->file_no == s.file_no && pos->page_no == s.page_no)) + { + /* Seek to a different page, release any current page. */ + if (cur_block) + { + mtr.commit(); + cur_block= nullptr; + } + page_ptr= nullptr; + } + if (cur_file_handle != (File)-1 && pos->file_no != s.file_no) + { + /* Seek to a different file than currently open, close it. */ + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + } + s= *pos; +} + + +void +chunk_reader::seek(uint64_t file_no, uint64_t offset) +{ + saved_position pos { + file_no, (uint32_t)(offset >> srv_page_size_shift), + (uint32_t)(offset & (srv_page_size - 1)), + 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }; + restore_pos(&pos); +} + + +void chunk_reader::release(bool release_file_page) +{ + if (cur_block) + { + mtr.commit(); + cur_block= nullptr; + page_ptr= nullptr; + } + else if (release_file_page) + { + /* + For when we reach EOF while reading from the file. We need to re-read + the page from the file (or buffer pool) in this case on next read, as + data might be added to the page. + */ + page_ptr= nullptr; + } +} + + +fsp_binlog_oob_reader::fsp_binlog_oob_reader() + : stack(init_stack), stack_len(sizeof(init_stack)/sizeof(init_stack[0])), + stack_top(0) +{ +} + + +fsp_binlog_oob_reader::~fsp_binlog_oob_reader() +{ + if (stack_len > sizeof(init_stack)/sizeof(init_stack[0])) + ut_free(stack); +} + + +bool +fsp_binlog_oob_reader::ensure_stack(uint32_t needed_length) +{ + uint32_t len= stack_len; + if (len >= needed_length) + return false; + do + len*= 2; + while (len < needed_length); + size_t needed= len*sizeof(stack_entry); + stack_entry *new_stack= (stack_entry *) ut_malloc(needed, mem_key_binlog); + if (!new_stack) + { + my_error(ER_OUTOFMEMORY, MYF(0), needed); + return true; + } + + memcpy(new_stack, stack, stack_len*sizeof(stack_entry)); + stack= new_stack; + stack_len= len; + return false; +} + + +bool +fsp_binlog_oob_reader::push_state(enum oob_states state, uint64_t file_no, + uint64_t offset, bool is_leftmost) +{ + uint32_t idx= stack_top; + if (ensure_stack(idx + 1)) + return true; + stack[idx].state= state; + stack[idx].file_no= file_no; + stack[idx].offset= offset; + stack[idx].is_leftmost= is_leftmost; + stack_top= idx + 1; + return false; +} + + +bool +fsp_binlog_oob_reader::start_traversal(uint64_t file_no, uint64_t offset) +{ + stack_top= 0; + return push_state(ST_initial, file_no, offset, true); +} + + +/* + Read from out-of-band event group data. + + Does a state-machine incremental traversal of the forest of perfect binary + trees of oob records in the event group. May read just the data available + on one page, thus returning less than the requested number of bytes (this + is to prefer to inspect each page only once, returning data page-by-page as + long as reader asks for at least a full page worth of data). +*/ +int +fsp_binlog_oob_reader::read_data(chunk_reader *chunk_rd, uchar *buf, int len) +{ + stack_entry *e; + uint64_t chunk_idx; + uint64_t left_file_no; + uint64_t left_offset; + int res; + const uchar *p_end; + const uchar *p; + std::pair v_and_p; + int size; + + if (stack_top <= 0) + { + ut_ad(0 /* Should not call when no more oob data to read. */); + return 0; + } + +again: + e= &(stack[stack_top - 1]); + switch (e->state) + { + case ST_initial: + chunk_rd->seek(e->file_no, e->offset); + static_assert(sizeof(e->rd_buf) == 5*COMPR_INT_MAX64); + res= chunk_rd->read_data(e->rd_buf, 5*COMPR_INT_MAX64, true); + if (res < 0) + return -1; + if (chunk_rd->cur_type() != FSP_BINLOG_TYPE_OOB_DATA) + return chunk_rd->read_error_corruption("Wrong chunk type"); + if (res == 0) + return chunk_rd->read_error_corruption("Unexpected EOF, expected " + "oob chunk"); + e->rd_buf_len= res; + p_end= e->rd_buf + res; + v_and_p= compr_int_read(e->rd_buf); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + chunk_idx= v_and_p.first; + (void)chunk_idx; + + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + left_file_no= v_and_p.first; + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + left_offset= v_and_p.first; + + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + e->right_file_no= v_and_p.first; + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + e->right_offset= v_and_p.first; + e->rd_buf_sofar= (uint32_t)(p - e->rd_buf); + if (left_file_no == 0 && left_offset == 0) + { + /* Leaf node. */ + if (e->is_leftmost && !(e->right_file_no == 0 && e->right_offset == 0)) + { + /* Traverse the prior tree(s) in the forst. */ + e->state= ST_traversing_prior_trees; + chunk_rd->save_pos(&e->saved_pos); + push_state(ST_initial, e->right_file_no, e->right_offset, true); + } + else + e->state= ST_self; + } + else + { + e->state= ST_traversing_left_child; + chunk_rd->save_pos(&e->saved_pos); + push_state(ST_initial, left_file_no, left_offset, e->is_leftmost); + } + goto again; + + case ST_traversing_prior_trees: + chunk_rd->restore_pos(&e->saved_pos); + e->state= ST_self; + goto again; + + case ST_traversing_left_child: + e->state= ST_traversing_right_child; + push_state(ST_initial, e->right_file_no, e->right_offset, false); + goto again; + + case ST_traversing_right_child: + chunk_rd->restore_pos(&e->saved_pos); + e->state= ST_self; + goto again; + + case ST_self: + size= 0; + if (e->rd_buf_len > e->rd_buf_sofar) + { + /* Use any excess data from when the header was read. */ + size= std::min((int)(e->rd_buf_len - e->rd_buf_sofar), len); + memcpy(buf, e->rd_buf + e->rd_buf_sofar, size); + e->rd_buf_sofar+= size; + len-= size; + buf+= size; + } + + if (UNIV_LIKELY(len > 0) && UNIV_LIKELY(!chunk_rd->end_of_record())) + { + res= chunk_rd->read_data(buf, len, false); + if (res < 0) + return -1; + size+= res; + } + + if (chunk_rd->end_of_record()) + { + /* This oob record done, pop the state. */ + ut_ad(stack_top > 0); + --stack_top; + } + return size; + + default: + ut_ad(0); + return -1; + } +} + + ha_innodb_binlog_reader::ha_innodb_binlog_reader(uint64_t file_no, uint64_t offset) - : chunk_pos(0), chunk_remain(0), skipping_partial(true) + : rd_buf_len(0), rd_buf_sofar(0), state(ST_read_next_event_group) { - page_buf= (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, srv_page_size, MYF(0)); /* ToDo: InnoDB alloc function? */ - // ToDo: Need some mechanism to find where to start reading. This is just "start from 0" for early testing. - cur_file_no= file_no; - cur_file_offset= offset; + page_buf= (uchar *)ut_malloc(srv_page_size, mem_key_binlog); + chunk_rd.set_page_buf(page_buf); + chunk_rd.seek(file_no, offset); + chunk_rd.skip_current(); + chunk_rd.skip_partial(true); } ha_innodb_binlog_reader::~ha_innodb_binlog_reader() { - if (cur_file != (File)-1) - my_close(cur_file, MYF(0)); - my_free(page_buf); /* ToDo: InnoDB alloc function? */ -} - - -bool -ha_innodb_binlog_reader::ensure_file_open() -{ - if (cur_file != (File)-1) - return false; - char filename[BINLOG_NAME_LEN]; - binlog_name_make(filename, cur_file_no); - cur_file= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME)); - if (UNIV_UNLIKELY(cur_file < (File)0)) { - cur_file= (File)-1; - return true; - } - MY_STAT stat_buf; - if (my_fstat(cur_file, &stat_buf, MYF(0))) { - my_error(ER_CANT_GET_STAT, MYF(0), filename, errno); - my_close(cur_file, MYF(0)); - cur_file= (File)-1; - return true; - } - cur_file_length= stat_buf.st_size; - return false; -} - - -void -ha_innodb_binlog_reader::next_file() -{ - if (cur_file != (File)-1) { - my_close(cur_file, MYF(0)); - cur_file= (File)-1; - } - ++cur_file_no; - cur_file_offset= 0; + ut_free(page_buf); } @@ -6158,271 +6963,139 @@ ha_innodb_binlog_reader::next_file() then read from the real file. */ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len) +{ + int res= read_data(buf, len); + chunk_rd.release(res == 0); + return res; +} + + +int ha_innodb_binlog_reader::read_data(uchar *buf, uint32_t len) { int res; + const uchar *p_end; + const uchar *p; + std::pair v_and_p; + int size; - /* - Loop repeatedly trying to read some data from a page. - The usual case is that just one iteration of the loop is necessary. But - occasionally more may be needed, for example when moving to the next - binlog file or when a page has no replication event data to read. - */ - uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); - for (;;) { - buf_block_t *block= nullptr; - mtr_t mtr; - bool mtr_started= false; - uint64_t active= active2; - uint64_t end_offset= - binlog_cur_end_offset[cur_file_no&1].load(std::memory_order_acquire); - ut_ad(cur_file_no <= active); +again: + switch (state) + { + case ST_read_next_event_group: + static_assert(sizeof(rd_buf) == 5*COMPR_INT_MAX64); + res= chunk_rd.read_data(rd_buf, 5*COMPR_INT_MAX64, true); + if (res <= 0) + return res; + if (chunk_rd.cur_type() != FSP_BINLOG_TYPE_COMMIT) + { + chunk_rd.skip_current(); + goto again; + } + /* Found the start of a commit record. */ + chunk_rd.skip_partial(false); - if (cur_file_no + 1 >= active) { - /* Check if we should read from the buffer pool or from the file. */ - if (end_offset != ~(uint64_t)0 && cur_file_offset < end_offset) { - mtr.start(); - mtr_started= true; - /* - ToDo: Should we keep track of the last block read and use it as a - hint? Will be mainly useful when reading the partially written active - page at the current end of the active binlog, which might be a common - case. - */ - buf_block_t *hint_block= nullptr; - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (cur_file_no & 1); - uint32_t page_no= (uint32_t)(cur_file_offset >> srv_page_size_shift); - dberr_t err= DB_SUCCESS; - block= buf_page_get_gen(page_id_t{space_id, page_no}, 0, - RW_S_LATCH, hint_block, BUF_GET_IF_IN_POOL, - &mtr, &err); - if (err != DB_SUCCESS) { - mtr.commit(); - res= -1; - break; - } - } - active2= active_binlog_file_no.load(std::memory_order_acquire); - if (UNIV_UNLIKELY(active2 != active)) { - /* - The active binlog file changed while we were processing; we might - have gotten invalid end_offset or a buffer pool page from a wrong - tablespace. So just try again. - */ - if (mtr_started) - mtr.commit(); - continue; - } - if (cur_file_offset >= end_offset) { - ut_ad(!mtr_started); - if (cur_file_no == active) { - /* Reached end of the currently active binlog file -> EOF. */ - res= 0; - break; - } - /* End of file reached, move to next file. */ - /* - ToDo: Should not read data and send to slaves that has not yet been - durably synced to disk, at least optionally, lest a crash leaves the - slaves with transactions that do not (any longer) exist on master - and breaks replication. - */ - /* - ToDo: Should also obey binlog_cur_written_offset[], once we start - actually maintaining that, to save unnecessary buffer pool - lookup. - */ - next_file(); - continue; - } - if (block) { - res= read_from_buffer_pool_page(block, end_offset, buf, len); - ut_ad(mtr_started); - if (mtr_started) - mtr.commit(); - } else { - /* Not in buffer pool, just read it from the file. */ - if (mtr_started) - mtr.commit(); - if (ensure_file_open()) { - res= -1; - break; - } - ut_ad(cur_file_offset < end_offset); - if (cur_file_offset >= cur_file_length) { - /* - This happens when we reach the end of (active-1) and the tablespace - has been closed. - */ - ut_ad(end_offset == ~(uint64_t)0); - ut_ad(!mtr_started); - next_file(); - continue; - } - res= read_from_file(end_offset, buf, len); - } - } else { - /* Tablespace is not open, just read from the file. */ - if (ensure_file_open()) { - res= -1; - break; - } - if (cur_file_offset >= cur_file_length) { - /* End of this file, move to the next one. */ - next_file(); - continue; - } - res= read_from_file(cur_file_length, buf, len); + /* Read the header of the commit record to see if there's any oob data. */ + rd_buf_len= res; + p_end= rd_buf + res; + v_and_p= compr_int_read(rd_buf); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + oob_count= v_and_p.first; + + if (oob_count > 0) + { + /* Skip the pointer to first chunk. */ + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + oob_last_file_no= v_and_p.first; + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + oob_last_offset= v_and_p.first; } - /* If nothing read, but not eof/error, then loop to try the next page. */ - if (res != 0) - break; - } + rd_buf_sofar= (uint32_t)(p - rd_buf); + state= ST_read_commit_record; + goto again; - return res; + case ST_read_commit_record: + size= 0; + if (rd_buf_len > rd_buf_sofar) + { + /* Use any excess data from when the header was read. */ + size= std::min((int)(rd_buf_len - rd_buf_sofar), (int)len); + memcpy(buf, rd_buf + rd_buf_sofar, size); + rd_buf_sofar+= size; + len-= size; + buf+= size; + } + + if (UNIV_LIKELY(len > 0) && UNIV_LIKELY(!chunk_rd.end_of_record())) + { + res= chunk_rd.read_data(buf, len, false); + if (res < 0) + return -1; + size+= res; + } + + if (UNIV_LIKELY(rd_buf_sofar == rd_buf_len) && chunk_rd.end_of_record()) + { + if (oob_count == 0) + state= ST_read_next_event_group; + else + { + oob_reader.start_traversal(oob_last_file_no, oob_last_offset); + chunk_rd.save_pos(&saved_commit_pos); + state= ST_read_oob_data; + } + if (size == 0) + goto again; + } + + return size; + + case ST_read_oob_data: + res= oob_reader.read_data(&chunk_rd, buf, len); + if (res < 0) + return -1; + if (oob_reader.oob_traversal_done()) + { + chunk_rd.restore_pos(&saved_commit_pos); + state= ST_read_next_event_group; + } + if (UNIV_UNLIKELY(res == 0)) + { + ut_ad(0 /* Should have had oob_traversal_done() last time then. */); + goto again; + } + return res; + + default: + ut_ad(0); + return -1; + } } bool ha_innodb_binlog_reader::data_available() { - uint64_t active= active_binlog_file_no.load(std::memory_order_acquire); - if (active != cur_file_no) - { - ut_ad(active > cur_file_no); + if (state != ST_read_next_event_group) return true; - } - uint64_t end_offset= - binlog_cur_end_offset[cur_file_no&1].load(std::memory_order_acquire); - uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); - if (active2 != active || end_offset > cur_file_offset) - return true; - ut_ad(cur_file_no == active2); - ut_ad(cur_file_offset == end_offset); - return false; -} - - -int -ha_innodb_binlog_reader::read_from_buffer_pool_page(buf_block_t *block, - uint64_t end_offset, - uchar *buf, uint32_t len) -{ - return read_from_page(block->page.frame, end_offset, buf, len); -} - - -int -ha_innodb_binlog_reader::read_from_file(uint64_t end_offset, - uchar *buf, uint32_t len) -{ - uint64_t mask= ((uint64_t)1 << srv_page_size_shift) - 1; - uint64_t offset= cur_file_offset; - uint64_t page_start_offset; - - ut_ad(cur_file != (File)-1); - ut_ad(cur_file_offset < cur_file_length); - - page_start_offset= offset & ~mask; - size_t res= my_pread(cur_file, page_buf, srv_page_size, page_start_offset, - MYF(MY_WME)); - if (res == (size_t)-1) - return -1; - - return read_from_page(page_buf, end_offset, buf, len); -} - - -/* - Read out max `len` bytes from the chunks stored in a page. - - page_ptr Points to start of data for current page matching cur_file_offset - end_offset Current end of binlog file, no reads past this point - buf Destination buffer to read into - len Maximum number of bytes to read - - Returns number of bytes actually read. -*/ -int -ha_innodb_binlog_reader::read_from_page(uchar *page_ptr, uint64_t end_offset, - uchar *buf, uint32_t len) -{ - uint32_t page_size= (uint32_t)srv_page_size; - uint64_t mask= ((uint64_t)1 << srv_page_size_shift) - 1; - uint64_t offset= cur_file_offset; - uint64_t page_start_offset= offset & ~mask; - uint32_t page_end= - end_offset > page_start_offset + (page_size - FIL_PAGE_DATA_END) ? - (page_size - FIL_PAGE_DATA_END) : - (uint32_t)(end_offset & mask); - uint32_t in_page_offset= (uint32_t)(offset & mask); - uint32_t sofar= 0; - - ut_ad(in_page_offset < page_size - FIL_PAGE_DATA_END); - if (in_page_offset < FIL_PAGE_DATA) - in_page_offset= FIL_PAGE_DATA; - - /* First return data from any partially-read chunk. */ - if ((sofar= chunk_remain)) { - if (sofar <= len) { - memcpy(buf, page_ptr + in_page_offset + chunk_pos, sofar); - chunk_pos= 0; - chunk_remain= 0; - in_page_offset+= sofar; - } else { - memcpy(buf, page_ptr + in_page_offset + chunk_pos, len); - chunk_pos+= len; - chunk_remain= sofar - len; - cur_file_offset= offset + len; - return len; - } - } - - while (sofar < len && in_page_offset < page_end) - { - uchar type= page_ptr[in_page_offset]; - if (type == 0x00) - break; /* No more data on the page yet */ - if (type == FSP_BINLOG_TYPE_FILLER) { - in_page_offset= page_size; /* Point to start of next page */ - break; /* No more data on page */ - } - uint32_t size= page_ptr[in_page_offset + 1] + - (uint32_t)(page_ptr[in_page_offset + 2] << 8); - if ((type & FSP_BINLOG_TYPE_MASK) != FSP_BINLOG_TYPE_COMMIT || - (UNIV_UNLIKELY(skipping_partial) && (type & FSP_BINLOG_FLAG_CONT))) - { - /* Skip non-binlog-event record, or initial partial record. */ - in_page_offset += 3 + size; - continue; - } - skipping_partial= false; - - /* Now grab the data in the chunk, or however much the caller requested. */ - uint32_t rest = len - sofar; - if (size > rest) { - /* - Chunk contains more data than reader requested. - Return what was requested, and remember the remaining partial data - for the next read. - */ - memcpy(buf + sofar, page_ptr + (in_page_offset + 3), rest); - chunk_pos= rest; - chunk_remain= size - rest; - sofar+= rest; - break; - } - - memcpy(buf + sofar, page_ptr + (in_page_offset + 3), size); - in_page_offset= in_page_offset + 3 + size; - sofar+= size; - } - - if (in_page_offset >= page_size - FIL_PAGE_DATA_END) - cur_file_offset= page_start_offset + page_size; // To start of next page - else - cur_file_offset= page_start_offset | in_page_offset; - return sofar; + return chunk_rd.data_available(); } @@ -6634,7 +7307,7 @@ gtid_search::find_gtid_pos(slave_connection_state *pos, /* First search backwards for the right file to start from. */ uint64_t file_end= 0; uint64_t diff_state_interval= 0; - rpl_binlog_state_base base_state, diff_state; + rpl_binlog_state_base base_state, page0_diff_state, tmp_diff_state; base_state.init(); for (;;) { @@ -6684,15 +7357,16 @@ gtid_search::find_gtid_pos(slave_connection_state *pos, /* Round to the next diff_state_interval after file_end. */ page2-= page2 % diff_state_page_interval; uint32_t page1= (page0 + page2) / 2; - diff_state.init(); - diff_state.load_nolock(&base_state); - while (page1 >= page0 + diff_state_interval) + page0_diff_state.init(); + page0_diff_state.load_nolock(&base_state); + tmp_diff_state.init(); + while (page1 >= page0 + diff_state_page_interval) { - ut_ad((page1 - page0) % diff_state_interval == 0); - diff_state.reset_nolock(); - diff_state.load_nolock(&base_state); + ut_ad((page1 - page0) % diff_state_page_interval == 0); + tmp_diff_state.reset_nolock(); + tmp_diff_state.load_nolock(&base_state); enum Read_Result res= - read_gtid_state_file_no(&diff_state, file_no, 0, &file_end, + read_gtid_state_file_no(&tmp_diff_state, file_no, page1, &file_end, &diff_state_interval); if (res == READ_ENOENT) return 0; /* File purged while we are reading from it? */ @@ -6708,14 +7382,18 @@ gtid_search::find_gtid_pos(slave_connection_state *pos, page1= page1 - diff_state_page_interval; continue; } - if (diff_state.is_before_pos(pos)) + if (tmp_diff_state.is_before_pos(pos)) + { page0= page1; + page0_diff_state.reset_nolock(); + page0_diff_state.load_nolock(&tmp_diff_state); + } else page2= page1; page1= (page0 + page2) / 2; } ut_ad(page1 >= page0); - out_state->load_nolock(&diff_state); + out_state->load_nolock(&page0_diff_state); *out_file_no= file_no; *out_offset= (uint64_t)page0 << srv_page_size_shift; return 1; @@ -6734,8 +7412,9 @@ ha_innodb_binlog_reader::init_gtid_pos(slave_connection_state *pos, return -1; if (res > 0) { - cur_file_no= file_no; - cur_file_offset= offset; + chunk_rd.seek(file_no, offset); + chunk_rd.skip_current(); + chunk_rd.skip_partial(true); } return res; } diff --git a/storage/innobase/include/ut0compr_int.h b/storage/innobase/include/ut0compr_int.h index 7d5ea31ef69..e4152d9071e 100644 --- a/storage/innobase/include/ut0compr_int.h +++ b/storage/innobase/include/ut0compr_int.h @@ -58,7 +58,7 @@ Created 2024-10-01 Kristian Nielsen #define COMPR_INT_MAX32 5 #define COMPR_INT_MAX64 9 -#define COMPR_INT_MAX COMPR_INT_MAX_64 +#define COMPR_INT_MAX COMPR_INT_MAX64 /* Write compressed unsigned integer */ extern unsigned char *compr_int_write(unsigned char *p, uint64_t v);