1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34705: Binlog in Engine: Also binlog standalone (eg. DDL) in the engine

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2024-08-09 18:01:12 +02:00
parent 070662e5ac
commit 094c772213
7 changed files with 203 additions and 90 deletions

View File

@@ -1530,8 +1530,8 @@ struct handlerton
void *optimizer_costs; /* Costs are stored here */ void *optimizer_costs; /* Costs are stored here */
/* Optional implementation of binlog in the engine. */ /* Optional implementation of binlog in the engine. */
bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size);
handler_binlog_reader * (*get_binlog_reader)(); handler_binlog_reader * (*get_binlog_reader)();
int (*binlog_data)(uchar *data, size_t len);
/* /*
Optional clauses in the CREATE/ALTER TABLE Optional clauses in the CREATE/ALTER TABLE

View File

@@ -6893,6 +6893,7 @@ Event_log::prepare_pending_rows_event(THD *thd, TABLE* table,
bool bool
MYSQL_BIN_LOG::write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone, MYSQL_BIN_LOG::write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone,
bool direct_write,
bool is_transactional, uint64 commit_id, bool is_transactional, uint64 commit_id,
bool has_xid, bool is_ro_1pc) bool has_xid, bool is_ro_1pc)
{ {
@@ -6950,6 +6951,10 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone,
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional, LOG_EVENT_SUPPRESS_USE_F, is_transactional,
commit_id, has_xid, is_ro_1pc); commit_id, has_xid, is_ro_1pc);
/* ToDo: Something better than this hack conditional. At least pass direct_write into the Gtid event contructor or something? */
if (!direct_write)
gtid_event.cache_type= is_transactional ?
Log_event::EVENT_TRANSACTIONAL_CACHE : Log_event::EVENT_STMT_CACHE;
/* Write the event to the binary log. */ /* Write the event to the binary log. */
DBUG_ASSERT(this == &mysql_bin_log); DBUG_ASSERT(this == &mysql_bin_log);
@@ -7164,7 +7169,9 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
bool is_trans_cache= FALSE; bool is_trans_cache= FALSE;
bool using_trans= event_info->use_trans_cache(); bool using_trans= event_info->use_trans_cache();
bool direct= event_info->use_direct_logging(); bool direct= event_info->use_direct_logging();
bool events_direct;
ulong UNINIT_VAR(prev_binlog_id); ulong UNINIT_VAR(prev_binlog_id);
uint64 UNINIT_VAR(commit_id);
DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
/* /*
@@ -7251,35 +7258,18 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
DBUG_RETURN(0); DBUG_RETURN(0);
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
binlog_cache_mngr * cache_mngr= NULL;
IO_CACHE *file= NULL; IO_CACHE *file= NULL;
events_direct= direct;
if (direct) if (direct)
{ {
/* We come here only for incident events */ /* Write the event to the binlog immediately. */
int res; int res;
uint64 commit_id= 0;
MDL_request mdl_request;
DBUG_PRINT("info", ("direct is set")); DBUG_PRINT("info", ("direct is set"));
DBUG_ASSERT(!thd->backup_commit_lock); DBUG_ASSERT(!thd->backup_commit_lock);
commit_id= 0;
MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
MDL_EXPLICIT);
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
DBUG_RETURN(1);
thd->backup_commit_lock= &mdl_request;
if ((res= thd->wait_for_prior_commit()))
{
if (mdl_request.ticket)
thd->mdl_context.release_lock(mdl_request.ticket);
thd->backup_commit_lock= 0;
DBUG_RETURN(res);
}
file= &log_file;
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
DBUG_EXECUTE_IF("binlog_force_commit_id", DBUG_EXECUTE_IF("binlog_force_commit_id",
{ {
const LEX_CSTRING commit_name= { STRING_WITH_LEN("commit_id") }; const LEX_CSTRING commit_name= { STRING_WITH_LEN("commit_id") };
@@ -7290,17 +7280,55 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
commit_name.length); commit_name.length);
commit_id= entry->val_int(&null_value); commit_id= entry->val_int(&null_value);
}); });
res= write_gtid_event(thd, &log_file, true, using_trans, commit_id);
if (mdl_request.ticket) if (opt_binlog_engine_hton)
thd->mdl_context.release_lock(mdl_request.ticket); {
thd->backup_commit_lock= 0; events_direct= false;
if (res) if (!(cache_mngr= thd->binlog_setup_trx_data()))
goto err; DBUG_RETURN(1);
cache_data= cache_mngr->get_binlog_cache_data(false);
DBUG_ASSERT(cache_data->empty());
file= &cache_data->cache_log;
/* Set cache_type to ensure we don't get checksums for this event */
event_info->cache_type= Log_event::EVENT_STMT_CACHE;
/* ToDo: Can we do some refactoring to this huge MYSQL_LOG_BIN::write() function, to split out common pieces of functionality in sub-functions, and reduce code duplication in this opt_binlog_engine_hton branch? */
}
else
{
MDL_request mdl_request;
MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
MDL_EXPLICIT);
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
DBUG_RETURN(1);
thd->backup_commit_lock= &mdl_request;
if ((res= thd->wait_for_prior_commit()))
{
if (mdl_request.ticket)
thd->mdl_context.release_lock(mdl_request.ticket);
thd->backup_commit_lock= 0;
DBUG_RETURN(res);
}
file= &log_file;
/* ToDo: Isn't this a race bug? Seems we can get a stale my_org_b_tell here since we're taking it outside of the mutex, which could leave to double accounting another binlog write. */
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
res= write_gtid_event(thd, &log_file, true, true, using_trans, commit_id);
if (mdl_request.ticket)
thd->mdl_context.release_lock(mdl_request.ticket);
thd->backup_commit_lock= 0;
if (res)
goto err;
}
} }
else else
{ {
binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); /* Write the event to the stmt or trx cache, and binlog it later. */
if (!cache_mngr) if (!(cache_mngr= thd->binlog_setup_trx_data()))
goto err; goto err;
is_trans_cache= use_trans_cache(thd, using_trans); is_trans_cache= use_trans_cache(thd, using_trans);
@@ -7325,7 +7353,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if (with_annotate && *with_annotate) if (with_annotate && *with_annotate)
{ {
DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT); DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT);
Annotate_rows_log_event anno(thd, using_trans, direct); Annotate_rows_log_event anno(thd, using_trans, events_direct);
/* Annotate event should be written not more than once */ /* Annotate event should be written not more than once */
*with_annotate= 0; *with_annotate= 0;
if (write_event(&anno, cache_data, file)) if (write_event(&anno, cache_data, file))
@@ -7339,7 +7367,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{ {
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog, thd->first_successful_insert_id_in_prev_stmt_for_binlog,
using_trans, direct); using_trans, events_direct);
if (write_event(&e, cache_data, file)) if (write_event(&e, cache_data, file))
goto err; goto err;
} }
@@ -7350,14 +7378,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
nb_elements())); nb_elements()));
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT, Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
thd->auto_inc_intervals_in_cur_stmt_for_binlog. thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum(), using_trans, direct); minimum(), using_trans, events_direct);
if (write_event(&e, cache_data, file)) if (write_event(&e, cache_data, file))
goto err; goto err;
} }
if (thd->used & THD::RAND_USED) if (thd->used & THD::RAND_USED)
{ {
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2, Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
using_trans, direct); using_trans, events_direct);
if (write_event(&e, cache_data, file)) if (write_event(&e, cache_data, file))
goto err; goto err;
} }
@@ -7375,7 +7403,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
user_var_event->th->user_var_log_event_data_type( user_var_event->th->user_var_log_event_data_type(
user_var_event->charset_number), user_var_event->charset_number),
using_trans, using_trans,
direct); events_direct);
if (write_event(&e, cache_data, file)) if (write_event(&e, cache_data, file))
goto err; goto err;
} }
@@ -7394,79 +7422,144 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
err: err:
if (direct) if (direct)
{ {
my_off_t offset= my_b_tell(file);
bool check_purge= false;
DBUG_ASSERT(!is_relay_log); DBUG_ASSERT(!is_relay_log);
if (opt_binlog_engine_hton)
if (likely(!error))
{ {
bool synced; my_off_t binlog_main_bytes= my_b_write_tell(file);
my_off_t binlog_total_bytes;
MDL_request mdl_request;
int res;
MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
MDL_EXPLICIT);
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
goto engine_fail;
thd->backup_commit_lock= &mdl_request;
update_gtid_index((uint32)offset, thd->get_last_commit_gtid()); if (thd->wait_for_prior_commit())
if ((error= flush_and_sync(&synced)))
{ {
if (mdl_request.ticket)
thd->mdl_context.release_lock(mdl_request.ticket);
thd->backup_commit_lock= 0;
goto engine_fail;
} }
else mysql_mutex_lock(&LOCK_log);
res= write_gtid_event(thd, file, true, false, using_trans, commit_id);
if (mdl_request.ticket)
thd->mdl_context.release_lock(mdl_request.ticket);
thd->backup_commit_lock= 0;
if (res)
goto engine_fail;
binlog_total_bytes= my_b_bytes_in_cache(file);
/*
Engine-in-binlog does not support the after-sync method.
This is for consistency with the binlogging of transactions in the
engine, which commit atomically at the same time in binlog and engine.
In any case, for non-transactional event group (eg. DDL), the
after-sync and after-commit semisync methods are mostly the same; the
change has already become visible to other connections on the master
when it is binlogged.
ToDo: If semi-sync is enabled, obtain the binlog coords from the
engine to be waited for later at after-commit.
ToDo2: Do we still need this chainining of mutexes?
*/
mysql_mutex_lock(&LOCK_after_binlog_sync);
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_after_binlog_sync);
if ((*opt_binlog_engine_hton->binlog_write_direct)(file, binlog_main_bytes))
goto engine_fail;
/* ToDo: Need to set last_commit_pos_offset here? */
/* ToDo: Maybe binlog_write_direct() could return the coords. */
mysql_mutex_unlock(&LOCK_commit_ordered);
status_var_add(thd->status_var.binlog_bytes_written, binlog_total_bytes);
goto engine_ok;
engine_fail:
error= 1;
engine_ok:
cache_mngr->reset(true, false);
}
else
{
my_off_t offset= my_b_tell(file);
bool check_purge= false;
if (likely(!error))
{ {
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); bool synced;
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); update_gtid_index((uint32)offset, thd->get_last_commit_gtid());
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
#ifdef HAVE_REPLICATION if ((error= flush_and_sync(&synced)))
if (repl_semisync_master.report_binlog_update(thd, thd,
log_file_name, offset))
{ {
sql_print_error("Failed to run 'after_flush' hooks");
error= 1;
} }
else else
#endif
{ {
/* mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
update binlog_end_pos so it can be read by dump thread mysql_mutex_assert_owner(&LOCK_log);
note: must be _after_ the RUN_HOOK(after_flush) or else mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
semi-sync might not have put the transaction into mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
it's list before dump-thread tries to send it #ifdef HAVE_REPLICATION
*/ if (repl_semisync_master.report_binlog_update(thd, thd,
update_binlog_end_pos(offset); log_file_name, offset))
if (unlikely((error= rotate(false, &check_purge)))) {
check_purge= false; sql_print_error("Failed to run 'after_flush' hooks");
error= 1;
}
else
#endif
{
/*
update binlog_end_pos so it can be read by dump thread
note: must be _after_ the RUN_HOOK(after_flush) or else
semi-sync might not have put the transaction into
it's list before dump-thread tries to send it
*/
update_binlog_end_pos(offset);
if (unlikely((error= rotate(false, &check_purge))))
check_purge= false;
}
} }
} }
}
status_var_add(thd->status_var.binlog_bytes_written, status_var_add(thd->status_var.binlog_bytes_written,
offset - my_org_b_tell); offset - my_org_b_tell);
mysql_mutex_lock(&LOCK_after_binlog_sync); mysql_mutex_lock(&LOCK_after_binlog_sync);
mysql_mutex_unlock(&LOCK_log); mysql_mutex_unlock(&LOCK_log);
DEBUG_SYNC(thd, "commit_after_release_LOCK_log"); DEBUG_SYNC(thd, "commit_after_release_LOCK_log");
mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
mysql_mutex_assert_not_owner(&LOCK_log); mysql_mutex_assert_not_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered); mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (repl_semisync_master.wait_after_sync(log_file_name, offset)) if (repl_semisync_master.wait_after_sync(log_file_name, offset))
{ {
error=1; error=1;
/* error is already printed inside hook */ /* error is already printed inside hook */
} }
#endif #endif
/* /*
Take mutex to protect against a reader seeing partial writes of 64-bit Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs. offset on 32-bit CPUs.
*/ */
mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_lock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_after_binlog_sync); mysql_mutex_unlock(&LOCK_after_binlog_sync);
last_commit_pos_offset= offset; last_commit_pos_offset= offset;
mysql_mutex_unlock(&LOCK_commit_ordered); mysql_mutex_unlock(&LOCK_commit_ordered);
if (check_purge) if (check_purge)
checkpoint_and_purge(prev_binlog_id); checkpoint_and_purge(prev_binlog_id);
}
} }
if (unlikely(error)) if (unlikely(error))
@@ -9254,6 +9347,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
if (!opt_binlog_engine_hton) if (!opt_binlog_engine_hton)
{ {
if (write_gtid_event(entry->thd, &log_file, is_prepared_xa(entry->thd), if (write_gtid_event(entry->thd, &log_file, is_prepared_xa(entry->thd),
true,
entry->using_trx_cache, commit_id, entry->using_trx_cache, commit_id,
has_xid, entry->ro_1pc)) has_xid, entry->ro_1pc))
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
@@ -9323,7 +9417,9 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
} }
mngr->gtid_cache_offset= my_b_tell(cache); mngr->gtid_cache_offset= my_b_tell(cache);
/* ToDo: This gets written with a checksum! Which is wrong, need some way to mark that GTID is being written to a cache... */
if (write_gtid_event(entry->thd, cache, is_prepared_xa(entry->thd), if (write_gtid_event(entry->thd, cache, is_prepared_xa(entry->thd),
false,
entry->using_trx_cache, commit_id, entry->using_trx_cache, commit_id,
has_xid, entry->ro_1pc)) has_xid, entry->ro_1pc))
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);

