1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-07 00:04:31 +03:00

MDEV-34705: Binlog-in-engine: Binlog reader to read whole page at a time

Instead of returning only one chunk at a time, make
ha_innodb_binlog_reader::read_data() try to read all chunks on the page.
This reduces the number of times each reader has to latch pages in the page
fifo, which contends for a global mutex also shared with the writer.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-07-03 14:41:53 +02:00
parent 84da20e658
commit 95ea6e15a6
5 changed files with 63 additions and 20 deletions

View File

@@ -5964,7 +5964,12 @@ public:
support legacy SHOW BINLOG EVENTS. support legacy SHOW BINLOG EVENTS.
*/ */
virtual int init_legacy_pos(const char *filename, ulonglong offset) = 0; virtual int init_legacy_pos(const char *filename, ulonglong offset) = 0;
/*
Can be called after init_gtid_pos() or init_legacy_pos() to make the reader
stop (return EOF) at the end of the binlog file. Used for SHOW BINLOG
EVENTS, which has a file-based interface based on legacy file name.
*/
virtual void enable_single_file() = 0;
int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed); int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed);
}; };

View File

@@ -4883,6 +4883,8 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi)
err= true; err= true;
goto end; goto end;
} }
/* The engine reader will stop at the end of the requested file. */
reader->enable_single_file();
{ {
SELECT_LEX_UNIT *unit= &thd->lex->unit; SELECT_LEX_UNIT *unit= &thd->lex->unit;
@@ -4894,9 +4896,7 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi)
char name_buf[FN_REFLEN]; char name_buf[FN_REFLEN];
opt_binlog_engine_hton->get_filename(name_buf, file_no); opt_binlog_engine_hton->get_filename(name_buf, file_no);
for (ha_rows event_count= 0; for (ha_rows event_count= 0; event_count < limit; ++event_count)
reader->cur_file_no == file_no && event_count < limit;
++event_count)
{ {
packet.length(0); packet.length(0);
int reader_error= reader->read_log_event(&packet, 0, int reader_error= reader->read_log_event(&packet, 0,

View File

@@ -1681,7 +1681,7 @@ fsp_binlog_flush()
binlog_chunk_reader::binlog_chunk_reader(std::atomic<uint64_t> *limit_offset_) binlog_chunk_reader::binlog_chunk_reader(std::atomic<uint64_t> *limit_offset_)
: s { 0, 0, 0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }, : s { 0, 0, 0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false },
page_ptr(0), cur_block(0), page_buffer(nullptr), stop_file_no(~(uint64_t)0), page_ptr(0), cur_block(0), page_buffer(nullptr),
limit_offset(limit_offset_), cur_file_handle((File)-1), limit_offset(limit_offset_), cur_file_handle((File)-1),
skipping_partial(false) skipping_partial(false)
{ {
@@ -1731,7 +1731,8 @@ binlog_chunk_reader::fetch_current_page()
uint64_t active= active2; uint64_t active= active2;
uint64_t end_offset= uint64_t end_offset=
limit_offset[s.file_no & 3].load(std::memory_order_acquire); limit_offset[s.file_no & 3].load(std::memory_order_acquire);
if (s.file_no > active || UNIV_UNLIKELY(active == ~(uint64_t)0)) if (s.file_no > active || UNIV_UNLIKELY(active == ~(uint64_t)0)
|| UNIV_UNLIKELY(s.file_no > stop_file_no))
{ {
ut_ad(s.page_no == 1); ut_ad(s.page_no == 1);
ut_ad(s.in_page_offset == 0); ut_ad(s.in_page_offset == 0);
@@ -2088,8 +2089,8 @@ void binlog_chunk_reader::release(bool release_file_page)
{ {
/* /*
For when we reach EOF while reading from the file. We need to re-read For when we reach EOF while reading from the file. We need to re-read
the page from the file (or buffer pool) in this case on next read, as the page from the file in this case on next read, as data might be added
data might be added to the page. to the page.
*/ */
page_ptr= nullptr; page_ptr= nullptr;
} }

View File

@@ -267,6 +267,7 @@ public:
virtual int init_gtid_pos(slave_connection_state *pos, virtual int init_gtid_pos(slave_connection_state *pos,
rpl_binlog_state_base *state) final; rpl_binlog_state_base *state) final;
virtual int init_legacy_pos(const char *filename, ulonglong offset) final; virtual int init_legacy_pos(const char *filename, ulonglong offset) final;
virtual void enable_single_file() final;
void seek_internal(uint64_t file_no, uint64_t offset); void seek_internal(uint64_t file_no, uint64_t offset);
}; };
@@ -2670,7 +2671,7 @@ int ha_innodb_binlog_reader::read_data(uchar *buf, uint32_t len)
const uchar *p_end; const uchar *p_end;
const uchar *p; const uchar *p;
std::pair<uint64_t, const unsigned char *> v_and_p; std::pair<uint64_t, const unsigned char *> v_and_p;
int size; int sofar= 0;
again: again:
switch (state) switch (state)
@@ -2679,8 +2680,10 @@ again:
static_assert(sizeof(rd_buf) == 5*COMPR_INT_MAX64, static_assert(sizeof(rd_buf) == 5*COMPR_INT_MAX64,
"rd_buf size must match code using it"); "rd_buf size must match code using it");
res= chunk_rd.read_data(rd_buf, 5*COMPR_INT_MAX64, true); res= chunk_rd.read_data(rd_buf, 5*COMPR_INT_MAX64, true);
if (res <= 0) if (res < 0)
return res; return res;
if (res == 0)
return sofar;
if (chunk_rd.cur_type() != FSP_BINLOG_TYPE_COMMIT) if (chunk_rd.cur_type() != FSP_BINLOG_TYPE_COMMIT)
{ {
chunk_rd.skip_current(); chunk_rd.skip_current();
@@ -2727,15 +2730,15 @@ again:
goto again; goto again;
case ST_read_commit_record: case ST_read_commit_record:
size= 0;
if (rd_buf_len > rd_buf_sofar) if (rd_buf_len > rd_buf_sofar)
{ {
/* Use any excess data from when the header was read. */ /* Use any excess data from when the header was read. */
size= std::min((int)(rd_buf_len - rd_buf_sofar), (int)len); int size= std::min((int)(rd_buf_len - rd_buf_sofar), (int)len);
memcpy(buf, rd_buf + rd_buf_sofar, size); memcpy(buf, rd_buf + rd_buf_sofar, size);
rd_buf_sofar+= size; rd_buf_sofar+= size;
len-= size; len-= size;
buf+= size; buf+= size;
sofar+= size;
} }
if (UNIV_LIKELY(len > 0) && UNIV_LIKELY(!chunk_rd.end_of_record())) if (UNIV_LIKELY(len > 0) && UNIV_LIKELY(!chunk_rd.end_of_record()))
@@ -2743,24 +2746,38 @@ again:
res= chunk_rd.read_data(buf, len, false); res= chunk_rd.read_data(buf, len, false);
if (res < 0) if (res < 0)
return -1; return -1;
size+= res; len-= res;
buf+= res;
sofar+= res;
} }
if (UNIV_LIKELY(rd_buf_sofar == rd_buf_len) && chunk_rd.end_of_record()) if (UNIV_LIKELY(rd_buf_sofar == rd_buf_len) && chunk_rd.end_of_record())
{ {
if (oob_count == 0) if (oob_count == 0)
{
state= ST_read_next_event_group; state= ST_read_next_event_group;
if (len > 0 && !chunk_rd.is_end_of_page())
{
/*
Let us try to read more data from this page. The goal is to read
from each page only once, as long as caller passes in a buffer at
least as big as our page size. Though commit record header that
spans a page boundary or oob records can break this property.
*/
goto again;
}
}
else else
{ {
oob_reader.start_traversal(oob_last_file_no, oob_last_offset); oob_reader.start_traversal(oob_last_file_no, oob_last_offset);
chunk_rd.save_pos(&saved_commit_pos); chunk_rd.save_pos(&saved_commit_pos);
state= ST_read_oob_data; state= ST_read_oob_data;
} }
if (size == 0) if (sofar == 0)
goto again; goto again;
} }
return size; return sofar;
case ST_read_oob_data: case ST_read_oob_data:
res= oob_reader.read_data(&chunk_rd, buf, len); res= oob_reader.read_data(&chunk_rd, buf, len);
@@ -2774,9 +2791,10 @@ again:
if (UNIV_UNLIKELY(res == 0)) if (UNIV_UNLIKELY(res == 0))
{ {
ut_ad(0 /* Should have had oob_traversal_done() last time then. */); ut_ad(0 /* Should have had oob_traversal_done() last time then. */);
if (sofar == 0)
goto again; goto again;
} }
return res; return sofar + res;
default: default:
ut_ad(0); ut_ad(0);
@@ -3230,6 +3248,13 @@ ha_innodb_binlog_reader::init_legacy_pos(const char *filename, ulonglong offset)
} }
void
ha_innodb_binlog_reader::enable_single_file()
{
chunk_rd.stop_file_no= chunk_rd.s.file_no;
}
void void
ha_innodb_binlog_reader::seek_internal(uint64_t file_no, uint64_t offset) ha_innodb_binlog_reader::seek_internal(uint64_t file_no, uint64_t offset)
{ {

View File

@@ -112,6 +112,10 @@ static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS),
"in ALLOWED_NESTED_RECORDS bitmask"); "in ALLOWED_NESTED_RECORDS bitmask");
extern uint32_t ibb_page_size_shift;
extern ulong ibb_page_size;
/* /*
The object representing a binlog page that is not yet flushed to disk. The object representing a binlog page that is not yet flushed to disk.
At the end of the object is an additionally allocated byte buffer of At the end of the object is an additionally allocated byte buffer of
@@ -383,6 +387,12 @@ public:
uint64_t cur_end_offset; uint64_t cur_end_offset;
/* Length of the currently open file, valid if cur_file_handle != -1. */ /* Length of the currently open file, valid if cur_file_handle != -1. */
uint64_t cur_file_length; uint64_t cur_file_length;
/*
If different from ~0, stop (return EOF) when reaching the end of this file.
This is used for SHOW BINLOG EVENTS, which has an old file-based interface,
and wants to show the events in a single file.
*/
uint64_t stop_file_no;
/* /*
After fetch_current_page(), this points into either cur_block or After fetch_current_page(), this points into either cur_block or
page_buffer as appropriate. page_buffer as appropriate.
@@ -414,6 +424,10 @@ public:
byte cur_type() { return (byte)(s.chunk_type & FSP_BINLOG_TYPE_MASK); } byte cur_type() { return (byte)(s.chunk_type & FSP_BINLOG_TYPE_MASK); }
bool cur_is_cont() { return (s.chunk_type & FSP_BINLOG_FLAG_CONT) != 0; } bool cur_is_cont() { return (s.chunk_type & FSP_BINLOG_FLAG_CONT) != 0; }
bool end_of_record() { return !s.in_record; } bool end_of_record() { return !s.in_record; }
bool is_end_of_page() noexcept
{
return s.in_page_offset >= ibb_page_size - (BINLOG_PAGE_DATA_END + 3);
}
static int read_error_corruption(uint64_t file_no, uint64_t page_no, static int read_error_corruption(uint64_t file_no, uint64_t page_no,
const char *msg); const char *msg);
int read_error_corruption(const char *msg) int read_error_corruption(const char *msg)
@@ -460,13 +474,11 @@ public:
bool is_before_pos(uint64_t file_no, uint64_t offset); bool is_before_pos(uint64_t file_no, uint64_t offset);
uint64_t current_file_no() { return s.file_no; } uint64_t current_file_no() { return s.file_no; }
uint64_t current_pos() { uint64_t current_pos() {
return (s.page_no << srv_page_size_shift) + s.in_page_offset; return (s.page_no << ibb_page_size_shift) + s.in_page_offset;
} }
}; };
extern uint32_t ibb_page_size_shift;
extern ulong ibb_page_size;
/* The state interval (in pages) used for active_binlog_file_no. */ /* The state interval (in pages) used for active_binlog_file_no. */
extern uint64_t current_binlog_state_interval; extern uint64_t current_binlog_state_interval;
extern mysql_mutex_t active_binlog_mutex; extern mysql_mutex_t active_binlog_mutex;