1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34705: Binlog-in-engine: Integration with server-layer code

Mostly various fixes to avoid initializing or creating any data or files for
the legacy binlog.

A possible later refinement could be to sub-class the binlog class
differently for legacy and in-engine binlogs, writing separate virtual
functions for behaviour that differ, extracting common functionality into
sub-methods. This could remove some if (opt_binlog_engine_hton)
conditionals.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-04-10 14:54:37 +02:00
parent 0327708ed6
commit d496e5278d
19 changed files with 595 additions and 165 deletions

View File

@@ -0,0 +1,54 @@
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
connect con1,localhost,root,,;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connect con2,localhost,root,,;
*** Connection sees current position by default.
connection default;
INSERT INTO t1 VALUES (2, 0);
FLUSH BINARY LOGS;
INSERT INTO t1 VALUES (3, 0);
INSERT INTO t1 VALUES (4, 0);
SELECT * FROM t1 ORDER BY a;
a b
1 0
2 0
3 0
4 0
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
binlog-000001.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000001.ibb # Query # # use `test`; INSERT INTO t1 VALUES (4, 0)
binlog-000001.ibb # Xid # # COMMIT /* XID */
*** START TRANSACTION WITH CONSISTENT SNAPSHOT sees position consistent with read view.
connection con1;
SELECT * FROM t1 ORDER BY a;
a b
1 0
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
binlog-000000.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000000.ibb # Query # # use `test`; INSERT INTO t1 VALUES (2, 0)
binlog-000000.ibb # Xid # # COMMIT /* XID */
*** Connection with no active transaction sees the current position.
connection con2;
connection default;
INSERT INTO t1 VALUES (5, 0);
connection con2;
SELECT * FROM t1 ORDER BY a;
a b
1 0
2 0
3 0
4 0
5 0
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
binlog-000001.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000001.ibb # Query # # use `test`; INSERT INTO t1 VALUES (5, 0)
binlog-000001.ibb # Xid # # COMMIT /* XID */
connection default;
disconnect con1;
disconnect con2;
DROP TABLE t1;

View File

@@ -0,0 +1,45 @@
--source include/have_binlog_format_mixed.inc
--source include/have_innodb_binlog.inc
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
--connect(con1,localhost,root,,)
START TRANSACTION WITH CONSISTENT SNAPSHOT;
--connect(con2,localhost,root,,)
--echo *** Connection sees current position by default.
--connection default
INSERT INTO t1 VALUES (2, 0);
FLUSH BINARY LOGS;
INSERT INTO t1 VALUES (3, 0);
--let $binlog_file= query_get_value(SHOW STATUS LIKE 'binlog_snapshot_file', Value, 1)
--let $binlog_start= query_get_value(SHOW STATUS LIKE 'binlog_snapshot_position', Value, 1)
--let $binlog_limit= 0, 3
INSERT INTO t1 VALUES (4, 0);
SELECT * FROM t1 ORDER BY a;
--source include/show_binlog_events.inc
--echo *** START TRANSACTION WITH CONSISTENT SNAPSHOT sees position consistent with read view.
--connection con1
--let $binlog_file= query_get_value(SHOW STATUS LIKE 'binlog_snapshot_file', Value, 1)
--let $binlog_start= query_get_value(SHOW STATUS LIKE 'binlog_snapshot_position', Value, 1)
SELECT * FROM t1 ORDER BY a;
--source include/show_binlog_events.inc
--echo *** Connection with no active transaction sees the current position.
--connection con2
--let $binlog_file= query_get_value(SHOW STATUS LIKE 'binlog_snapshot_file', Value, 1)
--let $binlog_start= query_get_value(SHOW STATUS LIKE 'binlog_snapshot_position', Value, 1)
--connection default
INSERT INTO t1 VALUES (5, 0);
--connection con2
SELECT * FROM t1 ORDER BY a;
--source include/show_binlog_events.inc
--connection default
--disconnect con1
--disconnect con2
DROP TABLE t1;

View File

@@ -0,0 +1,13 @@
SET GLOBAL rpl_semi_sync_master_enabled= 1;
ERROR HY000: Semi-synchronous replication is not yet supported with --binlog-storage-engine
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
XA START 'a';
ERROR HY000: Explicit XA transactions are not yet supported with --binlog-storage-engine
INSERT INTO t1 VALUES (0, 0);
XA END 'a';
ERROR XAE07: XAER_RMFAIL: The command cannot be executed when global transaction is in the NON-EXISTING state
XA PREPARE 'a';
ERROR XAE07: XAER_RMFAIL: The command cannot be executed when global transaction is in the NON-EXISTING state
XA COMMIT 'a';
ERROR XAE04: XAER_NOTA: Unknown XID
DROP TABLE t1;

View File

@@ -0,0 +1,17 @@
--source include/have_binlog_format_mixed.inc
--source include/have_innodb_binlog.inc
--error ER_ENGINE_BINLOG_NO_SEMISYNC
SET GLOBAL rpl_semi_sync_master_enabled= 1;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
--error ER_ENGINE_BINLOG_NO_USER_XA
XA START 'a';
INSERT INTO t1 VALUES (0, 0);
--error ER_XAER_RMFAIL
XA END 'a';
--error ER_XAER_RMFAIL
XA PREPARE 'a';
--error ER_XAER_NOTA
XA COMMIT 'a';
DROP TABLE t1;

