1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-07 00:04:31 +03:00

MDEV-34705: Binlog-in-engine: No use of InnoDB tablespace and bufferpool

In preparation for a simplified, lower-level recovery of binlog files
implemented in InnoDB, remove use of InnoDB tablespaces and buffer pool from
the binlog code. Instead, a custom binlog page fifo replaces the general
buffer pool for binlog pages, and tablespaces are replaced by simple file_no
references.

The new binlog page fifo is deliberately naively written in this commit for
simplicity, until the new recovery is complete and proven with tests; later
it can be improved for better efficiency and scalability. This first version
uses a simple global mutex, linear scans of linked lists, repeated
alloc/free of pages, and simple backgrund flush thread that uses
synchroneous pwrite() one page after another. Error handling is also mostly
omitted in this first version.

The page header/footer is not changed in this commit, nor is the pagesize,
to be done in a later patch.

The call to mtr_t::write_binlog() is currently commented-out in function
fsp_log_binlog_write() as it asserts in numerous places. To be enabled when
those asserts are fixed. For the same reason, the code does not yet
implement binlog_write_up_to(lsn_t lsn), to be done once mtr_t operations
are working.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-02-15 21:43:12 +00:00
parent 68f37e6e58
commit 86fbbbe273
15 changed files with 896 additions and 266 deletions

View File

@@ -0,0 +1,21 @@
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
BEGIN;
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
COMMIT;
SELECT * FROM t1 ORDER BY a;
a
1
2
3
DROP TABLE t1;
SELECT @@GLOBAL.binlog_checksum;
@@GLOBAL.binlog_checksum
NONE
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB;
SET SESSION binlog_format= ROW;
*** Do 1500 transactions ...
SET SESSION binlog_format= MIXED;
DROP TABLE t2;

View File

@@ -0,0 +1,13 @@
CREATE TABLE sbtest1(
id INTEGER NOT NULL AUTO_INCREMENT,
k INTEGER DEFAULT '0' NOT NULL,
c CHAR(120) DEFAULT '' NOT NULL,
pad CHAR(60) DEFAULT '' NOT NULL,
PRIMARY KEY (id)
) ENGINE = innodb;
*** Test bug where a large event in trx cache would fail to flush to disk
*** the last part of the IO_CACHE after reinit_io_cache()
SELECT COUNT(*) FROM sbtest1;
COUNT(*)
1000
DROP TABLE sbtest1;

View File

@@ -0,0 +1,58 @@
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
INSERT INTO t1 SELECT seq+1000000+100000*0, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*1, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*2, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*3, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*4, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*5, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*6, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*7, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*8, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+1000000+100000*9, seq FROM seq_1_to_4000;
SELECT @@GLOBAL.gtid_binlog_state;
@@GLOBAL.gtid_binlog_state
0-1-12
# restart
SELECT @@GLOBAL.gtid_binlog_state;
@@GLOBAL.gtid_binlog_state
0-1-12
INSERT INTO t1 VALUES (2, 0);
INSERT INTO t1 SELECT seq+2000000+100000*0, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*1, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*2, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*3, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*4, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*5, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*6, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*7, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*8, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+2000000+100000*9, seq FROM seq_1_to_4000;
SELECT @@GLOBAL.gtid_binlog_state;
@@GLOBAL.gtid_binlog_state
0-1-23
# restart
SELECT @@GLOBAL.gtid_binlog_state;
@@GLOBAL.gtid_binlog_state
0-1-23
INSERT INTO t1 VALUES (3, 0);
INSERT INTO t1 SELECT seq+3000000+100000*0, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*1, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*2, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*3, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*4, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*5, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*6, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*7, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*8, seq FROM seq_1_to_4000;
INSERT INTO t1 SELECT seq+3000000+100000*9, seq FROM seq_1_to_4000;
SHOW BINARY LOGS;
Log_name File_size
binlog-000000.ibb 262144
binlog-000001.ibb 262144
binlog-000002.ibb 262144
binlog-000003.ibb 262144
binlog-000004.ibb 262144
binlog-000005.ibb 262144
DROP TABLE t1;

View File

@@ -42,8 +42,9 @@ while ($i < 10) {
inc $i;
}
--let $binlog_name= binlog-000005.ibb
--let $binlog_size= 262144
--source include/wait_for_engine_binlog.inc
SHOW BINARY LOGS;
--exec $MYSQL_BINLOG --start-position=$gtid_pos3 --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog3.txt
DROP TABLE t1;

View File