View File

@@ -1116,6 +1116,7 @@ public:
void set_status_variables(THD *thd); void set_status_variables(THD *thd);
bool is_xidlist_idle(); bool is_xidlist_idle();
bool write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone, bool write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone,
bool direct_write,
bool is_transactional, uint64 commit_id, bool is_transactional, uint64 commit_id,
bool has_xid= false, bool ro_1pc= false); bool has_xid= false, bool ro_1pc= false);
int read_state_from_file(); int read_state_from_file();

View File

@@ -5476,7 +5476,8 @@ static int init_server_components()
opt_binlog_storage_engine); opt_binlog_storage_engine);
unireg_abort(1); unireg_abort(1);
} }
if (!opt_binlog_engine_hton->get_binlog_reader) if (!opt_binlog_engine_hton->binlog_write_direct ||
!opt_binlog_engine_hton->get_binlog_reader)
{ {
sql_print_error("Engine %s does not support --innodb-binlog-engine", sql_print_error("Engine %s does not support --innodb-binlog-engine",
opt_binlog_storage_engine); opt_binlog_storage_engine);

View File

@@ -4909,3 +4909,16 @@ innodb_get_binlog_reader()
{ {
return new ha_innodb_binlog_reader(); return new ha_innodb_binlog_reader();
} }
bool
innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size)
{
mtr_t mtr;
mtr.start();
fsp_binlog_write_cache(cache, main_size, &mtr);
mtr.commit();
/* ToDo: Should we sync the log here? Maybe depending on an extra bool parameter? */
/* ToDo: Presumably fsp_binlog_write_cache() should be able to fail in some cases? Then return any such error to the caller. */
return false;
}

View File

@@ -4145,6 +4145,7 @@ static int innodb_init(void* p)
= innodb_prepare_commit_versioned; = innodb_prepare_commit_versioned;
innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs; innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs;
innobase_hton->binlog_write_direct= innobase_binlog_write_direct;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innobase_hton->get_binlog_reader= innodb_get_binlog_reader;
innodb_remember_check_sysvar_funcs(); innodb_remember_check_sysvar_funcs();

View File

@@ -559,6 +559,7 @@ void fsp_shrink_temp_space();
extern void fsp_binlog_test(const uchar *data, uint32_t len); extern void fsp_binlog_test(const uchar *data, uint32_t len);
extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr); extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr);
class handler_binlog_reader; class handler_binlog_reader;
extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size);
extern handler_binlog_reader *innodb_get_binlog_reader(); extern handler_binlog_reader *innodb_get_binlog_reader();
extern void fsp_binlog_close(); extern void fsp_binlog_close();