diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine.result b/mysql-test/suite/binlog_in_engine/binlog_in_engine.result new file mode 100644 index 00000000000..037af9b92ca --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine.result @@ -0,0 +1,21 @@ +RESET MASTER; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1); +BEGIN; +INSERT INTO t1 VALUES (2); +INSERT INTO t1 VALUES (3); +COMMIT; +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +DROP TABLE t1; +SELECT @@GLOBAL.binlog_checksum; +@@GLOBAL.binlog_checksum +NONE +CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB; +SET SESSION binlog_format= ROW; +*** Do 1500 transactions ... +SET SESSION binlog_format= MIXED; +DROP TABLE t2; diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine2.result b/mysql-test/suite/binlog_in_engine/binlog_in_engine2.result new file mode 100644 index 00000000000..a0716f4e75a --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine2.result @@ -0,0 +1,13 @@ +CREATE TABLE sbtest1( +id INTEGER NOT NULL AUTO_INCREMENT, +k INTEGER DEFAULT '0' NOT NULL, +c CHAR(120) DEFAULT '' NOT NULL, +pad CHAR(60) DEFAULT '' NOT NULL, +PRIMARY KEY (id) +) ENGINE = innodb; +*** Test bug where a large event in trx cache would fail to flush to disk +*** the last part of the IO_CACHE after reinit_io_cache() +SELECT COUNT(*) FROM sbtest1; +COUNT(*) +1000 +DROP TABLE sbtest1; diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.result b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.result new file mode 100644 index 00000000000..d7657b333d0 --- /dev/null +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.result @@ -0,0 +1,58 @@ +RESET MASTER; +CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1, 0); +INSERT INTO t1 SELECT seq+1000000+100000*0, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*1, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*2, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*3, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*4, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*5, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*6, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*7, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*8, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+1000000+100000*9, seq FROM seq_1_to_4000; +SELECT @@GLOBAL.gtid_binlog_state; +@@GLOBAL.gtid_binlog_state +0-1-12 +# restart +SELECT @@GLOBAL.gtid_binlog_state; +@@GLOBAL.gtid_binlog_state +0-1-12 +INSERT INTO t1 VALUES (2, 0); +INSERT INTO t1 SELECT seq+2000000+100000*0, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*1, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*2, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*3, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*4, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*5, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*6, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*7, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*8, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+2000000+100000*9, seq FROM seq_1_to_4000; +SELECT @@GLOBAL.gtid_binlog_state; +@@GLOBAL.gtid_binlog_state +0-1-23 +# restart +SELECT @@GLOBAL.gtid_binlog_state; +@@GLOBAL.gtid_binlog_state +0-1-23 +INSERT INTO t1 VALUES (3, 0); +INSERT INTO t1 SELECT seq+3000000+100000*0, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*1, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*2, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*3, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*4, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*5, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*6, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*7, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*8, seq FROM seq_1_to_4000; +INSERT INTO t1 SELECT seq+3000000+100000*9, seq FROM seq_1_to_4000; +SHOW BINARY LOGS; +Log_name File_size +binlog-000000.ibb 262144 +binlog-000001.ibb 262144 +binlog-000002.ibb 262144 +binlog-000003.ibb 262144 +binlog-000004.ibb 262144 +binlog-000005.ibb 262144 +DROP TABLE t1; diff --git a/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test index eccb0bb462e..84d7abfd856 100644 --- a/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test +++ b/mysql-test/suite/binlog_in_engine/binlog_in_engine_restart.test @@ -42,8 +42,9 @@ while ($i < 10) { inc $i; } +--let $binlog_name= binlog-000005.ibb +--let $binlog_size= 262144 +--source include/wait_for_engine_binlog.inc SHOW BINARY LOGS; ---exec $MYSQL_BINLOG --start-position=$gtid_pos3 --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog3.txt - DROP TABLE t1; diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 124de6bc3c9..c810f48b0e8 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -2001,6 +2001,49 @@ static bool log_checkpoint_low(lsn_t oldest_lsn, lsn_t end_lsn) noexcept return true; } +static void binlog_write_up_to(lsn_t lsn) noexcept {} + +void mtr_t::write_binlog(bool space_id, uint32_t page_no, + uint16_t offset, + const void *buf, size_t size) noexcept +{ + /* ToDo: this asserts in multiple places currently. */ + return; + ut_ad(!srv_read_only_mode); + ut_ad(m_log_mode == MTR_LOG_ALL); + + bool alloc{size < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5)}; + byte *end= log_write(page_id_t{0xfffffffe | (uint)space_id, page_no}, + nullptr, size, alloc, offset); + if (alloc) + { + ::memcpy(end, buf, size); + m_log.close(end + size); + } + else + { + m_log.close(end); + m_log.push(static_cast(buf), uint32_t(size)); + } +} + +/** Write binlog data +@param space_id binlog tablespace +@param page_no binlog page number +@param offset offset within the page +@param buf data +@param size size of data +@return start LSN of the mini-transaction */ +lsn_t binlog_write_data(bool space_id, uint32_t page_no, uint16_t offset, + const void *buf, size_t size) noexcept +{ + mtr_t mtr; + mtr.start(); + mtr.write_binlog(space_id, page_no, offset, buf, size); + mtr.commit(); + return mtr.commit_lsn(); +} + /** Make a checkpoint. Note that this function does not flush dirty blocks from the buffer pool: it only checks what is lsn of the oldest modification in the pool, and writes information about the lsn in @@ -2021,11 +2064,13 @@ static bool log_checkpoint() noexcept #endif fil_flush_file_spaces(); + binlog_write_up_to(log_sys.get_lsn()); log_sys.latch.wr_lock(SRW_LOCK_CALL); const lsn_t end_lsn= log_sys.get_lsn(); mysql_mutex_lock(&buf_pool.flush_list_mutex); const lsn_t oldest_lsn= buf_pool.get_oldest_modification(end_lsn); + // FIXME: limit oldest_lsn below binlog split write LSN mysql_mutex_unlock(&buf_pool.flush_list_mutex); return log_checkpoint_low(oldest_lsn, end_lsn); } @@ -2209,12 +2254,14 @@ static void buf_flush_sync_for_checkpoint(lsn_t lsn) noexcept os_aio_wait_until_no_pending_writes(false); fil_flush_file_spaces(); + binlog_write_up_to(log_sys.get_lsn()); log_sys.latch.wr_lock(SRW_LOCK_CALL); const lsn_t newest_lsn= log_sys.get_lsn(); mysql_mutex_lock(&buf_pool.flush_list_mutex); lsn_t measure= buf_pool.get_oldest_modification(0); const lsn_t checkpoint_lsn= measure ? measure : newest_lsn; + // FIXME: limit checkpoint_lsn below binlog split write LSN if (!recv_recovery_is_on() && checkpoint_lsn > log_sys.last_checkpoint_lsn + SIZE_OF_FILE_CHECKPOINT) diff --git a/storage/innobase/fsp/fsp_binlog.cc b/storage/innobase/fsp/fsp_binlog.cc index 51bee2f08cb..690875f2cb2 100644 --- a/storage/innobase/fsp/fsp_binlog.cc +++ b/storage/innobase/fsp/fsp_binlog.cc @@ -32,8 +32,6 @@ InnoDB implementation of binlog. #include "log.h" -static buf_block_t *binlog_cur_block; - /* How often (in terms of bytes written) to dump a (differential) binlog state at the start of the page, to speed up finding the initial GTID position for @@ -47,14 +45,13 @@ static buf_block_t *binlog_cur_block; uint64_t current_binlog_state_interval; /* - Mutex protecting active_binlog_file_no and active_binlog_space. + Mutex protecting active_binlog_file_no. */ mysql_mutex_t active_binlog_mutex; pthread_cond_t active_binlog_cond; /* The currently being written binlog tablespace. */ std::atomic active_binlog_file_no; -fil_space_t* active_binlog_space; /* The first binlog tablespace that is still open. @@ -73,7 +70,6 @@ uint64_t first_open_binlog_file_no; next tablespace is still pending. */ uint64_t last_created_binlog_file_no; -fil_space_t *last_created_binlog_space; /* Point at which it is guaranteed that all data has been written out to the @@ -90,6 +86,473 @@ std::atomic binlog_cur_written_offset[2]; */ std::atomic binlog_cur_end_offset[2]; +fsp_binlog_page_fifo *binlog_page_fifo; + + +fsp_binlog_page_entry * +fsp_binlog_page_fifo::create_page(uint64_t file_no, uint32_t page_no) +{ + mysql_mutex_lock(&m_mutex); + ut_ad(first_file_no != ~(uint64_t)0); + ut_a(file_no == first_file_no || file_no == first_file_no + 1); + + page_list *pl= &fifos[file_no & 1]; + fsp_binlog_page_entry **next_ptr_ptr= &pl->first_page; + uint32_t entry_page_no= pl->first_page_no; + /* Can only add a page at the end of the list. */ + while (*next_ptr_ptr) + { + next_ptr_ptr= &((*next_ptr_ptr)->next); + ++entry_page_no; + } + ut_a(page_no == entry_page_no); + fsp_binlog_page_entry *e= (fsp_binlog_page_entry *)ut_malloc(sizeof(*e), mem_key_binlog); + ut_a(e); + e->next= nullptr; + e->page_buf= static_cast(aligned_malloc(srv_page_size, srv_page_size)); + ut_a(e->page_buf); + memset(e->page_buf, 0, srv_page_size); + e->latched= 1; + e->complete= false; + e->flushed_clean= false; + *next_ptr_ptr= e; + + mysql_mutex_unlock(&m_mutex); + return e; +} + + +fsp_binlog_page_entry * +fsp_binlog_page_fifo::get_page(uint64_t file_no, uint32_t page_no) +{ + fsp_binlog_page_entry *res= nullptr; + page_list *pl; + fsp_binlog_page_entry *p; + uint32_t entry_page_no; + + mysql_mutex_lock(&m_mutex); + ut_ad(first_file_no != ~(uint64_t)0); + ut_a(file_no <= first_file_no + 1); + if (file_no < first_file_no) + goto end; + pl= &fifos[file_no & 1]; + p= pl->first_page; + entry_page_no= pl->first_page_no; + if (!p || page_no < entry_page_no) + goto end; + while (p) + { + if (page_no == entry_page_no) + { + /* Found the page. */ + ++p->latched; + res= p; + break; + } + p= p->next; + ++entry_page_no; + } + +end: + mysql_mutex_unlock(&m_mutex); + return res; +} + + +void +fsp_binlog_page_fifo::release_page(uint64_t file_no, uint32_t page_no, + fsp_binlog_page_entry *page) +{ + mysql_mutex_lock(&m_mutex); + ut_a(page->latched > 0); + if (--page->latched == 0) + pthread_cond_signal(&m_cond); + mysql_mutex_unlock(&m_mutex); +} + + +/* + Flush (write to disk) the first unflushed page in a file. + Returns true when the last page has been flushed. + + Must be called with m_mutex held. + + If called with force=true, will flush even any final, incomplete page. + Otherwise such page will not be written out. Any final, incomplete page + is left in the FIFO in any case. +*/ +bool +fsp_binlog_page_fifo::flush_one_page(uint64_t file_no, bool force) +{ + page_list *pl; + fsp_binlog_page_entry *e; + + mysql_mutex_assert_owner(&m_mutex); + /* + Wait for the FIFO to be not flushing from another thread, and for the + first page to not be latched. + */ + for (;;) + { + /* + Let's make page not present not an error, to allow races where someone else + flushed the page ahead of us. + */ + if (file_no < first_file_no) + return true; + ut_a(file_no <= first_file_no + 1); + + if (!flushing) + { + pl= &fifos[file_no & 1]; + e= pl->first_page; + if (!e) + return true; + if (e->latched == 0) + break; + } + my_cond_wait(&m_cond, &m_mutex.m_mutex); + } + flushing= true; + uint32_t page_no= pl->first_page_no; + mysql_mutex_unlock(&m_mutex); + ut_ad(e->complete || !e->next); + if (e->complete || (force && !e->flushed_clean)) + { + File fh= get_fh(file_no); + ut_a(pl->fh >= (File)0); + size_t res= my_pwrite(fh, e->page_buf, srv_page_size, + (uint64_t)page_no << srv_page_size_shift, + MYF(MY_WME)); + ut_a(res == srv_page_size); + e->flushed_clean= true; + } + mysql_mutex_lock(&m_mutex); + /* + We marked the FIFO as flushing, page could not have disappeared despite + releasing the mutex during the I/O. + */ + ut_ad(flushing); + bool done= (e->next == nullptr); + if (e->complete) + { + pl->first_page= e->next; + pl->first_page_no= page_no + 1; + aligned_free(e->page_buf); + ut_free(e); + } + else + done= true; /* Cannot flush past final incomplete page. */ + + flushing= false; + pthread_cond_signal(&m_cond); + return done; +} + + +void +fsp_binlog_page_fifo::flush_up_to(uint64_t file_no, uint32_t page_no) +{ + mysql_mutex_lock(&m_mutex); + for (;;) + { + if (file_no < first_file_no || + (file_no == first_file_no && fifos[file_no & 1].first_page_no > page_no)) + break; + uint64_t file_no_to_flush= file_no; + /* Flush the prior file to completion first. */ + if (file_no == first_file_no + 1 && fifos[(file_no - 1) & 1].first_page) + file_no_to_flush= file_no - 1; + bool done= flush_one_page(file_no_to_flush, true); + if (done && file_no == file_no_to_flush) + break; + } + mysql_mutex_unlock(&m_mutex); +} + + +void +fsp_binlog_page_fifo::do_fdatasync(uint64_t file_no) +{ + mysql_mutex_lock(&m_mutex); + ut_a(file_no == first_file_no || file_no == first_file_no + 1); + File fh= fifos[file_no & 1].fh; + if (fh != (File)-1) + { + while (flushing) + my_cond_wait(&m_cond, &m_mutex.m_mutex); + flushing= true; + mysql_mutex_unlock(&m_mutex); + int res= my_sync(fh, MYF(MY_WME)); + ut_a(!res); + mysql_mutex_lock(&m_mutex); + flushing= false; + pthread_cond_signal(&m_cond); + } + mysql_mutex_unlock(&m_mutex); +} + + +File +fsp_binlog_page_fifo::get_fh(uint64_t file_no) +{ + File fh= fifos[file_no & 1].fh; + if (fh == (File)-1) + { + char filename[OS_FILE_MAX_PATH]; + binlog_name_make(filename, file_no); + fifos[file_no & 1].fh= fh= my_open(filename, O_RDWR | O_BINARY, MYF(MY_WME)); + } + return fh; +} + +/* + If init_page is not ~(uint32_t)0, then it is the page to continue writing + when re-opening existing binlog at server startup. + + If in addition, partial_page is non-NULL, it is an (aligned) page buffer + containing the partial data of page init_page. + + If init_page is set but partial_page is not, then init_page is the first, + empty page in the tablespace to create and start writing to. +*/ +void +fsp_binlog_page_fifo::create_tablespace(uint64_t file_no, + uint32_t size_in_pages, + uint32_t init_page, + byte *partial_page) +{ + mysql_mutex_lock(&m_mutex); + ut_ad(init_page == ~(uint32_t)0 || + first_file_no == ~(uint64_t)0 || + /* At server startup allow opening N empty and (N-1) partial. */ + (init_page != ~(uint32_t)0 && file_no + 1 == first_file_no && + !fifos[first_file_no & 1].first_page)); + ut_a(first_file_no == ~(uint64_t)0 || + file_no == first_file_no + 1 || + file_no == first_file_no + 2 || + (init_page != ~(uint32_t)0 && file_no + 1 == first_file_no && + !fifos[first_file_no & 1].first_page)); + if (first_file_no == ~(uint64_t)0) + { + first_file_no= file_no; + } + else if (UNIV_UNLIKELY(file_no + 1 == first_file_no)) + first_file_no= file_no; + else if (file_no == first_file_no + 2) + { + /* All pages in (N-2) must be flushed before doing (N). */ + ut_a(!fifos[file_no & 1].first_page); + if (fifos[file_no & 1].fh != (File)-1) + my_close(fifos[file_no & 1].fh, MYF(0)); + first_file_no= file_no - 1; + } + + if (init_page != ~(uint32_t)0) + { + if (partial_page) + { + fsp_binlog_page_entry *e= + (fsp_binlog_page_entry *)ut_malloc(sizeof(*e), mem_key_binlog); + ut_a(e); + e->next= nullptr; + e->page_buf= + static_cast(aligned_malloc(srv_page_size, srv_page_size)); + ut_a(e->page_buf); + memcpy(e->page_buf, partial_page, srv_page_size); + e->latched= 0; + e->complete= false; + e->flushed_clean= true; + fifos[file_no & 1].first_page= e; + } + else + fifos[file_no & 1].first_page= nullptr; + fifos[file_no & 1].first_page_no= init_page; + } + else + { + fifos[file_no & 1].first_page= nullptr; + fifos[file_no & 1].first_page_no= 0; + } + fifos[file_no & 1].fh= (File)-1; + fifos[file_no & 1].size_in_pages= size_in_pages; + mysql_mutex_unlock(&m_mutex); +} + + +void +fsp_binlog_page_fifo::release_tablespace(uint64_t file_no) +{ + mysql_mutex_lock(&m_mutex); + ut_a(file_no == first_file_no); + ut_a(!fifos[file_no & 1].first_page || + /* Allow a final, incomplete-but-fully-flushed page in the fifo. */ + (!fifos[file_no & 1].first_page->complete && + fifos[file_no & 1].first_page->flushed_clean && + !fifos[file_no & 1].first_page->next && + !fifos[(file_no + 1) & 1].first_page)); + if (fifos[file_no & 1].fh != (File)-1) + { + while (flushing) + my_cond_wait(&m_cond, &m_mutex.m_mutex); + flushing= true; + File fh= fifos[file_no & 1].fh; + mysql_mutex_unlock(&m_mutex); + int res= my_sync(fh, MYF(MY_WME)); + ut_a(!res); + my_close(fifos[file_no & 1].fh, MYF(0)); + mysql_mutex_lock(&m_mutex); + flushing= false; + pthread_cond_signal(&m_cond); + } + first_file_no= file_no + 1; + + fifos[file_no & 1].first_page= nullptr; + fifos[file_no & 1].first_page_no= 0; + fifos[file_no & 1].size_in_pages= 0; + fifos[file_no & 1].fh= (File)-1; + mysql_mutex_unlock(&m_mutex); +} + + +fsp_binlog_page_fifo::fsp_binlog_page_fifo() + : first_file_no(~(uint64_t)0), flushing(false), + flush_thread_started(false), flush_thread_end(false) +{ + fifos[0]= {nullptr, 0, 0, (File)-1 }; + fifos[1]= {nullptr, 0, 0, (File)-1 }; + mysql_mutex_init(fsp_page_fifo_mutex_key, &m_mutex, nullptr); + pthread_cond_init(&m_cond, nullptr); + + // ToDo I think I need to read the first page here, or somewhere? + // Normally I'd never want to read a page into the page fifo, but at startup, I seem to need to do so for the first page I start writing on. Though I suppose I already read that, so maybe just a way to add that page into the FIFO in the constructor? +} + + +void +fsp_binlog_page_fifo::reset() +{ + ut_ad(!flushing); + for (uint32_t i= 0; i < 2; ++i) + { + if (fifos[i].fh != (File)-1) + my_close(fifos[i].fh, MYF(0)); + fsp_binlog_page_entry *e= fifos[i].first_page; + while (e) + { + fsp_binlog_page_entry *next= e->next; + aligned_free(e->page_buf); + ut_free(e); + e= next; + } + fifos[i]= {nullptr, 0, 0, (File)-1 }; + } + first_file_no= ~(uint64_t)0; +} + + +fsp_binlog_page_fifo::~fsp_binlog_page_fifo() +{ + ut_ad(!flushing); + reset(); + mysql_mutex_destroy(&m_mutex); + pthread_cond_destroy(&m_cond); +} + + +void +fsp_binlog_page_fifo::lock_wait_for_idle() +{ + mysql_mutex_lock(&m_mutex); + while(flushing) + my_cond_wait(&m_cond, &m_mutex.m_mutex); +} + + +void +fsp_binlog_page_fifo::start_flush_thread() +{ + flush_thread_started= false; + flush_thread_end= false; + flush_thread_obj= std::thread{ [this] { flush_thread_run(); } }; + mysql_mutex_lock(&m_mutex); + while (!flush_thread_started) + my_cond_wait(&m_cond, &m_mutex.m_mutex); + mysql_mutex_unlock(&m_mutex); +} + + +void +fsp_binlog_page_fifo::stop_flush_thread() +{ + if (!flush_thread_started) + return; + mysql_mutex_lock(&m_mutex); + flush_thread_end= true; + pthread_cond_signal(&m_cond); + while (flush_thread_started) + my_cond_wait(&m_cond, &m_mutex.m_mutex); + mysql_mutex_unlock(&m_mutex); + flush_thread_obj.join(); +} + + +void +fsp_binlog_page_fifo::flush_thread_run() +{ + mysql_mutex_lock(&m_mutex); + flush_thread_started= true; + pthread_cond_signal(&m_cond); + + while (!flush_thread_end) + { + /* + Flush pages one by one as long as there are more pages pending. + Once all have been flushed, wait for more pages to become pending. + Don't try to force flush a final page that is not yet completely + filled with data. + */ + uint64_t file_no= first_file_no; + bool all_flushed= true; + if (first_file_no != ~(uint64_t)0) + { + all_flushed= flush_one_page(file_no, false); + if (all_flushed) + all_flushed= flush_one_page(file_no + 1, false); + } + if (all_flushed) + my_cond_wait(&m_cond, &m_mutex.m_mutex); + } + + flush_thread_started= false; + pthread_cond_signal(&m_cond); + mysql_mutex_unlock(&m_mutex); +} + + +void +fsp_log_binlog_write(mtr_t *mtr, uint64_t file_no, uint32_t page_no, + fsp_binlog_page_entry *page, uint32_t page_offset, + uint32_t len) +{ + if (page_offset + len >= srv_page_size - FIL_PAGE_DATA_END) + page->complete= true; + if (page->flushed_clean) + { + /* + If the page with partial data has been written to the file system, then + redo log all the data on the page, to be sure we can still recover the + entire page reliably even if the latest checkpoint is after that partial + write. + */ + len= page_offset + len; + page_offset= 0; + page->flushed_clean= false; + } + mtr->write_binlog(LOG_BINLOG_ID_0 + (file_no & 1), page_no, + (uint16_t)page_offset, page_offset + &page->page_buf[0], + len); +} /* Initialize the InnoDB implementation of binlog. @@ -101,12 +564,16 @@ fsp_binlog_init() { mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr); pthread_cond_init(&active_binlog_cond, nullptr); + binlog_page_fifo= new fsp_binlog_page_fifo(); + binlog_page_fifo->start_flush_thread(); } void fsp_binlog_shutdown() { + binlog_page_fifo->stop_flush_thread(); + delete binlog_page_fifo; pthread_cond_destroy(&active_binlog_cond); mysql_mutex_destroy(&active_binlog_mutex); } @@ -118,48 +585,10 @@ fsp_binlog_shutdown() dberr_t fsp_binlog_tablespace_close(uint64_t file_no) { - mtr_t mtr; - dberr_t res; - - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1); - mysql_mutex_lock(&fil_system.mutex); - fil_space_t *space= fil_space_get_by_id(space_id); - mysql_mutex_unlock(&fil_system.mutex); - if (!space) { - res= DB_ERROR; - goto end; - } - - /* - Write out any remaining pages in the buffer pool to the binlog tablespace. - Then flush the file to disk, and close the old tablespace. - - ToDo: Will this turn into a busy-wait if some of the pages are still latched - in this tablespace, maybe because even though the tablespace has been - written full, the mtr that's ending in the next tablespace may still be - active? This will need fixing, no busy-wait should be done here. - */ - - /* - Take and release an exclusive latch on the last page in the tablespace to - be closed. We might be signalled that the tablespace is done while the mtr - completing the tablespace write is still active; the exclusive latch will - ensure we wait for any last mtr to commit before we close the tablespace. - */ - mtr.start(); - buf_page_get_gen(page_id_t{space_id, space->size - 1}, 0, RW_X_LATCH, nullptr, - BUF_GET, &mtr, &res); - mtr.commit(); - - while (buf_flush_list_space(space)) - ; - // ToDo: Also, buf_flush_list_space() seems to use io_capacity, but that's not appropriate here perhaps - os_aio_wait_until_no_pending_writes(false); - space->flush(); - fil_space_free(space_id, false); - res= DB_SUCCESS; -end: - return res; + binlog_page_fifo->flush_up_to(file_no, ~(uint32_t)0); + /* release_tablespace() will fdatasync() the file first. */ + binlog_page_fifo->release_tablespace(file_no); + return DB_SUCCESS; } @@ -167,15 +596,16 @@ end: Open an existing tablespace. The filehandle fh is taken over by the tablespace (or closed in case of error). */ -fil_space_t * +bool fsp_binlog_open(const char *file_name, pfs_os_file_t fh, - uint64_t file_no, size_t file_size, bool open_empty) + uint64_t file_no, size_t file_size, + uint32_t init_page, byte *partial_page) { const uint32_t page_size= (uint32_t)srv_page_size; const uint32_t page_size_shift= srv_page_size_shift; os_offset_t binlog_size= innodb_binlog_size_in_pages << srv_page_size_shift; - if (open_empty && file_size < binlog_size) { + if (init_page == ~(uint32_t)0 && file_size < binlog_size) { /* A crash may have left a partially pre-allocated file. If so, extend it to the required size. @@ -197,81 +627,30 @@ fsp_binlog_open(const char *file_name, pfs_os_file_t fh, "should be at least %u bytes", file_no, file_size, 2*page_size); os_file_close(fh); - return nullptr; + return true; } - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1); - - if (!open_empty) { - page_t *page_buf= static_cast(aligned_malloc(page_size, page_size)); - if (!page_buf) { - os_file_close(fh); - return nullptr; - } - - dberr_t err= os_file_read(IORequestRead, fh, page_buf, 0, page_size, nullptr); - if (err != DB_SUCCESS) { - sql_print_warning("Unable to read first page of file '%s'", file_name); - aligned_free(page_buf); - os_file_close(fh); - return nullptr; - } - - /* ToDo: Maybe use leaner page format for binlog tablespace? */ - uint32_t id1= mach_read_from_4(FIL_PAGE_SPACE_ID + page_buf); - if (id1 != space_id) { - sql_print_warning("Binlog file %s has inconsistent tablespace id %u " - "(expected %u)", file_name, id1, space_id); - aligned_free(page_buf); - os_file_close(fh); - return nullptr; - } - // ToDo: should we here check buf_page_is_corrupted() ? - - aligned_free(page_buf); - } - - uint32_t fsp_flags= - FSP_FLAGS_FCRC32_MASK_MARKER | FSP_FLAGS_FCRC32_PAGE_SSIZE(); - /* ToDo: Enryption. */ - fil_encryption_t mode= FIL_ENCRYPTION_OFF; - fil_space_crypt_t* crypt_data= nullptr; - fil_space_t *space; - - mysql_mutex_lock(&fil_system.mutex); - if (!(space= fil_space_t::create(space_id, fsp_flags, false, crypt_data, - mode, true))) { - mysql_mutex_unlock(&fil_system.mutex); - os_file_close(fh); - return nullptr; - } - - space->add(file_name, fh, (uint32_t)(file_size >> page_size_shift), - false, true); - + binlog_page_fifo->create_tablespace(file_no, + (uint32_t)(file_size >> page_size_shift), + init_page, partial_page); + os_file_close(fh); first_open_binlog_file_no= file_no; if (last_created_binlog_file_no == ~(uint64_t)0 || file_no > last_created_binlog_file_no) { last_created_binlog_file_no= file_no; - last_created_binlog_space= space; } - - mysql_mutex_unlock(&fil_system.mutex); - return space; + return false; } /** Create a binlog tablespace file @param[in] file_no Index of the binlog tablespace -@param[out] new_space The newly created tablespace @return DB_SUCCESS or error code */ -dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages, - fil_space_t **new_space) +dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages) { pfs_os_file_t fh; bool ret; - *new_space= nullptr; if(srv_read_only_mode) return DB_ERROR; @@ -291,8 +670,6 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages, } /* ToDo: Enryption? */ - fil_encryption_t mode= FIL_ENCRYPTION_OFF; - fil_space_crypt_t* crypt_data= nullptr; /* We created the binlog file and now write it full of zeros */ if (!os_file_set_size(name, fh, @@ -304,24 +681,8 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages, return DB_ERROR; } - mysql_mutex_lock(&fil_system.mutex); - /* ToDo: Need to ensure file (N-2) is no longer active before creating (N). */ - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1); - if (!(*new_space= fil_space_t::create(space_id, - ( FSP_FLAGS_FCRC32_MASK_MARKER | - FSP_FLAGS_FCRC32_PAGE_SSIZE()), - false, crypt_data, - mode, true))) { - mysql_mutex_unlock(&fil_system.mutex); - os_file_close(fh); - os_file_delete(innodb_data_file_key, name); - return DB_ERROR; - } - - fil_node_t* node = (*new_space)->add(name, fh, size_in_pages, - false, true); - node->find_metadata(); - mysql_mutex_unlock(&fil_system.mutex); + binlog_page_fifo->create_tablespace(file_no, size_in_pages); + os_file_close(fh); return DB_SUCCESS; } @@ -341,12 +702,11 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) { uint32_t page_size= (uint32_t)srv_page_size; uint32_t page_size_shift= srv_page_size_shift; - fil_space_t *space= active_binlog_space; const uint32_t page_end= page_size - FIL_PAGE_DATA_END; uint32_t page_no= binlog_cur_page_no; uint32_t page_offset= binlog_cur_page_offset; /* ToDo: What is the lifetime of what's pointed to by binlog_cur_block, is there some locking needed around it or something? */ - buf_block_t *block= binlog_cur_block; + fsp_binlog_page_entry *block= nullptr; uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed); uint64_t pending_prev_end_offset= 0; uint64_t start_file_no= 0; @@ -359,7 +719,7 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) byte cont_flag= 0; for (;;) { if (page_offset == FIL_PAGE_DATA) { - if (UNIV_UNLIKELY(page_no >= space->size)) { + if (UNIV_UNLIKELY(page_no >= binlog_page_fifo->size_in_pages(file_no))) { /* Signal to the pre-allocation thread that this tablespace has been written full, so that it can be closed and a new one pre-allocated @@ -392,7 +752,6 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) ++file_no; binlog_cur_written_offset[file_no & 1].store(0, std::memory_order_relaxed); binlog_cur_end_offset[file_no & 1].store(0, std::memory_order_relaxed); - active_binlog_space= space= last_created_binlog_space; pthread_cond_signal(&active_binlog_cond); mysql_mutex_unlock(&active_binlog_mutex); binlog_cur_page_no= page_no= 0; @@ -452,34 +811,35 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) } } err= binlog_gtid_state(&full_state, mtr, block, page_no, - page_offset, space); + page_offset, file_no); ut_a(!err /* ToDo error handling */); ut_ad(block); full_state.free(); binlog_diff_state.reset_nolock(); } else { bool err= binlog_gtid_state(&binlog_diff_state, mtr, block, page_no, - page_offset, space); + page_offset, file_no); ut_a(!err /* ToDo error handling */); } } else - block= fsp_page_create(space, page_no, mtr); + block= binlog_page_fifo->create_page(file_no, page_no); } else { - dberr_t err; - /* ToDo: Is RW_SX_LATCH appropriate here? */ - block= buf_page_get_gen(page_id_t{space->id, page_no}, - 0, RW_SX_LATCH, block, - BUF_GET, mtr, &err); - ut_a(err == DB_SUCCESS); + block= binlog_page_fifo->get_page(file_no, page_no); } ut_ad(page_offset < page_end); uint32_t page_remain= page_end - page_offset; - byte *ptr= page_offset + block->page.frame; + byte *ptr= page_offset + &block->page_buf[0]; /* ToDo: Do this check at the end instead, to save one buf_page_get_gen()? */ if (page_remain < 4) { /* Pad the remaining few bytes, and move to next page. */ - mtr->memset(block, page_offset, page_remain, FSP_BINLOG_TYPE_FILLER); + if (UNIV_LIKELY(page_remain > 0)) + { + memset(ptr, FSP_BINLOG_TYPE_FILLER, page_remain); + fsp_log_binlog_write(mtr, file_no, page_no, block, page_offset, + page_remain); + } + binlog_page_fifo->release_page(file_no, page_no, block); block= nullptr; ++page_no; page_offset= FIL_PAGE_DATA; @@ -513,9 +873,10 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) ptr[2]= (byte)(size >> 8); ut_ad(size <= 0xffff); - mtr->memcpy(*block, page_offset, size+3); + fsp_log_binlog_write(mtr, file_no, page_no, block, page_offset, size + 3); cont_flag= FSP_BINLOG_FLAG_CONT; if (page_remain == 0) { + binlog_page_fifo->release_page(file_no, page_no, block); block= nullptr; page_offset= FIL_PAGE_DATA; ++page_no; @@ -525,7 +886,8 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) if (size_last.second) break; } - binlog_cur_block= block; + if (block) + binlog_page_fifo->release_page(file_no, page_no, block); binlog_cur_page_no= page_no; binlog_cur_page_offset= page_offset; if (UNIV_UNLIKELY(pending_prev_end_offset != 0)) @@ -571,40 +933,66 @@ bool fsp_binlog_flush() { uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed); - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1); uint32_t page_no= binlog_cur_page_no; - fil_space_t *space= active_binlog_space; chunk_data_flush dummy_data; mtr_t mtr; mysql_mutex_lock(&purge_binlog_mutex); - mtr.start(); - mtr.x_lock_space(space); - /* - ToDo: Here, if we are already at precisely the end of a page, we need not - fill up that page with a dummy record, we can just truncate the tablespace - to that point. But then we need to handle an assertion m_modifications!=0 - in mtr_t::commit_shrink(). - */ - fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY); - if (page_no + 1 < space->size) + binlog_page_fifo->lock_wait_for_idle(); + File fh= binlog_page_fifo->get_fh(file_no); + if (fh == (File)-1) { - mtr.trim_pages(page_id_t(space_id, page_no + 1)); - mtr.commit_shrink(*space, page_no + 1); + binlog_page_fifo->unlock(); + mysql_mutex_unlock(&purge_binlog_mutex); + return true; + } - size_t reclaimed= (space->size - (page_no + 1)) << srv_page_size_shift; + if (my_chsize(fh, ((uint64_t)page_no + 1) << srv_page_size_shift, 0, + MYF(MY_WME))) + { + binlog_page_fifo->unlock(); + mysql_mutex_unlock(&purge_binlog_mutex); + return true; + } + /* + Sync the truncate to disk. This way, if we crash after this we are sure the + truncate has been effected so we do not put the filler record in what is + then the middle of the file. If we crash before the truncate is durable, we + just come up as if the flush has never happened. If we crash with the + truncate durable but without the filler record, that is not a problem, the + binlog file will just be shorter. + */ + my_sync(fh, MYF(0)); + binlog_page_fifo->unlock(); + + uint32_t page_offset= binlog_cur_page_offset; + if (page_offset > FIL_PAGE_DATA || + page_offset < srv_page_size - FIL_PAGE_DATA_END) + { + /* + If we are not precisely the end of a page, fill up that page with a dummy + record. Otherwise the zeros at the end of the page would be detected as + end-of-file of the entire binlog. + */ + mtr.start(); + fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY); + mtr.commit(); + } + + if (page_no + 1 < binlog_page_fifo->size_in_pages(file_no)) + { + binlog_page_fifo->truncate_file_size(file_no, page_no + 1); + size_t reclaimed= (binlog_page_fifo->size_in_pages(file_no) - (page_no + 1)) + << srv_page_size_shift; if (UNIV_LIKELY(total_binlog_used_size >= reclaimed)) total_binlog_used_size-= reclaimed; else ut_ad(0); } - else - mtr.commit(); /* Flush out all pages in the (now filled-up) tablespace. */ - while (buf_flush_list_space(space)) - ; + binlog_page_fifo->flush_up_to(file_no, page_no); mysql_mutex_unlock(&purge_binlog_mutex); @@ -650,8 +1038,8 @@ binlog_chunk_reader::read_error_corruption(uint64_t file_no, uint64_t page_no, /* Obtain the data on the page currently pointed to by the chunk reader. The - page is either latched in the buffer pool (starting an mtr), or read from - the file into the page buffer. + page is either latched in the page fifo, or read from the file into the page + buffer. The code does a dirty read of active_binlog_file_no to determine if the page is known to be available to read from the file, or if it should be looked up @@ -664,11 +1052,10 @@ binlog_chunk_reader::read_error_corruption(uint64_t file_no, uint64_t page_no, enum binlog_chunk_reader::chunk_reader_status binlog_chunk_reader::fetch_current_page() { - ut_ad(!cur_block /* Must have no active mtr */); + ut_ad(!cur_block /* Must have no active page latch */); uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); for (;;) { - buf_block_t *block= nullptr; - bool mtr_started= false; + fsp_binlog_page_entry *block= nullptr; uint64_t offset= (s.page_no << srv_page_size_shift) | s.in_page_offset; uint64_t active= active2; uint64_t end_offset= @@ -687,24 +1074,13 @@ binlog_chunk_reader::fetch_current_page() if (s.file_no + 1 >= active) { /* Check if we should read from the buffer pool or from the file. */ if (end_offset != ~(uint64_t)0 && offset < end_offset) { - mtr.start(); - mtr_started= true; /* ToDo: Should we keep track of the last block read and use it as a hint? Will be mainly useful when reading the partially written active page at the current end of the active binlog, which might be a common case. */ - buf_block_t *hint_block= nullptr; - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(s.file_no & 1); - dberr_t err= DB_SUCCESS; - block= buf_page_get_gen(page_id_t{space_id, s.page_no}, 0, - RW_S_LATCH, hint_block, BUF_GET_IF_IN_POOL, - &mtr, &err); - if (err != DB_SUCCESS) { - mtr.commit(); - return CHUNK_READER_ERROR; - } + block= binlog_page_fifo->get_page(s.file_no, s.page_no); } active2= active_binlog_file_no.load(std::memory_order_acquire); if (UNIV_UNLIKELY(active2 != active)) { @@ -713,13 +1089,10 @@ binlog_chunk_reader::fetch_current_page() have gotten invalid end_offset or a buffer pool page from a wrong tablespace. So just try again. */ - if (mtr_started) - mtr.commit(); continue; } cur_end_offset= end_offset; if (offset >= end_offset) { - ut_ad(!mtr_started); if (s.file_no == active) { /* Reached end of the currently active binlog file -> EOF. */ return CHUNK_READER_EOF; @@ -727,13 +1100,10 @@ binlog_chunk_reader::fetch_current_page() } if (block) { cur_block= block; - ut_ad(mtr_started); - page_ptr= block->page.frame; + page_ptr= block->page_buf; return CHUNK_READER_FOUND; } else { /* Not in buffer pool, just read it from the file. */ - if (mtr_started) - mtr.commit(); /* Fall through to read from file. */ } } @@ -948,11 +1318,12 @@ skip_chunk: { go_next_page: /* End of page reached, move to the next page. */ + uint32_t block_page_no= s.page_no; ++s.page_no; page_ptr= nullptr; if (cur_block) { - mtr.commit(); + binlog_page_fifo->release_page(s.file_no, block_page_no, cur_block); cur_block= nullptr; } s.in_page_offset= 0; @@ -992,7 +1363,7 @@ binlog_chunk_reader::restore_pos(binlog_chunk_reader::saved_position *pos) /* Seek to a different page, release any current page. */ if (cur_block) { - mtr.commit(); + binlog_page_fifo->release_page(s.file_no, s.page_no, cur_block); cur_block= nullptr; } page_ptr= nullptr; @@ -1023,7 +1394,7 @@ void binlog_chunk_reader::release(bool release_file_page) { if (cur_block) { - mtr.commit(); + binlog_page_fifo->release_page(s.file_no, s.page_no, cur_block); cur_block= nullptr; page_ptr= nullptr; } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 164ebc9da7f..bcdf712cccc 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -550,6 +550,7 @@ mysql_pfs_key_t srv_threads_mutex_key; mysql_pfs_key_t tpool_cache_mutex_key; mysql_pfs_key_t fsp_active_binlog_mutex_key; mysql_pfs_key_t fsp_purge_binlog_mutex_key; +mysql_pfs_key_t fsp_page_fifo_mutex_key; /* all_innodb_mutexes array contains mutexes that are performance schema instrumented if "UNIV_PFS_MUTEX" diff --git a/storage/innobase/handler/innodb_binlog.cc b/storage/innobase/handler/innodb_binlog.cc index 410d6090209..50dcb6828d2 100644 --- a/storage/innobase/handler/innodb_binlog.cc +++ b/storage/innobase/handler/innodb_binlog.cc @@ -230,8 +230,6 @@ class ha_innodb_binlog_reader : public handler_binlog_reader { /* Used to read the header of the commit record. */ byte rd_buf[5*COMPR_INT_MAX64]; private: - int read_from_buffer_pool_page(buf_block_t *block, uint64_t end_offset, - uchar *buf, uint32_t len); int read_from_file(uint64_t end_offset, uchar *buf, uint32_t len); int read_from_page(uchar *page_ptr, uint64_t end_offset, uchar *buf, uint32_t len); @@ -463,7 +461,6 @@ innodb_binlog_init_state() earliest_binlog_file_no= ~(uint64_t)0; total_binlog_used_size= 0; active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release); - active_binlog_space= nullptr; binlog_cur_page_no= 0; binlog_cur_page_offset= FIL_PAGE_DATA; current_binlog_state_interval= innodb_binlog_state_interval; @@ -597,7 +594,6 @@ binlog_page_empty(const byte *page) static int find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, - fil_space_t **out_space, uint32_t *out_page_no, uint32_t *out_pos_in_page) { const uint32_t page_size= (uint32_t)srv_page_size; @@ -627,10 +623,11 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, return -1; } if (binlog_page_empty(page_buf)) { - *out_space= fsp_binlog_open(file_name, fh, file_no, file_size, true); + ret= + fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr); binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed); - return (*out_space ? 0 : -1); + return (ret ? -1 : 0); } last_nonempty= 0; @@ -688,11 +685,14 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf, *out_page_no= p_0 - 1; *out_pos_in_page= (uint32_t)(p - page_buf); - *out_space= fsp_binlog_open(file_name, fh, file_no, file_size, false); + if (*out_pos_in_page >= page_size - FIL_PAGE_DATA_END) + ret= fsp_binlog_open(file_name, fh, file_no, file_size, p_0, nullptr); + else + ret= fsp_binlog_open(file_name, fh, file_no, file_size, p_0 - 1, page_buf); uint64_t pos= (*out_page_no << page_size_shift) | *out_pos_in_page; binlog_cur_written_offset[idx].store(pos, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(pos, std::memory_order_relaxed); - return (*out_space ? 1 : -1); + return ret ? -1 : 1; } @@ -736,7 +736,6 @@ innodb_binlog_discover() Now, if we found any binlog files, locate the point in one of them where binlogging stopped, and where we should continue writing new binlog data. */ - fil_space_t *space, *prev_space; uint32_t page_no, prev_page_no, pos_in_page, prev_pos_in_page; std::unique_ptr page_buf(static_cast(aligned_malloc(page_size, page_size)), @@ -749,8 +748,7 @@ innodb_binlog_discover() int res= find_pos_in_binlog(binlog_files.last_file_no, binlog_files.last_size, - page_buf.get(), - &space, &page_no, &pos_in_page); + page_buf.get(), &page_no, &pos_in_page); if (res < 0) { file_no= binlog_files.last_file_no; active_binlog_file_no.store(file_no, std::memory_order_release); @@ -764,7 +762,6 @@ innodb_binlog_discover() /* Found start position in the last binlog file. */ file_no= binlog_files.last_file_no; active_binlog_file_no.store(file_no, std::memory_order_release); - active_binlog_space= space; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -779,11 +776,10 @@ innodb_binlog_discover() res= find_pos_in_binlog(binlog_files.prev_file_no, binlog_files.prev_size, page_buf.get(), - &prev_space, &prev_page_no, &prev_pos_in_page); + &prev_page_no, &prev_pos_in_page); if (res < 0) { file_no= binlog_files.last_file_no; active_binlog_file_no.store(file_no, std::memory_order_release); - active_binlog_space= space; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; sql_print_warning("Binlog number %llu could not be opened, starting " @@ -793,7 +789,6 @@ innodb_binlog_discover() } file_no= binlog_files.prev_file_no; active_binlog_file_no.store(file_no, std::memory_order_release); - active_binlog_space= prev_space; binlog_cur_page_no= prev_page_no; binlog_cur_page_offset= prev_pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -806,7 +801,6 @@ innodb_binlog_discover() /* Just one empty binlog file found. */ file_no= binlog_files.last_file_no; active_binlog_file_no.store(file_no, std::memory_order_release); - active_binlog_space= space; binlog_cur_page_no= page_no; binlog_cur_page_offset= pos_in_page; ib::info() << "Continuing binlog number " << file_no << " from position " @@ -875,7 +869,6 @@ innodb_binlog_prealloc_thread() /* Pre-allocate the next tablespace (if not done already). */ uint64_t last_created= last_created_binlog_file_no; if (last_created <= active && last_created <= first_open) { - fil_space_t *new_space; ut_ad(last_created == active); ut_ad(last_created == first_open || first_open == ~(uint64_t)0); /* @@ -887,8 +880,7 @@ innodb_binlog_prealloc_thread() mysql_mutex_lock(&purge_binlog_mutex); uint32_t size_in_pages= innodb_binlog_size_in_pages; - dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages, - &new_space); + dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages); if (earliest_binlog_file_no == ~(uint64_t)0) earliest_binlog_file_no= last_created; total_binlog_used_size+= (size_in_pages << srv_page_size_shift); @@ -898,15 +890,12 @@ innodb_binlog_prealloc_thread() mysql_mutex_lock(&active_binlog_mutex); ut_a(res2 == DB_SUCCESS /* ToDo: Error handling. */); - ut_a(new_space); last_created_binlog_file_no= last_created; - last_created_binlog_space= new_space; /* If we created the initial tablespace file, make it the active one. */ ut_ad(active < ~(uint64_t)0 || last_created == 0); if (active == ~(uint64_t)0) { active_binlog_file_no.store(last_created, std::memory_order_relaxed); - active_binlog_space= last_created_binlog_space; } if (first_open == ~(uint64_t)0) first_open_binlog_file_no= first_open= last_created; @@ -988,8 +977,8 @@ serialize_gtid_state(rpl_binlog_state_base *state, byte *buf, size_t buf_size, bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, - buf_block_t * &block, uint32_t &page_no, - uint32_t &page_offset, fil_space_t *space) + fsp_binlog_page_entry * &block, uint32_t &page_no, + uint32_t &page_offset, uint64_t file_no) { /* Use a small, efficient stack-allocated buffer by default, falling back to @@ -997,6 +986,8 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, */ byte small_buf[192]; byte *buf, *alloced_buf; + uint32_t block_page_no= ~(uint32_t)0; + block= nullptr; ssize_t used_bytes= serialize_gtid_state(state, small_buf, sizeof(small_buf), page_no==0); @@ -1034,16 +1025,19 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, afterwards. There is no point in using space to allow fast search to a point if there is no data to search for after that point. */ - if (page_no + needed_pages < space->size) + if (page_no + needed_pages < binlog_page_fifo->size_in_pages(file_no)) { byte cont_flag= 0; while (used_bytes > 0) { - ut_ad(page_no < space->size); - block= fsp_page_create(space, page_no, mtr); + ut_ad(page_no < binlog_page_fifo->size_in_pages(file_no)); + if (block) + binlog_page_fifo->release_page(file_no, block_page_no, block); + block_page_no= page_no; + block= binlog_page_fifo->create_page(file_no, block_page_no); ut_a(block /* ToDo: error handling? */); page_offset= FIL_PAGE_DATA; - byte *ptr= page_offset + block->page.frame; + byte *ptr= page_offset + &block->page_buf[0]; ssize_t chunk= used_bytes; byte last_flag= FSP_BINLOG_FLAG_LAST; if (chunk > page_room - 3) { @@ -1056,15 +1050,19 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, ptr[2] = (byte)(chunk >> 8); ut_ad(chunk <= 0xffff); memcpy(ptr+3, buf, chunk); - mtr->memcpy(*block, page_offset, chunk+3); + fsp_log_binlog_write(mtr, file_no, block_page_no, block, page_offset, + (uint32)(chunk+3)); page_offset+= (uint32_t)(chunk+3); buf+= chunk; used_bytes-= chunk; cont_flag= FSP_BINLOG_FLAG_CONT; } - if (page_offset == FIL_PAGE_DATA_END) { + if (page_offset == page_size - FIL_PAGE_DATA_END) { + if (block) + binlog_page_fifo->release_page(file_no, block_page_no, block); block= nullptr; + block_page_no= ~(uint32_t)0; page_offset= FIL_PAGE_DATA; ++page_no; } @@ -1073,7 +1071,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, /* Make sure we return a page for caller to write the main event data into. */ if (UNIV_UNLIKELY(!block)) { - block= fsp_page_create(space, page_no, mtr); + block= binlog_page_fifo->create_page(file_no, page_no); ut_a(block /* ToDo: error handling? */); } @@ -1895,8 +1893,6 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, uint64_t *out_file_end, uint64_t *out_diff_state_interval) { - buf_block_t *block; - *out_file_end= 0; uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire); if (file_no > active2) @@ -1904,11 +1900,11 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, for (;;) { - mtr_t mtr; - bool mtr_started= false; uint64_t active= active2; uint64_t end_offset= binlog_cur_end_offset[file_no&1].load(std::memory_order_acquire); + fsp_binlog_page_entry *block; + if (file_no + 1 >= active && end_offset != ~(uint64_t)0 && page_no <= (end_offset >> srv_page_size_shift)) @@ -1920,16 +1916,7 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, not change while getting the page (otherwise it might belong to a later tablespace file). */ - mtr.start(); - mtr_started= true; - uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1); - dberr_t err= DB_SUCCESS; - block= buf_page_get_gen(page_id_t{space_id, page_no}, 0, RW_S_LATCH, - nullptr, BUF_GET_IF_IN_POOL, &mtr, &err); - if (err != DB_SUCCESS) { - mtr.commit(); - return READ_ERROR; - } + block= binlog_page_fifo->get_page(file_no, page_no); } else block= nullptr; @@ -1937,8 +1924,8 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, if (UNIV_UNLIKELY(active2 != active)) { /* Active moved ahead while we were reading, try again. */ - if (mtr_started) - mtr.commit(); + if (block) + binlog_page_fifo->release_page(file_no, page_no, block); continue; } if (file_no + 1 >= active) @@ -1952,7 +1939,7 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, */ if (page_no > (end_offset >> srv_page_size_shift)) { - ut_ad(!mtr_started); + ut_ad(!block); return READ_NOT_FOUND; } } @@ -1960,17 +1947,13 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state, if (block) { ut_ad(end_offset != ~(uint64_t)0); - int res= read_gtid_state_from_page(state, block->page.frame, page_no, + int res= read_gtid_state_from_page(state, block->page_buf, page_no, out_diff_state_interval); - ut_ad(mtr_started); - if (mtr_started) - mtr.commit(); + binlog_page_fifo->release_page(file_no, page_no, block); return (Read_Result)res; } else { - if (mtr_started) - mtr.commit(); if (cur_open_file_no != file_no) { if (cur_open_file >= (File)0) @@ -2234,9 +2217,14 @@ innodb_reset_binlogs() bool err= false; ut_a(innodb_binlog_inited >= 2); + /* Close existing binlog tablespaces and stop the pre-alloc thread. */ innodb_binlog_close(false); + /* Prevent any flushing activity while resetting. */ + binlog_page_fifo->lock_wait_for_idle(); + binlog_page_fifo->reset(); + /* Delete all binlog files in the directory. */ MY_DIR *dir= my_dir(innodb_binlog_directory, MYF(MY_WME)); if (!dir) @@ -2269,6 +2257,7 @@ innodb_reset_binlogs() /* Re-initialize empty binlog state and start the pre-alloc thread. */ innodb_binlog_init_state(); + binlog_page_fifo->unlock(); start_binlog_prealloc_thread(); return err; diff --git a/storage/innobase/include/fsp_binlog.h b/storage/innobase/include/fsp_binlog.h index 0d91e2dcb44..d0773849c62 100644 --- a/storage/innobase/include/fsp_binlog.h +++ b/storage/innobase/include/fsp_binlog.h @@ -31,7 +31,6 @@ InnoDB implementation of binlog. #include "mtr0mtr.h" -struct fil_space_t; struct chunk_data_base; @@ -84,6 +83,94 @@ static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS), "in ALLOWED_NESTED_RECORDS bitmask"); +struct fsp_binlog_page_entry { + fsp_binlog_page_entry *next; + byte *page_buf; + uint32_t latched; + /* + Flag set when the page has been filled, no more data will be added and + it is safe to write out to disk and remove from the FIFO. + */ + bool complete; + /* + Flag set when the page is not yet complete, but all data added so far + have been written out to the file. So the page should not be written + again (until more data is added), but nor can it be removed from the + FIFO yet. + */ + bool flushed_clean; +}; + + +/* + A page FIFO, as a lower-level alternative to the buffer pool used for full + tablespaces. + + Since binlog files are written strictly append-only, we can simply add new + pages at the end and flush them from the beginning. + + ToDo: This is deliberately a naive implementation with single global mutex + and repeated malloc()/free(), as a starting point. Should be improved later + for efficiency and scalability. +*/ +class fsp_binlog_page_fifo { +public: + struct page_list { + fsp_binlog_page_entry *first_page; + uint32_t first_page_no; + uint32_t size_in_pages; + File fh; + }; +private: + mysql_mutex_t m_mutex; + pthread_cond_t m_cond; + std::thread flush_thread_obj; + + /* + The first_file_no is the first valid file in the fifo. The other entry in + the fifo holds (first_file_no+1) if it is not empty. + If first_file_no==~0, then there are no files in the fifo (initial state + just after construction). + */ + uint64_t first_file_no; + page_list fifos[2]; + + bool flushing; + bool flush_thread_started; + bool flush_thread_end; + + +public: + fsp_binlog_page_fifo(); + ~fsp_binlog_page_fifo(); + void reset(); + void start_flush_thread(); + void stop_flush_thread(); + void flush_thread_run(); + void lock_wait_for_idle(); + void unlock() { mysql_mutex_unlock(&m_mutex); } + void create_tablespace(uint64_t file_no, uint32_t size_in_pages, + uint32_t init_page= ~(uint32_t)0, + byte *partial_page= nullptr); + void release_tablespace(uint64_t file_no); + fsp_binlog_page_entry *create_page(uint64_t file_no, uint32_t page_no); + fsp_binlog_page_entry *get_page(uint64_t file_no, uint32_t page_no); + void release_page(uint64_t file_no, uint32_t page_no, + fsp_binlog_page_entry *page); + bool flush_one_page(uint64_t file_no, bool force); + void flush_up_to(uint64_t file_no, uint32_t page_no); + void do_fdatasync(uint64_t file_no); + File get_fh(uint64_t file_no); + uint32_t size_in_pages(uint64_t file_no) { + return fifos[file_no & 1].size_in_pages; + } + void truncate_file_size(uint64_t file_no, uint32_t size_in_pages) + { + fifos[file_no & 1].size_in_pages= size_in_pages; + } +}; + + class binlog_chunk_reader { public: enum chunk_reader_status { @@ -117,15 +204,13 @@ public: bool in_record; } s; - /* The mtr is started iff cur_block is non-NULL. */ - mtr_t mtr; /* After fetch_current_page(), this points into either cur_block or page_buffer as appropriate. */ byte *page_ptr; /* Valid after fetch_current_page(), if page found in buffer pool. */ - buf_block_t *cur_block; + fsp_binlog_page_entry *cur_block; /* Buffer for reading a page directly from a tablespace file. */ byte *page_buffer; /* Amount of data in file, valid after fetch_current_page(). */ @@ -200,22 +285,23 @@ extern uint64_t current_binlog_state_interval; extern mysql_mutex_t active_binlog_mutex; extern pthread_cond_t active_binlog_cond; extern std::atomic active_binlog_file_no; -extern fil_space_t* active_binlog_space; extern uint64_t first_open_binlog_file_no; extern uint64_t last_created_binlog_file_no; -extern fil_space_t *last_created_binlog_space; extern std::atomic binlog_cur_written_offset[2]; extern std::atomic binlog_cur_end_offset[2]; +extern fsp_binlog_page_fifo *binlog_page_fifo; +extern void fsp_log_binlog_write(mtr_t *mtr, uint64_t file_no, uint32_t page_no, + fsp_binlog_page_entry *page, + uint32_t page_offset, uint32_t len); extern void fsp_binlog_init(); extern void fsp_binlog_shutdown(); extern dberr_t fsp_binlog_tablespace_close(uint64_t file_no); -extern fil_space_t *fsp_binlog_open(const char *file_name, pfs_os_file_t fh, - uint64_t file_no, size_t file_size, - bool open_empty); +extern bool fsp_binlog_open(const char *file_name, pfs_os_file_t fh, + uint64_t file_no, size_t file_size, + uint32_t init_page, byte *partial_page); extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no, - uint32_t size_in_pages, - fil_space_t **new_space); + uint32_t size_in_pages); extern std::pair fsp_binlog_write_rec( struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type); extern bool fsp_binlog_flush(); diff --git a/storage/innobase/include/innodb_binlog.h b/storage/innobase/include/innodb_binlog.h index 85f17e5d3ae..540e8dbcdaa 100644 --- a/storage/innobase/include/innodb_binlog.h +++ b/storage/innobase/include/innodb_binlog.h @@ -28,8 +28,6 @@ InnoDB implementation of binlog. #include "fsp_binlog.h" -struct fil_space_t; -struct buf_block_t; struct mtr_t; struct rpl_binlog_state_base; struct rpl_gtid; @@ -97,8 +95,8 @@ extern void innodb_binlog_startup_init(); extern bool innodb_binlog_init(size_t binlog_size, const char *directory); extern void innodb_binlog_close(bool shutdown); extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr, - buf_block_t * &block, uint32_t &page_no, - uint32_t &page_offset, fil_space_t *space); + fsp_binlog_page_entry * &block, uint32_t &page_no, + uint32_t &page_offset, uint64_t file_no); extern bool innodb_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, void **engine_data); extern void innodb_free_oob(THD *thd, void *engine_data); diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h index 6a6d722f135..a9273b6d3b4 100644 --- a/storage/innobase/include/log0log.h +++ b/storage/innobase/include/log0log.h @@ -103,6 +103,10 @@ or the MySQL version that created the redo log file. */ #define LOG_HEADER_CREATOR_END 48 /* @} */ +/** Fake tablespace id for InnoDB-implemented binlog files. */ +#define LOG_BINLOG_ID_0 0xFFFFFFFEU +#define LOG_BINLOG_ID_1 0xFFFFFFFFU + struct log_t; /** File abstraction */ diff --git a/storage/innobase/include/mtr0log.h b/storage/innobase/include/mtr0log.h index bbe81aba363..baea0b0b67c 100644 --- a/storage/innobase/include/mtr0log.h +++ b/storage/innobase/include/mtr0log.h @@ -390,10 +390,12 @@ inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage, static_assert(!(type & 15) && type != RESERVED && type <= FILE_CHECKPOINT, "invalid type"); ut_ad(type >= FILE_CREATE || is_named_space(id.space()) || - id.space() == SRV_SPACE_ID_BINLOG0 || - id.space() == SRV_SPACE_ID_BINLOG1); + id.space() == LOG_BINLOG_ID_0 || + id.space() == LOG_BINLOG_ID_1); ut_ad(!bpage || bpage->id() == id); - ut_ad(id < end_page_id); + ut_ad(id < end_page_id || + id.space() == LOG_BINLOG_ID_0 || + id.space() == LOG_BINLOG_ID_1); constexpr bool have_len= type != INIT_PAGE && type != FREE_PAGE; constexpr bool have_offset= type == WRITE || type == MEMSET || type == MEMMOVE; @@ -404,7 +406,8 @@ inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage, ut_ad(offset + len <= srv_page_size); static_assert(MIN_4BYTE >= UNIV_PAGE_SIZE_MAX, "consistency"); ut_ad(type == FREE_PAGE || type == OPTION || (type == EXTENDED && !bpage) || - memo_contains_flagged(bpage, MTR_MEMO_MODIFY)); + memo_contains_flagged(bpage, MTR_MEMO_MODIFY) || + (type == WRITE && id.space() >= LOG_BINLOG_ID_0)); size_t max_len; if (!have_len) max_len= 1 + 5 + 5; diff --git a/storage/innobase/include/mtr0mtr.h b/storage/innobase/include/mtr0mtr.h index 4ca431711e8..3f5a8832836 100644 --- a/storage/innobase/include/mtr0mtr.h +++ b/storage/innobase/include/mtr0mtr.h @@ -719,6 +719,17 @@ public: static unsigned spin_wait_delay; /** Update finisher when spin_wait_delay is changing to or from 0. */ static void finisher_update(); + + /** Write binlog data + @param space_id binlog tablespace + @param page_no binlog page number + @param offset offset within the page + @param buf data + @param size size of data + @return */ + void write_binlog(bool space_id, uint32_t page_no, uint16_t offset, + const void *buf, size_t size) noexcept; + private: /** Release all latches. */ diff --git a/storage/innobase/include/univ.i b/storage/innobase/include/univ.i index 58e9f23ae52..edf32459d07 100644 --- a/storage/innobase/include/univ.i +++ b/storage/innobase/include/univ.i @@ -483,6 +483,7 @@ extern mysql_pfs_key_t lock_wait_mutex_key; extern mysql_pfs_key_t srv_threads_mutex_key; extern mysql_pfs_key_t fsp_active_binlog_mutex_key; extern mysql_pfs_key_t fsp_purge_binlog_mutex_key; +extern mysql_pfs_key_t fsp_page_fifo_mutex_key; # endif /* UNIV_PFS_MUTEX */ # ifdef UNIV_PFS_RWLOCK diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index d6d70a64eb8..6052ab1727a 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -2379,6 +2379,14 @@ void recv_sys_t::rewind(source &l, source &begin) noexcept pages_it= pages.end(); } +static void binlog_recover_write_data(bool space_id, uint32_t page_no, + uint16_t offset, + lsn_t start_lsn, lsn_t lsn, + const byte *buf, size_t size) noexcept +{} +static void binlog_recover_end(lsn_t lsn) noexcept {} + + /** Parse and register one log_t::FORMAT_10_8 mini-transaction. @tparam storing whether to store the records @param l log data source @@ -2868,6 +2876,23 @@ restart: #endif if (storing == YES) { + if (space_id >= 0xfffffffe) + { + if ((b & 0xf0) != WRITE) + goto record_corrupted; + const size_t olen= mlog_decode_varint_length(*cl); + if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3)) + goto record_corrupted; + const uint32_t offset= mlog_decode_varint(cl); + ut_ad(offset != MLOG_DECODE_ERROR); + if (UNIV_UNLIKELY(offset + rlen - olen >= 65535)) + goto record_corrupted; + binlog_recover_write_data(space_id & 1, page_no, uint16_t(offset), + start_lsn, lsn, + l.get_buf(cl, recs, decrypt_buf) + olen, + l - recs + rlen - olen); + continue; + } if (if_exists) { if (fil_space_t *space= fil_space_t::get(space_id)) @@ -4216,6 +4241,7 @@ static bool recv_scan_log(bool last_phase) ut_ad(!rewound_lsn); ut_ad(recv_sys.lsn >= recv_sys.file_checkpoint); log_sys.set_recovered_lsn(recv_sys.lsn); + binlog_recover_end(recv_sys.lsn); } else if (rewound_lsn) {