@@ -2001,6 +2001,49 @@ static bool log_checkpoint_low(lsn_t oldest_lsn, lsn_t end_lsn) noexcept
return true;
}
static void binlog_write_up_to(lsn_t lsn) noexcept {}
void mtr_t::write_binlog(bool space_id, uint32_t page_no,
uint16_t offset,
const void *buf, size_t size) noexcept
{
/* ToDo: this asserts in multiple places currently. */
return;
ut_ad(!srv_read_only_mode);
ut_ad(m_log_mode == MTR_LOG_ALL);
bool alloc{size < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5)};
byte *end= log_write<WRITE>(page_id_t{0xfffffffe | (uint)space_id, page_no},
nullptr, size, alloc, offset);
if (alloc)
{
::memcpy(end, buf, size);
m_log.close(end + size);
}
else
{
m_log.close(end);
m_log.push(static_cast<const byte*>(buf), uint32_t(size));
}
}
/** Write binlog data
@param space_id binlog tablespace
@param page_no binlog page number
@param offset offset within the page
@param buf data
@param size size of data
@return start LSN of the mini-transaction */
lsn_t binlog_write_data(bool space_id, uint32_t page_no, uint16_t offset,
const void *buf, size_t size) noexcept
{
mtr_t mtr;
mtr.start();
mtr.write_binlog(space_id, page_no, offset, buf, size);
mtr.commit();
return mtr.commit_lsn();
}
/** Make a checkpoint. Note that this function does not flush dirty
blocks from the buffer pool: it only checks what is lsn of the oldest
modification in the pool, and writes information about the lsn in
@@ -2021,11 +2064,13 @@ static bool log_checkpoint() noexcept
#endif
fil_flush_file_spaces();
binlog_write_up_to(log_sys.get_lsn());
log_sys.latch.wr_lock(SRW_LOCK_CALL);
const lsn_t end_lsn= log_sys.get_lsn();
mysql_mutex_lock(&buf_pool.flush_list_mutex);
const lsn_t oldest_lsn= buf_pool.get_oldest_modification(end_lsn);
// FIXME: limit oldest_lsn below binlog split write LSN
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
return log_checkpoint_low(oldest_lsn, end_lsn);
}
@@ -2209,12 +2254,14 @@ static void buf_flush_sync_for_checkpoint(lsn_t lsn) noexcept
os_aio_wait_until_no_pending_writes(false);
fil_flush_file_spaces();
binlog_write_up_to(log_sys.get_lsn());
log_sys.latch.wr_lock(SRW_LOCK_CALL);
const lsn_t newest_lsn= log_sys.get_lsn();
mysql_mutex_lock(&buf_pool.flush_list_mutex);
lsn_t measure= buf_pool.get_oldest_modification(0);
const lsn_t checkpoint_lsn= measure ? measure : newest_lsn;
// FIXME: limit checkpoint_lsn below binlog split write LSN
if (!recv_recovery_is_on() &&
checkpoint_lsn > log_sys.last_checkpoint_lsn + SIZE_OF_FILE_CHECKPOINT)

View File

@@ -32,8 +32,6 @@ InnoDB implementation of binlog.
#include "log.h"
static buf_block_t *binlog_cur_block;
/*
How often (in terms of bytes written) to dump a (differential) binlog state
at the start of the page, to speed up finding the initial GTID position for
@@ -47,14 +45,13 @@ static buf_block_t *binlog_cur_block;
uint64_t current_binlog_state_interval;
/*
Mutex protecting active_binlog_file_no and active_binlog_space.
Mutex protecting active_binlog_file_no.
*/
mysql_mutex_t active_binlog_mutex;
pthread_cond_t active_binlog_cond;
/* The currently being written binlog tablespace. */
std::atomic<uint64_t> active_binlog_file_no;
fil_space_t* active_binlog_space;
/*
The first binlog tablespace that is still open.
@@ -73,7 +70,6 @@ uint64_t first_open_binlog_file_no;
next tablespace is still pending.
*/
uint64_t last_created_binlog_file_no;
fil_space_t *last_created_binlog_space;
/*
Point at which it is guaranteed that all data has been written out to the
@@ -90,6 +86,473 @@ std::atomic<uint64_t> binlog_cur_written_offset[2];
*/
std::atomic<uint64_t> binlog_cur_end_offset[2];
fsp_binlog_page_fifo *binlog_page_fifo;
fsp_binlog_page_entry *
fsp_binlog_page_fifo::create_page(uint64_t file_no, uint32_t page_no)
{
mysql_mutex_lock(&m_mutex);
ut_ad(first_file_no != ~(uint64_t)0);
ut_a(file_no == first_file_no || file_no == first_file_no + 1);
page_list *pl= &fifos[file_no & 1];
fsp_binlog_page_entry **next_ptr_ptr= &pl->first_page;
uint32_t entry_page_no= pl->first_page_no;
/* Can only add a page at the end of the list. */
while (*next_ptr_ptr)
{
next_ptr_ptr= &((*next_ptr_ptr)->next);
++entry_page_no;
}
ut_a(page_no == entry_page_no);
fsp_binlog_page_entry *e= (fsp_binlog_page_entry *)ut_malloc(sizeof(*e), mem_key_binlog);
ut_a(e);
e->next= nullptr;
e->page_buf= static_cast<byte*>(aligned_malloc(srv_page_size, srv_page_size));
ut_a(e->page_buf);
memset(e->page_buf, 0, srv_page_size);
e->latched= 1;
e->complete= false;
e->flushed_clean= false;
*next_ptr_ptr= e;
mysql_mutex_unlock(&m_mutex);
return e;
}
fsp_binlog_page_entry *
fsp_binlog_page_fifo::get_page(uint64_t file_no, uint32_t page_no)
{
fsp_binlog_page_entry *res= nullptr;
page_list *pl;
fsp_binlog_page_entry *p;
uint32_t entry_page_no;
mysql_mutex_lock(&m_mutex);
ut_ad(first_file_no != ~(uint64_t)0);
ut_a(file_no <= first_file_no + 1);
if (file_no < first_file_no)
goto end;
pl= &fifos[file_no & 1];
p= pl->first_page;
entry_page_no= pl->first_page_no;
if (!p || page_no < entry_page_no)
goto end;
while (p)
{
if (page_no == entry_page_no)
{
/* Found the page. */
++p->latched;
res= p;
break;
}
p= p->next;
++entry_page_no;
}
end:
mysql_mutex_unlock(&m_mutex);
return res;
}
void
fsp_binlog_page_fifo::release_page(uint64_t file_no, uint32_t page_no,
fsp_binlog_page_entry *page)
{
mysql_mutex_lock(&m_mutex);
ut_a(page->latched > 0);
if (--page->latched == 0)
pthread_cond_signal(&m_cond);
mysql_mutex_unlock(&m_mutex);
}
/*
Flush (write to disk) the first unflushed page in a file.
Returns true when the last page has been flushed.
Must be called with m_mutex held.
If called with force=true, will flush even any final, incomplete page.
Otherwise such page will not be written out. Any final, incomplete page
is left in the FIFO in any case.
*/
bool
fsp_binlog_page_fifo::flush_one_page(uint64_t file_no, bool force)
{
page_list *pl;
fsp_binlog_page_entry *e;
mysql_mutex_assert_owner(&m_mutex);
/*
Wait for the FIFO to be not flushing from another thread, and for the
first page to not be latched.
*/
for (;;)
{
/*
Let's make page not present not an error, to allow races where someone else
flushed the page ahead of us.
*/
if (file_no < first_file_no)
return true;
ut_a(file_no <= first_file_no + 1);
if (!flushing)
{
pl= &fifos[file_no & 1];
e= pl->first_page;
if (!e)
return true;
if (e->latched == 0)
break;
}
my_cond_wait(&m_cond, &m_mutex.m_mutex);
}
flushing= true;
uint32_t page_no= pl->first_page_no;
mysql_mutex_unlock(&m_mutex);
ut_ad(e->complete || !e->next);
if (e->complete || (force && !e->flushed_clean))
{
File fh= get_fh(file_no);
ut_a(pl->fh >= (File)0);
size_t res= my_pwrite(fh, e->page_buf, srv_page_size,
(uint64_t)page_no << srv_page_size_shift,
MYF(MY_WME));
ut_a(res == srv_page_size);
e->flushed_clean= true;
}
mysql_mutex_lock(&m_mutex);
/*
We marked the FIFO as flushing, page could not have disappeared despite
releasing the mutex during the I/O.
*/
ut_ad(flushing);
bool done= (e->next == nullptr);
if (e->complete)
{
pl->first_page= e->next;
pl->first_page_no= page_no + 1;
aligned_free(e->page_buf);
ut_free(e);
}
else
done= true; /* Cannot flush past final incomplete page. */
flushing= false;
pthread_cond_signal(&m_cond);
return done;
}
void
fsp_binlog_page_fifo::flush_up_to(uint64_t file_no, uint32_t page_no)
{
mysql_mutex_lock(&m_mutex);
for (;;)
{
if (file_no < first_file_no ||
(file_no == first_file_no && fifos[file_no & 1].first_page_no > page_no))
break;
uint64_t file_no_to_flush= file_no;
/* Flush the prior file to completion first. */
if (file_no == first_file_no + 1 && fifos[(file_no - 1) & 1].first_page)
file_no_to_flush= file_no - 1;
bool done= flush_one_page(file_no_to_flush, true);
if (done && file_no == file_no_to_flush)
break;
}
mysql_mutex_unlock(&m_mutex);
}
void
fsp_binlog_page_fifo::do_fdatasync(uint64_t file_no)
{
mysql_mutex_lock(&m_mutex);
ut_a(file_no == first_file_no || file_no == first_file_no + 1);
File fh= fifos[file_no & 1].fh;
if (fh != (File)-1)
{
while (flushing)
my_cond_wait(&m_cond, &m_mutex.m_mutex);
flushing= true;
mysql_mutex_unlock(&m_mutex);
int res= my_sync(fh, MYF(MY_WME));
ut_a(!res);
mysql_mutex_lock(&m_mutex);
flushing= false;
pthread_cond_signal(&m_cond);
}
mysql_mutex_unlock(&m_mutex);
}
File
fsp_binlog_page_fifo::get_fh(uint64_t file_no)
{
File fh= fifos[file_no & 1].fh;
if (fh == (File)-1)
{
char filename[OS_FILE_MAX_PATH];
binlog_name_make(filename, file_no);
fifos[file_no & 1].fh= fh= my_open(filename, O_RDWR | O_BINARY, MYF(MY_WME));
}
return fh;
}
/*
If init_page is not ~(uint32_t)0, then it is the page to continue writing
when re-opening existing binlog at server startup.
If in addition, partial_page is non-NULL, it is an (aligned) page buffer
containing the partial data of page init_page.
If init_page is set but partial_page is not, then init_page is the first,
empty page in the tablespace to create and start writing to.
*/
void
fsp_binlog_page_fifo::create_tablespace(uint64_t file_no,
uint32_t size_in_pages,
uint32_t init_page,
byte *partial_page)
{
mysql_mutex_lock(&m_mutex);
ut_ad(init_page == ~(uint32_t)0 ||
first_file_no == ~(uint64_t)0 ||
/* At server startup allow opening N empty and (N-1) partial. */
(init_page != ~(uint32_t)0 && file_no + 1 == first_file_no &&
!fifos[first_file_no & 1].first_page));
ut_a(first_file_no == ~(uint64_t)0 ||
file_no == first_file_no + 1 ||
file_no == first_file_no + 2 ||
(init_page != ~(uint32_t)0 && file_no + 1 == first_file_no &&
!fifos[first_file_no & 1].first_page));
if (first_file_no == ~(uint64_t)0)
{
first_file_no= file_no;
}
else if (UNIV_UNLIKELY(file_no + 1 == first_file_no))
first_file_no= file_no;
else if (file_no == first_file_no + 2)
{
/* All pages in (N-2) must be flushed before doing (N). */
ut_a(!fifos[file_no & 1].first_page);
if (fifos[file_no & 1].fh != (File)-1)
my_close(fifos[file_no & 1].fh, MYF(0));
first_file_no= file_no - 1;
}
if (init_page != ~(uint32_t)0)
{
if (partial_page)
{
fsp_binlog_page_entry *e=
(fsp_binlog_page_entry *)ut_malloc(sizeof(*e), mem_key_binlog);
ut_a(e);
e->next= nullptr;
e->page_buf=
static_cast<byte*>(aligned_malloc(srv_page_size, srv_page_size));
ut_a(e->page_buf);
memcpy(e->page_buf, partial_page, srv_page_size);
e->latched= 0;
e->complete= false;
e->flushed_clean= true;
fifos[file_no & 1].first_page= e;
}
else
fifos[file_no & 1].first_page= nullptr;
fifos[file_no & 1].first_page_no= init_page;
}
else
{
fifos[file_no & 1].first_page= nullptr;
fifos[file_no & 1].first_page_no= 0;
}
fifos[file_no & 1].fh= (File)-1;
fifos[file_no & 1].size_in_pages= size_in_pages;
mysql_mutex_unlock(&m_mutex);
}
void
fsp_binlog_page_fifo::release_tablespace(uint64_t file_no)
{
mysql_mutex_lock(&m_mutex);
ut_a(file_no == first_file_no);
ut_a(!fifos[file_no & 1].first_page ||
/* Allow a final, incomplete-but-fully-flushed page in the fifo. */
(!fifos[file_no & 1].first_page->complete &&
fifos[file_no & 1].first_page->flushed_clean &&
!fifos[file_no & 1].first_page->next &&
!fifos[(file_no + 1) & 1].first_page));
if (fifos[file_no & 1].fh != (File)-1)
{
while (flushing)
my_cond_wait(&m_cond, &m_mutex.m_mutex);
flushing= true;
File fh= fifos[file_no & 1].fh;
mysql_mutex_unlock(&m_mutex);
int res= my_sync(fh, MYF(MY_WME));
ut_a(!res);
my_close(fifos[file_no & 1].fh, MYF(0));
mysql_mutex_lock(&m_mutex);
flushing= false;
pthread_cond_signal(&m_cond);
}
first_file_no= file_no + 1;
fifos[file_no & 1].first_page= nullptr;
fifos[file_no & 1].first_page_no= 0;
fifos[file_no & 1].size_in_pages= 0;
fifos[file_no & 1].fh= (File)-1;
mysql_mutex_unlock(&m_mutex);
}
fsp_binlog_page_fifo::fsp_binlog_page_fifo()
: first_file_no(~(uint64_t)0), flushing(false),
flush_thread_started(false), flush_thread_end(false)
{
fifos[0]= {nullptr, 0, 0, (File)-1 };
fifos[1]= {nullptr, 0, 0, (File)-1 };
mysql_mutex_init(fsp_page_fifo_mutex_key, &m_mutex, nullptr);
pthread_cond_init(&m_cond, nullptr);
// ToDo I think I need to read the first page here, or somewhere?
// Normally I'd never want to read a page into the page fifo, but at startup, I seem to need to do so for the first page I start writing on. Though I suppose I already read that, so maybe just a way to add that page into the FIFO in the constructor?
}
void
fsp_binlog_page_fifo::reset()
{
ut_ad(!flushing);
for (uint32_t i= 0; i < 2; ++i)
{
if (fifos[i].fh != (File)-1)
my_close(fifos[i].fh, MYF(0));
fsp_binlog_page_entry *e= fifos[i].first_page;
while (e)
{
fsp_binlog_page_entry *next= e->next;
aligned_free(e->page_buf);
ut_free(e);
e= next;
}
fifos[i]= {nullptr, 0, 0, (File)-1 };
}
first_file_no= ~(uint64_t)0;
}
fsp_binlog_page_fifo::~fsp_binlog_page_fifo()
{
ut_ad(!flushing);
reset();
mysql_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
void
fsp_binlog_page_fifo::lock_wait_for_idle()
{
mysql_mutex_lock(&m_mutex);
while(flushing)
my_cond_wait(&m_cond, &m_mutex.m_mutex);
}
void
fsp_binlog_page_fifo::start_flush_thread()
{
flush_thread_started= false;
flush_thread_end= false;
flush_thread_obj= std::thread{ [this] { flush_thread_run(); } };
mysql_mutex_lock(&m_mutex);
while (!flush_thread_started)
my_cond_wait(&m_cond, &m_mutex.m_mutex);
mysql_mutex_unlock(&m_mutex);
}
void
fsp_binlog_page_fifo::stop_flush_thread()
{
if (!flush_thread_started)
return;
mysql_mutex_lock(&m_mutex);
flush_thread_end= true;
pthread_cond_signal(&m_cond);
while (flush_thread_started)
my_cond_wait(&m_cond, &m_mutex.m_mutex);
mysql_mutex_unlock(&m_mutex);
flush_thread_obj.join();
}
void
fsp_binlog_page_fifo::flush_thread_run()
{
mysql_mutex_lock(&m_mutex);
flush_thread_started= true;
pthread_cond_signal(&m_cond);
while (!flush_thread_end)
{
/*
Flush pages one by one as long as there are more pages pending.
Once all have been flushed, wait for more pages to become pending.
Don't try to force flush a final page that is not yet completely
filled with data.
*/
uint64_t file_no= first_file_no;
bool all_flushed= true;
if (first_file_no != ~(uint64_t)0)
{
all_flushed= flush_one_page(file_no, false);
if (all_flushed)
all_flushed= flush_one_page(file_no + 1, false);
}
if (all_flushed)
my_cond_wait(&m_cond, &m_mutex.m_mutex);
}
flush_thread_started= false;
pthread_cond_signal(&m_cond);
mysql_mutex_unlock(&m_mutex);
}
void
fsp_log_binlog_write(mtr_t *mtr, uint64_t file_no, uint32_t page_no,
fsp_binlog_page_entry *page, uint32_t page_offset,
uint32_t len)
{
if (page_offset + len >= srv_page_size - FIL_PAGE_DATA_END)
page->complete= true;
if (page->flushed_clean)
{
/*
If the page with partial data has been written to the file system, then
redo log all the data on the page, to be sure we can still recover the
entire page reliably even if the latest checkpoint is after that partial
write.
*/
len= page_offset + len;
page_offset= 0;
page->flushed_clean= false;
}
mtr->write_binlog(LOG_BINLOG_ID_0 + (file_no & 1), page_no,
(uint16_t)page_offset, page_offset + &page->page_buf[0],
len);
}
/*
Initialize the InnoDB implementation of binlog.
@@ -101,12 +564,16 @@ fsp_binlog_init()
{
mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr);
pthread_cond_init(&active_binlog_cond, nullptr);
binlog_page_fifo= new fsp_binlog_page_fifo();
binlog_page_fifo->start_flush_thread();
}
void
fsp_binlog_shutdown()
{
binlog_page_fifo->stop_flush_thread();
delete binlog_page_fifo;
pthread_cond_destroy(&active_binlog_cond);
mysql_mutex_destroy(&active_binlog_mutex);
}
@@ -118,48 +585,10 @@ fsp_binlog_shutdown()
dberr_t
fsp_binlog_tablespace_close(uint64_t file_no)
{
mtr_t mtr;
dberr_t res;
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1);
mysql_mutex_lock(&fil_system.mutex);
fil_space_t *space= fil_space_get_by_id(space_id);
mysql_mutex_unlock(&fil_system.mutex);
if (!space) {
res= DB_ERROR;
goto end;
}
/*
Write out any remaining pages in the buffer pool to the binlog tablespace.
Then flush the file to disk, and close the old tablespace.
ToDo: Will this turn into a busy-wait if some of the pages are still latched
in this tablespace, maybe because even though the tablespace has been
written full, the mtr that's ending in the next tablespace may still be
active? This will need fixing, no busy-wait should be done here.
*/
/*
Take and release an exclusive latch on the last page in the tablespace to
be closed. We might be signalled that the tablespace is done while the mtr
completing the tablespace write is still active; the exclusive latch will
ensure we wait for any last mtr to commit before we close the tablespace.
*/
mtr.start();
buf_page_get_gen(page_id_t{space_id, space->size - 1}, 0, RW_X_LATCH, nullptr,
BUF_GET, &mtr, &res);
mtr.commit();
while (buf_flush_list_space(space))
;
// ToDo: Also, buf_flush_list_space() seems to use io_capacity, but that's not appropriate here perhaps
os_aio_wait_until_no_pending_writes(false);
space->flush<false>();
fil_space_free(space_id, false);
res= DB_SUCCESS;
end:
return res;
binlog_page_fifo->flush_up_to(file_no, ~(uint32_t)0);
/* release_tablespace() will fdatasync() the file first. */
binlog_page_fifo->release_tablespace(file_no);
return DB_SUCCESS;
}
@@ -167,15 +596,16 @@ end:
Open an existing tablespace. The filehandle fh is taken over by the tablespace
(or closed in case of error).
*/
fil_space_t *
bool
fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
uint64_t file_no, size_t file_size, bool open_empty)
uint64_t file_no, size_t file_size,
uint32_t init_page, byte *partial_page)
{
const uint32_t page_size= (uint32_t)srv_page_size;
const uint32_t page_size_shift= srv_page_size_shift;
os_offset_t binlog_size= innodb_binlog_size_in_pages << srv_page_size_shift;
if (open_empty && file_size < binlog_size) {
if (init_page == ~(uint32_t)0 && file_size < binlog_size) {
/*
A crash may have left a partially pre-allocated file. If so, extend it
to the required size.
@@ -197,81 +627,30 @@ fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
"should be at least %u bytes",
file_no, file_size, 2*page_size);
os_file_close(fh);
return nullptr;
return true;
}
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1);
if (!open_empty) {
page_t *page_buf= static_cast<byte*>(aligned_malloc(page_size, page_size));
if (!page_buf) {
binlog_page_fifo->create_tablespace(file_no,
(uint32_t)(file_size >> page_size_shift),
init_page, partial_page);
os_file_close(fh);
return nullptr;
}
dberr_t err= os_file_read(IORequestRead, fh, page_buf, 0, page_size, nullptr);
if (err != DB_SUCCESS) {
sql_print_warning("Unable to read first page of file '%s'", file_name);
aligned_free(page_buf);
os_file_close(fh);
return nullptr;
}
/* ToDo: Maybe use leaner page format for binlog tablespace? */
uint32_t id1= mach_read_from_4(FIL_PAGE_SPACE_ID + page_buf);
if (id1 != space_id) {
sql_print_warning("Binlog file %s has inconsistent tablespace id %u "
"(expected %u)", file_name, id1, space_id);
aligned_free(page_buf);
os_file_close(fh);
return nullptr;
}
// ToDo: should we here check buf_page_is_corrupted() ?
aligned_free(page_buf);
}
uint32_t fsp_flags=
FSP_FLAGS_FCRC32_MASK_MARKER | FSP_FLAGS_FCRC32_PAGE_SSIZE();
/* ToDo: Enryption. */
fil_encryption_t mode= FIL_ENCRYPTION_OFF;
fil_space_crypt_t* crypt_data= nullptr;
fil_space_t *space;
mysql_mutex_lock(&fil_system.mutex);
if (!(space= fil_space_t::create(space_id, fsp_flags, false, crypt_data,
mode, true))) {
mysql_mutex_unlock(&fil_system.mutex);
os_file_close(fh);
return nullptr;
}
space->add(file_name, fh, (uint32_t)(file_size >> page_size_shift),
false, true);
first_open_binlog_file_no= file_no;
if (last_created_binlog_file_no == ~(uint64_t)0 ||
file_no > last_created_binlog_file_no) {
last_created_binlog_file_no= file_no;
last_created_binlog_space= space;
}
mysql_mutex_unlock(&fil_system.mutex);
return space;
return false;
}
/** Create a binlog tablespace file
@param[in] file_no Index of the binlog tablespace
@param[out] new_space The newly created tablespace
@return DB_SUCCESS or error code */
dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages,
fil_space_t **new_space)
dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages)
{
pfs_os_file_t fh;
bool ret;
*new_space= nullptr;
if(srv_read_only_mode)
return DB_ERROR;
@@ -291,8 +670,6 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages,
}
/* ToDo: Enryption? */
fil_encryption_t mode= FIL_ENCRYPTION_OFF;
fil_space_crypt_t* crypt_data= nullptr;
/* We created the binlog file and now write it full of zeros */
if (!os_file_set_size(name, fh,
@@ -304,24 +681,8 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, uint32_t size_in_pages,
return DB_ERROR;
}
mysql_mutex_lock(&fil_system.mutex);
/* ToDo: Need to ensure file (N-2) is no longer active before creating (N). */
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1);
if (!(*new_space= fil_space_t::create(space_id,
( FSP_FLAGS_FCRC32_MASK_MARKER |
FSP_FLAGS_FCRC32_PAGE_SSIZE()),
false, crypt_data,
mode, true))) {
mysql_mutex_unlock(&fil_system.mutex);
binlog_page_fifo->create_tablespace(file_no, size_in_pages);
os_file_close(fh);
os_file_delete(innodb_data_file_key, name);
return DB_ERROR;
}
fil_node_t* node = (*new_space)->add(name, fh, size_in_pages,
false, true);
node->find_metadata();
mysql_mutex_unlock(&fil_system.mutex);
return DB_SUCCESS;
}
@@ -341,12 +702,11 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
{
uint32_t page_size= (uint32_t)srv_page_size;
uint32_t page_size_shift= srv_page_size_shift;
fil_space_t *space= active_binlog_space;
const uint32_t page_end= page_size - FIL_PAGE_DATA_END;
uint32_t page_no= binlog_cur_page_no;
uint32_t page_offset= binlog_cur_page_offset;
/* ToDo: What is the lifetime of what's pointed to by binlog_cur_block, is there some locking needed around it or something? */
buf_block_t *block= binlog_cur_block;
fsp_binlog_page_entry *block= nullptr;
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
uint64_t pending_prev_end_offset= 0;
uint64_t start_file_no= 0;
@@ -359,7 +719,7 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
byte cont_flag= 0;
for (;;) {
if (page_offset == FIL_PAGE_DATA) {
if (UNIV_UNLIKELY(page_no >= space->size)) {
if (UNIV_UNLIKELY(page_no >= binlog_page_fifo->size_in_pages(file_no))) {
/*
Signal to the pre-allocation thread that this tablespace has been
written full, so that it can be closed and a new one pre-allocated
@@ -392,7 +752,6 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
++file_no;
binlog_cur_written_offset[file_no & 1].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 1].store(0, std::memory_order_relaxed);
active_binlog_space= space= last_created_binlog_space;
pthread_cond_signal(&active_binlog_cond);
mysql_mutex_unlock(&active_binlog_mutex);
binlog_cur_page_no= page_no= 0;
@@ -452,34 +811,35 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
}
}
err= binlog_gtid_state(&full_state, mtr, block, page_no,
page_offset, space);
page_offset, file_no);
ut_a(!err /* ToDo error handling */);
ut_ad(block);
full_state.free();
binlog_diff_state.reset_nolock();
} else {
bool err= binlog_gtid_state(&binlog_diff_state, mtr, block, page_no,
page_offset, space);
page_offset, file_no);
ut_a(!err /* ToDo error handling */);
}
} else
block= fsp_page_create(space, page_no, mtr);
block= binlog_page_fifo->create_page(file_no, page_no);
} else {
dberr_t err;
/* ToDo: Is RW_SX_LATCH appropriate here? */
block= buf_page_get_gen(page_id_t{space->id, page_no},
0, RW_SX_LATCH, block,
BUF_GET, mtr, &err);
ut_a(err == DB_SUCCESS);
block= binlog_page_fifo->get_page(file_no, page_no);
}
ut_ad(page_offset < page_end);
uint32_t page_remain= page_end - page_offset;
byte *ptr= page_offset + block->page.frame;
byte *ptr= page_offset + &block->page_buf[0];
/* ToDo: Do this check at the end instead, to save one buf_page_get_gen()? */
if (page_remain < 4) {
/* Pad the remaining few bytes, and move to next page. */
mtr->memset(block, page_offset, page_remain, FSP_BINLOG_TYPE_FILLER);
if (UNIV_LIKELY(page_remain > 0))
{
memset(ptr, FSP_BINLOG_TYPE_FILLER, page_remain);
fsp_log_binlog_write(mtr, file_no, page_no, block, page_offset,
page_remain);
}
binlog_page_fifo->release_page(file_no, page_no, block);
block= nullptr;
++page_no;
page_offset= FIL_PAGE_DATA;
@@ -513,9 +873,10 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
ptr[2]= (byte)(size >> 8);
ut_ad(size <= 0xffff);
mtr->memcpy(*block, page_offset, size+3);
fsp_log_binlog_write(mtr, file_no, page_no, block, page_offset, size + 3);
cont_flag= FSP_BINLOG_FLAG_CONT;
if (page_remain == 0) {
binlog_page_fifo->release_page(file_no, page_no, block);
block= nullptr;
page_offset= FIL_PAGE_DATA;
++page_no;
@@ -525,7 +886,8 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
if (size_last.second)
break;
}
binlog_cur_block= block;
if (block)
binlog_page_fifo->release_page(file_no, page_no, block);
binlog_cur_page_no= page_no;
binlog_cur_page_offset= page_offset;
if (UNIV_UNLIKELY(pending_prev_end_offset != 0))
@@ -571,40 +933,66 @@ bool
fsp_binlog_flush()
{
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1);
uint32_t page_no= binlog_cur_page_no;
fil_space_t *space= active_binlog_space;
chunk_data_flush dummy_data;
mtr_t mtr;
mysql_mutex_lock(&purge_binlog_mutex);
mtr.start();
mtr.x_lock_space(space);
/*
ToDo: Here, if we are already at precisely the end of a page, we need not
fill up that page with a dummy record, we can just truncate the tablespace
to that point. But then we need to handle an assertion m_modifications!=0
in mtr_t::commit_shrink().
*/
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY);
if (page_no + 1 < space->size)
binlog_page_fifo->lock_wait_for_idle();
File fh= binlog_page_fifo->get_fh(file_no);
if (fh == (File)-1)
{
mtr.trim_pages(page_id_t(space_id, page_no + 1));
mtr.commit_shrink(*space, page_no + 1);
binlog_page_fifo->unlock();
mysql_mutex_unlock(&purge_binlog_mutex);
return true;
}
size_t reclaimed= (space->size - (page_no + 1)) << srv_page_size_shift;
if (my_chsize(fh, ((uint64_t)page_no + 1) << srv_page_size_shift, 0,
MYF(MY_WME)))
{
binlog_page_fifo->unlock();
mysql_mutex_unlock(&purge_binlog_mutex);
return true;
}
/*
Sync the truncate to disk. This way, if we crash after this we are sure the
truncate has been effected so we do not put the filler record in what is
then the middle of the file. If we crash before the truncate is durable, we
just come up as if the flush has never happened. If we crash with the
truncate durable but without the filler record, that is not a problem, the
binlog file will just be shorter.
*/
my_sync(fh, MYF(0));
binlog_page_fifo->unlock();
uint32_t page_offset= binlog_cur_page_offset;
if (page_offset > FIL_PAGE_DATA ||
page_offset < srv_page_size - FIL_PAGE_DATA_END)
{
/*
If we are not precisely the end of a page, fill up that page with a dummy
record. Otherwise the zeros at the end of the page would be detected as
end-of-file of the entire binlog.
*/
mtr.start();
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY);
mtr.commit();
}
if (page_no + 1 < binlog_page_fifo->size_in_pages(file_no))
{
binlog_page_fifo->truncate_file_size(file_no, page_no + 1);
size_t reclaimed= (binlog_page_fifo->size_in_pages(file_no) - (page_no + 1))
<< srv_page_size_shift;
if (UNIV_LIKELY(total_binlog_used_size >= reclaimed))
total_binlog_used_size-= reclaimed;
else
ut_ad(0);
}
else
mtr.commit();
/* Flush out all pages in the (now filled-up) tablespace. */
while (buf_flush_list_space(space))
;
binlog_page_fifo->flush_up_to(file_no, page_no);
mysql_mutex_unlock(&purge_binlog_mutex);
@@ -650,8 +1038,8 @@ binlog_chunk_reader::read_error_corruption(uint64_t file_no, uint64_t page_no,
/*
Obtain the data on the page currently pointed to by the chunk reader. The
page is either latched in the buffer pool (starting an mtr), or read from
the file into the page buffer.
page is either latched in the page fifo, or read from the file into the page
buffer.
The code does a dirty read of active_binlog_file_no to determine if the page
is known to be available to read from the file, or if it should be looked up
@@ -664,11 +1052,10 @@ binlog_chunk_reader::read_error_corruption(uint64_t file_no, uint64_t page_no,
enum binlog_chunk_reader::chunk_reader_status
binlog_chunk_reader::fetch_current_page()
{
ut_ad(!cur_block /* Must have no active mtr */);
ut_ad(!cur_block /* Must have no active page latch */);
uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire);
for (;;) {
buf_block_t *block= nullptr;
bool mtr_started= false;
fsp_binlog_page_entry *block= nullptr;
uint64_t offset= (s.page_no << srv_page_size_shift) | s.in_page_offset;
uint64_t active= active2;
uint64_t end_offset=
@@ -687,24 +1074,13 @@ binlog_chunk_reader::fetch_current_page()
if (s.file_no + 1 >= active) {
/* Check if we should read from the buffer pool or from the file. */
if (end_offset != ~(uint64_t)0 && offset < end_offset) {
mtr.start();
mtr_started= true;
/*
ToDo: Should we keep track of the last block read and use it as a
hint? Will be mainly useful when reading the partially written active
page at the current end of the active binlog, which might be a common
case.
*/
buf_block_t *hint_block= nullptr;
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(s.file_no & 1);
dberr_t err= DB_SUCCESS;
block= buf_page_get_gen(page_id_t{space_id, s.page_no}, 0,
RW_S_LATCH, hint_block, BUF_GET_IF_IN_POOL,
&mtr, &err);
if (err != DB_SUCCESS) {
mtr.commit();
return CHUNK_READER_ERROR;
}
block= binlog_page_fifo->get_page(s.file_no, s.page_no);
}
active2= active_binlog_file_no.load(std::memory_order_acquire);
if (UNIV_UNLIKELY(active2 != active)) {
@@ -713,13 +1089,10 @@ binlog_chunk_reader::fetch_current_page()
have gotten invalid end_offset or a buffer pool page from a wrong
tablespace. So just try again.
*/
if (mtr_started)
mtr.commit();
continue;
}
cur_end_offset= end_offset;
if (offset >= end_offset) {
ut_ad(!mtr_started);
if (s.file_no == active) {
/* Reached end of the currently active binlog file -> EOF. */
return CHUNK_READER_EOF;
@@ -727,13 +1100,10 @@ binlog_chunk_reader::fetch_current_page()
}
if (block) {
cur_block= block;
ut_ad(mtr_started);
page_ptr= block->page.frame;
page_ptr= block->page_buf;
return CHUNK_READER_FOUND;
} else {
/* Not in buffer pool, just read it from the file. */
if (mtr_started)
mtr.commit();
/* Fall through to read from file. */
}
}
@@ -948,11 +1318,12 @@ skip_chunk:
{
go_next_page:
/* End of page reached, move to the next page. */
uint32_t block_page_no= s.page_no;
++s.page_no;
page_ptr= nullptr;
if (cur_block)
{
mtr.commit();
binlog_page_fifo->release_page(s.file_no, block_page_no, cur_block);
cur_block= nullptr;
}
s.in_page_offset= 0;
@@ -992,7 +1363,7 @@ binlog_chunk_reader::restore_pos(binlog_chunk_reader::saved_position *pos)
/* Seek to a different page, release any current page. */
if (cur_block)
{
mtr.commit();
binlog_page_fifo->release_page(s.file_no, s.page_no, cur_block);
cur_block= nullptr;
}
page_ptr= nullptr;
@@ -1023,7 +1394,7 @@ void binlog_chunk_reader::release(bool release_file_page)
{
if (cur_block)
{
mtr.commit();
binlog_page_fifo->release_page(s.file_no, s.page_no, cur_block);
cur_block= nullptr;
page_ptr= nullptr;
}

View File

@@ -550,6 +550,7 @@ mysql_pfs_key_t srv_threads_mutex_key;
mysql_pfs_key_t tpool_cache_mutex_key;
mysql_pfs_key_t fsp_active_binlog_mutex_key;
mysql_pfs_key_t fsp_purge_binlog_mutex_key;
mysql_pfs_key_t fsp_page_fifo_mutex_key;
/* all_innodb_mutexes array contains mutexes that are
performance schema instrumented if "UNIV_PFS_MUTEX"

View File

@@ -230,8 +230,6 @@ class ha_innodb_binlog_reader : public handler_binlog_reader {
/* Used to read the header of the commit record. */
byte rd_buf[5*COMPR_INT_MAX64];
private:
int read_from_buffer_pool_page(buf_block_t *block, uint64_t end_offset,
uchar *buf, uint32_t len);
int read_from_file(uint64_t end_offset, uchar *buf, uint32_t len);
int read_from_page(uchar *page_ptr, uint64_t end_offset,
uchar *buf, uint32_t len);
@@ -463,7 +461,6 @@ innodb_binlog_init_state()
earliest_binlog_file_no= ~(uint64_t)0;
total_binlog_used_size= 0;
active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release);
active_binlog_space= nullptr;
binlog_cur_page_no= 0;
binlog_cur_page_offset= FIL_PAGE_DATA;
current_binlog_state_interval= innodb_binlog_state_interval;
@@ -597,7 +594,6 @@ binlog_page_empty(const byte *page)
static int
find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
fil_space_t **out_space,
uint32_t *out_page_no, uint32_t *out_pos_in_page)
{
const uint32_t page_size= (uint32_t)srv_page_size;
@@ -627,10 +623,11 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
return -1;
}
if (binlog_page_empty(page_buf)) {
*out_space= fsp_binlog_open(file_name, fh, file_no, file_size, true);
ret=
fsp_binlog_open(file_name, fh, file_no, file_size, ~(uint32_t)0, nullptr);
binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed);
return (*out_space ? 0 : -1);
return (ret ? -1 : 0);
}
last_nonempty= 0;
@@ -688,11 +685,14 @@ find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
*out_page_no= p_0 - 1;
*out_pos_in_page= (uint32_t)(p - page_buf);
*out_space= fsp_binlog_open(file_name, fh, file_no, file_size, false);
if (*out_pos_in_page >= page_size - FIL_PAGE_DATA_END)
ret= fsp_binlog_open(file_name, fh, file_no, file_size, p_0, nullptr);
else
ret= fsp_binlog_open(file_name, fh, file_no, file_size, p_0 - 1, page_buf);
uint64_t pos= (*out_page_no << page_size_shift) | *out_pos_in_page;
binlog_cur_written_offset[idx].store(pos, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(pos, std::memory_order_relaxed);
return (*out_space ? 1 : -1);
return ret ? -1 : 1;
}
@@ -736,7 +736,6 @@ innodb_binlog_discover()
Now, if we found any binlog files, locate the point in one of them where
binlogging stopped, and where we should continue writing new binlog data.
*/
fil_space_t *space, *prev_space;
uint32_t page_no, prev_page_no, pos_in_page, prev_pos_in_page;
std::unique_ptr<byte, void (*)(void *)>
page_buf(static_cast<byte*>(aligned_malloc(page_size, page_size)),
@@ -749,8 +748,7 @@ innodb_binlog_discover()
int res= find_pos_in_binlog(binlog_files.last_file_no,
binlog_files.last_size,
page_buf.get(),
&space, &page_no, &pos_in_page);
page_buf.get(), &page_no, &pos_in_page);
if (res < 0) {
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
@@ -764,7 +762,6 @@ innodb_binlog_discover()
/* Found start position in the last binlog file. */
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= space;
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -779,11 +776,10 @@ innodb_binlog_discover()
res= find_pos_in_binlog(binlog_files.prev_file_no,
binlog_files.prev_size,
page_buf.get(),
&prev_space, &prev_page_no, &prev_pos_in_page);
&prev_page_no, &prev_pos_in_page);
if (res < 0) {
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= space;
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
sql_print_warning("Binlog number %llu could not be opened, starting "
@@ -793,7 +789,6 @@ innodb_binlog_discover()
}
file_no= binlog_files.prev_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= prev_space;
binlog_cur_page_no= prev_page_no;
binlog_cur_page_offset= prev_pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -806,7 +801,6 @@ innodb_binlog_discover()
/* Just one empty binlog file found. */
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= space;
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
@@ -875,7 +869,6 @@ innodb_binlog_prealloc_thread()
/* Pre-allocate the next tablespace (if not done already). */
uint64_t last_created= last_created_binlog_file_no;
if (last_created <= active && last_created <= first_open) {
fil_space_t *new_space;
ut_ad(last_created == active);
ut_ad(last_created == first_open || first_open == ~(uint64_t)0);
/*
@@ -887,8 +880,7 @@ innodb_binlog_prealloc_thread()
mysql_mutex_lock(&purge_binlog_mutex);
uint32_t size_in_pages= innodb_binlog_size_in_pages;
dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages,
&new_space);
dberr_t res2= fsp_binlog_tablespace_create(last_created, size_in_pages);
if (earliest_binlog_file_no == ~(uint64_t)0)
earliest_binlog_file_no= last_created;
total_binlog_used_size+= (size_in_pages << srv_page_size_shift);
@@ -898,15 +890,12 @@ innodb_binlog_prealloc_thread()
mysql_mutex_lock(&active_binlog_mutex);
ut_a(res2 == DB_SUCCESS /* ToDo: Error handling. */);
ut_a(new_space);
last_created_binlog_file_no= last_created;
last_created_binlog_space= new_space;
/* If we created the initial tablespace file, make it the active one. */
ut_ad(active < ~(uint64_t)0 || last_created == 0);
if (active == ~(uint64_t)0) {
active_binlog_file_no.store(last_created, std::memory_order_relaxed);
active_binlog_space= last_created_binlog_space;
}
if (first_open == ~(uint64_t)0)
first_open_binlog_file_no= first_open= last_created;
@@ -988,8 +977,8 @@ serialize_gtid_state(rpl_binlog_state_base *state, byte *buf, size_t buf_size,
bool
binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
buf_block_t * &block, uint32_t &page_no,
uint32_t &page_offset, fil_space_t *space)
fsp_binlog_page_entry * &block, uint32_t &page_no,
uint32_t &page_offset, uint64_t file_no)
{
/*
Use a small, efficient stack-allocated buffer by default, falling back to
@@ -997,6 +986,8 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
*/
byte small_buf[192];
byte *buf, *alloced_buf;
uint32_t block_page_no= ~(uint32_t)0;
block= nullptr;
ssize_t used_bytes= serialize_gtid_state(state, small_buf, sizeof(small_buf),
page_no==0);
@@ -1034,16 +1025,19 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
afterwards. There is no point in using space to allow fast search to a
point if there is no data to search for after that point.
*/
if (page_no + needed_pages < space->size)
if (page_no + needed_pages < binlog_page_fifo->size_in_pages(file_no))
{
byte cont_flag= 0;
while (used_bytes > 0)
{
ut_ad(page_no < space->size);
block= fsp_page_create(space, page_no, mtr);
ut_ad(page_no < binlog_page_fifo->size_in_pages(file_no));
if (block)
binlog_page_fifo->release_page(file_no, block_page_no, block);
block_page_no= page_no;
block= binlog_page_fifo->create_page(file_no, block_page_no);
ut_a(block /* ToDo: error handling? */);
page_offset= FIL_PAGE_DATA;
byte *ptr= page_offset + block->page.frame;
byte *ptr= page_offset + &block->page_buf[0];
ssize_t chunk= used_bytes;
byte last_flag= FSP_BINLOG_FLAG_LAST;
if (chunk > page_room - 3) {
@@ -1056,15 +1050,19 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
ptr[2] = (byte)(chunk >> 8);
ut_ad(chunk <= 0xffff);
memcpy(ptr+3, buf, chunk);
mtr->memcpy(*block, page_offset, chunk+3);
fsp_log_binlog_write(mtr, file_no, block_page_no, block, page_offset,
(uint32)(chunk+3));
page_offset+= (uint32_t)(chunk+3);
buf+= chunk;
used_bytes-= chunk;
cont_flag= FSP_BINLOG_FLAG_CONT;
}
if (page_offset == FIL_PAGE_DATA_END) {
if (page_offset == page_size - FIL_PAGE_DATA_END) {
if (block)
binlog_page_fifo->release_page(file_no, block_page_no, block);
block= nullptr;
block_page_no= ~(uint32_t)0;
page_offset= FIL_PAGE_DATA;
++page_no;
}
@@ -1073,7 +1071,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
/* Make sure we return a page for caller to write the main event data into. */
if (UNIV_UNLIKELY(!block)) {
block= fsp_page_create(space, page_no, mtr);
block= binlog_page_fifo->create_page(file_no, page_no);
ut_a(block /* ToDo: error handling? */);
}
@@ -1895,8 +1893,6 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
uint64_t *out_file_end,
uint64_t *out_diff_state_interval)
{
buf_block_t *block;
*out_file_end= 0;
uint64_t active2= active_binlog_file_no.load(std::memory_order_acquire);
if (file_no > active2)
@@ -1904,11 +1900,11 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
for (;;)
{
mtr_t mtr;
bool mtr_started= false;
uint64_t active= active2;
uint64_t end_offset=
binlog_cur_end_offset[file_no&1].load(std::memory_order_acquire);
fsp_binlog_page_entry *block;
if (file_no + 1 >= active &&
end_offset != ~(uint64_t)0 &&
page_no <= (end_offset >> srv_page_size_shift))
@@ -1920,16 +1916,7 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
not change while getting the page (otherwise it might belong to a
later tablespace file).
*/
mtr.start();
mtr_started= true;
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (uint32_t)(file_no & 1);
dberr_t err= DB_SUCCESS;
block= buf_page_get_gen(page_id_t{space_id, page_no}, 0, RW_S_LATCH,
nullptr, BUF_GET_IF_IN_POOL, &mtr, &err);
if (err != DB_SUCCESS) {
mtr.commit();
return READ_ERROR;
}
block= binlog_page_fifo->get_page(file_no, page_no);
}
else
block= nullptr;
@@ -1937,8 +1924,8 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
if (UNIV_UNLIKELY(active2 != active))
{
/* Active moved ahead while we were reading, try again. */
if (mtr_started)
mtr.commit();
if (block)
binlog_page_fifo->release_page(file_no, page_no, block);
continue;
}
if (file_no + 1 >= active)
@@ -1952,7 +1939,7 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
*/
if (page_no > (end_offset >> srv_page_size_shift))
{
ut_ad(!mtr_started);
ut_ad(!block);
return READ_NOT_FOUND;
}
}
@@ -1960,17 +1947,13 @@ gtid_search::read_gtid_state_file_no(rpl_binlog_state_base *state,
if (block)
{
ut_ad(end_offset != ~(uint64_t)0);
int res= read_gtid_state_from_page(state, block->page.frame, page_no,
int res= read_gtid_state_from_page(state, block->page_buf, page_no,
out_diff_state_interval);
ut_ad(mtr_started);
if (mtr_started)
mtr.commit();
binlog_page_fifo->release_page(file_no, page_no, block);
return (Read_Result)res;
}
else
{
if (mtr_started)
mtr.commit();
if (cur_open_file_no != file_no)
{
if (cur_open_file >= (File)0)
@@ -2234,9 +2217,14 @@ innodb_reset_binlogs()
bool err= false;
ut_a(innodb_binlog_inited >= 2);
/* Close existing binlog tablespaces and stop the pre-alloc thread. */
innodb_binlog_close(false);
/* Prevent any flushing activity while resetting. */
binlog_page_fifo->lock_wait_for_idle();
binlog_page_fifo->reset();
/* Delete all binlog files in the directory. */
MY_DIR *dir= my_dir(innodb_binlog_directory, MYF(MY_WME));
if (!dir)
@@ -2269,6 +2257,7 @@ innodb_reset_binlogs()
/* Re-initialize empty binlog state and start the pre-alloc thread. */
innodb_binlog_init_state();
binlog_page_fifo->unlock();
start_binlog_prealloc_thread();
return err;

View File

@@ -31,7 +31,6 @@ InnoDB implementation of binlog.
#include "mtr0mtr.h"
struct fil_space_t;
struct chunk_data_base;
@@ -84,6 +83,94 @@ static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS),
"in ALLOWED_NESTED_RECORDS bitmask");
struct fsp_binlog_page_entry {
fsp_binlog_page_entry *next;
byte *page_buf;
uint32_t latched;
/*
Flag set when the page has been filled, no more data will be added and
it is safe to write out to disk and remove from the FIFO.
*/
bool complete;
/*
Flag set when the page is not yet complete, but all data added so far
have been written out to the file. So the page should not be written
again (until more data is added), but nor can it be removed from the
FIFO yet.
*/
bool flushed_clean;
};
/*
A page FIFO, as a lower-level alternative to the buffer pool used for full
tablespaces.
Since binlog files are written strictly append-only, we can simply add new
pages at the end and flush them from the beginning.
ToDo: This is deliberately a naive implementation with single global mutex
and repeated malloc()/free(), as a starting point. Should be improved later
for efficiency and scalability.
*/
class fsp_binlog_page_fifo {
public:
struct page_list {
fsp_binlog_page_entry *first_page;
uint32_t first_page_no;
uint32_t size_in_pages;
File fh;
};
private:
mysql_mutex_t m_mutex;
pthread_cond_t m_cond;
std::thread flush_thread_obj;
/*
The first_file_no is the first valid file in the fifo. The other entry in
the fifo holds (first_file_no+1) if it is not empty.
If first_file_no==~0, then there are no files in the fifo (initial state
just after construction).
*/
uint64_t first_file_no;
page_list fifos[2];
bool flushing;
bool flush_thread_started;
bool flush_thread_end;
public:
fsp_binlog_page_fifo();
~fsp_binlog_page_fifo();
void reset();
void start_flush_thread();
void stop_flush_thread();
void flush_thread_run();
void lock_wait_for_idle();
void unlock() { mysql_mutex_unlock(&m_mutex); }
void create_tablespace(uint64_t file_no, uint32_t size_in_pages,
uint32_t init_page= ~(uint32_t)0,
byte *partial_page= nullptr);
void release_tablespace(uint64_t file_no);
fsp_binlog_page_entry *create_page(uint64_t file_no, uint32_t page_no);
fsp_binlog_page_entry *get_page(uint64_t file_no, uint32_t page_no);
void release_page(uint64_t file_no, uint32_t page_no,
fsp_binlog_page_entry *page);
bool flush_one_page(uint64_t file_no, bool force);
void flush_up_to(uint64_t file_no, uint32_t page_no);
void do_fdatasync(uint64_t file_no);
File get_fh(uint64_t file_no);
uint32_t size_in_pages(uint64_t file_no) {
return fifos[file_no & 1].size_in_pages;
}
void truncate_file_size(uint64_t file_no, uint32_t size_in_pages)
{
fifos[file_no & 1].size_in_pages= size_in_pages;
}
};
class binlog_chunk_reader {
public:
enum chunk_reader_status {
@@ -117,15 +204,13 @@ public:
bool in_record;
} s;
/* The mtr is started iff cur_block is non-NULL. */
mtr_t mtr;
/*
After fetch_current_page(), this points into either cur_block or
page_buffer as appropriate.
*/
byte *page_ptr;
/* Valid after fetch_current_page(), if page found in buffer pool. */
buf_block_t *cur_block;
fsp_binlog_page_entry *cur_block;
/* Buffer for reading a page directly from a tablespace file. */
byte *page_buffer;
/* Amount of data in file, valid after fetch_current_page(). */
@@ -200,22 +285,23 @@ extern uint64_t current_binlog_state_interval;
extern mysql_mutex_t active_binlog_mutex;
extern pthread_cond_t active_binlog_cond;
extern std::atomic<uint64_t> active_binlog_file_no;
extern fil_space_t* active_binlog_space;
extern uint64_t first_open_binlog_file_no;
extern uint64_t last_created_binlog_file_no;
extern fil_space_t *last_created_binlog_space;
extern std::atomic<uint64_t> binlog_cur_written_offset[2];
extern std::atomic<uint64_t> binlog_cur_end_offset[2];
extern fsp_binlog_page_fifo *binlog_page_fifo;
extern void fsp_log_binlog_write(mtr_t *mtr, uint64_t file_no, uint32_t page_no,
fsp_binlog_page_entry *page,
uint32_t page_offset, uint32_t len);
extern void fsp_binlog_init();
extern void fsp_binlog_shutdown();
extern dberr_t fsp_binlog_tablespace_close(uint64_t file_no);
extern fil_space_t *fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
extern bool fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
uint64_t file_no, size_t file_size,
bool open_empty);
uint32_t init_page, byte *partial_page);
extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no,
uint32_t size_in_pages,
fil_space_t **new_space);
uint32_t size_in_pages);
extern std::pair<uint64_t, uint64_t> fsp_binlog_write_rec(
struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type);
extern bool fsp_binlog_flush();

View File

@@ -28,8 +28,6 @@ InnoDB implementation of binlog.
#include "fsp_binlog.h"
struct fil_space_t;
struct buf_block_t;
struct mtr_t;
struct rpl_binlog_state_base;
struct rpl_gtid;
@@ -97,8 +95,8 @@ extern void innodb_binlog_startup_init();
extern bool innodb_binlog_init(size_t binlog_size, const char *directory);
extern void innodb_binlog_close(bool shutdown);
extern bool binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
buf_block_t * &block, uint32_t &page_no,
uint32_t &page_offset, fil_space_t *space);
fsp_binlog_page_entry * &block, uint32_t &page_no,
uint32_t &page_offset, uint64_t file_no);
extern bool innodb_binlog_oob(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
extern void innodb_free_oob(THD *thd, void *engine_data);

View File

@@ -103,6 +103,10 @@ or the MySQL version that created the redo log file. */
#define LOG_HEADER_CREATOR_END 48
/* @} */
/** Fake tablespace id for InnoDB-implemented binlog files. */
#define LOG_BINLOG_ID_0 0xFFFFFFFEU
#define LOG_BINLOG_ID_1 0xFFFFFFFFU
struct log_t;
/** File abstraction */

View File

@@ -390,10 +390,12 @@ inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage,
static_assert(!(type & 15) && type != RESERVED &&
type <= FILE_CHECKPOINT, "invalid type");
ut_ad(type >= FILE_CREATE || is_named_space(id.space()) ||
id.space() == SRV_SPACE_ID_BINLOG0 ||
id.space() == SRV_SPACE_ID_BINLOG1);
id.space() == LOG_BINLOG_ID_0 ||
id.space() == LOG_BINLOG_ID_1);
ut_ad(!bpage || bpage->id() == id);
ut_ad(id < end_page_id);
ut_ad(id < end_page_id ||
id.space() == LOG_BINLOG_ID_0 ||
id.space() == LOG_BINLOG_ID_1);
constexpr bool have_len= type != INIT_PAGE && type != FREE_PAGE;
constexpr bool have_offset= type == WRITE || type == MEMSET ||
type == MEMMOVE;
@@ -404,7 +406,8 @@ inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage,
ut_ad(offset + len <= srv_page_size);
static_assert(MIN_4BYTE >= UNIV_PAGE_SIZE_MAX, "consistency");
ut_ad(type == FREE_PAGE || type == OPTION || (type == EXTENDED && !bpage) ||
memo_contains_flagged(bpage, MTR_MEMO_MODIFY));
memo_contains_flagged(bpage, MTR_MEMO_MODIFY) ||
(type == WRITE && id.space() >= LOG_BINLOG_ID_0));
size_t max_len;
if (!have_len)
max_len= 1 + 5 + 5;

