diff --git a/sql/handler.h b/sql/handler.h index ab24f37fde2..e82c40cf238 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -5964,7 +5964,12 @@ public: support legacy SHOW BINLOG EVENTS. */ virtual int init_legacy_pos(const char *filename, ulonglong offset) = 0; - + /* + Can be called after init_gtid_pos() or init_legacy_pos() to make the reader + stop (return EOF) at the end of the binlog file. Used for SHOW BINLOG + EVENTS, which has a file-based interface based on legacy file name. + */ + virtual void enable_single_file() = 0; int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed); }; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index bcf21906596..8da91c0ec12 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -4883,6 +4883,8 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi) err= true; goto end; } + /* The engine reader will stop at the end of the requested file. */ + reader->enable_single_file(); { SELECT_LEX_UNIT *unit= &thd->lex->unit; @@ -4894,9 +4896,7 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi) char name_buf[FN_REFLEN]; opt_binlog_engine_hton->get_filename(name_buf, file_no); - for (ha_rows event_count= 0; - reader->cur_file_no == file_no && event_count < limit; - ++event_count) + for (ha_rows event_count= 0; event_count < limit; ++event_count) { packet.length(0); int reader_error= reader->read_log_event(&packet, 0, diff --git a/storage/innobase/fsp/fsp_binlog.cc b/storage/innobase/fsp/fsp_binlog.cc index b8ec1258cc9..9837729daac 100644 --- a/storage/innobase/fsp/fsp_binlog.cc +++ b/storage/innobase/fsp/fsp_binlog.cc @@ -1681,7 +1681,7 @@ fsp_binlog_flush() binlog_chunk_reader::binlog_chunk_reader(std::atomic *limit_offset_) : s { 0, 0, 0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }, - page_ptr(0), cur_block(0), page_buffer(nullptr), + stop_file_no(~(uint64_t)0), page_ptr(0), cur_block(0), page_buffer(nullptr), limit_offset(limit_offset_), cur_file_handle((File)-1), skipping_partial(false) { @@ -1731,7 +1731,8 @@ binlog_chunk_reader::fetch_current_page() uint64_t active= active2; uint64_t end_offset= limit_offset[s.file_no & 3].load(std::memory_order_acquire); - if (s.file_no > active || UNIV_UNLIKELY(active == ~(uint64_t)0)) + if (s.file_no > active || UNIV_UNLIKELY(active == ~(uint64_t)0) + || UNIV_UNLIKELY(s.file_no > stop_file_no)) { ut_ad(s.page_no == 1); ut_ad(s.in_page_offset == 0); @@ -2088,8 +2089,8 @@ void binlog_chunk_reader::release(bool release_file_page) { /* For when we reach EOF while reading from the file. We need to re-read - the page from the file (or buffer pool) in this case on next read, as - data might be added to the page. + the page from the file in this case on next read, as data might be added + to the page. */ page_ptr= nullptr; } diff --git a/storage/innobase/handler/innodb_binlog.cc b/storage/innobase/handler/innodb_binlog.cc index 9e3d77e7793..553514bb197 100644 --- a/storage/innobase/handler/innodb_binlog.cc +++ b/storage/innobase/handler/innodb_binlog.cc @@ -267,6 +267,7 @@ public: virtual int init_gtid_pos(slave_connection_state *pos, rpl_binlog_state_base *state) final; virtual int init_legacy_pos(const char *filename, ulonglong offset) final; + virtual void enable_single_file() final; void seek_internal(uint64_t file_no, uint64_t offset); }; @@ -2670,7 +2671,7 @@ int ha_innodb_binlog_reader::read_data(uchar *buf, uint32_t len) const uchar *p_end; const uchar *p; std::pair v_and_p; - int size; + int sofar= 0; again: switch (state) @@ -2679,8 +2680,10 @@ again: static_assert(sizeof(rd_buf) == 5*COMPR_INT_MAX64, "rd_buf size must match code using it"); res= chunk_rd.read_data(rd_buf, 5*COMPR_INT_MAX64, true); - if (res <= 0) + if (res < 0) return res; + if (res == 0) + return sofar; if (chunk_rd.cur_type() != FSP_BINLOG_TYPE_COMMIT) { chunk_rd.skip_current(); @@ -2727,15 +2730,15 @@ again: goto again; case ST_read_commit_record: - size= 0; if (rd_buf_len > rd_buf_sofar) { /* Use any excess data from when the header was read. */ - size= std::min((int)(rd_buf_len - rd_buf_sofar), (int)len); + int size= std::min((int)(rd_buf_len - rd_buf_sofar), (int)len); memcpy(buf, rd_buf + rd_buf_sofar, size); rd_buf_sofar+= size; len-= size; buf+= size; + sofar+= size; } if (UNIV_LIKELY(len > 0) && UNIV_LIKELY(!chunk_rd.end_of_record())) @@ -2743,24 +2746,38 @@ again: res= chunk_rd.read_data(buf, len, false); if (res < 0) return -1; - size+= res; + len-= res; + buf+= res; + sofar+= res; } if (UNIV_LIKELY(rd_buf_sofar == rd_buf_len) && chunk_rd.end_of_record()) { if (oob_count == 0) + { state= ST_read_next_event_group; + if (len > 0 && !chunk_rd.is_end_of_page()) + { + /* + Let us try to read more data from this page. The goal is to read + from each page only once, as long as caller passes in a buffer at + least as big as our page size. Though commit record header that + spans a page boundary or oob records can break this property. + */ + goto again; + } + } else { oob_reader.start_traversal(oob_last_file_no, oob_last_offset); chunk_rd.save_pos(&saved_commit_pos); state= ST_read_oob_data; } - if (size == 0) + if (sofar == 0) goto again; } - return size; + return sofar; case ST_read_oob_data: res= oob_reader.read_data(&chunk_rd, buf, len); @@ -2774,9 +2791,10 @@ again: if (UNIV_UNLIKELY(res == 0)) { ut_ad(0 /* Should have had oob_traversal_done() last time then. */); - goto again; + if (sofar == 0) + goto again; } - return res; + return sofar + res; default: ut_ad(0); @@ -3230,6 +3248,13 @@ ha_innodb_binlog_reader::init_legacy_pos(const char *filename, ulonglong offset) } +void +ha_innodb_binlog_reader::enable_single_file() +{ + chunk_rd.stop_file_no= chunk_rd.s.file_no; +} + + void ha_innodb_binlog_reader::seek_internal(uint64_t file_no, uint64_t offset) { diff --git a/storage/innobase/include/fsp_binlog.h b/storage/innobase/include/fsp_binlog.h index 4d0133b2340..89550ffe9f5 100644 --- a/storage/innobase/include/fsp_binlog.h +++ b/storage/innobase/include/fsp_binlog.h @@ -112,6 +112,10 @@ static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS), "in ALLOWED_NESTED_RECORDS bitmask"); +extern uint32_t ibb_page_size_shift; +extern ulong ibb_page_size; + + /* The object representing a binlog page that is not yet flushed to disk. At the end of the object is an additionally allocated byte buffer of @@ -383,6 +387,12 @@ public: uint64_t cur_end_offset; /* Length of the currently open file, valid if cur_file_handle != -1. */ uint64_t cur_file_length; + /* + If different from ~0, stop (return EOF) when reaching the end of this file. + This is used for SHOW BINLOG EVENTS, which has an old file-based interface, + and wants to show the events in a single file. + */ + uint64_t stop_file_no; /* After fetch_current_page(), this points into either cur_block or page_buffer as appropriate. @@ -414,6 +424,10 @@ public: byte cur_type() { return (byte)(s.chunk_type & FSP_BINLOG_TYPE_MASK); } bool cur_is_cont() { return (s.chunk_type & FSP_BINLOG_FLAG_CONT) != 0; } bool end_of_record() { return !s.in_record; } + bool is_end_of_page() noexcept + { + return s.in_page_offset >= ibb_page_size - (BINLOG_PAGE_DATA_END + 3); + } static int read_error_corruption(uint64_t file_no, uint64_t page_no, const char *msg); int read_error_corruption(const char *msg) @@ -460,13 +474,11 @@ public: bool is_before_pos(uint64_t file_no, uint64_t offset); uint64_t current_file_no() { return s.file_no; } uint64_t current_pos() { - return (s.page_no << srv_page_size_shift) + s.in_page_offset; + return (s.page_no << ibb_page_size_shift) + s.in_page_offset; } }; -extern uint32_t ibb_page_size_shift; -extern ulong ibb_page_size; /* The state interval (in pages) used for active_binlog_file_no. */ extern uint64_t current_binlog_state_interval; extern mysql_mutex_t active_binlog_mutex;