From 094c772213084bb7589c44af8b547cfa92ccebaa Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Fri, 9 Aug 2024 18:01:12 +0200 Subject: [PATCH] MDEV-34705: Binlog in Engine: Also binlog standalone (eg. DDL) in the engine Signed-off-by: Kristian Nielsen --- sql/handler.h | 2 +- sql/log.cc | 272 +++++++++++++++++--------- sql/log.h | 1 + sql/mysqld.cc | 3 +- storage/innobase/fsp/fsp0fsp.cc | 13 ++ storage/innobase/handler/ha_innodb.cc | 1 + storage/innobase/include/fsp0fsp.h | 1 + 7 files changed, 203 insertions(+), 90 deletions(-) diff --git a/sql/handler.h b/sql/handler.h index 05fba74896f..63c0757730f 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1530,8 +1530,8 @@ struct handlerton void *optimizer_costs; /* Costs are stored here */ /* Optional implementation of binlog in the engine. */ + bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size); handler_binlog_reader * (*get_binlog_reader)(); - int (*binlog_data)(uchar *data, size_t len); /* Optional clauses in the CREATE/ALTER TABLE diff --git a/sql/log.cc b/sql/log.cc index 18d3c24de3b..25677703f16 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6893,6 +6893,7 @@ Event_log::prepare_pending_rows_event(THD *thd, TABLE* table, bool MYSQL_BIN_LOG::write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone, + bool direct_write, bool is_transactional, uint64 commit_id, bool has_xid, bool is_ro_1pc) { @@ -6950,6 +6951,10 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone, Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, LOG_EVENT_SUPPRESS_USE_F, is_transactional, commit_id, has_xid, is_ro_1pc); + /* ToDo: Something better than this hack conditional. At least pass direct_write into the Gtid event contructor or something? */ + if (!direct_write) + gtid_event.cache_type= is_transactional ? + Log_event::EVENT_TRANSACTIONAL_CACHE : Log_event::EVENT_STMT_CACHE; /* Write the event to the binary log. */ DBUG_ASSERT(this == &mysql_bin_log); @@ -7164,7 +7169,9 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) bool is_trans_cache= FALSE; bool using_trans= event_info->use_trans_cache(); bool direct= event_info->use_direct_logging(); + bool events_direct; ulong UNINIT_VAR(prev_binlog_id); + uint64 UNINIT_VAR(commit_id); DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); /* @@ -7251,35 +7258,18 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) DBUG_RETURN(0); #endif /* HAVE_REPLICATION */ + binlog_cache_mngr * cache_mngr= NULL; IO_CACHE *file= NULL; + events_direct= direct; if (direct) { - /* We come here only for incident events */ + /* Write the event to the binlog immediately. */ int res; - uint64 commit_id= 0; - MDL_request mdl_request; + DBUG_PRINT("info", ("direct is set")); DBUG_ASSERT(!thd->backup_commit_lock); - - MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, - MDL_EXPLICIT); - if (thd->mdl_context.acquire_lock(&mdl_request, - thd->variables.lock_wait_timeout)) - DBUG_RETURN(1); - thd->backup_commit_lock= &mdl_request; - - if ((res= thd->wait_for_prior_commit())) - { - if (mdl_request.ticket) - thd->mdl_context.release_lock(mdl_request.ticket); - thd->backup_commit_lock= 0; - DBUG_RETURN(res); - } - file= &log_file; - my_org_b_tell= my_b_tell(file); - mysql_mutex_lock(&LOCK_log); - prev_binlog_id= current_binlog_id; + commit_id= 0; DBUG_EXECUTE_IF("binlog_force_commit_id", { const LEX_CSTRING commit_name= { STRING_WITH_LEN("commit_id") }; @@ -7290,17 +7280,55 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) commit_name.length); commit_id= entry->val_int(&null_value); }); - res= write_gtid_event(thd, &log_file, true, using_trans, commit_id); - if (mdl_request.ticket) - thd->mdl_context.release_lock(mdl_request.ticket); - thd->backup_commit_lock= 0; - if (res) - goto err; + + if (opt_binlog_engine_hton) + { + events_direct= false; + if (!(cache_mngr= thd->binlog_setup_trx_data())) + DBUG_RETURN(1); + cache_data= cache_mngr->get_binlog_cache_data(false); + DBUG_ASSERT(cache_data->empty()); + file= &cache_data->cache_log; + /* Set cache_type to ensure we don't get checksums for this event */ + event_info->cache_type= Log_event::EVENT_STMT_CACHE; + + /* ToDo: Can we do some refactoring to this huge MYSQL_LOG_BIN::write() function, to split out common pieces of functionality in sub-functions, and reduce code duplication in this opt_binlog_engine_hton branch? */ + } + else + { + MDL_request mdl_request; + + MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_EXPLICIT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + DBUG_RETURN(1); + thd->backup_commit_lock= &mdl_request; + + if ((res= thd->wait_for_prior_commit())) + { + if (mdl_request.ticket) + thd->mdl_context.release_lock(mdl_request.ticket); + thd->backup_commit_lock= 0; + DBUG_RETURN(res); + } + file= &log_file; + /* ToDo: Isn't this a race bug? Seems we can get a stale my_org_b_tell here since we're taking it outside of the mutex, which could leave to double accounting another binlog write. */ + my_org_b_tell= my_b_tell(file); + mysql_mutex_lock(&LOCK_log); + prev_binlog_id= current_binlog_id; + res= write_gtid_event(thd, &log_file, true, true, using_trans, commit_id); + if (mdl_request.ticket) + thd->mdl_context.release_lock(mdl_request.ticket); + thd->backup_commit_lock= 0; + if (res) + goto err; + } } else { - binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); - if (!cache_mngr) + /* Write the event to the stmt or trx cache, and binlog it later. */ + if (!(cache_mngr= thd->binlog_setup_trx_data())) goto err; is_trans_cache= use_trans_cache(thd, using_trans); @@ -7325,7 +7353,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) if (with_annotate && *with_annotate) { DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT); - Annotate_rows_log_event anno(thd, using_trans, direct); + Annotate_rows_log_event anno(thd, using_trans, events_direct); /* Annotate event should be written not more than once */ *with_annotate= 0; if (write_event(&anno, cache_data, file)) @@ -7339,7 +7367,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) { Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, thd->first_successful_insert_id_in_prev_stmt_for_binlog, - using_trans, direct); + using_trans, events_direct); if (write_event(&e, cache_data, file)) goto err; } @@ -7350,14 +7378,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) nb_elements())); Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT, thd->auto_inc_intervals_in_cur_stmt_for_binlog. - minimum(), using_trans, direct); + minimum(), using_trans, events_direct); if (write_event(&e, cache_data, file)) goto err; } if (thd->used & THD::RAND_USED) { Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2, - using_trans, direct); + using_trans, events_direct); if (write_event(&e, cache_data, file)) goto err; } @@ -7375,7 +7403,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) user_var_event->th->user_var_log_event_data_type( user_var_event->charset_number), using_trans, - direct); + events_direct); if (write_event(&e, cache_data, file)) goto err; } @@ -7394,79 +7422,144 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) err: if (direct) { - my_off_t offset= my_b_tell(file); - bool check_purge= false; DBUG_ASSERT(!is_relay_log); - - if (likely(!error)) + if (opt_binlog_engine_hton) { - bool synced; + my_off_t binlog_main_bytes= my_b_write_tell(file); + my_off_t binlog_total_bytes; + MDL_request mdl_request; + int res; + MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_EXPLICIT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + goto engine_fail; + thd->backup_commit_lock= &mdl_request; - update_gtid_index((uint32)offset, thd->get_last_commit_gtid()); - - if ((error= flush_and_sync(&synced))) + if (thd->wait_for_prior_commit()) { + if (mdl_request.ticket) + thd->mdl_context.release_lock(mdl_request.ticket); + thd->backup_commit_lock= 0; + goto engine_fail; } - else + mysql_mutex_lock(&LOCK_log); + res= write_gtid_event(thd, file, true, false, using_trans, commit_id); + if (mdl_request.ticket) + thd->mdl_context.release_lock(mdl_request.ticket); + thd->backup_commit_lock= 0; + if (res) + goto engine_fail; + + binlog_total_bytes= my_b_bytes_in_cache(file); + /* + Engine-in-binlog does not support the after-sync method. + This is for consistency with the binlogging of transactions in the + engine, which commit atomically at the same time in binlog and engine. + + In any case, for non-transactional event group (eg. DDL), the + after-sync and after-commit semisync methods are mostly the same; the + change has already become visible to other connections on the master + when it is binlogged. + + ToDo: If semi-sync is enabled, obtain the binlog coords from the + engine to be waited for later at after-commit. + + ToDo2: Do we still need this chainining of mutexes? + */ + mysql_mutex_lock(&LOCK_after_binlog_sync); + mysql_mutex_unlock(&LOCK_log); + mysql_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_after_binlog_sync); + if ((*opt_binlog_engine_hton->binlog_write_direct)(file, binlog_main_bytes)) + goto engine_fail; + /* ToDo: Need to set last_commit_pos_offset here? */ + /* ToDo: Maybe binlog_write_direct() could return the coords. */ + mysql_mutex_unlock(&LOCK_commit_ordered); + + status_var_add(thd->status_var.binlog_bytes_written, binlog_total_bytes); + + goto engine_ok; + engine_fail: + error= 1; + engine_ok: + cache_mngr->reset(true, false); + } + else + { + my_off_t offset= my_b_tell(file); + bool check_purge= false; + + if (likely(!error)) { - mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); - mysql_mutex_assert_owner(&LOCK_log); - mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); - mysql_mutex_assert_not_owner(&LOCK_commit_ordered); -#ifdef HAVE_REPLICATION - if (repl_semisync_master.report_binlog_update(thd, thd, - log_file_name, offset)) + bool synced; + + update_gtid_index((uint32)offset, thd->get_last_commit_gtid()); + + if ((error= flush_and_sync(&synced))) { - sql_print_error("Failed to run 'after_flush' hooks"); - error= 1; } else -#endif { - /* - update binlog_end_pos so it can be read by dump thread - note: must be _after_ the RUN_HOOK(after_flush) or else - semi-sync might not have put the transaction into - it's list before dump-thread tries to send it - */ - update_binlog_end_pos(offset); - if (unlikely((error= rotate(false, &check_purge)))) - check_purge= false; + mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); + mysql_mutex_assert_not_owner(&LOCK_commit_ordered); +#ifdef HAVE_REPLICATION + if (repl_semisync_master.report_binlog_update(thd, thd, + log_file_name, offset)) + { + sql_print_error("Failed to run 'after_flush' hooks"); + error= 1; + } + else +#endif + { + /* + update binlog_end_pos so it can be read by dump thread + note: must be _after_ the RUN_HOOK(after_flush) or else + semi-sync might not have put the transaction into + it's list before dump-thread tries to send it + */ + update_binlog_end_pos(offset); + if (unlikely((error= rotate(false, &check_purge)))) + check_purge= false; + } } } - } - status_var_add(thd->status_var.binlog_bytes_written, - offset - my_org_b_tell); + status_var_add(thd->status_var.binlog_bytes_written, + offset - my_org_b_tell); - mysql_mutex_lock(&LOCK_after_binlog_sync); - mysql_mutex_unlock(&LOCK_log); + mysql_mutex_lock(&LOCK_after_binlog_sync); + mysql_mutex_unlock(&LOCK_log); - DEBUG_SYNC(thd, "commit_after_release_LOCK_log"); + DEBUG_SYNC(thd, "commit_after_release_LOCK_log"); - mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); - mysql_mutex_assert_not_owner(&LOCK_log); - mysql_mutex_assert_owner(&LOCK_after_binlog_sync); - mysql_mutex_assert_not_owner(&LOCK_commit_ordered); + mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); + mysql_mutex_assert_not_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_after_binlog_sync); + mysql_mutex_assert_not_owner(&LOCK_commit_ordered); #ifdef HAVE_REPLICATION - if (repl_semisync_master.wait_after_sync(log_file_name, offset)) - { - error=1; - /* error is already printed inside hook */ - } + if (repl_semisync_master.wait_after_sync(log_file_name, offset)) + { + error=1; + /* error is already printed inside hook */ + } #endif - /* - Take mutex to protect against a reader seeing partial writes of 64-bit - offset on 32-bit CPUs. - */ - mysql_mutex_lock(&LOCK_commit_ordered); - mysql_mutex_unlock(&LOCK_after_binlog_sync); - last_commit_pos_offset= offset; - mysql_mutex_unlock(&LOCK_commit_ordered); + /* + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. + */ + mysql_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_after_binlog_sync); + last_commit_pos_offset= offset; + mysql_mutex_unlock(&LOCK_commit_ordered); - if (check_purge) - checkpoint_and_purge(prev_binlog_id); + if (check_purge) + checkpoint_and_purge(prev_binlog_id); + } } if (unlikely(error)) @@ -9254,6 +9347,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, if (!opt_binlog_engine_hton) { if (write_gtid_event(entry->thd, &log_file, is_prepared_xa(entry->thd), + true, entry->using_trx_cache, commit_id, has_xid, entry->ro_1pc)) DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -9323,7 +9417,9 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_RETURN(ER_ERROR_ON_WRITE); } mngr->gtid_cache_offset= my_b_tell(cache); + /* ToDo: This gets written with a checksum! Which is wrong, need some way to mark that GTID is being written to a cache... */ if (write_gtid_event(entry->thd, cache, is_prepared_xa(entry->thd), + false, entry->using_trx_cache, commit_id, has_xid, entry->ro_1pc)) DBUG_RETURN(ER_ERROR_ON_WRITE); diff --git a/sql/log.h b/sql/log.h index b61428912ef..fdf2eccb004 100644 --- a/sql/log.h +++ b/sql/log.h @@ -1116,6 +1116,7 @@ public: void set_status_variables(THD *thd); bool is_xidlist_idle(); bool write_gtid_event(THD *thd, IO_CACHE *dest, bool standalone, + bool direct_write, bool is_transactional, uint64 commit_id, bool has_xid= false, bool ro_1pc= false); int read_state_from_file(); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index abd4d6d19e2..1830ff4074f 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -5476,7 +5476,8 @@ static int init_server_components() opt_binlog_storage_engine); unireg_abort(1); } - if (!opt_binlog_engine_hton->get_binlog_reader) + if (!opt_binlog_engine_hton->binlog_write_direct || + !opt_binlog_engine_hton->get_binlog_reader) { sql_print_error("Engine %s does not support --innodb-binlog-engine", opt_binlog_storage_engine); diff --git a/storage/innobase/fsp/fsp0fsp.cc b/storage/innobase/fsp/fsp0fsp.cc index 8665d785890..39c5ee09e53 100644 --- a/storage/innobase/fsp/fsp0fsp.cc +++ b/storage/innobase/fsp/fsp0fsp.cc @@ -4909,3 +4909,16 @@ innodb_get_binlog_reader() { return new ha_innodb_binlog_reader(); } + + +bool +innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size) +{ + mtr_t mtr; + mtr.start(); + fsp_binlog_write_cache(cache, main_size, &mtr); + mtr.commit(); + /* ToDo: Should we sync the log here? Maybe depending on an extra bool parameter? */ + /* ToDo: Presumably fsp_binlog_write_cache() should be able to fail in some cases? Then return any such error to the caller. */ + return false; +} diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index e16517d80d2..4114cbc7158 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -4145,6 +4145,7 @@ static int innodb_init(void* p) = innodb_prepare_commit_versioned; innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs; + innobase_hton->binlog_write_direct= innobase_binlog_write_direct; innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innodb_remember_check_sysvar_funcs(); diff --git a/storage/innobase/include/fsp0fsp.h b/storage/innobase/include/fsp0fsp.h index dfe222ab3e0..4ddcfa5a4d8 100644 --- a/storage/innobase/include/fsp0fsp.h +++ b/storage/innobase/include/fsp0fsp.h @@ -559,6 +559,7 @@ void fsp_shrink_temp_space(); extern void fsp_binlog_test(const uchar *data, uint32_t len); extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr); class handler_binlog_reader; +extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size); extern handler_binlog_reader *innodb_get_binlog_reader(); extern void fsp_binlog_close();