From 18b9ec637ee3a5140d38c57e310c0b70f7d75daa Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Thu, 3 Oct 2024 18:39:55 +0200 Subject: [PATCH] MDEV-34705: Binlog in Engine: Searchability for GTID position Every N bytes (hardcoded at 64k for now, to become a configurable setting), write the binlog GTID state into the binlog tablespace. This allows to quickly find a given GTID position by binary search to the prior GTID state in the tablespace and then a small linear scan from that point. The full binlog state is dumped at the start of the binlog file; remaining states dumped are differential states containing only the changed (domain_id, server_id) pairs, to save space if binlog space is large. This commit only implements the writing of the binlog state to the tablespace at regular intervals. The binary search to be implemented in a subsequent commit. Signed-off-by: Kristian Nielsen --- sql/handler.h | 4 +- sql/log.cc | 24 ++- sql/log.h | 6 +- sql/sql_class.h | 2 +- sql/sys_vars.cc | 2 +- storage/innobase/fsp/fsp0fsp.cc | 273 +++++++++++++++++++++-------- storage/innobase/include/fsp0fsp.h | 4 +- 7 files changed, 234 insertions(+), 81 deletions(-) diff --git a/sql/handler.h b/sql/handler.h index dda75f937a6..ee6fb5e2fda 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -60,6 +60,7 @@ class Field_blob; class Column_definition; class select_result; class handler_binlog_reader; +struct rpl_gtid; // the following is for checking tables @@ -1531,7 +1532,8 @@ struct handlerton /* Optional implementation of binlog in the engine. */ bool (*binlog_init)(size_t binlog_size); - bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size); + bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size, + const rpl_gtid *gtid); handler_binlog_reader * (*get_binlog_reader)(); /* diff --git a/sql/log.cc b/sql/log.cc index 25677703f16..1b6ad569f03 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1836,11 +1836,13 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, extern "C" void -binlog_get_cache(THD *thd, IO_CACHE **out_cache, size_t *out_main_size) +binlog_get_cache(THD *thd, IO_CACHE **out_cache, size_t *out_main_size, + const rpl_gtid **out_gtid) { IO_CACHE *cache= nullptr; size_t main_size= 0; binlog_cache_mngr *cache_mngr; + const rpl_gtid *gtid= nullptr; if (likely(!opt_bootstrap /* ToDo needed? */) && opt_binlog_engine_hton && (cache_mngr= thd->binlog_get_cache_mngr())) @@ -1848,9 +1850,11 @@ binlog_get_cache(THD *thd, IO_CACHE **out_cache, size_t *out_main_size) main_size= cache_mngr->gtid_cache_offset; cache= !cache_mngr->trx_cache.empty() ? &cache_mngr->trx_cache.cache_log : &cache_mngr->stmt_cache.cache_log; + gtid= thd->get_last_commit_gtid(); } *out_cache= cache; *out_main_size= main_size; + *out_gtid= gtid; } @@ -7087,6 +7091,17 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) } +/* ToDo: Should this be a service? */ +bool +load_global_binlog_state(rpl_binlog_state_base *state) +{ + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + bool err= state->load_nolock(&rpl_global_gtid_binlog_state); + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return err; +} + + bool MYSQL_BIN_LOG::append_state_pos(String *str) { @@ -7471,7 +7486,8 @@ err: mysql_mutex_unlock(&LOCK_log); mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_unlock(&LOCK_after_binlog_sync); - if ((*opt_binlog_engine_hton->binlog_write_direct)(file, binlog_main_bytes)) + if ((*opt_binlog_engine_hton->binlog_write_direct) + (file, binlog_main_bytes, thd->get_last_commit_gtid())) goto engine_fail; /* ToDo: Need to set last_commit_pos_offset here? */ /* ToDo: Maybe binlog_write_direct() could return the coords. */ @@ -7576,14 +7592,14 @@ err: void -MYSQL_BIN_LOG::update_gtid_index(uint32 offset, rpl_gtid gtid) +MYSQL_BIN_LOG::update_gtid_index(uint32 offset, const rpl_gtid *gtid) { if (!unlikely(gtid_index)) return; rpl_gtid *gtid_list; uint32 gtid_count; - int err= gtid_index->process_gtid_check_batch(offset, >id, + int err= gtid_index->process_gtid_check_batch(offset, gtid, >id_list, >id_count); if (err) return; diff --git a/sql/log.h b/sql/log.h index fdf2eccb004..f710f4ca597 100644 --- a/sql/log.h +++ b/sql/log.h @@ -600,6 +600,7 @@ class binlog_cache_mngr; class binlog_cache_data; struct rpl_gtid; struct wait_for_commit; +struct rpl_binlog_state_base; class MYSQL_BIN_LOG: public TC_LOG, private Event_log { @@ -763,7 +764,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private Event_log bool write_transaction_to_binlog_events(group_commit_entry *entry); void trx_group_commit_leader(group_commit_entry *leader); bool is_xidlist_idle_nolock(); - void update_gtid_index(uint32 offset, rpl_gtid gtid); + void update_gtid_index(uint32 offset, const rpl_gtid *gtid); public: void purge(bool all); @@ -1191,6 +1192,9 @@ public: char binlog_end_pos_file[FN_REFLEN]; }; +extern bool load_global_binlog_state(rpl_binlog_state_base *state); + + class Log_event_handler { public: diff --git a/sql/sql_class.h b/sql/sql_class.h index 3300a501390..64fac513549 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5512,7 +5512,7 @@ private: rpl_gtid m_last_commit_gtid; public: - rpl_gtid get_last_commit_gtid() { return m_last_commit_gtid; } + const rpl_gtid *get_last_commit_gtid() { return &m_last_commit_gtid; } void set_last_commit_gtid(rpl_gtid >id); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 4733e67830c..027f8ae594f 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2324,7 +2324,7 @@ Sys_var_last_gtid::session_value_ptr(THD *thd, const LEX_CSTRING *base) const bool first= true; str.length(0); - rpl_gtid gtid= thd->get_last_commit_gtid(); + rpl_gtid gtid= *thd->get_last_commit_gtid(); if ((gtid.seq_no > 0 && rpl_slave_state_tostring_helper(&str, >id, &first)) || !(p= thd->strmake(str.ptr(), str.length()))) diff --git a/storage/innobase/fsp/fsp0fsp.cc b/storage/innobase/fsp/fsp0fsp.cc index 526b2af0af0..9254725969c 100644 --- a/storage/innobase/fsp/fsp0fsp.cc +++ b/storage/innobase/fsp/fsp0fsp.cc @@ -51,6 +51,7 @@ Created 11/29/1995 Heikki Tuuri #include #include "trx0undo.h" #include "trx0trx.h" +#include "ut0compr_int.h" #include "rpl_gtid_base.h" /** Returns the first extent descriptor for a segment. @@ -4277,6 +4278,24 @@ func_exit: } +/* + Binlog implementation in InnoDB. + ToDo: Move this somewhere reasonable, its own file(s) etc. +*/ + +/* + How often (in terms of bytes written) to dump a (differential) binlog state + at the start of the page, to speed up finding the initial GTID position for + a connecting slave. + + Must be a power-of-two multiple of the page size. + + ToDo: An InnoDB setting for this - which needs to be persisted in the + tablespace header or maybe initial full binlog state record. + ToDo: Default should be like 1M or so, this 64k is for testing. +*/ +#define INNODB_BINLOG_STATE_INTERVAL (64*1024) + static uint32_t binlog_size_in_pages; buf_block_t *binlog_cur_block; uint32_t binlog_cur_page_no; @@ -4309,6 +4328,12 @@ std::atomic first_open_binlog_file_no; uint64_t last_created_binlog_file_no; fil_space_t *last_created_binlog_space; +/* + Differential binlog state in the currently active binlog tablespace, relative + to the state at the start. +*/ +static rpl_binlog_state_base binlog_diff_state; + /* Point at which it is guaranteed that all data has been written out to the binlog file (on the OS level; not necessarily fsync()'ed yet). @@ -4386,15 +4411,17 @@ fsp_binlog_tablespace_close(uint64_t file_no) /* Write out any remaining pages in the buffer pool to the binlog tablespace. Then flush the file to disk, and close the old tablespace. + + ToDo: Will this turn into a busy-wait if some of the pages are still latched + in this tablespace, maybe because even though the tablespace has been + written full, the mtr that's ending in the next tablespace may still be + active? This will need fixing, no busy-wait should be done here. */ while (buf_flush_list_space(space)) ; os_aio_wait_until_no_pending_writes(false); space->flush(); - mysql_mutex_lock(&fil_system.mutex); - fil_system.detach(space, false); - mysql_mutex_unlock(&fil_system.mutex); - + fil_space_free(space_id, false); res= DB_SUCCESS; end: return res; @@ -4411,6 +4438,7 @@ fsp_binlog_init() { mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr); pthread_cond_init(&active_binlog_cond, nullptr); + binlog_diff_state.init(); } @@ -4849,6 +4877,7 @@ void fsp_binlog_close() ==3464576== by 0x1558445: fsp_binlog_tablespace_create(unsigned long) (fsp0fsp.cc:3900) ==3464576== by 0x1558C70: fsp_binlog_write_cache(st_io_cache*, unsigned long, mtr_t*) (fsp0fsp.cc:4013) */ + binlog_diff_state.free(); pthread_cond_destroy(&active_binlog_cond); mysql_mutex_destroy(&active_binlog_mutex); } @@ -4987,54 +5016,127 @@ fsp_binlog_prealloc_thread() } -void fsp_binlog_write_start(uint32_t page_no, - const uchar *data, uint32_t len, mtr_t *mtr) +__attribute__((noinline)) +static ssize_t +serialize_gtid_state(rpl_binlog_state_base *state, byte *buf, size_t buf_size, + bool is_first_page) { - buf_block_t *block= fsp_page_create(active_binlog_space, page_no, mtr); - mtr->memcpy(*block, FIL_PAGE_DATA + block->page.frame, - data, len); - binlog_cur_block= block; -} - -void fsp_binlog_write_offset(uint32_t page_no, uint32_t offset, - const uchar *data, uint32_t len, mtr_t *mtr) -{ - dberr_t err; - /* ToDo: Is RW_SX_LATCH appropriate here? */ - buf_block_t *block= buf_page_get_gen(page_id_t{active_binlog_space->id, page_no}, - 0, RW_SX_LATCH, binlog_cur_block, - BUF_GET, mtr, &err); - ut_a(err == DB_SUCCESS); - mtr->memcpy(*block, - offset + block->page.frame, - data, len); -} - -void fsp_binlog_append(const uchar *data, uint32_t len, mtr_t *mtr) -{ - ut_ad(binlog_cur_page_offset <= srv_page_size - FIL_PAGE_DATA_END); - uint32_t remain= ((uint32_t)srv_page_size - FIL_PAGE_DATA_END) - - binlog_cur_page_offset; - // ToDo: Some kind of mutex to protect binlog access. - while (len > 0) { - if (remain < 4) { - binlog_cur_page_offset= FIL_PAGE_DATA; - remain= ((uint32_t)srv_page_size - FIL_PAGE_DATA_END) - - binlog_cur_page_offset; - ++binlog_cur_page_no; - } - uint32_t this_len= std::min(len, remain); - if (binlog_cur_page_offset == FIL_PAGE_DATA) - fsp_binlog_write_start(binlog_cur_page_no, data, this_len, mtr); - else - fsp_binlog_write_offset(binlog_cur_page_no, binlog_cur_page_offset, - data, this_len, mtr); - len-= this_len; - data+= this_len; - binlog_cur_page_offset+= this_len; + unsigned char *p= (unsigned char *)buf; + ut_ad(buf_size >= 2*COMPR_INT_MAX32 + 2*COMPR_INT_MAX64); + if (is_first_page) { + /* + In the first page where we put the full state, include the value of the + setting for the interval at which differential states are binlogged, so + we know how to search them independent of how the setting changes. + */ + p= compr_int_write(p, INNODB_BINLOG_STATE_INTERVAL); } + unsigned char * const pmax= + p + (buf_size - (2*COMPR_INT_MAX32 + COMPR_INT_MAX64)); + + if (state->iterate( + [buf, buf_size, pmax, &p] (const rpl_gtid *gtid) { + if (UNIV_UNLIKELY(p > pmax)) + return true; + p= compr_int_write(p, gtid->domain_id); + p= compr_int_write(p, gtid->server_id); + p= compr_int_write(p, gtid->seq_no); + return false; + })) + return -1; + else + return p - (unsigned char *)buf; } + +static bool +binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, + buf_block_t * &block, uint32_t &page_no, + uint32_t &page_offset, fil_space_t *space) +{ + /* + Use a small, efficient stack-allocated buffer by default, falling back to + malloc() if needed for large GTID state. + */ + byte small_buf[192]; + byte *buf, *alloced_buf; + + ssize_t used_bytes= serialize_gtid_state(state, small_buf, sizeof(small_buf), + page_no==0); + if (used_bytes >= 0) + { + buf= small_buf; + alloced_buf= nullptr; + } + else + { + size_t buf_size= + state->count_nolock() * (2*COMPR_INT_MAX32 + COMPR_INT_MAX64); + /* ToDo: InnoDB alloc function? */ + alloced_buf= (byte *)my_malloc(PSI_INSTRUMENT_ME, buf_size, MYF(MY_WME)); + if (UNIV_UNLIKELY(!alloced_buf)) + return true; + buf= alloced_buf; + used_bytes= serialize_gtid_state(state, buf, buf_size, page_no==0); + if (UNIV_UNLIKELY(used_bytes < 0)) + { + ut_ad(0 /* Shouldn't happen, as we allocated maximum needed size. */); + my_free(alloced_buf); + return true; + } + } + + const uint32_t page_size= (uint32_t)srv_page_size; + const uint32_t page_room= page_size - (FIL_PAGE_DATA + FIL_PAGE_DATA_END); + uint32_t needed_pages= (uint32_t)((used_bytes + page_room - 1) / page_room); + + /* For now, GTID state always at the start of a page. */ + ut_ad(page_offset == FIL_PAGE_DATA); + + /* + Only write the GTID state record if there is room for actual event data + afterwards. There is no point in using space to allow fast search to a + point if there is no data to search for after that point. + */ + if (page_no + needed_pages < space->size) + { + while (used_bytes > 0) + { + ut_ad(page_no < space->size); + block= fsp_page_create(space, page_no, mtr); + ut_a(block /* ToDo: error handling? */); + page_offset= FIL_PAGE_DATA; + byte *ptr= page_offset + block->page.frame; + ssize_t chunk= used_bytes; + if (chunk > page_room - 3) { + chunk= page_room - 3; + ++page_no; + } + ptr[0]= 0x02 /* ToDo: FSP_BINLOG_TYPE_GTID_STATE */ | ((chunk < used_bytes) << 7); + ptr[1] = (byte)chunk & 0xff; + ptr[2] = (byte)(chunk >> 8); + ut_ad(chunk <= 0xffff); + memcpy(ptr+3, buf, chunk); + mtr->memcpy(*block, page_offset, chunk+3); + page_offset+= (uint32_t)(chunk+3); + buf+= chunk; + used_bytes-= chunk; + } + + if (page_offset == FIL_PAGE_DATA_END) { + block= nullptr; + page_offset= FIL_PAGE_DATA; + ++page_no; + block= fsp_page_create(space, page_no, mtr); + ut_a(block /* ToDo: error handling? */); + } + } + + my_free(alloced_buf); + return false; // No error +} + + void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) { uint32_t page_size= (uint32_t)srv_page_size; @@ -5114,7 +5216,32 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) mysql_mutex_unlock(&active_binlog_mutex); binlog_cur_page_no= page_no= 0; } - block= fsp_page_create(space, page_no, mtr); + + /* Must be a power of two and larger than page size. */ + ut_ad(INNODB_BINLOG_STATE_INTERVAL > page_size); + ut_ad(INNODB_BINLOG_STATE_INTERVAL == + (uint64_t)1 << (63 - nlz((uint64_t)INNODB_BINLOG_STATE_INTERVAL))); + + if (0 == (page_no & + ((INNODB_BINLOG_STATE_INTERVAL >> page_size_shift) - 1))) { + if (page_no == 0) { + rpl_binlog_state_base full_state; + full_state.init(); + bool err= load_global_binlog_state(&full_state); + ut_a(!err /* ToDo error handling */); + err= binlog_gtid_state(&full_state, mtr, block, page_no, + page_offset, space); + ut_a(!err /* ToDo error handling */); + ut_ad(block); + full_state.free(); + binlog_diff_state.reset_nolock(); + } else { + bool err= binlog_gtid_state(&binlog_diff_state, mtr, block, page_no, + page_offset, space); + ut_a(!err /* ToDo error handling */); + } + } else + block= fsp_page_create(space, page_no, mtr); } else { dberr_t err; /* ToDo: Is RW_SX_LATCH appropriate here? */ @@ -5185,30 +5312,23 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) } -extern "C" void binlog_get_cache(THD *, IO_CACHE **, size_t *); +extern "C" void binlog_get_cache(THD *, IO_CACHE **, size_t *, + const rpl_gtid **); void fsp_binlog_trx(trx_t *trx, mtr_t *mtr) { IO_CACHE *cache; size_t main_size; + const rpl_gtid *gtid; if (!trx->mysql_thd) return; - binlog_get_cache(trx->mysql_thd, &cache, &main_size); - if (main_size) + binlog_get_cache(trx->mysql_thd, &cache, &main_size, >id); + if (main_size) { + binlog_diff_state.update_nolock(gtid); fsp_binlog_write_cache(cache, main_size, mtr); -} - - -void fsp_binlog_test(const uchar *data, uint32_t len) -{ - mtr_t mtr; - mtr.start(); - if (!active_binlog_space) - fsp_binlog_tablespace_create(0, &active_binlog_space); - fsp_binlog_append(data, len, &mtr); - mtr.commit(); + } } @@ -5276,7 +5396,9 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len) uint64_t file_no= cur_file_no; uint64_t offset= cur_file_offset; uint64_t active_file_no= active_binlog_file_no.load(std::memory_order_acquire); - if (first_open_binlog_file_no.load(std::memory_order_relaxed) > file_no + /* Temporary hack to work-around the next line ToDo: comment */ (offset >= cur_file_length)) { + if (first_open_binlog_file_no.load(std::memory_order_relaxed) > file_no + + /* Temporary hack to work-around the next line ToDo: comment */ + (cur_file != (File)-1 && offset >= cur_file_length)) { // ToDo: I think there is a bug here, if we're at the end of active_file_no-2, we will be reading directly from active_file_no-1 without checking properly if buffer pool is needed instead. */ return read_from_file(~(uint64_t)0, buf, len); } @@ -5296,6 +5418,10 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len) } else { /* file_no == active_file_no - 1 */ if (offset >= end_offset) { // ToDo what if this end_pos is stale? Need somehow an extra check afterwards if we are now active-2, and then do a simple file read, not EOF on the stale end_offset. /* Handle moving to the currently active file. */ + if (!(cur_file < (File)0)) { + my_close(cur_file, MYF(0)); + cur_file= (File)-1; + } cur_file_no= ++file_no; cur_file_offset= offset= 0; idx= file_no & 1; @@ -5372,11 +5498,14 @@ ha_innodb_binlog_reader::read_from_file(uint64_t end_offset, uint64_t offset= cur_file_offset; uint64_t page_start_offset; - if (cur_file < (File)0 || cur_file_offset >= cur_file_length) { - if (!(cur_file < (File)0)) { - my_close(cur_file, MYF(0)); - ++cur_file_no; - } + if (cur_file >= (File)0 && cur_file_offset >= cur_file_length) { + /* At the end of currently open file, move to the next file. */ + my_close(cur_file, MYF(0)); + cur_file= (File)-1; + ++cur_file_no; + cur_file_offset= offset= 0; + } + if (cur_file < (File)0) { char filename[BINLOG_NAME_LEN]; binlog_name_make(filename, cur_file_no); if ((cur_file= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME))) < (File)0) @@ -5390,7 +5519,6 @@ ha_innodb_binlog_reader::read_from_file(uint64_t end_offset, return -1; } cur_file_length= stat_buf.st_size; - cur_file_offset= offset= 0; } page_start_offset= offset & ~mask; @@ -5501,9 +5629,12 @@ innodb_get_binlog_reader() bool -innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size) +innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size, + const rpl_gtid *gtid) { mtr_t mtr; + if (gtid) + binlog_diff_state.update_nolock(gtid); mtr.start(); fsp_binlog_write_cache(cache, main_size, &mtr); mtr.commit(); diff --git a/storage/innobase/include/fsp0fsp.h b/storage/innobase/include/fsp0fsp.h index 6403f0edc60..8a589f024d8 100644 --- a/storage/innobase/include/fsp0fsp.h +++ b/storage/innobase/include/fsp0fsp.h @@ -556,10 +556,10 @@ void fsp_system_tablespace_truncate(bool shutdown); /** Truncate the temporary tablespace */ void fsp_shrink_temp_space(); -extern void fsp_binlog_test(const uchar *data, uint32_t len); extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr); class handler_binlog_reader; -extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size); +extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size, + const struct rpl_gtid *gtid); extern handler_binlog_reader *innodb_get_binlog_reader(); extern void fsp_binlog_init(); extern bool innodb_binlog_init(size_t binlog_size);