View File

@@ -719,6 +719,17 @@ public:
static unsigned spin_wait_delay;
/** Update finisher when spin_wait_delay is changing to or from 0. */
static void finisher_update();
/** Write binlog data
@param space_id binlog tablespace
@param page_no binlog page number
@param offset offset within the page
@param buf data
@param size size of data
@return */
void write_binlog(bool space_id, uint32_t page_no, uint16_t offset,
const void *buf, size_t size) noexcept;
private:
/** Release all latches. */

View File

@@ -483,6 +483,7 @@ extern mysql_pfs_key_t lock_wait_mutex_key;
extern mysql_pfs_key_t srv_threads_mutex_key;
extern mysql_pfs_key_t fsp_active_binlog_mutex_key;
extern mysql_pfs_key_t fsp_purge_binlog_mutex_key;
extern mysql_pfs_key_t fsp_page_fifo_mutex_key;
# endif /* UNIV_PFS_MUTEX */
# ifdef UNIV_PFS_RWLOCK

View File

@@ -2379,6 +2379,14 @@ void recv_sys_t::rewind(source &l, source &begin) noexcept
pages_it= pages.end();
}
static void binlog_recover_write_data(bool space_id, uint32_t page_no,
uint16_t offset,
lsn_t start_lsn, lsn_t lsn,
const byte *buf, size_t size) noexcept
{}
static void binlog_recover_end(lsn_t lsn) noexcept {}
/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
@tparam storing whether to store the records
@param l log data source
@@ -2868,6 +2876,23 @@ restart:
#endif
if (storing == YES)
{
if (space_id >= 0xfffffffe)
{
if ((b & 0xf0) != WRITE)
goto record_corrupted;
const size_t olen= mlog_decode_varint_length(*cl);
if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3))
goto record_corrupted;
const uint32_t offset= mlog_decode_varint(cl);
ut_ad(offset != MLOG_DECODE_ERROR);
if (UNIV_UNLIKELY(offset + rlen - olen >= 65535))
goto record_corrupted;
binlog_recover_write_data(space_id & 1, page_no, uint16_t(offset),
start_lsn, lsn,
l.get_buf(cl, recs, decrypt_buf) + olen,
l - recs + rlen - olen);
continue;
}
if (if_exists)
{
if (fil_space_t *space= fil_space_t::get(space_id))
@@ -4216,6 +4241,7 @@ static bool recv_scan_log(bool last_phase)
ut_ad(!rewound_lsn);
ut_ad(recv_sys.lsn >= recv_sys.file_checkpoint);
log_sys.set_recovered_lsn(recv_sys.lsn);
binlog_recover_end(recv_sys.lsn);
}
else if (rewound_lsn)
{