diff --git a/mysql-test/suite/binlog_in_engine/rpl_oob.result b/mysql-test/suite/binlog_in_engine/rpl_oob.result new file mode 100644 index 00000000000..598d2ff47ec --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/rpl_oob.result @@ -0,0 +1,19 @@ +include/master-slave.inc +[connection master] +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); +*** Generating 25 large transactions in 5 interleaved connections +connection master; +INSERT INTO t1 VALUES (0, 1, 'End'); +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +2552 33150 128776 5000383 +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c)) +2552 33150 128776 5000383 +connection master; +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/binlog_in_engine/rpl_oob.test b/mysql-test/suite/binlog_in_engine/rpl_oob.test new file mode 100644 index 00000000000..0878dd7bf26 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/rpl_oob.test @@ -0,0 +1,69 @@ +--source include/have_innodb_binlog.inc +--source include/have_binlog_format_row.inc +--source include/master-slave.inc + +# Test a number of transactions that are large and get interleaved with each +# other over multiple binlog files. +--let $NUM_CONNECTIONS= 5 +# $NUM_TRANSACTIONS is total, not per connection. +--let $NUM_TRANSACTIONS=25 +--let $NUM_PIECES= 100 +--let $PIECE_SIZE= 2000 + + +CREATE TABLE t1 (a INT NOT NULL, b INT NOT NULL, c TEXT, PRIMARY KEY(a, b)) ENGINE=InnoDB; +INSERT INTO t1 VALUES (0, 0, 'Start'); + +--echo *** Generating $NUM_TRANSACTIONS large transactions in $NUM_CONNECTIONS interleaved connections +--disable_query_log +let $t= 0; +while ($t < $NUM_TRANSACTIONS) { + let $b= $t; + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connect(con$i,localhost,root,,) + START TRANSACTION; + eval INSERT INTO t1 VALUES ($b + $i, 0, 'Initial $i'); + inc $i; + inc $t; + } + + let $p= 1; + while ($p <= $NUM_PIECES) { + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($b + $i, $p, REPEAT(CHR(65 + ($p + $i MOD 26)), $PIECE_SIZE)); + inc $i; + } + inc $p; + } + + let $i= 1; + while ($i <= $NUM_CONNECTIONS) { + --connection con$i + eval INSERT INTO t1 VALUES ($b + $i, $NUM_PIECES+1, 'Last $i'); + COMMIT; + --disconnect con$i + inc $i; + } +} +--enable_query_log + +--connection master +INSERT INTO t1 VALUES (0, 1, 'End'); + +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; +--source include/save_master_gtid.inc +--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 --start-position=0-1-1 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.txt + +--connection slave +--source include/sync_with_master_gtid.inc +SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1; + + +# Cleanup. +--connection master +DROP TABLE t1; +--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 --start-position=0-1-1 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.txt +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/include/rpl_gtid_index.inc b/mysql-test/suite/rpl/include/rpl_gtid_index.inc index f44bcfe81fb..ae8f49e70fc 100644 --- a/mysql-test/suite/rpl/include/rpl_gtid_index.inc +++ b/mysql-test/suite/rpl/include/rpl_gtid_index.inc @@ -109,7 +109,7 @@ while ($i < $NUM_POS) { --disable_result_log eval START SLAVE UNTIL master_gtid_pos='$gtid_pos'; --enable_result_log - --let $res= `SELECT MASTER_GTID_WAIT('$gtid_pos')` + --let $res= `SELECT MASTER_GTID_WAIT('$gtid_pos', 60)` if ($res != 0) { --die "FAIL: MASTER_GTID_WAIT($gtid_pos) returned $res, should have been 0" } @@ -154,7 +154,7 @@ while ($i <= $NUM_DOMAIN) { --disable_result_log eval START SLAVE UNTIL master_gtid_pos='$until_pos'; --enable_result_log - --let $res= `SELECT MASTER_GTID_WAIT('$until_pos')` + --let $res= `SELECT MASTER_GTID_WAIT('$until_pos', 60)` if ($res != 0) { --die "FAIL: MASTER_GTID_WAIT($until_pos) returned $res, should have been 0" } diff --git a/sql/handler.h b/sql/handler.h index adb621abb9e..4de527cfbd1 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -5845,21 +5845,6 @@ struct handler_binlog_event_group_info { Class for reading a binlog implemented in an engine. */ class handler_binlog_reader { -public: - /* ToDo: Should some of this state go to the derived class, in case different engines might want to do something different? */ - - /* The file number of the currently-being-read binlog file. */ - uint64_t cur_file_no; - /* The current offset into the binlog file. */ - uint64_t cur_file_offset; - /* - Open file handle of binlog file. - This may be NULL if the currently-being-read binlog file is "hot" and - is being read from in-memory buffers while the data may not yet be - written out to the file on the OS level. - */ - File cur_file; - private: /* Position and length of any remaining data in buf[]. */ uint32_t buf_data_pos; @@ -5869,8 +5854,7 @@ private: public: handler_binlog_reader() - : cur_file_no(~(uint64_t)0), cur_file_offset(0), cur_file((File)-1), - buf_data_pos(0), buf_data_remain(0) + : buf_data_pos(0), buf_data_remain(0) { } virtual ~handler_binlog_reader() { }; virtual int read_binlog_data(uchar *buf, uint32_t len) = 0; @@ -5879,14 +5863,6 @@ public: rpl_binlog_state_base *state) = 0; int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed); - -/* - cur_file_no -> implicitly gives file/tablespace - cur_file_offset -> implicitly gives page - cur_file_fh -> open fh, if any - cur_chunk_len - cur_chunk_sofar -*/ }; #endif /* HANDLER_INCLUDED */ diff --git a/sql/log.cc b/sql/log.cc index 67a13422f3b..4c135e0e7de 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6312,6 +6312,28 @@ bool stmt_has_updated_non_trans_table(const THD* thd) static int binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len) { + /* + Tricky: The mysys IO_CACHE write function can be called either from + my_b_flush_io_cache(), where it must write everything it was asked to; or + from _my_b_write(), where it needs only write as much as is efficient (eg. + an integer multiple of some block size), and any remainder (which must be + < cache size) will be put in the cache. + + The two cases are distinguished on whether the passed-in data pointer is + equal to cache->write_buffer or not. + + We want each oob record to be the full size, so write only integer + multiples of the cache size in the latter case. + */ + if (data != cache->write_buffer) + { + len-= (len % cache->buffer_length); + if (!len) + return false; + } + + /* ToDo: If len > the cache size (32k default), then split up the write in multiple oob writes to the engine. This can happen if there is a large single write to the IO_CACHE. Maybe the split could happen also in the engine, depending if I want to split the size here to the binlog_cache_size which is known here, or if I want to split it to an engine imposed max size. But since the commit record size is determined by the upper layer here, I think it makes sense to determine the oob record size here also. */ + binlog_cache_mngr *mngr= (binlog_cache_mngr *)cache->append_read_pos; void **engine_ptr= &mngr->engine_binlog_info.engine_ptr; bool res= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data, len, diff --git a/storage/innobase/fsp/fsp0fsp.cc b/storage/innobase/fsp/fsp0fsp.cc index a9150abcd6c..f0515d9342e 100644 --- a/storage/innobase/fsp/fsp0fsp.cc +++ b/storage/innobase/fsp/fsp0fsp.cc @@ -4292,6 +4292,8 @@ enum fsp_binlog_chunk_types { FSP_BINLOG_TYPE_GTID_STATE= 2, /* Out-of-band event group data. */ FSP_BINLOG_TYPE_OOB_DATA= 3, + /* Must be one more than the last type. */ + FSP_BINLOG_TYPE_END, /* Padding data at end of page. */ FSP_BINLOG_TYPE_FILLER= 0xff @@ -4312,6 +4314,17 @@ static constexpr uint32_t FSP_BINLOG_FLAG_LAST= (1 << FSP_BINLOG_FLAG_BIT_LAST); static constexpr uint32_t FSP_BINLOG_TYPE_MASK= ~(FSP_BINLOG_FLAG_CONT | FSP_BINLOG_FLAG_LAST); +/* + These are the chunk types that are allowed to occur in the middle of + another record. +*/ +static constexpr uint64_t ALLOWED_NESTED_RECORDS= + /* GTID STATE at start of page can occur in the middle of other record. */ + ((uint64_t)1 << FSP_BINLOG_TYPE_GTID_STATE) + ; +/* Ensure that all types fit in the ALLOWED_NESTED_RECORDS bitmask. */ +static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS)); + static uint32_t binlog_size_in_pages; buf_block_t *binlog_cur_block; @@ -4458,27 +4471,244 @@ struct binlog_oob_context { }; -class ha_innodb_binlog_reader : public handler_binlog_reader { - /* Buffer to hold a page read directly from the binlog file. */ - uchar *page_buf; - /* Length of the currently open file (if cur_file != -1). */ +class chunk_reader { +public: + enum chunk_reader_status { + CHUNK_READER_ERROR= -1, + CHUNK_READER_EOF= 0, + CHUNK_READER_FOUND= 1 + }; + + /* + Current state, can be obtained from save_pos() and later passed to + restore_pos(). + */ + struct saved_position { + /* Current position file. */ + uint64_t file_no; + /* Current position page. */ + uint32_t page_no; + /* Start of current chunk inside page. */ + uint32_t in_page_offset; + /* + The length of the current chunk, once the chunk type has been read. + If 0, it means the chunk type (and length) has not yet been read. + */ + uint32_t chunk_len; + /* The read position inside the current chunk. */ + uint32_t chunk_read_offset; + byte chunk_type; + /* When set, read will skip the current chunk, if any. */ + bool skip_current; + /* Set while we are in the middle of reading a record. */ + bool in_record; + } s; + + /* The mtr is started iff cur_block is non-NULL. */ + mtr_t mtr; + /* + After fetch_current_page(), this points into either cur_block or + page_buffer as appropriate. + */ + byte *page_ptr; + /* Valid after fetch_current_page(), if page found in buffer pool. */ + buf_block_t *cur_block; + /* Buffer for reading a page directly from a tablespace file. */ + byte *page_buffer; + /* Amount of data in file, valid after fetch_current_page(). */ + uint64_t cur_end_offset; + /* Length of the currently open file, valid if cur_file_handle != -1. */ uint64_t cur_file_length; - /* Used to keep track of partial chunk returned to reader. */ - uint32_t chunk_pos; - uint32_t chunk_remain; + /* Open file handle to tablespace file_no, or -1. */ + File cur_file_handle; /* Flag used to skip the rest of any partial chunk we might be starting in the middle of. */ bool skipping_partial; + + chunk_reader(); + void set_page_buf(byte *in_page_buf) { page_buffer= in_page_buf; } + ~chunk_reader(); + + /* Current type, or FSP_BINLOG_TYPE_FILLER if between records. */ + byte cur_type() { return (byte)(s.chunk_type & FSP_BINLOG_TYPE_MASK); } + bool cur_is_cont() { return (s.chunk_type & FSP_BINLOG_FLAG_CONT) != 0; } + bool end_of_record() { return !s.in_record; } + static int read_error_corruption(uint64_t file_no, uint64_t page_no, + const char *msg); + int read_error_corruption(const char *msg) + { + return read_error_corruption(s.file_no, s.page_no, msg); + } + enum chunk_reader_status fetch_current_page(); + /* + Try to read max_len bytes from a record into buffer. + + If multipage is true, will move across pages to read following + continuation chunks, if any, to try and read max_len total bytes. Only if + the record ends before max_len bytes is less amount of bytes returned. + + If multipage is false, will read as much is available on one page (up to + max of max_len), and then return. + + Returns number of bytes read, or -1 for error. + Returns 0 if the chunk_reader is pointing to start of a chunk at the end + of the current binlog (ie. end-of-file). + */ + int read_data(byte *buffer, int max_len, bool multipage); + + /* Save current position, and restore it later. */ + void save_pos(saved_position *out_pos) { *out_pos= s; } + void restore_pos(saved_position *pos); + void seek(uint64_t file_no, uint64_t offset); + + /* + Make next read_data() skip any data from the current chunk (if any), and + start reading data only from the beginning of the next chunk. */ + void skip_current() { s.skip_current= true; } + /* + Used initially, after seeking potentially into the middle of a (commit) + record, to skip any continuation chunks until we reach the start of the + first real record. + */ + void skip_partial(bool skip) { skipping_partial= skip; } + /* Release any buffer pool page latch. */ + void release(bool release_file_page= false); + + bool data_available() + { + if (!end_of_record()) + return true; + uint64_t active= active_binlog_file_no.load(std::memory_order_acquire); + if (active != s.file_no) + { + ut_ad(active > s.file_no); + return true; + } + uint64_t end_offset= + binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire); + uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); + if (active2 != active) + return true; // Active moved while we were checking + if (end_offset == ~(uint64_t)0) + return false; // Nothing in this binlog file yet + uint64_t offset= (s.page_no << srv_page_size_shift) | s.in_page_offset; + if (offset < end_offset) + return true; + + ut_ad(s.file_no == active2); + ut_ad(offset == end_offset); + return false; + } +}; + + +/* + A class for doing the post-order traversal of the forest of perfect binary + trees that make up the out-of-band data for a commit record. +*/ +class fsp_binlog_oob_reader { + enum oob_states { + /* The initial state, about to visit the node for the first time. */ + ST_initial, + /* State of leaf node while traversing the prior trees in the forest. */ + ST_traversing_prior_trees, + /* State of non-leaf node while traversing its left sub-tree. */ + ST_traversing_left_child, + /* State of non-leaf node while traversing its right sub-tree. */ + ST_traversing_right_child, + /* State of node while reading out its data. */ + ST_self + }; + + /* + Stack entry for one node currently taking part in post-order traversal. + We maintain a stack of pending nodes during the traversal, as the traversal + happens in a state machine rather than by recursion. + */ + struct stack_entry { + /* Saved position after reading header. */ + chunk_reader::saved_position saved_pos; + /* The location of this node's OOB record. */ + uint64_t file_no; + uint64_t offset; + /* Right child, to be traversed after left child. */ + uint64_t right_file_no; + uint64_t right_offset; + /* Offset of real data in this node, after header. */ + uint32_t header_len; + /* Amount of data read into rd_buf, and amount used to parse header. */ + uint32_t rd_buf_len; + uint32_t rd_buf_sofar; + /* Current state in post-order traversal state machine. */ + enum oob_states state; + /* Buffer for reading header. */ + byte rd_buf[5*COMPR_INT_MAX64]; + /* + True when the node is reached using only left child pointers, false + otherwise. Used to identify the left-most leaf in a tree which points to + a prior tree that must be traversed first. + */ + bool is_leftmost; + } init_stack[8], *stack; + + /* + If the stack_len is == sizeof(init_stack)/sizeof(init_stack[0]), then we + are using the embedded stack memory. Otherwise we are using a dynamically + allocated stack that must be freed. + */ + uint32_t stack_len; + /* Current top of stack. */ + uint32_t stack_top; + /* State machine current state. */ + enum oob_states state; + +public: + fsp_binlog_oob_reader(); + ~fsp_binlog_oob_reader(); + + bool start_traversal(uint64_t file_no, uint64_t offset); + bool oob_traversal_done() { return stack_top == 0; } + int read_data(chunk_reader *chunk_rd, uchar *buf, int max_len); + +private: + bool ensure_stack(uint32_t length); + bool push_state(enum oob_states state, uint64_t file_no, uint64_t offset, + bool is_leftmost); +}; + + +class ha_innodb_binlog_reader : public handler_binlog_reader { + enum reader_states { + ST_read_next_event_group, ST_read_oob_data, ST_read_commit_record + }; + + chunk_reader chunk_rd; + fsp_binlog_oob_reader oob_reader; + chunk_reader::saved_position saved_commit_pos; + + /* Buffer to hold a page read directly from the binlog file. */ + uchar *page_buf; + /* Out-of-band data to read after commit record, if any. */ + uint64_t oob_count; + uint64_t oob_last_file_no; + uint64_t oob_last_offset; + /* Keep track of pending bytes in the rd_buf. */ + uint32_t rd_buf_len; + uint32_t rd_buf_sofar; + /* State for state machine reading chunks one by one. */ + enum reader_states state; + + /* Used to read the header of the commit record. */ + byte rd_buf[5*COMPR_INT_MAX64]; private: - bool ensure_file_open(); - void next_file(); int read_from_buffer_pool_page(buf_block_t *block, uint64_t end_offset, uchar *buf, uint32_t len); int read_from_file(uint64_t end_offset, uchar *buf, uint32_t len); int read_from_page(uchar *page_ptr, uint64_t end_offset, uchar *buf, uint32_t len); + int read_data(uchar *buf, uint32_t len); public: ha_innodb_binlog_reader(uint64_t file_no= 0, uint64_t offset= 0); @@ -5254,8 +5484,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, { size_t buf_size= state->count_nolock() * (2*COMPR_INT_MAX32 + COMPR_INT_MAX64); - /* ToDo: InnoDB alloc function? */ - alloced_buf= (byte *)my_malloc(PSI_INSTRUMENT_ME, buf_size, MYF(MY_WME)); + alloced_buf= (byte *)ut_malloc(buf_size, mem_key_binlog); if (UNIV_UNLIKELY(!alloced_buf)) return true; buf= alloced_buf; @@ -5263,7 +5492,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, if (UNIV_UNLIKELY(used_bytes < 0)) { ut_ad(0 /* Shouldn't happen, as we allocated maximum needed size. */); - my_free(alloced_buf); + ut_free(alloced_buf); return true; } } @@ -5315,7 +5544,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, ++page_no; } } - my_free(alloced_buf); + ut_free(alloced_buf); /* Make sure we return a page for caller to write the main event data into. */ if (UNIV_UNLIKELY(!block)) { @@ -5782,7 +6011,7 @@ struct chunk_data_cache : public chunk_data_base { if (gtid_remain > 0) { uint32_t size2= gtid_remain > max_len ? max_len : (uint32_t)gtid_remain; - int res2= my_b_read(cache, p, size2); + int res2= my_b_read(cache, p + size, size2); ut_a(!res2 /* ToDo: Error handling */); gtid_remain-= size2; if (gtid_remain == 0) @@ -6083,58 +6312,634 @@ fsp_binlog_trx(trx_t *trx, mtr_t *mtr) } +chunk_reader::chunk_reader() + : s { 0, 0,0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }, + page_ptr(0), cur_block(0), page_buffer(nullptr), + cur_file_handle((File)-1), skipping_partial(false) +{ + /* Nothing else. */ +} + + +chunk_reader::~chunk_reader() +{ + release(); + if (cur_file_handle >= (File)0) + my_close(cur_file_handle, MYF(0)); +} + + +int +chunk_reader::read_error_corruption(uint64_t file_no, uint64_t page_no, + const char *msg) +{ + ib::error() << "Corrupt binlog found on page " << page_no << + " in binlog number " << file_no << ": " << msg; + return -1; +} + + +/* + Obtain the data on the page currently pointed to by the chunk reader. The + page is either latched in the buffer pool (starting an mtr), or read from + the file into the page buffer. + + The code does a dirty read of active_binlog_file_no to determine if the page + is known to be available to read from the file, or if it should be looked up + in the buffer pool. After making the decision, another dirty read is done to + protect against the race where the active tablespace changes in the middle, + and if so the operation is re-tried. This is necessary since the binlog files + N and N-2 use the same tablespace id, so we must ensure we do not mistake a + page from N as belonging to N-2. +*/ +enum chunk_reader::chunk_reader_status +chunk_reader::fetch_current_page() +{ + ut_ad(!cur_block /* Must have no active mtr */); + uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); + for (;;) { + buf_block_t *block= nullptr; + bool mtr_started= false; + uint64_t offset= (s.page_no << srv_page_size_shift) | s.in_page_offset; + uint64_t active= active2; + uint64_t end_offset= + binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire); + ut_ad(s.file_no <= active); + + if (s.file_no + 1 >= active) { + /* Check if we should read from the buffer pool or from the file. */ + if (end_offset != ~(uint64_t)0 && offset < end_offset) { + mtr.start(); + mtr_started= true; + /* + ToDo: Should we keep track of the last block read and use it as a + hint? Will be mainly useful when reading the partially written active + page at the current end of the active binlog, which might be a common + case. + */ + buf_block_t *hint_block= nullptr; + uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (s.file_no & 1); + dberr_t err= DB_SUCCESS; + block= buf_page_get_gen(page_id_t{space_id, s.page_no}, 0, + RW_S_LATCH, hint_block, BUF_GET_IF_IN_POOL, + &mtr, &err); + if (err != DB_SUCCESS) { + mtr.commit(); + return CHUNK_READER_ERROR; + } + } + active2= active_binlog_file_no.load(std::memory_order_acquire); + if (UNIV_UNLIKELY(active2 != active)) { + /* + The active binlog file changed while we were processing; we might + have gotten invalid end_offset or a buffer pool page from a wrong + tablespace. So just try again. + */ + if (mtr_started) + mtr.commit(); + continue; + } + cur_end_offset= end_offset; + if (offset >= end_offset) { + ut_ad(!mtr_started); + if (s.file_no == active) { + /* Reached end of the currently active binlog file -> EOF. */ + return CHUNK_READER_EOF; + } + } + if (block) { + cur_block= block; + ut_ad(mtr_started); + page_ptr= block->page.frame; + return CHUNK_READER_FOUND; + } else { + /* Not in buffer pool, just read it from the file. */ + if (mtr_started) + mtr.commit(); + /* Fall through to read from file. */ + } + } + + /* Tablespace is not open, just read from the file. */ + if (cur_file_handle < (File)0) + { + char filename[BINLOG_NAME_LEN]; + MY_STAT stat_buf; + + binlog_name_make(filename, s.file_no); + cur_file_handle= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME)); + if (UNIV_UNLIKELY(cur_file_handle < (File)0)) { + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + return CHUNK_READER_ERROR; + } + if (my_fstat(cur_file_handle, &stat_buf, MYF(0))) { + my_error(ER_CANT_GET_STAT, MYF(0), filename, errno); + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + return CHUNK_READER_ERROR; + } + cur_file_length= stat_buf.st_size; + } + if (s.file_no == active) + cur_end_offset= end_offset; + else + cur_end_offset= cur_file_length; + + if (offset >= cur_file_length) { + /* End of this file, move to the next one. */ + /* + ToDo: Should also obey binlog_cur_written_offset[], once we start + actually maintaining that, to save unnecessary buffer pool + lookup. + */ + if (cur_file_handle >= (File)0) + { + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + } + ++s.file_no; + s.page_no= 0; + continue; + } + + size_t res= my_pread(cur_file_handle, page_buffer, srv_page_size, + s.page_no << srv_page_size_shift, MYF(MY_WME)); + if (res == (size_t)-1) + return CHUNK_READER_ERROR; + page_ptr= page_buffer; + return CHUNK_READER_FOUND; + } + /* NOTREACHED */ +} + + +int +chunk_reader::read_data(byte *buffer, int max_len, bool multipage) +{ + uint32_t size; + int sofar= 0; + +read_more_data: + if (max_len == 0) + return sofar; + + if (!page_ptr) + { + enum chunk_reader_status res= fetch_current_page(); + if (res == CHUNK_READER_EOF) + { + if (s.in_record) + return read_error_corruption(s.file_no, s.page_no, "binlog tablespace " + "truncated in the middle of record"); + else + return 0; + } + else if (res == CHUNK_READER_ERROR) + return -1; + } + + if (s.chunk_len == 0) + { + byte type; + if (s.in_page_offset < FIL_PAGE_DATA) + s.in_page_offset= FIL_PAGE_DATA; + else if (s.in_page_offset >= srv_page_size - (FIL_PAGE_DATA_END + 3) || + page_ptr[s.in_page_offset] == FSP_BINLOG_TYPE_FILLER) + { + ut_ad(s.in_page_offset >= srv_page_size - FIL_PAGE_DATA_END || + page_ptr[s.in_page_offset] == FSP_BINLOG_TYPE_FILLER); + goto go_next_page; + } + + /* Check for end-of-file. */ + if (cur_end_offset == ~(uint64_t)0 || + (s.page_no << srv_page_size_shift) + s.in_page_offset >= cur_end_offset) + return sofar; + + type= page_ptr[s.in_page_offset]; + if (type == 0) + { + ut_ad(0 /* Should have detected end-of-file on cur_end_offset. */); + return 0; + } + + /* + Consistency check on the chunks. A record must consist in a sequence of + chunks of the same type, all but the first must have the + FSP_BINLOG_FLAG_BIT_CONT bit set, and the final one must have the + FSP_BINLOG_FLAG_BIT_LAST bit set. + */ + if (!s.in_record) + { + if (UNIV_UNLIKELY(type & FSP_BINLOG_FLAG_CONT) && !s.skip_current) + { + if (skipping_partial) + { + s.chunk_len= page_ptr[s.in_page_offset + 1] | + ((uint32_t)page_ptr[s.in_page_offset + 2] << 8); + s.skip_current= true; + goto skip_chunk; + } + else + return read_error_corruption(s.file_no, s.page_no, "Binlog record " + "starts with continuation chunk"); + } + } + else + { + if ((type ^ s.chunk_type) & FSP_BINLOG_TYPE_MASK) + { + /* + As a special case, we must allow a GTID state to appear in the + middle of a record. + */ + if (((uint64_t)1 << (type & FSP_BINLOG_TYPE_MASK)) & + ALLOWED_NESTED_RECORDS) + { + s.chunk_len= page_ptr[s.in_page_offset + 1] | + ((uint32_t)page_ptr[s.in_page_offset + 2] << 8); + goto skip_chunk; + } + /* Chunk type changed in the middle. */ + return read_error_corruption(s.file_no, s.page_no, "Binlog record missing " + "end chunk"); + } + if (!(type & FSP_BINLOG_FLAG_CONT)) + { + /* START chunk without END chunk. */ + return read_error_corruption(s.file_no, s.page_no, "Binlog record missing " + "end chunk"); + } + } + + s.skip_current= false; + s.chunk_type= type; + s.in_record= true; + s.chunk_len= page_ptr[s.in_page_offset + 1] | + ((uint32_t)page_ptr[s.in_page_offset + 2] << 8); + s.chunk_read_offset= 0; + } + + /* Now we have a chunk available to read data from. */ + ut_ad(s.chunk_read_offset < s.chunk_len); + if (s.skip_current && + (s.chunk_read_offset > 0 || (s.chunk_type & FSP_BINLOG_FLAG_CONT))) + { + /* + Skip initial continuation chunks. + Used to be able to start reading potentially in the middle of a record, + ie. at a GTID state point. + */ + s.chunk_read_offset= s.chunk_len; + } + else + { + size= std::min((uint32_t)max_len, s.chunk_len - s.chunk_read_offset); + memcpy(buffer, page_ptr + s.in_page_offset + 3 + s.chunk_read_offset, size); + buffer+= size; + s.chunk_read_offset+= size; + max_len-= size; + sofar+= size; + } + + if (s.chunk_len > s.chunk_read_offset) + { + ut_ad(max_len == 0 /* otherwise would have read more */); + return sofar; + } + + /* We have read all of the chunk. Move to next chunk or end of the record. */ +skip_chunk: + s.in_page_offset+= 3 + s.chunk_len; + s.chunk_len= 0; + s.chunk_read_offset= 0; + + if (s.chunk_type & FSP_BINLOG_FLAG_LAST) + { + s.in_record= false; /* End of record. */ + s.skip_current= false; + } + + if (s.in_page_offset >= srv_page_size - (FIL_PAGE_DATA_END + 3)) + { +go_next_page: + /* End of page reached, move to the next page. */ + ++s.page_no; + page_ptr= nullptr; + if (cur_block) + { + mtr.commit(); + cur_block= nullptr; + } + s.in_page_offset= 0; + + if (cur_file_handle >= (File)0 && + (s.page_no << srv_page_size_shift) >= cur_file_length) + { + /* Move to the next file. */ + /* + ToDo: Should we have similar logic for moving to the next file when + we reach the end of the last page using the buffer pool? + + Then we save re-fetching the page later. But then we need to keep + track of the file length also when reading from the buffer pool. + */ + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + ++s.file_no; + s.page_no= 0; + } + } + + if (sofar > 0 && (!multipage || !s.in_record)) + return sofar; + + goto read_more_data; +} + + +void +chunk_reader::restore_pos(chunk_reader::saved_position *pos) +{ + if (page_ptr && + !(pos->file_no == s.file_no && pos->page_no == s.page_no)) + { + /* Seek to a different page, release any current page. */ + if (cur_block) + { + mtr.commit(); + cur_block= nullptr; + } + page_ptr= nullptr; + } + if (cur_file_handle != (File)-1 && pos->file_no != s.file_no) + { + /* Seek to a different file than currently open, close it. */ + my_close(cur_file_handle, MYF(0)); + cur_file_handle= (File)-1; + cur_file_length= ~(uint64_t)0; + } + s= *pos; +} + + +void +chunk_reader::seek(uint64_t file_no, uint64_t offset) +{ + saved_position pos { + file_no, (uint32_t)(offset >> srv_page_size_shift), + (uint32_t)(offset & (srv_page_size - 1)), + 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }; + restore_pos(&pos); +} + + +void chunk_reader::release(bool release_file_page) +{ + if (cur_block) + { + mtr.commit(); + cur_block= nullptr; + page_ptr= nullptr; + } + else if (release_file_page) + { + /* + For when we reach EOF while reading from the file. We need to re-read + the page from the file (or buffer pool) in this case on next read, as + data might be added to the page. + */ + page_ptr= nullptr; + } +} + + +fsp_binlog_oob_reader::fsp_binlog_oob_reader() + : stack(init_stack), stack_len(sizeof(init_stack)/sizeof(init_stack[0])), + stack_top(0) +{ +} + + +fsp_binlog_oob_reader::~fsp_binlog_oob_reader() +{ + if (stack_len > sizeof(init_stack)/sizeof(init_stack[0])) + ut_free(stack); +} + + +bool +fsp_binlog_oob_reader::ensure_stack(uint32_t needed_length) +{ + uint32_t len= stack_len; + if (len >= needed_length) + return false; + do + len*= 2; + while (len < needed_length); + size_t needed= len*sizeof(stack_entry); + stack_entry *new_stack= (stack_entry *) ut_malloc(needed, mem_key_binlog); + if (!new_stack) + { + my_error(ER_OUTOFMEMORY, MYF(0), needed); + return true; + } + + memcpy(new_stack, stack, stack_len*sizeof(stack_entry)); + stack= new_stack; + stack_len= len; + return false; +} + + +bool +fsp_binlog_oob_reader::push_state(enum oob_states state, uint64_t file_no, + uint64_t offset, bool is_leftmost) +{ + uint32_t idx= stack_top; + if (ensure_stack(idx + 1)) + return true; + stack[idx].state= state; + stack[idx].file_no= file_no; + stack[idx].offset= offset; + stack[idx].is_leftmost= is_leftmost; + stack_top= idx + 1; + return false; +} + + +bool +fsp_binlog_oob_reader::start_traversal(uint64_t file_no, uint64_t offset) +{ + stack_top= 0; + return push_state(ST_initial, file_no, offset, true); +} + + +/* + Read from out-of-band event group data. + + Does a state-machine incremental traversal of the forest of perfect binary + trees of oob records in the event group. May read just the data available + on one page, thus returning less than the requested number of bytes (this + is to prefer to inspect each page only once, returning data page-by-page as + long as reader asks for at least a full page worth of data). +*/ +int +fsp_binlog_oob_reader::read_data(chunk_reader *chunk_rd, uchar *buf, int len) +{ + stack_entry *e; + uint64_t chunk_idx; + uint64_t left_file_no; + uint64_t left_offset; + int res; + const uchar *p_end; + const uchar *p; + std::pair v_and_p; + int size; + + if (stack_top <= 0) + { + ut_ad(0 /* Should not call when no more oob data to read. */); + return 0; + } + +again: + e= &(stack[stack_top - 1]); + switch (e->state) + { + case ST_initial: + chunk_rd->seek(e->file_no, e->offset); + static_assert(sizeof(e->rd_buf) == 5*COMPR_INT_MAX64); + res= chunk_rd->read_data(e->rd_buf, 5*COMPR_INT_MAX64, true); + if (res < 0) + return -1; + if (chunk_rd->cur_type() != FSP_BINLOG_TYPE_OOB_DATA) + return chunk_rd->read_error_corruption("Wrong chunk type"); + if (res == 0) + return chunk_rd->read_error_corruption("Unexpected EOF, expected " + "oob chunk"); + e->rd_buf_len= res; + p_end= e->rd_buf + res; + v_and_p= compr_int_read(e->rd_buf); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + chunk_idx= v_and_p.first; + (void)chunk_idx; + + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + left_file_no= v_and_p.first; + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + left_offset= v_and_p.first; + + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + e->right_file_no= v_and_p.first; + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd->read_error_corruption("Short chunk"); + e->right_offset= v_and_p.first; + e->rd_buf_sofar= (uint32_t)(p - e->rd_buf); + if (left_file_no == 0 && left_offset == 0) + { + /* Leaf node. */ + if (e->is_leftmost && !(e->right_file_no == 0 && e->right_offset == 0)) + { + /* Traverse the prior tree(s) in the forst. */ + e->state= ST_traversing_prior_trees; + chunk_rd->save_pos(&e->saved_pos); + push_state(ST_initial, e->right_file_no, e->right_offset, true); + } + else + e->state= ST_self; + } + else + { + e->state= ST_traversing_left_child; + chunk_rd->save_pos(&e->saved_pos); + push_state(ST_initial, left_file_no, left_offset, e->is_leftmost); + } + goto again; + + case ST_traversing_prior_trees: + chunk_rd->restore_pos(&e->saved_pos); + e->state= ST_self; + goto again; + + case ST_traversing_left_child: + e->state= ST_traversing_right_child; + push_state(ST_initial, e->right_file_no, e->right_offset, false); + goto again; + + case ST_traversing_right_child: + chunk_rd->restore_pos(&e->saved_pos); + e->state= ST_self; + goto again; + + case ST_self: + size= 0; + if (e->rd_buf_len > e->rd_buf_sofar) + { + /* Use any excess data from when the header was read. */ + size= std::min((int)(e->rd_buf_len - e->rd_buf_sofar), len); + memcpy(buf, e->rd_buf + e->rd_buf_sofar, size); + e->rd_buf_sofar+= size; + len-= size; + buf+= size; + } + + if (UNIV_LIKELY(len > 0) && UNIV_LIKELY(!chunk_rd->end_of_record())) + { + res= chunk_rd->read_data(buf, len, false); + if (res < 0) + return -1; + size+= res; + } + + if (chunk_rd->end_of_record()) + { + /* This oob record done, pop the state. */ + ut_ad(stack_top > 0); + --stack_top; + } + return size; + + default: + ut_ad(0); + return -1; + } +} + + ha_innodb_binlog_reader::ha_innodb_binlog_reader(uint64_t file_no, uint64_t offset) - : chunk_pos(0), chunk_remain(0), skipping_partial(true) + : rd_buf_len(0), rd_buf_sofar(0), state(ST_read_next_event_group) { - page_buf= (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, srv_page_size, MYF(0)); /* ToDo: InnoDB alloc function? */ - // ToDo: Need some mechanism to find where to start reading. This is just "start from 0" for early testing. - cur_file_no= file_no; - cur_file_offset= offset; + page_buf= (uchar *)ut_malloc(srv_page_size, mem_key_binlog); + chunk_rd.set_page_buf(page_buf); + chunk_rd.seek(file_no, offset); + chunk_rd.skip_current(); + chunk_rd.skip_partial(true); } ha_innodb_binlog_reader::~ha_innodb_binlog_reader() { - if (cur_file != (File)-1) - my_close(cur_file, MYF(0)); - my_free(page_buf); /* ToDo: InnoDB alloc function? */ -} - - -bool -ha_innodb_binlog_reader::ensure_file_open() -{ - if (cur_file != (File)-1) - return false; - char filename[BINLOG_NAME_LEN]; - binlog_name_make(filename, cur_file_no); - cur_file= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME)); - if (UNIV_UNLIKELY(cur_file < (File)0)) { - cur_file= (File)-1; - return true; - } - MY_STAT stat_buf; - if (my_fstat(cur_file, &stat_buf, MYF(0))) { - my_error(ER_CANT_GET_STAT, MYF(0), filename, errno); - my_close(cur_file, MYF(0)); - cur_file= (File)-1; - return true; - } - cur_file_length= stat_buf.st_size; - return false; -} - - -void -ha_innodb_binlog_reader::next_file() -{ - if (cur_file != (File)-1) { - my_close(cur_file, MYF(0)); - cur_file= (File)-1; - } - ++cur_file_no; - cur_file_offset= 0; + ut_free(page_buf); } @@ -6158,271 +6963,139 @@ ha_innodb_binlog_reader::next_file() then read from the real file. */ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len) +{ + int res= read_data(buf, len); + chunk_rd.release(res == 0); + return res; +} + + +int ha_innodb_binlog_reader::read_data(uchar *buf, uint32_t len) { int res; + const uchar *p_end; + const uchar *p; + std::pair v_and_p; + int size; - /* - Loop repeatedly trying to read some data from a page. - The usual case is that just one iteration of the loop is necessary. But - occasionally more may be needed, for example when moving to the next - binlog file or when a page has no replication event data to read. - */ - uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); - for (;;) { - buf_block_t *block= nullptr; - mtr_t mtr; - bool mtr_started= false; - uint64_t active= active2; - uint64_t end_offset= - binlog_cur_end_offset[cur_file_no&1].load(std::memory_order_acquire); - ut_ad(cur_file_no <= active); +again: + switch (state) + { + case ST_read_next_event_group: + static_assert(sizeof(rd_buf) == 5*COMPR_INT_MAX64); + res= chunk_rd.read_data(rd_buf, 5*COMPR_INT_MAX64, true); + if (res <= 0) + return res; + if (chunk_rd.cur_type() != FSP_BINLOG_TYPE_COMMIT) + { + chunk_rd.skip_current(); + goto again; + } + /* Found the start of a commit record. */ + chunk_rd.skip_partial(false); - if (cur_file_no + 1 >= active) { - /* Check if we should read from the buffer pool or from the file. */ - if (end_offset != ~(uint64_t)0 && cur_file_offset < end_offset) { - mtr.start(); - mtr_started= true; - /* - ToDo: Should we keep track of the last block read and use it as a - hint? Will be mainly useful when reading the partially written active - page at the current end of the active binlog, which might be a common - case. - */ - buf_block_t *hint_block= nullptr; - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (cur_file_no & 1); - uint32_t page_no= (uint32_t)(cur_file_offset >> srv_page_size_shift); - dberr_t err= DB_SUCCESS; - block= buf_page_get_gen(page_id_t{space_id, page_no}, 0, - RW_S_LATCH, hint_block, BUF_GET_IF_IN_POOL, - &mtr, &err); - if (err != DB_SUCCESS) { - mtr.commit(); - res= -1; - break; - } - } - active2= active_binlog_file_no.load(std::memory_order_acquire); - if (UNIV_UNLIKELY(active2 != active)) { - /* - The active binlog file changed while we were processing; we might - have gotten invalid end_offset or a buffer pool page from a wrong - tablespace. So just try again. - */ - if (mtr_started) - mtr.commit(); - continue; - } - if (cur_file_offset >= end_offset) { - ut_ad(!mtr_started); - if (cur_file_no == active) { - /* Reached end of the currently active binlog file -> EOF. */ - res= 0; - break; - } - /* End of file reached, move to next file. */ - /* - ToDo: Should not read data and send to slaves that has not yet been - durably synced to disk, at least optionally, lest a crash leaves the - slaves with transactions that do not (any longer) exist on master - and breaks replication. - */ - /* - ToDo: Should also obey binlog_cur_written_offset[], once we start - actually maintaining that, to save unnecessary buffer pool - lookup. - */ - next_file(); - continue; - } - if (block) { - res= read_from_buffer_pool_page(block, end_offset, buf, len); - ut_ad(mtr_started); - if (mtr_started) - mtr.commit(); - } else { - /* Not in buffer pool, just read it from the file. */ - if (mtr_started) - mtr.commit(); - if (ensure_file_open()) { - res= -1; - break; - } - ut_ad(cur_file_offset < end_offset); - if (cur_file_offset >= cur_file_length) { - /* - This happens when we reach the end of (active-1) and the tablespace - has been closed. - */ - ut_ad(end_offset == ~(uint64_t)0); - ut_ad(!mtr_started); - next_file(); - continue; - } - res= read_from_file(end_offset, buf, len); - } - } else { - /* Tablespace is not open, just read from the file. */ - if (ensure_file_open()) { - res= -1; - break; - } - if (cur_file_offset >= cur_file_length) { - /* End of this file, move to the next one. */ - next_file(); - continue; - } - res= read_from_file(cur_file_length, buf, len); + /* Read the header of the commit record to see if there's any oob data. */ + rd_buf_len= res; + p_end= rd_buf + res; + v_and_p= compr_int_read(rd_buf); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + oob_count= v_and_p.first; + + if (oob_count > 0) + { + /* Skip the pointer to first chunk. */ + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + oob_last_file_no= v_and_p.first; + v_and_p= compr_int_read(p); + p= v_and_p.second; + if (p > p_end) + return chunk_rd.read_error_corruption("Short chunk"); + oob_last_offset= v_and_p.first; } - /* If nothing read, but not eof/error, then loop to try the next page. */ - if (res != 0) - break; - } + rd_buf_sofar= (uint32_t)(p - rd_buf); + state= ST_read_commit_record; + goto again; - return res; + case ST_read_commit_record: + size= 0; + if (rd_buf_len > rd_buf_sofar) + { + /* Use any excess data from when the header was read. */ + size= std::min((int)(rd_buf_len - rd_buf_sofar), (int)len); + memcpy(buf, rd_buf + rd_buf_sofar, size); + rd_buf_sofar+= size; + len-= size; + buf+= size; + } + + if (UNIV_LIKELY(len > 0) && UNIV_LIKELY(!chunk_rd.end_of_record())) + { + res= chunk_rd.read_data(buf, len, false); + if (res < 0) + return -1; + size+= res; + } + + if (UNIV_LIKELY(rd_buf_sofar == rd_buf_len) && chunk_rd.end_of_record()) + { + if (oob_count == 0) + state= ST_read_next_event_group; + else + { + oob_reader.start_traversal(oob_last_file_no, oob_last_offset); + chunk_rd.save_pos(&saved_commit_pos); + state= ST_read_oob_data; + } + if (size == 0) + goto again; + } + + return size; + + case ST_read_oob_data: + res= oob_reader.read_data(&chunk_rd, buf, len); + if (res < 0) + return -1; + if (oob_reader.oob_traversal_done()) + { + chunk_rd.restore_pos(&saved_commit_pos); + state= ST_read_next_event_group; + } + if (UNIV_UNLIKELY(res == 0)) + { + ut_ad(0 /* Should have had oob_traversal_done() last time then. */); + goto again; + } + return res; + + default: + ut_ad(0); + return -1; + } } bool ha_innodb_binlog_reader::data_available() { - uint64_t active= active_binlog_file_no.load(std::memory_order_acquire); - if (active != cur_file_no) - { - ut_ad(active > cur_file_no); + if (state != ST_read_next_event_group) return true; - } - uint64_t end_offset= - binlog_cur_end_offset[cur_file_no&1].load(std::memory_order_acquire); - uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); - if (active2 != active || end_offset > cur_file_offset) - return true; - ut_ad(cur_file_no == active2); - ut_ad(cur_file_offset == end_offset); - return false; -} - - -int -ha_innodb_binlog_reader::read_from_buffer_pool_page(buf_block_t *block, - uint64_t end_offset, - uchar *buf, uint32_t len) -{ - return read_from_page(block->page.frame, end_offset, buf, len); -} - - -int -ha_innodb_binlog_reader::read_from_file(uint64_t end_offset, - uchar *buf, uint32_t len) -{ - uint64_t mask= ((uint64_t)1 << srv_page_size_shift) - 1; - uint64_t offset= cur_file_offset; - uint64_t page_start_offset; - - ut_ad(cur_file != (File)-1); - ut_ad(cur_file_offset < cur_file_length); - - page_start_offset= offset & ~mask; - size_t res= my_pread(cur_file, page_buf, srv_page_size, page_start_offset, - MYF(MY_WME)); - if (res == (size_t)-1) - return -1; - - return read_from_page(page_buf, end_offset, buf, len); -} - - -/* - Read out max `len` bytes from the chunks stored in a page. - - page_ptr Points to start of data for current page matching cur_file_offset - end_offset Current end of binlog file, no reads past this point - buf Destination buffer to read into - len Maximum number of bytes to read - - Returns number of bytes actually read. -*/ -int -ha_innodb_binlog_reader::read_from_page(uchar *page_ptr, uint64_t end_offset, - uchar *buf, uint32_t len) -{ - uint32_t page_size= (uint32_t)srv_page_size; - uint64_t mask= ((uint64_t)1 << srv_page_size_shift) - 1; - uint64_t offset= cur_file_offset; - uint64_t page_start_offset= offset & ~mask; - uint32_t page_end= - end_offset > page_start_offset + (page_size - FIL_PAGE_DATA_END) ? - (page_size - FIL_PAGE_DATA_END) : - (uint32_t)(end_offset & mask); - uint32_t in_page_offset= (uint32_t)(offset & mask); - uint32_t sofar= 0; - - ut_ad(in_page_offset < page_size - FIL_PAGE_DATA_END); - if (in_page_offset < FIL_PAGE_DATA) - in_page_offset= FIL_PAGE_DATA; - - /* First return data from any partially-read chunk. */ - if ((sofar= chunk_remain)) { - if (sofar <= len) { - memcpy(buf, page_ptr + in_page_offset + chunk_pos, sofar); - chunk_pos= 0; - chunk_remain= 0; - in_page_offset+= sofar; - } else { - memcpy(buf, page_ptr + in_page_offset + chunk_pos, len); - chunk_pos+= len; - chunk_remain= sofar - len; - cur_file_offset= offset + len; - return len; - } - } - - while (sofar < len && in_page_offset < page_end) - { - uchar type= page_ptr[in_page_offset]; - if (type == 0x00) - break; /* No more data on the page yet */ - if (type == FSP_BINLOG_TYPE_FILLER) { - in_page_offset= page_size; /* Point to start of next page */ - break; /* No more data on page */ - } - uint32_t size= page_ptr[in_page_offset + 1] + - (uint32_t)(page_ptr[in_page_offset + 2] << 8); - if ((type & FSP_BINLOG_TYPE_MASK) != FSP_BINLOG_TYPE_COMMIT || - (UNIV_UNLIKELY(skipping_partial) && (type & FSP_BINLOG_FLAG_CONT))) - { - /* Skip non-binlog-event record, or initial partial record. */ - in_page_offset += 3 + size; - continue; - } - skipping_partial= false; - - /* Now grab the data in the chunk, or however much the caller requested. */ - uint32_t rest = len - sofar; - if (size > rest) { - /* - Chunk contains more data than reader requested. - Return what was requested, and remember the remaining partial data - for the next read. - */ - memcpy(buf + sofar, page_ptr + (in_page_offset + 3), rest); - chunk_pos= rest; - chunk_remain= size - rest; - sofar+= rest; - break; - } - - memcpy(buf + sofar, page_ptr + (in_page_offset + 3), size); - in_page_offset= in_page_offset + 3 + size; - sofar+= size; - } - - if (in_page_offset >= page_size - FIL_PAGE_DATA_END) - cur_file_offset= page_start_offset + page_size; // To start of next page - else - cur_file_offset= page_start_offset | in_page_offset; - return sofar; + return chunk_rd.data_available(); } @@ -6634,7 +7307,7 @@ gtid_search::find_gtid_pos(slave_connection_state *pos, /* First search backwards for the right file to start from. */ uint64_t file_end= 0; uint64_t diff_state_interval= 0; - rpl_binlog_state_base base_state, diff_state; + rpl_binlog_state_base base_state, page0_diff_state, tmp_diff_state; base_state.init(); for (;;) { @@ -6684,15 +7357,16 @@ gtid_search::find_gtid_pos(slave_connection_state *pos, /* Round to the next diff_state_interval after file_end. */ page2-= page2 % diff_state_page_interval; uint32_t page1= (page0 + page2) / 2; - diff_state.init(); - diff_state.load_nolock(&base_state); - while (page1 >= page0 + diff_state_interval) + page0_diff_state.init(); + page0_diff_state.load_nolock(&base_state); + tmp_diff_state.init(); + while (page1 >= page0 + diff_state_page_interval) { - ut_ad((page1 - page0) % diff_state_interval == 0); - diff_state.reset_nolock(); - diff_state.load_nolock(&base_state); + ut_ad((page1 - page0) % diff_state_page_interval == 0); + tmp_diff_state.reset_nolock(); + tmp_diff_state.load_nolock(&base_state); enum Read_Result res= - read_gtid_state_file_no(&diff_state, file_no, 0, &file_end, + read_gtid_state_file_no(&tmp_diff_state, file_no, page1, &file_end, &diff_state_interval); if (res == READ_ENOENT) return 0; /* File purged while we are reading from it? */ @@ -6708,14 +7382,18 @@ gtid_search::find_gtid_pos(slave_connection_state *pos, page1= page1 - diff_state_page_interval; continue; } - if (diff_state.is_before_pos(pos)) + if (tmp_diff_state.is_before_pos(pos)) + { page0= page1; + page0_diff_state.reset_nolock(); + page0_diff_state.load_nolock(&tmp_diff_state); + } else page2= page1; page1= (page0 + page2) / 2; } ut_ad(page1 >= page0); - out_state->load_nolock(&diff_state); + out_state->load_nolock(&page0_diff_state); *out_file_no= file_no; *out_offset= (uint64_t)page0 << srv_page_size_shift; return 1; @@ -6734,8 +7412,9 @@ ha_innodb_binlog_reader::init_gtid_pos(slave_connection_state *pos, return -1; if (res > 0) { - cur_file_no= file_no; - cur_file_offset= offset; + chunk_rd.seek(file_no, offset); + chunk_rd.skip_current(); + chunk_rd.skip_partial(true); } return res; } diff --git a/storage/innobase/include/ut0compr_int.h b/storage/innobase/include/ut0compr_int.h index 7d5ea31ef69..e4152d9071e 100644 --- a/storage/innobase/include/ut0compr_int.h +++ b/storage/innobase/include/ut0compr_int.h @@ -58,7 +58,7 @@ Created 2024-10-01 Kristian Nielsen #define COMPR_INT_MAX32 5 #define COMPR_INT_MAX64 9 -#define COMPR_INT_MAX COMPR_INT_MAX_64 +#define COMPR_INT_MAX COMPR_INT_MAX64 /* Write compressed unsigned integer */ extern unsigned char *compr_int_write(unsigned char *p, uint64_t v);