1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34705: Binlog-in-engine: Implement FLUSH BINARY LOGS

No DELETE_DOMAIN_ID supported yet, will come in a later commit, after PURGE
is implemented.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-01-13 15:42:08 +01:00
parent 947de2bfaf
commit 6889c8e4cf
10 changed files with 174 additions and 8 deletions

View File

@@ -0,0 +1,21 @@
--source include/have_binlog_format_row.inc
--source include/have_innodb_binlog.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
BEGIN;
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
COMMIT;
INSERT INTO t2 VALUES (0, REPEAT("x", 2048));
INSERT INTO t2 SELECT a+1, b FROM t2;
INSERT INTO t2 SELECT a+2, b FROM t2;
INSERT INTO t2 SELECT a+4, b FROM t2;
INSERT INTO t2 SELECT a+8, b FROM t2;
SHOW BINARY LOGS;
FLUSH BINARY LOGS;
SHOW BINARY LOGS;
DROP TABLE t1, t2;

View File

@@ -1565,6 +1565,11 @@ struct handlerton
handler_binlog_reader * (*get_binlog_reader)(); handler_binlog_reader * (*get_binlog_reader)();
/* Obtain list of binlog files (SHOw BINARY LOGS). */ /* Obtain list of binlog files (SHOw BINARY LOGS). */
binlog_file_entry * (*get_binlog_file_list)(MEM_ROOT *mem_root); binlog_file_entry * (*get_binlog_file_list)(MEM_ROOT *mem_root);
/*
End the current binlog file, and create and switch to a new one.
Used to implement FLUSH BINARY LOGS.
*/
bool (*binlog_flush)();
/* /*
Optional clauses in the CREATE/ALTER TABLE Optional clauses in the CREATE/ALTER TABLE

View File

@@ -8214,6 +8214,36 @@ int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate,
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/* Implementation of FLUSH BINARY LOGS for binlog implemented in engine. */
int
MYSQL_BIN_LOG::flush_binlogs_engine(DYNAMIC_ARRAY *domain_drop_lex)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::flush_binlogs_engine");
mysql_mutex_lock(&LOCK_log);
// ToDo: Implement DELETE_DOMAIN_ID option. Ask the engine to load the oldest GTID state in the binlog, check that it matches the current GTID state in the to-be-deleted domains, then update the GTID state so the engine can write the state with domains deleted after it does the FLUSH. See also do_delete_gtid_domain().
if ((*opt_binlog_engine_hton->binlog_flush)())
error= 1;
mysql_mutex_lock(&LOCK_after_binlog_sync);
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_after_binlog_sync);
mysql_mutex_unlock(&LOCK_commit_ordered);
if (!error)
{
/* ToDo: Do purge, once implemented. */
}
DBUG_RETURN(error);
}
uint MYSQL_BIN_LOG::next_file_id() uint MYSQL_BIN_LOG::next_file_id()
{ {
uint res; uint res;

View File

@@ -1041,6 +1041,14 @@ public:
int rotate(bool force_rotate, bool* check_purge); int rotate(bool force_rotate, bool* check_purge);
void checkpoint_and_purge(ulong binlog_id); void checkpoint_and_purge(ulong binlog_id);
int rotate_and_purge(bool force_rotate, DYNAMIC_ARRAY* drop_gtid_domain= NULL); int rotate_and_purge(bool force_rotate, DYNAMIC_ARRAY* drop_gtid_domain= NULL);
int flush_binlogs_engine(DYNAMIC_ARRAY *domain_drop_lex);
int flush_binlog(DYNAMIC_ARRAY* drop_gtid_domain)
{
if (opt_binlog_engine_hton)
return flush_binlogs_engine(drop_gtid_domain);
else
return rotate_and_purge(true, drop_gtid_domain);
}
/** /**
Flush binlog cache and synchronize to disk. Flush binlog cache and synchronize to disk.

View File

@@ -174,7 +174,7 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options,
DYNAMIC_ARRAY *drop_gtid_domain= DYNAMIC_ARRAY *drop_gtid_domain=
(thd && (thd->lex->delete_gtid_domain.elements > 0)) ? (thd && (thd->lex->delete_gtid_domain.elements > 0)) ?
&thd->lex->delete_gtid_domain : NULL; &thd->lex->delete_gtid_domain : NULL;
if (mysql_bin_log.rotate_and_purge(true, drop_gtid_domain)) if (mysql_bin_log.flush_binlog(drop_gtid_domain))
*write_to_binlog= -1; *write_to_binlog= -1;
/* Note that WSREP(thd) might not be true here e.g. during /* Note that WSREP(thd) might not be true here e.g. during

View File

@@ -318,8 +318,17 @@ dberr_t fsp_binlog_tablespace_create(uint64_t file_no, fil_space_t **new_space)
} }
/*
Write out a binlog record.
Split into chucks that each fit on a page.
The data for the record is provided by a class derived from chunk_data_base.
As a special case, a record write of type FSP_BINLOG_TYPE_FILLER does not
write any record, but moves to the next tablespace and writes the initial
GTID state record, used for FLUSH BINARY LOGS.
*/
std::pair<uint64_t, uint64_t> std::pair<uint64_t, uint64_t>
fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type) fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
{ {
uint32_t page_size= (uint32_t)srv_page_size; uint32_t page_size= (uint32_t)srv_page_size;
uint32_t page_size_shift= srv_page_size_shift; uint32_t page_size_shift= srv_page_size_shift;
@@ -467,6 +476,16 @@ fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
page_offset= FIL_PAGE_DATA; page_offset= FIL_PAGE_DATA;
continue; continue;
} }
if (UNIV_UNLIKELY(chunk_type == FSP_BINLOG_TYPE_FILLER))
{
/*
Used for FLUSH BINARY LOGS, to move to the next tablespace and write
the initial GTID state record without writing any actual event data.
*/
break;
}
if (start_offset == 0) if (start_offset == 0)
{ {
start_file_no= file_no; start_file_no= file_no;
@@ -509,6 +528,74 @@ fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
} }
/*
Empty chunk data, used to pass a dummy record to fsp_binlog_write_rec()
in fsp_binlog_flush().
*/
struct chunk_data_flush : public chunk_data_base {
~chunk_data_flush() { }
virtual std::pair<uint32_t, bool> copy_data(byte *p, uint32_t max_len) final
{
memset(p, 0xff, max_len);
return {max_len, true};
}
};
/*
Implementation of FLUSH BINARY LOGS.
Truncate the current binlog tablespace, fill up the last page with dummy data
(if needed), write the current GTID state to the first page in the next
tablespace file (for DELETE_DOMAIN_ID).
Relies on the server layer to prevent other binlog writes in parallel during
the operation.
*/
bool
fsp_binlog_flush()
{
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
uint32_t space_id= SRV_SPACE_ID_BINLOG0 + (file_no & 1);
uint32_t page_no= binlog_cur_page_no;
fil_space_t *space= active_binlog_space;
chunk_data_flush dummy_data;
mtr_t mtr;
mtr.start();
mtr.x_lock_space(space);
/*
ToDo: Here, if we are already at precisely the end of a page, we need not
fill up that page with a dummy record, we can just truncate the tablespace
to that point. But then we need to handle an assertion m_modifications!=0
in mtr_t::commit_shrink().
*/
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_DUMMY);
if (page_no + 1 < space->size)
{
mtr.trim_pages(page_id_t(space_id, page_no + 1));
mtr.commit_shrink(*space, page_no + 1);
}
else
mtr.commit();
/* Flush out all pages in the (now filled-up) tablespace. */
while (buf_flush_list_space(space))
;
/*
Now get a new GTID state record written to the next binlog tablespace.
This ensures that the new state (in case of DELETE_DOMAIN_ID) will be
persisted across a server restart.
*/
mtr.start();
fsp_binlog_write_rec(&dummy_data, &mtr, FSP_BINLOG_TYPE_FILLER);
mtr.commit();
return false;
}
binlog_chunk_reader::binlog_chunk_reader() binlog_chunk_reader::binlog_chunk_reader()
: s { 0, 0,0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false }, : s { 0, 0,0, 0, 0, FSP_BINLOG_TYPE_FILLER, false, false },
page_ptr(0), cur_block(0), page_buffer(nullptr), page_ptr(0), cur_block(0), page_buffer(nullptr),