View File

@@ -0,0 +1,46 @@
include/master-slave.inc
[connection master]
*** Test the --binlog-directory variable with legacy binlog.
connection slave;
include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=Slave_pos;
connection master;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT);
INSERT INTO t1 VALUES (1, 10);
include/rpl_stop_server.inc [server_number=1]
include/rpl_start_server.inc [server_number=1 parameters: --binlog-directory=binlog_dir]
INSERT INTO t1 VALUES (2, 11);
include/rpl_stop_server.inc [server_number=1]
include/rpl_start_server.inc [server_number=1]
INSERT INTO t1 VALUES (3, 12);
show binary logs;
Log_name File_size
master-bin.000001 #
master-bin.000002 #
master-bin.000002 #
*** Contents of master-bin.index (including directory path):
./master-bin.000001
binlog_dir/master-bin.000002
./master-bin.000002
SELECT * FROM t1 ORDER BY a;
a b
1 10
2 11
3 12
connection slave;
include/start_slave.inc
SELECT * FROM t1 ORDER BY a;
a b
1 10
2 11
3 12
connection slave;
include/stop_slave.inc
SET GLOBAL gtid_slave_pos= '';
connection master;
RESET MASTER;
connection slave;
include/start_slave.inc
connection master;
DROP TABLE t1;
include/rpl_end.inc

View File

@@ -0,0 +1,63 @@
--source include/have_binlog_format_mixed.inc
--source include/master-slave.inc
--echo *** Test the --binlog-directory variable with legacy binlog.
--connection slave
# Stop the slave while restarting master, just to not have to worry about
# any connect/re-connect tries. We are testing the location of the binlog,
# not the slave re-connect abilities.
--source include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=Slave_pos;
--connection master
--let $master_datadir= `SELECT @@datadir`
CREATE TABLE t1 (a INT PRIMARY KEY, b INT);
INSERT INTO t1 VALUES (1, 10);
# Test that the master can be restarted with binlogs in a separate
# directory specificed by --binlog-directory.
--let $rpl_server_number= 1
--source include/rpl_stop_server.inc
--mkdir $master_datadir/binlog_dir
--copy_file $master_datadir/master-bin.000001 $master_datadir/binlog_dir/master-bin.000001
--copy_file $master_datadir/master-bin.000001.idx $master_datadir/binlog_dir/master-bin.000001.idx
--let $rpl_server_parameters= --binlog-directory=binlog_dir
--source include/rpl_start_server.inc
INSERT INTO t1 VALUES (2, 11);
# Move master back to using the standard binlog directory.
--source include/rpl_stop_server.inc
--let $rpl_server_parameters=
--source include/rpl_start_server.inc
INSERT INTO t1 VALUES (3, 12);
--save_master_pos
--source include/show_binary_logs.inc
--echo *** Contents of master-bin.index (including directory path):
--cat_file $master_datadir/master-bin.index
SELECT * FROM t1 ORDER BY a;
--connection slave
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t1 ORDER BY a;
# Clean up.
--connection slave
--source include/stop_slave.inc
SET GLOBAL gtid_slave_pos= '';
--connection master
RESET MASTER;
--rmdir $master_datadir/binlog_dir
--connection slave
--source include/start_slave.inc
--connection master
DROP TABLE t1;
--source include/rpl_end.inc

View File

@@ -1568,7 +1568,9 @@ struct handlerton
Obtain the current position in the binlog.
Used to support legacy SHOW MASTER STATUS.
*/
void (*binlog_status)(char out_filename[FN_REFLEN], ulonglong *out_pos);
void (*binlog_status)(uint64_t * out_fileno, uint64_t *out_pos);
/* Get a binlog name from a file_no. */
void (*get_filename)(char name[FN_REFLEN], uint64_t file_no);
/* Obtain list of binlog files (SHOW BINARY LOGS). */
binlog_file_entry * (*get_binlog_file_list)(MEM_ROOT *mem_root);
/*
@@ -5932,8 +5934,6 @@ public:
support legacy SHOW BINLOG EVENTS.
*/
virtual int init_legacy_pos(const char *filename, ulonglong offset) = 0;
/* Get a binlog name from a file_no. */
virtual void get_filename(char name[FN_REFLEN], uint64_t file_no) = 0;
int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed);
};

View File

