1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-07 00:04:31 +03:00

MDEV-34705: Binlog in Engine: Resume from existing binlogs

When (re-)starting the server, check for any existing binlog files.
Open the last two found (if any), and find the position that was last
written before the restart. Continue binlogging from that point rather
than creating new binlog files.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2024-09-22 18:04:17 +02:00
parent 39695c3956
commit 1794fd427a
7 changed files with 455 additions and 9 deletions

View File

@@ -0,0 +1 @@
--source include/have_innodb.inc

View File

@@ -0,0 +1 @@
--binlog-storage-engine=innodb --max-binlog-size=256K

View File

@@ -864,6 +864,7 @@ our @tags=
"binlog_formats", ["row", "statement"]],
["include/have_innodb.inc", "innodb_test", 1],
["include/have_innodb_binlog.inc", "innodb_test", 1],
["include/have_log_bin.inc", "need_binlog", 1],
["include/big_test.inc", "big_test", 1],
["include/have_debug.inc", "need_debug", 1],

View File

@@ -1,4 +1,4 @@
--source include/have_innodb.inc
--source include/have_innodb_binlog.inc
--source include/have_binlog_format_mixed.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,19 @@
--source include/have_innodb_binlog.inc
--source include/have_binlog_format_mixed.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog1.txt
--source include/restart_mysqld.inc
INSERT INTO t1 VALUES (2, 0);
--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog2.txt
--source include/restart_mysqld.inc
INSERT INTO t1 VALUES (3, 0);
--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog3.txt
DROP TABLE t1;

View File