View File

@@ -4052,6 +4052,13 @@ static binlog_file_entry *innodb_get_binlog_file_list(MEM_ROOT *mem_root)
} }
static bool
innodb_binlog_flush()
{
return fsp_binlog_flush();
}
/** Initialize the InnoDB storage engine plugin. /** Initialize the InnoDB storage engine plugin.
@param[in,out] p InnoDB handlerton @param[in,out] p InnoDB handlerton
@return error code @return error code
@@ -4126,6 +4133,7 @@ static int innodb_init(void* p)
innobase_hton->binlog_oob_free= innodb_free_oob; innobase_hton->binlog_oob_free= innodb_free_oob;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innobase_hton->get_binlog_reader= innodb_get_binlog_reader;
innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list; innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list;
innobase_hton->binlog_flush= innodb_binlog_flush;
innodb_remember_check_sysvar_funcs(); innodb_remember_check_sysvar_funcs();

View File

@@ -59,7 +59,7 @@ static bool prealloc_thread_end= false;
struct binlog_oob_context { struct binlog_oob_context {
/* /*
Structure used to encapsulate the data to be binlogged in an out-of-band Structure used to encapsulate the data to be binlogged in an out-of-band
chunk, for use by fsp_binlog_write_chunk(). chunk, for use by fsp_binlog_write_rec().
*/ */
struct chunk_data_oob : public chunk_data_base { struct chunk_data_oob : public chunk_data_base {
/* /*
@@ -1176,7 +1176,7 @@ innodb_binlog_write_cache(IO_CACHE *cache,
handler_binlog_event_group_info *binlog_info, mtr_t *mtr) handler_binlog_event_group_info *binlog_info, mtr_t *mtr)
{ {
chunk_data_cache chunk_data(cache, binlog_info); chunk_data_cache chunk_data(cache, binlog_info);
fsp_binlog_write_chunk(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT); fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT);
} }
@@ -1356,7 +1356,7 @@ binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx,
mtr_t mtr; mtr_t mtr;
mtr.start(); mtr.start();
std::pair<uint64_t, uint64_t> new_file_no_offset= std::pair<uint64_t, uint64_t> new_file_no_offset=
fsp_binlog_write_chunk(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA); fsp_binlog_write_rec(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA);
mtr.commit(); mtr.commit();
node_list[node].file_no= new_file_no_offset.first; node_list[node].file_no= new_file_no_offset.first;
node_list[node].offset= new_file_no_offset.second; node_list[node].offset= new_file_no_offset.second;

View File

@@ -44,6 +44,8 @@ enum fsp_binlog_chunk_types {
FSP_BINLOG_TYPE_GTID_STATE= 2, FSP_BINLOG_TYPE_GTID_STATE= 2,
/* Out-of-band event group data. */ /* Out-of-band event group data. */
FSP_BINLOG_TYPE_OOB_DATA= 3, FSP_BINLOG_TYPE_OOB_DATA= 3,
/* Dummy record, use to fill remainder of page (eg. FLUSH BINARY LOGS). */
FSP_BINLOG_TYPE_DUMMY= 4,
/* Must be one more than the last type. */ /* Must be one more than the last type. */
FSP_BINLOG_TYPE_END, FSP_BINLOG_TYPE_END,
@@ -72,7 +74,9 @@ static constexpr uint32_t FSP_BINLOG_TYPE_MASK=
*/ */
static constexpr uint64_t ALLOWED_NESTED_RECORDS= static constexpr uint64_t ALLOWED_NESTED_RECORDS=
/* GTID STATE at start of page can occur in the middle of other record. */ /* GTID STATE at start of page can occur in the middle of other record. */
((uint64_t)1 << FSP_BINLOG_TYPE_GTID_STATE) ((uint64_t)1 << FSP_BINLOG_TYPE_GTID_STATE) |
/* DUMMY data at tablespace end can occur in the middle of other record. */
((uint64_t)1 << FSP_BINLOG_TYPE_DUMMY)
; ;
/* Ensure that all types fit in the ALLOWED_NESTED_RECORDS bitmask. */ /* Ensure that all types fit in the ALLOWED_NESTED_RECORDS bitmask. */
static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS)); static_assert(FSP_BINLOG_TYPE_END <= 8*sizeof(ALLOWED_NESTED_RECORDS));
@@ -204,7 +208,8 @@ extern fil_space_t *fsp_binlog_open(const char *file_name, pfs_os_file_t fh,
bool open_empty); bool open_empty);
extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no, extern dberr_t fsp_binlog_tablespace_create(uint64_t file_no,
fil_space_t **new_space); fil_space_t **new_space);
extern std::pair<uint64_t, uint64_t> fsp_binlog_write_chunk( extern std::pair<uint64_t, uint64_t> fsp_binlog_write_rec(
struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type); struct chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type);
extern bool fsp_binlog_flush();
#endif /* fsp_binlog_h */ #endif /* fsp_binlog_h */

View File

@@ -582,7 +582,9 @@ void mtr_t::commit_shrink(fil_space_t &space, uint32_t size)
file->size-= space.size - size; file->size-= space.size - size;
space.size= space.size_in_header= size; space.size= space.size_in_header= size;
if (space.id == TRX_SYS_SPACE) if (space.id == TRX_SYS_SPACE ||
space.id == SRV_SPACE_ID_BINLOG0 ||
space.id == SRV_SPACE_ID_BINLOG1)
srv_sys_space.set_last_file_size(file->size); srv_sys_space.set_last_file_size(file->size);
else else
space.set_create_lsn(m_commit_lsn); space.set_create_lsn(m_commit_lsn);