@@ -385,7 +385,10 @@ public:
trx_cache.set_binlog_cache_info(param_max_binlog_cache_size,
param_ptr_binlog_cache_use,
param_ptr_binlog_cache_disk_use);
last_commit_pos_file[0]= 0;
if (opt_binlog_engine_hton)
last_commit_pos_file.engine_file_no= ~(uint64_t)0;
else
last_commit_pos_file.legacy_name[0]= 0;
}
void reset(bool do_stmt, bool do_trx)
@@ -399,7 +402,10 @@ public:
{
trx_cache.reset();
using_xa= FALSE;
last_commit_pos_file[0]= 0;
if (opt_binlog_engine_hton)
last_commit_pos_file.engine_file_no= ~(uint64_t)0;
else
last_commit_pos_file.legacy_name[0]= 0;
last_commit_pos_offset= 0;
}
if (likely(opt_binlog_engine_hton) &&
@@ -438,9 +444,15 @@ public:
position corresponding to the snapshot taken. During (and after) commit,
this is set to the binlog position corresponding to just after the
commit (so storage engines can store it in their transaction log).
For the legacy binlog, we have to use the full filename, for binlog
implemented in engine we can just keep track of the file number.
*/
char last_commit_pos_file[FN_REFLEN];
my_off_t last_commit_pos_offset;
union {
uint64_t engine_file_no;
char legacy_name[FN_REFLEN];
} last_commit_pos_file;
uint64_t last_commit_pos_offset;
/* Context for engine-implemented binlogging. */
handler_binlog_event_group_info engine_binlog_info;
@@ -1895,7 +1907,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
extern "C"
void
binlog_get_cache(THD *thd, IO_CACHE **out_cache,
binlog_get_cache(THD *thd, uint64_t file_no, uint64_t offset,
IO_CACHE **out_cache,
handler_binlog_event_group_info **out_context,
const rpl_gtid **out_gtid)
{
@@ -1903,10 +1916,11 @@ binlog_get_cache(THD *thd, IO_CACHE **out_cache,
handler_binlog_event_group_info *context= nullptr;
binlog_cache_mngr *cache_mngr;
const rpl_gtid *gtid= nullptr;
if (likely(!opt_bootstrap /* ToDo needed? */) &&
opt_binlog_engine_hton &&
(cache_mngr= thd->binlog_get_cache_mngr()))
/* opt_binlog_engine_hton can be unset during bootstrap. */
if (opt_binlog_engine_hton && (cache_mngr= thd->binlog_get_cache_mngr()))
{
cache_mngr->last_commit_pos_file.engine_file_no= file_no;
cache_mngr->last_commit_pos_offset= offset;
context= &cache_mngr->engine_binlog_info;
cache= !cache_mngr->trx_cache.empty() ?
&cache_mngr->trx_cache.cache_log : &cache_mngr->stmt_cache.cache_log;
@@ -3655,6 +3669,12 @@ void MYSQL_BIN_LOG::cleanup()
inited= 0;
mysql_mutex_lock(&LOCK_log);
if (opt_binlog_engine_hton)
{
if (!is_relay_log)
close_engine();
}
else
close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
mysql_mutex_unlock(&LOCK_log);
delete description_event_for_queue;
@@ -4258,6 +4278,18 @@ err:
}
/*
Open the binlog implemented in a storage engine (--binlog-storage-engine). */
bool
MYSQL_BIN_LOG::open_engine(handlerton *hton, ulong max_size, const char *dir)
{
bool err= (*hton->binlog_init)((size_t)max_size, dir);
if (!err)
log_state= LOG_OPENED;
return err;
}
int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo)
{
mysql_mutex_lock(&LOCK_log);
@@ -6688,8 +6720,18 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
/* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
mysql_mutex_assert_owner(&LOCK_commit_ordered);
strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file);
if (opt_binlog_engine_hton)
{
(*opt_binlog_engine_hton->binlog_status)
(&cache_mngr->last_commit_pos_file.engine_file_no,
&cache_mngr->last_commit_pos_offset);
}
else
{
strmake_buf(cache_mngr->last_commit_pos_file.legacy_name,
mysql_bin_log.last_commit_pos_file);
cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
}
trans_register_ha(thd, TRUE, binlog_hton, 0);
@@ -7745,8 +7787,6 @@ err:
if ((*opt_binlog_engine_hton->binlog_write_direct)
(file, engine_context, thd->get_last_commit_gtid()))
goto engine_fail;
/* ToDo: Need to set last_commit_pos_offset here? */
/* ToDo: Maybe binlog_write_direct() could return the coords. */
mysql_mutex_unlock(&LOCK_commit_ordered);
update_binlog_end_pos();
@@ -8620,6 +8660,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
Incident incident= INCIDENT_LOST_EVENTS;
Incident_log_event ev(thd, incident, &write_error_msg);
DBUG_ASSERT(!opt_binlog_engine_hton);
if (likely(is_open()))
{
error= write_event(&ev);
@@ -8638,6 +8679,9 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
ulong prev_binlog_id;
DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
if (opt_binlog_engine_hton)
DBUG_RETURN(0);
mysql_mutex_lock(&LOCK_log);
if (likely(is_open()))
{
@@ -8700,6 +8744,7 @@ write_binlog_checkpoint_event_already_locked(const char *name_arg, uint len)
bool err;
Binlog_checkpoint_log_event ev(name_arg, len);
DBUG_ASSERT(!opt_binlog_engine_hton);
/*
Note that we must sync the binlog checkpoint to disk.
Otherwise a subsequent log purge could delete binlogs that XA recovery
@@ -9437,11 +9482,18 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
commit_id))))
current->commit_errno= errno;
strmake_buf(cache_mngr->last_commit_pos_file, log_file_name);
if (!opt_binlog_engine_hton)
{
strmake_buf(cache_mngr->last_commit_pos_file.legacy_name, log_file_name);
commit_offset= my_b_write_tell(&log_file);
cache_mngr->last_commit_pos_offset= commit_offset;
/*
When --binlog-storage-engine, the last_commit_pos is updated in
binlog_get_cache().
*/
update_gtid_index((uint32)commit_offset,
current->thd->get_last_commit_gtid());
cache_mngr->last_commit_pos_offset= commit_offset;
}
if ((cache_mngr->using_xa && cache_mngr->xa_xid) || current->need_unlog)
{
/*
@@ -9479,6 +9531,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
}
else
{
#ifdef HAVE_REPLICATION
if (unlikely(repl_semisync_master.get_master_enabled()))
{
DEBUG_SYNC(leader->thd, "commit_before_update_binlog_end_pos");
bool any_error= false;
@@ -9489,7 +9544,6 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
for (current= queue; current != NULL; current= current->next)
{
#ifdef HAVE_REPLICATION
/*
The thread which will await the ACK from the replica can change
depending on the wait-point. If AFTER_COMMIT, then the user thread
@@ -9500,12 +9554,24 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
? current->thd
: leader->thd;
char buf[FN_REFLEN];
const char *filename= buf;
/*
ToDo: When we want to support semi-sync in the --binlog-in-engine
case, we need to do this report_binlog_update() later, after
commit_ordered() has been called, as that is where each
transaction gets written to the binlog and where the binlog
position gets put into the cache_mngr.
*/
if (opt_binlog_engine_hton)
(*opt_binlog_engine_hton->get_filename)
(buf, current->cache_mngr->last_commit_pos_file.engine_file_no);
else
filename= current->cache_mngr->last_commit_pos_file.legacy_name;
if (likely(!current->error) &&
unlikely(repl_semisync_master.
report_binlog_update(current->thd, waiter_thd,
current->cache_mngr->
last_commit_pos_file,
current->cache_mngr->
report_binlog_update(current->thd, waiter_thd, filename,
(my_off_t)current->cache_mngr->
last_commit_pos_offset)))
{
current->error= ER_ERROR_ON_WRITE;
@@ -9513,9 +9579,13 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
current->error_cache= NULL;
any_error= true;
}
#endif
}
if (unlikely(any_error))
sql_print_error("Failed to run 'after_flush' hooks");
}
#endif
/*
update binlog_end_pos so it can be read by dump thread
Note: must be _after_ the RUN_HOOK(after_flush) or else
@@ -9527,11 +9597,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
if (!opt_binlog_engine_hton)
update_binlog_end_pos(commit_offset);
if (unlikely(any_error))
sql_print_error("Failed to run 'after_flush' hooks");
}
if (!opt_binlog_engine_hton)
{
/*
If any commit_events are Xid_log_event, increase the number of pending
XIDs in current binlog (it's decreased in ::unlog()). When the count in
@@ -9565,6 +9634,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
/* In case of binlog rotate, update the correct current binlog offset. */
commit_offset= my_b_write_tell(&log_file);
}
}
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_after_binlog_sync");
mysql_mutex_lock(&LOCK_after_binlog_sync);
@@ -9578,9 +9648,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
#ifdef HAVE_REPLICATION
/*
Loop through threads and run the binlog_sync hook
AFTER_SYNC is not available for --binlog-in-engine, as there we avoid the
costly two-phase commit between binlog and engine.
*/
if (!opt_binlog_engine_hton &&
unlikely(repl_semisync_master.get_master_enabled()))
{
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
mysql_mutex_assert_not_owner(&LOCK_log);
@@ -9592,17 +9667,16 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
for (current= queue; current != NULL; current= current->next)
{
last= current->next == NULL;
#ifdef HAVE_REPLICATION
if (likely(!current->error))
current->error=
repl_semisync_master.wait_after_sync(current->cache_mngr->
last_commit_pos_file,
current->cache_mngr->
last_commit_pos_file.legacy_name,
(my_off_t)current->cache_mngr->
last_commit_pos_offset);
#endif
first= false;
}
}
#endif
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
@@ -9611,6 +9685,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
DBUG_SUICIDE();
});
if (!opt_binlog_engine_hton)
last_commit_pos_offset= commit_offset;
/*
@@ -9687,8 +9762,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
if (opt_binlog_engine_hton)
update_binlog_end_pos();
if (check_purge)
else if (check_purge)
checkpoint_and_purge(binlog_id);
DBUG_VOID_RETURN;
@@ -10152,6 +10226,13 @@ void MYSQL_BIN_LOG::close(uint exiting)
}
void
MYSQL_BIN_LOG::close_engine()
{
log_state= LOG_CLOSED;
}
/*
Clear the LOG_EVENT_BINLOG_IN_USE_F; this marks the binlog file as cleanly
closed and not needing crash recovery.
@@ -11439,13 +11520,25 @@ int TC_LOG_BINLOG::open(const char *opt_name)
DBUG_ASSERT(opt_name);
DBUG_ASSERT(opt_name[0]);
if (!my_b_inited(&index_file))
if (!opt_binlog_engine_hton && !my_b_inited(&index_file))
{
/* There was a failure to open the index file, can't open the binlog */
cleanup();
DBUG_RETURN(1);
}
if (opt_binlog_engine_hton)
{
/*
ToDo: Eventually, we will want to recover from the in-engine binlog,
for cross-engine transactions (or just transactions in a different
XA-capable engine that the engine used in --binlog-storage-engine.
For now, just skip recovery when --binlog-storage-engine.
*/
binlog_state_recover_done= true;
DBUG_RETURN(0);
}
if (using_heuristic_recover())
{
mysql_mutex_lock(&LOCK_log);
@@ -13131,10 +13224,11 @@ void
mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file)
{
binlog_cache_mngr *cache_mngr;
if (opt_bin_log &&
//DBUG_ASSERT(!opt_binlog_engine_hton);
if (likely(opt_bin_log) && likely(!opt_binlog_engine_hton) &&
(cache_mngr= thd->binlog_get_cache_mngr()))
{
*out_file= cache_mngr->last_commit_pos_file;
*out_file= cache_mngr->last_commit_pos_file.legacy_name;
*out_pos= (ulonglong)(cache_mngr->last_commit_pos_offset);
}
else
@@ -13256,12 +13350,31 @@ TC_LOG_BINLOG::set_status_variables(THD *thd)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
auto cache_mngr= thd->binlog_get_cache_mngr();
have_snapshot= cache_mngr && cache_mngr->last_commit_pos_file[0];
if (cache_mngr)
{
if (opt_binlog_engine_hton)
{
have_snapshot=
cache_mngr->last_commit_pos_file.engine_file_no != ~(uint64_t)0;
if (have_snapshot)
{
set_binlog_snapshot_file(cache_mngr->last_commit_pos_file);
char buf[FN_REFLEN];
uint64_t file_no= cache_mngr->last_commit_pos_file.engine_file_no;
(*opt_binlog_engine_hton->get_filename)(buf, file_no);
set_binlog_snapshot_file(buf);
binlog_snapshot_position= cache_mngr->last_commit_pos_offset;
}
}
else
{
have_snapshot= cache_mngr->last_commit_pos_file.legacy_name[0] != '\0';
if (have_snapshot)
{
set_binlog_snapshot_file(cache_mngr->last_commit_pos_file.legacy_name);
binlog_snapshot_position= cache_mngr->last_commit_pos_offset;
}
}
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
@@ -13269,10 +13382,23 @@ TC_LOG_BINLOG::set_status_variables(THD *thd)
binlog_status_var_num_commits= this->num_commits;
binlog_status_var_num_group_commits= this->num_group_commits;
if (!have_snapshot)
{
if (opt_binlog_engine_hton)
{
char buf[FN_REFLEN];
uint64_t file_no;
uint64_t offset;
(*opt_binlog_engine_hton->binlog_status)(&file_no, &offset);
(*opt_binlog_engine_hton->get_filename)(buf, file_no);
set_binlog_snapshot_file(buf);
binlog_snapshot_position= (ulonglong)offset;
}
else
{
set_binlog_snapshot_file(last_commit_pos_file);
binlog_snapshot_position= last_commit_pos_offset;
}
}
mysql_mutex_unlock(&LOCK_commit_ordered);
mysql_mutex_lock(&LOCK_prepare_ordered);
binlog_status_group_commit_trigger_count= this->group_commit_trigger_count;

View File

@@ -1003,6 +1003,7 @@ public:
ulong max_size,
bool null_created,
bool need_mutex);
bool open_engine(handlerton *hton, ulong max_size, const char *dir);
bool open_index_file(const char *index_file_name_arg,
const char *log_name, bool need_mutex);
/* Use this to start writing a new log file */
@@ -1109,6 +1110,7 @@ public:
uint32 init_state_len);
void wait_for_last_checkpoint_event();
void close(uint exiting);
void close_engine();
void clear_inuse_flag_when_closing(File file);
// iterating through the log index file

View File

@@ -770,6 +770,7 @@ mysql_rwlock_t LOCK_all_status_vars;
mysql_prlock_t LOCK_system_variables_hash;
mysql_cond_t COND_start_thread;
pthread_t signal_thread;
bool signal_thread_needs_join= false;
pthread_attr_t connection_attrib;
mysql_mutex_t LOCK_server_started;
mysql_cond_t COND_server_started;
@@ -2121,7 +2122,11 @@ static void wait_for_signal_thread_to_end()
{
sql_print_warning("Signal handler thread did not exit in a timely manner. "
"Continuing to wait for it to stop..");
}
if (signal_thread_needs_join)
{
pthread_join(signal_thread, NULL);
signal_thread_needs_join= false;
}
#endif
}
@@ -3220,6 +3225,7 @@ static void start_signal_handler(void)
error,errno);
exit(1);
}
signal_thread_needs_join= true;
mysql_cond_wait(&COND_start_thread, &LOCK_start_thread);
mysql_mutex_unlock(&LOCK_start_thread);
@@ -4868,6 +4874,8 @@ static int adjust_optimizer_costs(const LEX_CSTRING *, OPTIMIZER_COSTS *oc, TABL
static int init_server_components()
{
DBUG_ENTER("init_server_components");
bool binlog_engine_used= false;
/*
We need to call each of these following functions to ensure that
all things are initialized so that unireg_abort() doesn't fail
@@ -5040,16 +5048,22 @@ static int init_server_components()
if (opt_bin_log)
{
if (opt_binlog_storage_engine && *opt_binlog_storage_engine)
binlog_engine_used= true;
/* Reports an error and aborts, if the --log-bin's path
is a directory.*/
if (opt_bin_logname[0] &&
opt_bin_logname[strlen(opt_bin_logname) - 1] == FN_LIBCHAR)
{
sql_print_error("Path '%s' is a directory name, please specify "
"a file name for --log-bin option", opt_bin_logname);
"a file name for --log-bin option, or use "
"--binlog-directory", opt_bin_logname);
unireg_abort(1);
}
if (!binlog_engine_used)
{
/* Reports an error and aborts, if the --log-bin-index's path
is a directory.*/
if (opt_binlog_index_name &&
@@ -5061,12 +5075,28 @@ static int init_server_components()
opt_binlog_index_name);
unireg_abort(1);
}
}
char buf[FN_REFLEN];
char buf[FN_REFLEN], buf2[FN_REFLEN];
const char *ln;
/* ToDo: Here we also need to add in opt_binlog_directory, if given. */
ln= mysql_bin_log.generate_name(opt_bin_logname, "-bin", 1, buf);
if (!opt_bin_logname[0] && !opt_binlog_index_name)
/* Add in opt_binlog_directory, if given. */
if (opt_binlog_directory && opt_binlog_directory[0])
{
if (strlen(opt_binlog_directory) + 1 + strlen(ln) + 1 > FN_REFLEN)
{
sql_print_error("The combination of --binlog-directory path '%s' with "
"filename '%s' from --log-bin results in a too long "
"path", opt_binlog_directory, ln);
unireg_abort(1);
}
const char *end= &buf2[FN_REFLEN-1];
char *p= strmake(buf2, opt_binlog_directory, FN_REFLEN - 2);
*p++= FN_LIBCHAR;
strmake(p, ln, end - p - 1);
ln= buf2;
}
if (!binlog_engine_used && !opt_bin_logname[0] && !opt_binlog_index_name)
{
/*
User didn't give us info to name the binlog index file.
@@ -5085,6 +5115,8 @@ static int init_server_components()
}
if (ln == buf)
opt_bin_logname= my_once_strdup(buf, MYF(MY_WME));
else if (ln == buf2)
opt_bin_logname= my_once_strdup(buf2, MYF(MY_WME));
}
/*
@@ -5149,7 +5181,7 @@ static int init_server_components()
}
#endif /* WITH_WSREP */
if (!opt_help && opt_bin_log)
if (!opt_help && !binlog_engine_used && opt_bin_log)
{
if (mysql_bin_log.open_index_file(opt_binlog_index_name, opt_bin_logname,
TRUE))
@@ -5471,7 +5503,7 @@ static int init_server_components()
unireg_abort(1);
}
if (opt_binlog_storage_engine && *opt_binlog_storage_engine && !opt_bootstrap)
if (binlog_engine_used)
{
LEX_CSTRING name= { opt_binlog_storage_engine, strlen(opt_binlog_storage_engine) };
opt_binlog_engine_plugin= ha_resolve_by_name(0, &name, false);
@@ -5503,6 +5535,14 @@ static int init_server_components()
"to specify a separate directory for binlogs");
unireg_abort(1);
}
#ifdef HAVE_REPLICATION
if (rpl_semi_sync_master_enabled)
{
sql_print_error("Semi-synchronous replication is not yet supported "
"with --binlog-storage-engine");
unireg_abort(1);
}
#endif
}
#ifdef USE_ARIA_FOR_TMP_TABLES
@@ -5559,25 +5599,21 @@ static int init_server_components()
{
mysql_mutex_t *log_lock= mysql_bin_log.get_log_lock();
bool error;
mysql_mutex_lock(log_lock);
if (opt_binlog_engine_hton)
{
mysql_mutex_lock(log_lock);
error= (*opt_binlog_engine_hton->binlog_init)((size_t)max_binlog_size,
error= mysql_bin_log.open_engine(opt_binlog_engine_hton, max_binlog_size,
opt_binlog_directory);
mysql_mutex_unlock(log_lock);
if (unlikely(error))
unireg_abort(1);
}
if (true) /* ToDo: `else` branch (don't open legacy binlog if using engine implementation). */
else
{
mysql_mutex_lock(log_lock);
error= mysql_bin_log.open(opt_bin_logname, 0, 0,
WRITE_CACHE, max_binlog_size, 0, TRUE);
}
mysql_mutex_unlock(log_lock);
if (unlikely(error))
unireg_abort(1);
}
}
#ifdef HAVE_REPLICATION
binlog_space_limit= internal_binlog_space_limit;
@@ -5585,11 +5621,14 @@ static int init_server_components()
internal_slave_connections_needed_for_purge;
if (opt_bin_log)
{
if (!opt_binlog_engine_hton)
{
if (binlog_space_limit)
mysql_bin_log.count_binlog_space_with_mutex();
mysql_bin_log.purge(1);
}
}
else
{
if ((binlog_expire_logs_seconds || binlog_space_limit) &&

View File

@@ -12304,3 +12304,7 @@ ER_BINLOG_CANNOT_READ_STATE
eng "Error reading GTID state from the binlog"
ER_BINLOG_POS_INVALID
eng "The binlog offset %llu is invalid"
ER_ENGINE_BINLOG_NO_SEMISYNC
eng "Semi-synchronous replication is not yet supported with --binlog-storage-engine"
ER_ENGINE_BINLOG_NO_USER_XA
eng "Explicit XA transactions are not yet supported with --binlog-storage-engine"

View File

@@ -4865,7 +4865,7 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi)
String packet;
Format_description_log_event fd(4);
char name_buf[FN_REFLEN];
reader->get_filename(name_buf, file_no);
opt_binlog_engine_hton->get_filename(name_buf, file_no);
for (ha_rows event_count= 0;
reader->cur_file_no == file_no && event_count < limit;
@@ -5256,12 +5256,14 @@ bool show_binlog_info(THD* thd)
if (mysql_bin_log.is_open())
{
const char *base;
ulonglong pos;
uint64_t pos;
if (opt_binlog_engine_hton)
{
char buf[FN_REFLEN];
uint64_t file_no;
mysql_mutex_lock(mysql_bin_log.get_log_lock());
(*opt_binlog_engine_hton->binlog_status)(buf, &pos);
(*opt_binlog_engine_hton->binlog_status)(&file_no, &pos);
(*opt_binlog_engine_hton->get_filename)(buf, file_no);
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
base= buf;
}
@@ -5269,13 +5271,13 @@ bool show_binlog_info(THD* thd)
{
LOG_INFO li;
mysql_bin_log.get_current_log(&li);
pos= (ulonglong) li.pos;
pos= (uint64_t) li.pos;
size_t dir_len = dirname_length(li.log_file_name);
base= li.log_file_name + dir_len;
}
protocol->store(base, strlen(base), &my_charset_bin);
protocol->store(pos);
protocol->store((ulonglong)pos);
protocol->store(binlog_filter->get_do_db());
protocol->store(binlog_filter->get_ignore_db());
if (protocol->write())

View File

@@ -3704,6 +3704,17 @@ Replicate_events_marked_for_skip
/* new options for semisync */
static bool
check_rpl_semi_sync_master_enabled(sys_var *self, THD *thd, set_var *var)
{
if (opt_binlog_engine_hton && var->save_result.ulonglong_value)
{
my_error(ER_ENGINE_BINLOG_NO_SEMISYNC, MYF(0));
return true;
}
return false;
}
static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd,
enum_var_type type)
{
@@ -3758,7 +3769,8 @@ Sys_semisync_master_enabled(
"Enable semi-synchronous replication master (disabled by default).",
GLOBAL_VAR(rpl_semi_sync_master_enabled),
CMD_LINE(OPT_ARG), DEFAULT(FALSE),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
NO_MUTEX_GUARD, NOT_IN_BINLOG,
ON_CHECK(check_rpl_semi_sync_master_enabled),
ON_UPDATE(fix_rpl_semi_sync_master_enabled));
static Sys_var_on_access_global<Sys_var_ulong,

View File

@@ -442,6 +442,12 @@ bool trans_xa_start(THD *thd)
{
DBUG_ENTER("trans_xa_start");
if (opt_binlog_engine_hton)
{
my_error(ER_ENGINE_BINLOG_NO_USER_XA, MYF(0));
DBUG_RETURN(TRUE);
}
if (thd->transaction->xid_state.is_explicit_XA() &&
thd->transaction->xid_state.xid_cache_element->xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_RESUME)

View File

@@ -523,17 +523,12 @@ fsp_binlog_page_fifo::release_tablespace(uint64_t file_no)
mysql_mutex_unlock(&m_mutex);
int res= my_sync(fh, MYF(MY_WME));
ut_a(!res);
my_close(fifos[file_no & 1].fh, MYF(0));
mysql_mutex_lock(&m_mutex);
free_page_list(&fifos[file_no & 1]);
flushing= false;
pthread_cond_broadcast(&m_cond);
}
first_file_no= file_no + 1;
fifos[file_no & 1].first_page= nullptr;
fifos[file_no & 1].first_page_no= 0;
fifos[file_no & 1].size_in_pages= 0;
fifos[file_no & 1].fh= (File)-1;
mysql_mutex_unlock(&m_mutex);
}
@@ -546,9 +541,24 @@ fsp_binlog_page_fifo::fsp_binlog_page_fifo()
fifos[1]= {nullptr, 0, 0, (File)-1 };
mysql_mutex_init(fsp_page_fifo_mutex_key, &m_mutex, nullptr);
pthread_cond_init(&m_cond, nullptr);
}
void
fsp_binlog_page_fifo::free_page_list(page_list *pl)
{
if (pl->fh != (File)-1)
my_close(pl->fh, MYF(0));
fsp_binlog_page_entry *e= pl->first_page;
while (e)
{
fsp_binlog_page_entry *next= e->next;
aligned_free(e->page_buf);
ut_free(e);
e= next;
}
*pl= {nullptr, 0, 0, (File)-1 };
// ToDo I think I need to read the first page here, or somewhere?
// Normally I'd never want to read a page into the page fifo, but at startup, I seem to need to do so for the first page I start writing on. Though I suppose I already read that, so maybe just a way to add that page into the FIFO in the constructor?
}
@@ -557,19 +567,7 @@ fsp_binlog_page_fifo::reset()
{
ut_ad(!flushing);
for (uint32_t i= 0; i < 2; ++i)
{
if (fifos[i].fh != (File)-1)
my_close(fifos[i].fh, MYF(0));
fsp_binlog_page_entry *e= fifos[i].first_page;
while (e)
{
fsp_binlog_page_entry *next= e->next;
aligned_free(e->page_buf);
ut_free(e);
e= next;
}
fifos[i]= {nullptr, 0, 0, (File)-1 };
}
free_page_list(&fifos[i]);
first_file_no= ~(uint64_t)0;
}

View File

@@ -4135,6 +4135,7 @@ static int innodb_init(void* p)
innobase_hton->binlog_oob_free= innodb_free_oob;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader;
innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list;
innobase_hton->get_filename= ibb_get_filename;
innobase_hton->binlog_status= innodb_binlog_status;
innobase_hton->binlog_flush= innodb_binlog_flush;
innobase_hton->binlog_get_init_state= innodb_binlog_get_init_state;

View File

@@ -244,7 +244,6 @@ public:
virtual int init_gtid_pos(slave_connection_state *pos,
rpl_binlog_state_base *state) final;
virtual int init_legacy_pos(const char *filename, ulonglong offset) final;
virtual void get_filename(char name[FN_REFLEN], uint64_t file_no) final;
};
@@ -2953,7 +2952,7 @@ ha_innodb_binlog_reader::init_legacy_pos(const char *filename, ulonglong offset)
void
ha_innodb_binlog_reader::get_filename(char name[FN_REFLEN], uint64_t file_no)
ibb_get_filename(char name[FN_REFLEN], uint64_t file_no)
{
static_assert(BINLOG_NAME_MAX_LEN <= FN_REFLEN,
"FN_REFLEN too shot to hold InnoDB binlog name");
@@ -2961,7 +2960,7 @@ ha_innodb_binlog_reader::get_filename(char name[FN_REFLEN], uint64_t file_no)
}
extern "C" void binlog_get_cache(THD *, IO_CACHE **,
extern "C" void binlog_get_cache(THD *, uint64_t, uint64_t, IO_CACHE **,
handler_binlog_event_group_info **,
const rpl_gtid **);
@@ -2971,10 +2970,12 @@ innodb_binlog_trx(trx_t *trx, mtr_t *mtr)
IO_CACHE *cache;
handler_binlog_event_group_info *binlog_info;
const rpl_gtid *gtid;
uint64_t file_no, pos;
if (!trx->mysql_thd)
return;
binlog_get_cache(trx->mysql_thd, &cache, &binlog_info, &gtid);
innodb_binlog_status(&file_no, &pos);
binlog_get_cache(trx->mysql_thd, file_no, pos, &cache, &binlog_info, &gtid);
if (UNIV_LIKELY(binlog_info != nullptr) &&
UNIV_LIKELY(binlog_info->gtid_offset > 0)) {
binlog_diff_state.update_nolock(gtid);
@@ -3019,15 +3020,15 @@ innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last)
void
innodb_binlog_status(char out_filename[FN_REFLEN], ulonglong *out_pos)
innodb_binlog_status(uint64_t *out_file_no, uint64_t *out_pos)
{
static_assert(BINLOG_NAME_MAX_LEN <= FN_REFLEN,
"FN_REFLEN too shot to hold InnoDB binlog name");
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
uint32_t page_no= binlog_cur_page_no;
uint32_t in_page_offset= binlog_cur_page_offset;
binlog_name_make_short(out_filename, file_no);
*out_pos= ((ulonglong)page_no << ibb_page_size_shift) | in_page_offset;
*out_file_no= file_no;
*out_pos= ((uint64_t)page_no << ibb_page_size_shift) | in_page_offset;
}

View File

@@ -184,6 +184,7 @@ public:
uint32_t init_page= ~(uint32_t)0,
byte *partial_page= nullptr);
void release_tablespace(uint64_t file_no);
void free_page_list(page_list *pl);
fsp_binlog_page_entry *create_page(uint64_t file_no, uint32_t page_no);
fsp_binlog_page_entry *get_page(uint64_t file_no, uint32_t page_no);
void release_page(fsp_binlog_page_entry *page);

View File

@@ -171,13 +171,13 @@ extern bool innodb_binlog_oob(THD *thd, const unsigned char *data,
size_t data_len, void **engine_data);
extern void innodb_free_oob(THD *thd, void *engine_data);
extern handler_binlog_reader *innodb_get_binlog_reader();
extern void ibb_get_filename(char name[FN_REFLEN], uint64_t file_no);
extern void innodb_binlog_trx(trx_t *trx, mtr_t *mtr);
extern bool innobase_binlog_write_direct
(IO_CACHE *cache, handler_binlog_event_group_info *binlog_info,
const rpl_gtid *gtid);
extern bool innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last);
extern void innodb_binlog_status(char out_filename[FN_REFLEN],
ulonglong *out_pos);
extern void innodb_binlog_status(uint64_t *out_file_no, uint64_t *out_pos);
extern bool innodb_binlog_get_init_state(rpl_binlog_state_base *out_state);
extern bool innodb_reset_binlogs();
extern int innodb_binlog_purge(handler_binlog_purge_info *purge_info);