diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index c92b0457e07..3136c6b3393 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -1710,6 +1710,13 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) if ((length=(size_t) (info->write_pos - info->write_buffer))) { + /* + The write_function() updates info->pos_in_file. So compute the new end + position here before calling it, but update the value only after we + check for error return. + */ + uchar *new_write_end= (info->write_buffer + info->buffer_length - + ((info->pos_in_file + length) & (IO_SIZE - 1))); if (append_cache) { if (mysql_file_write(info->file, info->write_buffer, length, @@ -1730,8 +1737,7 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) set_if_bigger(info->end_of_file, info->pos_in_file); } - info->write_end= (info->write_buffer + info->buffer_length - - ((info->pos_in_file + length) & (IO_SIZE - 1))); + info->write_end= new_write_end; info->write_pos= info->write_buffer; ++info->disk_writes; UNLOCK_APPEND_BUFFER; diff --git a/sql/handler.h b/sql/handler.h index 42e2be42fa8..7b5710f5b0e 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -63,6 +63,7 @@ class handler_binlog_reader; struct rpl_gtid; struct slave_connection_state; struct rpl_binlog_state_base; +struct handler_binlog_event_group_info; // the following is for checking tables @@ -1535,7 +1536,8 @@ struct handlerton /* Optional implementation of binlog in the engine. */ bool (*binlog_init)(size_t binlog_size); /* Binlog an event group that doesn't go through commit_ordered. */ - bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size, + bool (*binlog_write_direct)(IO_CACHE *cache, + handler_binlog_event_group_info *binlog_info, const rpl_gtid *gtid); /* Binlog parts of large transactions out-of-band, in different chunks in the @@ -5825,6 +5827,20 @@ int get_select_field_pos(Alter_info *alter_info, int select_field_count, const char* dbug_print_row(TABLE *table, const uchar *rec, bool print_names= true); #endif /* DBUG_OFF */ +/* Struct with info about an event group to be binlogged by a storage engine. */ +struct handler_binlog_event_group_info { + /* Opaque pointer for the engine's use. */ + void *engine_ptr; + /* End of data that has already been binlogged out-of-band. */ + my_off_t out_of_band_offset; + /* + Offset of the GTID event, which comes first in the event group, but is put + at the end of the IO_CACHE containing the data to be binlogged. + */ + my_off_t gtid_offset; +}; + + /* Class for reading a binlog implemented in an engine. */ diff --git a/sql/log.cc b/sql/log.cc index eef3ba06bf0..67a13422f3b 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -103,6 +103,8 @@ static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, Log_event *end_ev, bool all, bool using_stmt, bool using_trx, bool is_ro_1pc); +static int binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, + size_t len); static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; @@ -370,10 +372,11 @@ public: ulong *param_ptr_binlog_cache_use, ulong *param_ptr_binlog_cache_disk_use, bool precompute_checksums) - : thd(thd_arg), engine_ptr(0), + : thd(thd_arg), stmt_cache(precompute_checksums), trx_cache(precompute_checksums), - last_commit_pos_offset(0), gtid_cache_offset(0), did_oob_spill(false), + last_commit_pos_offset(0), + engine_binlog_info {0, 0, 0}, using_xa(FALSE), xa_xid(0) { stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, @@ -387,8 +390,9 @@ public: void reset(bool do_stmt, bool do_trx) { - if (engine_ptr) - (*opt_binlog_engine_hton->binlog_oob_free)(thd, engine_ptr); + if (engine_binlog_info.engine_ptr) + (*opt_binlog_engine_hton->binlog_oob_free)(thd, + engine_binlog_info.engine_ptr); if (do_stmt) stmt_cache.reset(); if (do_trx) @@ -397,8 +401,15 @@ public: using_xa= FALSE; last_commit_pos_file[0]= 0; last_commit_pos_offset= 0; - gtid_cache_offset= 0; } + /* + Use a custom write_function to spill to the engine-implemented binlog. + And re-use the IO_CACHE::append_read_pos as a handle for our + write_function; it is unused when the cache is not SEQ_READ_APPEND. + */ + trx_cache.cache_log.write_function= binlog_spill_to_engine; + trx_cache.cache_log.append_read_pos= (uchar *)this; + engine_binlog_info= {0, 0, 0}; } binlog_cache_data* get_binlog_cache_data(bool is_transactional) @@ -412,8 +423,6 @@ public: } THD *thd; - /* For use by engine when --binlog-storage-engine. */ - void *engine_ptr; binlog_cache_data stmt_cache; @@ -428,14 +437,10 @@ public: */ char last_commit_pos_file[FN_REFLEN]; my_off_t last_commit_pos_offset; - /* Point in trx cache where GTID event sits after end event. */ - my_off_t gtid_cache_offset; - /* - Flag to remember if we spilled any partial transaction data to the binlog - implemented in storage engine. - */ - bool did_oob_spill; + /* Context for engine-implemented binlogging. */ + handler_binlog_event_group_info engine_binlog_info; + /* Flag set true if this transaction is committed with log_xid() as part of XA, false if not. @@ -1793,9 +1798,9 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, DBUG_ENTER("binlog_flush_cache"); DBUG_PRINT("enter", ("end_ev: %p", end_ev)); - if ((using_stmt && !cache_mngr->stmt_cache.empty()) || - (using_trx && !cache_mngr->trx_cache.empty()) || - thd->transaction->xid_state.is_explicit_XA()) + bool doing_stmt= using_stmt && !cache_mngr->stmt_cache.empty(); + bool doing_trx= using_trx && !cache_mngr->trx_cache.empty(); + if (doing_stmt || doing_trx || thd->transaction->xid_state.is_explicit_XA()) { if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE)) DBUG_RETURN(1); @@ -1812,6 +1817,41 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, } #endif /* WITH_WSREP */ + if (opt_binlog_engine_hton) + { + /* + Write the end_event into the cache, in preparation for sending the + cache to the engine to be binlogged as a whole. + */ + binlog_cache_data *cache_data; + if (doing_trx || !doing_stmt) + { + end_ev->cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE; + cache_data= &cache_mngr->trx_cache; + } + else + { + end_ev->cache_type= Log_event::EVENT_STMT_CACHE; + cache_data= &cache_mngr->stmt_cache; + } + if (mysql_bin_log.write_event(end_ev, cache_data, &cache_data->cache_log)) + DBUG_RETURN(1); + + if (cache_mngr->engine_binlog_info.out_of_band_offset) + { + /* + This is a "large" transaction, where parts of the transaction were + already binlogged out-of-band to the engine binlog. + + Binlog the remaining bits of event data as well, so all the event + group is consecutive out-of-band data and the commit record will + only contain the GTID event (depending on engine implementation). + */ + if (my_b_flush_io_cache(&cache_mngr->trx_cache.cache_log, 0)) + DBUG_RETURN(1); + } + } + /* Doing a commit or a rollback including non-transactional tables, i.e., ending a transaction where we might write the transaction @@ -1851,24 +1891,25 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, extern "C" void -binlog_get_cache(THD *thd, IO_CACHE **out_cache, size_t *out_main_size, +binlog_get_cache(THD *thd, IO_CACHE **out_cache, + handler_binlog_event_group_info **out_context, const rpl_gtid **out_gtid) { IO_CACHE *cache= nullptr; - size_t main_size= 0; + handler_binlog_event_group_info *context= nullptr; binlog_cache_mngr *cache_mngr; const rpl_gtid *gtid= nullptr; if (likely(!opt_bootstrap /* ToDo needed? */) && opt_binlog_engine_hton && (cache_mngr= thd->binlog_get_cache_mngr())) { - main_size= cache_mngr->gtid_cache_offset; + context= &cache_mngr->engine_binlog_info; cache= !cache_mngr->trx_cache.empty() ? &cache_mngr->trx_cache.cache_log : &cache_mngr->stmt_cache.cache_log; gtid= thd->get_last_commit_gtid(); } *out_cache= cache; - *out_main_size= main_size; + *out_context= context; *out_gtid= gtid; } @@ -6272,9 +6313,12 @@ static int binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len) { binlog_cache_mngr *mngr= (binlog_cache_mngr *)cache->append_read_pos; + void **engine_ptr= &mngr->engine_binlog_info.engine_ptr; bool res= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data, len, - &mngr->engine_ptr); - mngr->did_oob_spill= true; + engine_ptr); + mngr->engine_binlog_info.out_of_band_offset+= len; + cache->pos_in_file+= len; + return res; } @@ -7538,7 +7582,9 @@ err: DBUG_ASSERT(!is_relay_log); if (opt_binlog_engine_hton) { - my_off_t binlog_main_bytes= my_b_write_tell(file); + handler_binlog_event_group_info *engine_context= + &cache_mngr->engine_binlog_info; + engine_context->gtid_offset= my_b_tell(file); my_off_t binlog_total_bytes; MDL_request mdl_request; int res; @@ -7585,7 +7631,7 @@ err: mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_unlock(&LOCK_after_binlog_sync); if ((*opt_binlog_engine_hton->binlog_write_direct) - (file, binlog_main_bytes, thd->get_last_commit_gtid())) + (file, engine_context, thd->get_last_commit_gtid())) goto engine_fail; /* ToDo: Need to set last_commit_pos_offset here? */ /* ToDo: Maybe binlog_write_direct() could return the coords. */ @@ -9502,6 +9548,12 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_RETURN(ER_ERROR_ON_WRITE); }); + /* + Write the end event (XID_EVENT, commit QUERY_LOG_EVENT) directly to the + legacy binary log. This is required to get the correct end position in the + event as currently needed by non-GTID slaves (since write_cache() does a + direct write of the cache, leaving end positions at zero). + */ if (write_event(entry->end_event)) { entry->error_cache= NULL; @@ -9521,17 +9573,11 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, IO_CACHE *cache= (entry->using_trx_cache && !mngr->trx_cache.empty()) ? &mngr->trx_cache.cache_log : &mngr->stmt_cache.cache_log; /* - Prepare the full event data in the trx cache for the engine to binlog. The GTID event cannot go first since we only allocate the GTID at binlog time. So write the GTID at the very end, and record its offset so that the engine can pick it out and binlog it at the start. */ - if (write_event(entry->end_event, NULL, cache)) - { - entry->error_cache= NULL; - DBUG_RETURN(ER_ERROR_ON_WRITE); - } - mngr->gtid_cache_offset= my_b_tell(cache); + mngr->engine_binlog_info.gtid_offset= my_b_tell(cache); /* ToDo: This gets written with a checksum! Which is wrong, need some way to mark that GTID is being written to a cache... */ if (write_gtid_event(entry->thd, cache, is_prepared_xa(entry->thd), false, diff --git a/storage/innobase/fsp/fsp0fsp.cc b/storage/innobase/fsp/fsp0fsp.cc index cc8d35626ef..1bceb7fd5ca 100644 --- a/storage/innobase/fsp/fsp0fsp.cc +++ b/storage/innobase/fsp/fsp0fsp.cc @@ -4405,23 +4405,6 @@ struct chunk_data_base { /* Structure holding context for out-of-band chunks of binlogged event group. */ struct binlog_oob_context { -#ifdef _MSC_VER -/* Flexible array member is not standard C++, disable compiler warning. */ -#pragma warning(disable : 4200) -#endif - uint32_t node_list_len; - uint32_t node_list_alloc_len; - /* - The node_list contains the root of each tree in the forest of perfect - binary trees. - */ - struct node_info { - uint64_t file_no; - uint64_t offset; - uint64_t node_index; - uint32_t height; - } node_list []; - /* Structure used to encapsulate the data to be binlogged in an out-of-band chunk, for use by fsp_binlog_write_chunk(). @@ -4453,6 +4436,23 @@ struct binlog_oob_context { bool binlog_node(uint32_t node, uint64_t new_idx, uint32_t left_node, uint32_t right_node, chunk_data_oob *oob_data); + + uint32_t node_list_len; + uint32_t node_list_alloc_len; + /* + The node_list contains the root of each tree in the forest of perfect + binary trees. + */ +#ifdef _MSC_VER +/* Flexible array member is not standard C++, disable compiler warning. */ +#pragma warning(disable : 4200) +#endif + struct node_info { + uint64_t file_no; + uint64_t offset; + uint64_t node_index; + uint32_t height; + } node_list []; }; @@ -5692,15 +5692,17 @@ struct chunk_data_cache : public chunk_data_base { size_t main_remain; size_t gtid_remain; - chunk_data_cache(IO_CACHE *cache_arg, size_t main_size_arg) - : cache(cache_arg), main_remain(main_size_arg) + chunk_data_cache(IO_CACHE *cache_arg, my_off_t main_offset, + my_off_t gtid_offset) + : cache(cache_arg), main_remain(gtid_offset - main_offset) { - size_t remain= my_b_tell(cache); - ut_ad(remain > 0); - ut_ad(remain >= main_size_arg); - gtid_remain= remain - main_size_arg; + size_t end_offset= my_b_tell(cache); + ut_ad(end_offset > main_offset); + ut_ad(gtid_offset >= main_offset); + ut_ad(end_offset >= gtid_offset); + gtid_remain= end_offset - gtid_offset; - if (cache->pos_in_file > 0) { + if (cache->pos_in_file > main_offset) { /* ToDo: A limitation in mysys IO_CACHE. If I change (reinit_io_cache()) the cache from WRITE_CACHE to READ_CACHE without seeking out of the @@ -5718,10 +5720,8 @@ struct chunk_data_cache : public chunk_data_base { } /* Start with the GTID event, which is put at the end of the IO_CACHE. */ - my_bool res= reinit_io_cache(cache, READ_CACHE, main_size_arg, 0, 0); + my_bool res= reinit_io_cache(cache, READ_CACHE, gtid_offset, 0, 0); ut_a(!res /* ToDo: Error handling. */); - - gtid_remain= remain - main_size_arg; } ~chunk_data_cache() { } @@ -5744,11 +5744,11 @@ struct chunk_data_cache : public chunk_data_base { /* Write remaining data. */ ut_ad(gtid_remain == 0); - if (UNIV_UNLIKELY(main_remain == 0)) + if (main_remain == 0) { /* - This would imply that only GTID data is present, which probably does - not happen currently, but let's not leave an unhandled case. + This means that only GTID data is present, eg. when the main data was + already binlogged out-of-band. */ ut_ad(size > 0); return {size, true}; @@ -5763,10 +5763,12 @@ struct chunk_data_cache : public chunk_data_base { }; -void -fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) +static void +fsp_binlog_write_cache(IO_CACHE *cache, + handler_binlog_event_group_info *binlog_info, mtr_t *mtr) { - chunk_data_cache chunk_data(cache, main_size); + chunk_data_cache chunk_data(cache, binlog_info->out_of_band_offset, + binlog_info->gtid_offset); fsp_binlog_write_chunk(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT); } @@ -5901,7 +5903,7 @@ fsp_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, return true; c->node_list_len= i - 1; } - else + else if (i > 0) { /* Case 2: Add the new node as a singleton tree. */ c= ensure_oob_context(engine_data, i+1); @@ -5916,6 +5918,15 @@ fsp_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, return true; c->node_list_len= i + 1; } + else + { + /* Special case i==0, like case 2 but no prior node to link to. */ + binlog_oob_context::chunk_data_oob oob_data + (new_idx, 0, 0, 0, 0, (byte *)data, data_len); + if (c->binlog_node(i, new_idx, ~(uint32_t)0, ~(uint32_t)0, &oob_data)) + return true; + c->node_list_len= 1; + } return false; } @@ -5967,7 +5978,7 @@ binlog_oob_context::chunk_data_oob::chunk_data_oob(uint64_t idx, std::pair binlog_oob_context::chunk_data_oob::copy_data(byte *p, uint32_t max_len) { - uint32_t size; + uint32_t size= 0; /* First write header data, if any left. */ if (sofar < header_len) { @@ -5983,10 +5994,11 @@ binlog_oob_context::chunk_data_oob::copy_data(byte *p, uint32_t max_len) /* Then write the main chunk data. */ ut_ad(sofar >= header_len); ut_ad(main_len > 0); - size= (uint32_t)std::min(header_len + main_len - sofar, (uint64_t)max_len); - memcpy(p, main_data + (sofar - header_len), size); - sofar+= size; - return {size, sofar == header_len + main_len}; + uint32_t size2= + (uint32_t)std::min(header_len + main_len - sofar, (uint64_t)max_len); + memcpy(p, main_data + (sofar - header_len), size2); + sofar+= size2; + return {size + size2, sofar == header_len + main_len}; } @@ -5997,22 +6009,24 @@ fsp_free_oob(THD *thd, void *engine_data) } -extern "C" void binlog_get_cache(THD *, IO_CACHE **, size_t *, +extern "C" void binlog_get_cache(THD *, IO_CACHE **, + handler_binlog_event_group_info **, const rpl_gtid **); void fsp_binlog_trx(trx_t *trx, mtr_t *mtr) { IO_CACHE *cache; - size_t main_size; + handler_binlog_event_group_info *binlog_info; const rpl_gtid *gtid; if (!trx->mysql_thd) return; - binlog_get_cache(trx->mysql_thd, &cache, &main_size, >id); - if (main_size) { + binlog_get_cache(trx->mysql_thd, &cache, &binlog_info, >id); + if (UNIV_LIKELY(binlog_info != nullptr) && + UNIV_LIKELY(binlog_info->gtid_offset > 0)) { binlog_diff_state.update_nolock(gtid); - fsp_binlog_write_cache(cache, main_size, mtr); + fsp_binlog_write_cache(cache, binlog_info, mtr); } } @@ -6676,14 +6690,15 @@ ha_innodb_binlog_reader::init_gtid_pos(slave_connection_state *pos, bool -innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size, +innobase_binlog_write_direct(IO_CACHE *cache, + handler_binlog_event_group_info *binlog_info, const rpl_gtid *gtid) { mtr_t mtr; if (gtid) binlog_diff_state.update_nolock(gtid); mtr.start(); - fsp_binlog_write_cache(cache, main_size, &mtr); + fsp_binlog_write_cache(cache, binlog_info, &mtr); mtr.commit(); /* ToDo: Should we sync the log here? Maybe depending on an extra bool parameter? */ /* ToDo: Presumably fsp_binlog_write_cache() should be able to fail in some cases? Then return any such error to the caller. */ diff --git a/storage/innobase/include/fsp0fsp.h b/storage/innobase/include/fsp0fsp.h index 7cdca9a18de..4403747f45a 100644 --- a/storage/innobase/include/fsp0fsp.h +++ b/storage/innobase/include/fsp0fsp.h @@ -559,8 +559,10 @@ void fsp_shrink_temp_space(); extern ulonglong innodb_binlog_state_interval; extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr); class handler_binlog_reader; -extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size, - const struct rpl_gtid *gtid); +struct handler_binlog_event_group_info; +extern bool innobase_binlog_write_direct + (IO_CACHE *cache, handler_binlog_event_group_info *binlog_info, + const struct rpl_gtid *gtid); extern bool fsp_binlog_oob(THD *thd, const unsigned char *data, size_t data_len, void **engine_data); extern void fsp_free_oob(THD *thd, void *engine_data);