@@ -24,6 +24,9 @@ File space management
Created 11/29/1995 Heikki Tuuri
***********************************************************************/
#include <memory>
#include <cctype>
#include <cstdlib>
#include <thread>
#include "fsp0fsp.h"
@@ -4285,7 +4288,6 @@ pthread_cond_t active_binlog_cond;
static std::thread binlog_prealloc_thr_obj;
static bool prealloc_thread_end= false;
/* The currently being written binlog tablespace. */
/* ToDo: This needs to discover existing binlogs and start from the next free index. */
std::atomic<uint64_t> active_binlog_file_no;
fil_space_t* active_binlog_space;
/*
@@ -4318,14 +4320,48 @@ std::atomic<uint64_t> binlog_cur_written_offset[2];
std::atomic<uint64_t> binlog_cur_end_offset[2];
static void fsp_binlog_prealloc_thread();
static int fsp_binlog_discover();
/* '.' + '/' + "binlog" + '-' + (<=20 digits) + '.' + "ibb" + '\0'. */
#define BINLOG_NAME_LEN 1 + 1 + 6 + 1 + 20 + 1 + 3 + 1
#define BINLOG_NAME_BASE "binlog-"
#define BINLOG_NAME_EXT ".ibb"
/* '.' + '/' + "binlog-" + (<=20 digits) + '.' + "ibb" + '\0'. */
#define BINLOG_NAME_LEN 1 + 1 + 7 + 20 + 1 + 3 + 1
static inline void
binlog_name_make(char name_buf[BINLOG_NAME_LEN], uint64_t file_no)
{
sprintf(name_buf, "./binlog-%06" PRIu64 ".ibb", file_no);
sprintf(name_buf, "./" BINLOG_NAME_BASE "%06" PRIu64 BINLOG_NAME_EXT,
file_no);
}
/*
Check if this is an InnoDB binlog file name.
Return the index/file_no if so.
*/
static bool
is_binlog_name(const char *name, uint64_t *out_idx)
{
const size_t base_len= sizeof(BINLOG_NAME_BASE) - 1; // Length without '\0' terminator
const size_t ext_len= sizeof(BINLOG_NAME_EXT) - 1;
if (0 != strncmp(name, BINLOG_NAME_BASE, base_len))
return false;
size_t name_len= strlen(name);
if (name_len < base_len + 1 + ext_len)
return false;
const char *ext_start= name + (name_len - ext_len);
if (0 != strcmp(ext_start, BINLOG_NAME_EXT))
return false;
if (!std::isdigit((unsigned char)(name[base_len])))
return false;
char *conv_end= nullptr;
unsigned long long idx= std::strtoull(name + base_len, &conv_end, 10);
if (idx == ULLONG_MAX || conv_end != ext_start)
return false;
*out_idx= (uint64_t)idx;
return true;
}
@@ -4367,7 +4403,7 @@ end:
/*
Initialize the InnoDB implementation of binlog.
Note that we do not create or open any binlog tablespaces here.
This is only done if InnoDB binlog is enabled on the server leve.
This is only done if InnoDB binlog is enabled on the server level.
*/
void
fsp_binlog_init()
@@ -4403,11 +4439,12 @@ innodb_binlog_init(size_t binlog_size)
last_created_binlog_file_no= ~(uint64_t)0;
active_binlog_file_no.store(~(uint64_t)0, std::memory_order_release);
active_binlog_space= nullptr;
/* ToDo: This needs to be initialized to the current point in any existing binlog. */
binlog_cur_page_no= 0;
binlog_cur_page_offset= FIL_PAGE_DATA;
/* Find any existing binlog files and continue writing in them. */
fsp_binlog_discover();
/* Start pre-allocating new binlog files. */
// ToDo: Eventually, open existing binlog files and find where to continue writing.
binlog_prealloc_thr_obj= std::thread{fsp_binlog_prealloc_thread};
mysql_mutex_lock(&active_binlog_mutex);
@@ -4416,11 +4453,374 @@ innodb_binlog_init(size_t binlog_size)
my_cond_wait(&active_binlog_cond, &active_binlog_mutex.m_mutex);
}
mysql_mutex_unlock(&active_binlog_mutex);
ut_ad(active_binlog_file_no == 0);
return false;
}
struct found_binlogs {
uint64_t last_file_no, prev_file_no;
size_t last_size, prev_size;
int found_binlogs;
};
/* Compute the (so far) last and last-but-one binlog files found. */
static void
process_binlog_name(found_binlogs *bls, uint64_t idx, size_t size)
{
if (bls->found_binlogs == 0 ||
idx > bls->last_file_no) {
if (bls->found_binlogs >= 1 && idx == bls->last_file_no + 1) {
bls->prev_file_no= bls->last_file_no;
bls->prev_size= bls->last_size;
bls->found_binlogs= 2;
} else {
bls->found_binlogs= 1;
}
bls->last_file_no= idx;
bls->last_size= size;
} else if (bls->found_binlogs == 1 && idx + 1 == bls->last_file_no) {
bls->found_binlogs= 2;
bls->prev_file_no= idx;
bls->prev_size= size;
}
}
/*
Open an existing tablespace. The filehandle fh is taken over by the tablespace
(or closed in case of error).
*/
static fil_space_t *
fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
uint64_t file_no, size_t file_size, bool open_empty)
{
const uint32_t page_size= (uint32_t)srv_page_size;
const uint32_t page_size_shift= srv_page_size_shift;
os_offset_t binlog_size= max_binlog_size;
if (open_empty && file_size < binlog_size) {
/*
A crash may have left a partially pre-allocated file. If so, extend it
to the required size.
Note that this may also extend a previously pre-allocated file to the new
binlog configured size, if the configuration changed during server
restart.
*/
if (!os_file_set_size(file_name, fh, binlog_size, false)) {
ib::warn() << "Failed to change the size of InnoDB binlog file " <<
file_name << " from " << file_size << " to " << binlog_size <<
" bytes (error code: " << errno << ").";
} else {
file_size= (size_t)binlog_size;
}
}
if (file_size < 2*page_size)
{
ib::warn() << "InnoDB binlog file number " << file_no << " is too short"
" (" << file_size << " bytes), should be at least " << 2*page_size <<
" bytes.";
os_file_close(fh);
return nullptr;
}
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (file_no & 1);
if (!open_empty) {
page_t *page_buf= static_cast<byte*>(aligned_malloc(page_size, page_size));
if (!page_buf) {
os_file_close(fh);
return nullptr;
}
dberr_t err= os_file_read(IORequestRead, fh, page_buf, 0, page_size, nullptr);
if (err != DB_SUCCESS) {
ib::warn() << "Unable to read first page of file " << file_name;
aligned_free(page_buf);
os_file_close(fh);
return nullptr;
}
/* ToDo: Maybe use leaner page format for binlog tablespace? */
uint32_t id1= mach_read_from_4(FIL_PAGE_SPACE_ID + page_buf);
if (id1 != space_id) {
ib::warn() << "Binlog file " << file_name <<
" has inconsistent tablespace id " << id1 <<
" (expected " << space_id << ")";
aligned_free(page_buf);
os_file_close(fh);
return nullptr;
}
// ToDo: should we here check buf_page_is_corrupted() ?
aligned_free(page_buf);
}
uint32_t fsp_flags=
FSP_FLAGS_FCRC32_MASK_MARKER | FSP_FLAGS_FCRC32_PAGE_SSIZE();
/* ToDo: Enryption. */
fil_encryption_t mode= FIL_ENCRYPTION_OFF;
fil_space_crypt_t* crypt_data= nullptr;
fil_space_t *space;
mysql_mutex_lock(&fil_system.mutex);
if (!(space= fil_space_t::create(space_id, fsp_flags, false, crypt_data,
mode, true))) {
mysql_mutex_unlock(&fil_system.mutex);
os_file_close(fh);
return nullptr;
}
space->add(file_name, fh, (uint32_t)(file_size >> page_size_shift),
false, true);
first_open_binlog_file_no.store(file_no, std::memory_order_release);
if (last_created_binlog_file_no == ~(uint64_t)0 ||
file_no > last_created_binlog_file_no) {
last_created_binlog_file_no= file_no;
last_created_binlog_space= space;
}
mysql_mutex_unlock(&fil_system.mutex);
return space;
}
static bool
binlog_page_empty(const byte *page)
{
return page[FIL_PAGE_DATA] == 0;
}
/*
Find the last written position in the binlog file.
Do a binary search through the pages to find the last non-empty page, then
scan the page to find the place to start writing new binlog data.
Returns:
1 position found, output in *out_space, *out_page_no and *out_pos_in_page.
0 binlog file is empty.
-1 error.
*/
static int
find_pos_in_binlog(uint64_t file_no, size_t file_size, byte *page_buf,
fil_space_t **out_space,
uint32_t *out_page_no, uint32_t *out_pos_in_page)
{
const uint32_t page_size= (uint32_t)srv_page_size;
const uint32_t page_size_shift= (uint32_t)srv_page_size_shift;
const uint32_t idx= file_no & 1;
char file_name[BINLOG_NAME_LEN];
uint32_t p_0, p_1, p_2, last_nonempty;
dberr_t err;
byte *p, *page_end;
bool ret;
*out_page_no= 0;
*out_pos_in_page= FIL_PAGE_DATA;
binlog_name_make(file_name, file_no);
pfs_os_file_t fh= os_file_create(innodb_data_file_key, file_name,
OS_FILE_OPEN, OS_DATA_FILE,
srv_read_only_mode, &ret);
if (!ret) {
ib::warn() << "Unable to open file " << file_name;
return -1;
}
err= os_file_read(IORequestRead, fh, page_buf, 0, page_size, nullptr);
if (err != DB_SUCCESS) {
os_file_close(fh);
return -1;
}
if (binlog_page_empty(page_buf)) {
*out_space= fsp_binlog_open(file_name, fh, file_no, file_size, true);
binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed);
return (*out_space ? 0 : -1);
}
last_nonempty= 0;
/*
During the binary search, p_0-1 is the largest page number that is know to
be non-empty. And p_2 is the first page that is known to be empty.
*/
p_0= 1;
p_2= (uint32_t)(file_size / page_size);
for (;;) {
if (p_0 == p_2)
break;
ut_ad(p_0 < p_2);
p_1= (p_0 + p_2) / 2;
err= os_file_read(IORequestRead, fh, page_buf, p_1 << page_size_shift,
page_size, nullptr);
if (err != DB_SUCCESS) {
os_file_close(fh);
return -1;
}
if (binlog_page_empty(page_buf)) {
p_2= p_1;
} else {
p_0= p_1 + 1;
last_nonempty= p_1;
}
}
/* At this point, p_0 == p_2 is the first empty page. */
ut_ad(p_0 >= 1);
/*
This sometimes does an extra read, but as this is only during startup it
does not matter.
*/
err= os_file_read(IORequestRead, fh, page_buf,
last_nonempty << page_size_shift, page_size, nullptr);
if (err != DB_SUCCESS) {
os_file_close(fh);
return -1;
}
/* Now scan the last page to find the position in it to continue. */
p= &page_buf[FIL_PAGE_DATA];
page_end= &page_buf[page_size - FIL_PAGE_DATA_END];
while (*p && p < page_end) {
if (*p == 0xff) {
p= page_end;
break;
}
p += 3 + (((uint32_t)p[2] << 8) | ((uint32_t)p[1] & 0xff));
// ToDo: How to handle page corruption?
ut_a(p <= page_end);
}
*out_page_no= p_0 - 1;
*out_pos_in_page= (uint32_t)(p - page_buf);
*out_space= fsp_binlog_open(file_name, fh, file_no, file_size, false);
uint64_t pos= (*out_page_no << page_size_shift) | *out_pos_in_page;
binlog_cur_written_offset[idx].store(pos, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(pos, std::memory_order_relaxed);
return (*out_space ? 1 : -1);
}
/*
Returns:
-1 error
0 No binlogs found
1 Just one binlog file found
2 Found two (or more) existing binlog files
*/
static int
fsp_binlog_discover()
{
uint64_t file_no;
const uint32_t page_size= (uint32_t)srv_page_size;
const uint32_t page_size_shift= (uint32_t)srv_page_size_shift;
MY_DIR *dir= my_dir(".", MYF(MY_WME|MY_WANT_STAT)); // ToDo: configurable binlog directory, and don't ask my_dir to stat every file found
if (!dir)
return -1;
struct found_binlogs UNINIT_VAR(binlog_files);
binlog_files.found_binlogs= 0;
size_t num_entries= dir->number_of_files;
fileinfo *entries= dir-> dir_entry;
for (size_t i= 0; i < num_entries; ++i) {
const char *name= entries[i].name;
uint64_t idx;
if (!is_binlog_name(name, &idx))
continue;
process_binlog_name(&binlog_files, idx, entries[i].mystat->st_size);
}
my_dirend(dir);
/*
Now, if we found any binlog files, locate the point in one of them where
binlogging stopped, and where we should continue writing new binlog data.
*/
fil_space_t *space, *prev_space;
uint32_t page_no, prev_page_no, pos_in_page, prev_pos_in_page;
// ToDo: Do we need aligned_malloc() for page_buf, to be able to read a page into it (like IO_DIRECT maybe) ?
std::unique_ptr<byte[]> page_buf(new byte[page_size]);
if (!page_buf)
return -1;
if (binlog_files.found_binlogs >= 1) {
int res= find_pos_in_binlog(binlog_files.last_file_no,
binlog_files.last_size,
page_buf.get(),
&space, &page_no, &pos_in_page);
if (res < 0) {
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
ib::warn() << "Binlog number " << binlog_files.last_file_no <<
" could no be opened. Starting a new binlog file from number " <<
(file_no + 1) << ".";
return 0;
}
if (res > 0) {
/* Found start position in the last binlog file. */
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= space;
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
<< (((uint64_t)page_no << page_size_shift) | pos_in_page)
<< ".";
return binlog_files.found_binlogs;
}
/* res == 0, the last binlog is empty. */
if (binlog_files.found_binlogs >= 2) {
/* The last binlog is empty, try the previous one. */
res= find_pos_in_binlog(binlog_files.prev_file_no,
binlog_files.prev_size,
page_buf.get(),
&prev_space, &prev_page_no, &prev_pos_in_page);
if (res < 0) {
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= space;
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
ib::warn() << "Binlog number " << binlog_files.prev_file_no
<< " could not be opened, starting from binlog number "
<< file_no << " instead." ;
return 1;
}
file_no= binlog_files.prev_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= prev_space;
binlog_cur_page_no= prev_page_no;
binlog_cur_page_offset= prev_pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
<< (((uint64_t)prev_page_no << page_size_shift) |
prev_pos_in_page)
<< ".";
return binlog_files.found_binlogs;
}
/* Just one empty binlog file found. */
file_no= binlog_files.last_file_no;
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= space;
binlog_cur_page_no= page_no;
binlog_cur_page_offset= pos_in_page;
ib::info() << "Continuing binlog number " << file_no << " from position "
<< FIL_PAGE_DATA << ".";
return binlog_files.found_binlogs;
}
/* No binlog files found, start from scratch. */
file_no= 0;
ib::info() << "Starting a new binlog from file number " << file_no << ".";
return 0;
}
void fsp_binlog_close()
{
if (binlog_prealloc_thr_obj.joinable()) {
@@ -4504,6 +4904,8 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, fil_space_t **new_space)
false, crypt_data,
mode, true))) {
mysql_mutex_unlock(&fil_system.mutex);
os_file_close(fh);
os_file_delete(innodb_data_file_key, name);
return DB_ERROR;
}