diff --git a/sql/log_event.cc b/sql/log_event.cc index cd6da8baa22..e7c0506a50a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3843,17 +3843,6 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, thd->variables.auto_increment_increment= auto_increment_increment; thd->variables.auto_increment_offset= auto_increment_offset; - /* - InnoDB internally stores the master log position it has executed so far, - i.e. the position just after the COMMIT event. - When InnoDB will want to store, the positions in rli won't have - been updated yet, so group_master_log_* will point to old BEGIN - and event_master_log* will point to the beginning of current COMMIT. - But log_pos of the COMMIT Query event is what we want, i.e. the pos of the - END of the current log event (COMMIT). We save it in rli so that InnoDB can - access it. - */ - const_cast(rli)->future_group_master_log_pos= log_pos; DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); clear_all_errors(thd, const_cast(rli)); @@ -3882,7 +3871,6 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, invariants like IN_STMT flag must be off at committing the transaction. */ rgi->inc_event_relay_log_pos(); - const_cast(rli)->clear_flag(Relay_log_info::IN_STMT); } else { @@ -5535,16 +5523,6 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi, thd->lex->local_file= local_fname; mysql_reset_thd_for_next_command(thd, 0); - if (!use_rli_only_for_errors) - { - /* - Saved for InnoDB, see comment in - Query_log_event::do_apply_event() - */ - const_cast(rli)->future_group_master_log_pos= log_pos; - DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); - } - /* We test replicate_*_db rules. Note that we have already prepared the file to load, even if we are going to ignore and delete it @@ -5940,11 +5918,16 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi) correspond to the beginning of the transaction. Starting from 5.0.0, there also are some rotates from the slave itself, in the relay log, which shall not change the group positions. + + In parallel replication, rotate event is executed out-of-band with normal + events, so we cannot update group_master_log_name or _pos here, it will + be updated with the next normal event instead. */ if ((server_id != global_system_variables.server_id || rli->replicate_same_server_id) && !is_relay_log_event() && - !rli->is_in_group()) + !rli->is_in_group() && + !rgi->is_parallel_exec) { mysql_mutex_lock(&rli->data_lock); DBUG_PRINT("info", ("old group_master_log_name: '%s' " @@ -7712,7 +7695,7 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi) */ if (rli->get_flag(Relay_log_info::IN_TRANSACTION)) rgi->inc_event_relay_log_pos(); - else + else if (!rgi->is_parallel_exec) { rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi); rli->inc_group_relay_log_pos(0, rgi); @@ -8408,7 +8391,6 @@ int Execute_load_log_event::do_apply_event(rpl_group_info *rgi) calls mysql_load()). */ - const_cast(rli)->future_group_master_log_pos= log_pos; if (lev->do_apply_event(0,rgi,1)) { /* diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index fbf135c0bb6..8942b1d0a33 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -64,7 +64,11 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, /* ToDo: Access to thd, and what about rli, split out a parallel part? */ mysql_mutex_lock(&rli->data_lock); qev->ev->thd= thd; + strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); + rgi->event_relay_log_name= rgi->event_relay_log_name_buf; + rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; + strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); thd->rgi_slave= NULL; @@ -660,7 +664,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) } qev->ev= ev; qev->next= NULL; + strcpy(qev->event_relay_log_name, rli->event_relay_log_name); + qev->event_relay_log_pos= rli->event_relay_log_pos; qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; + strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); if (typ == GTID_EVENT) { @@ -674,6 +681,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) delete rgi; return true; } + rgi->is_parallel_exec = true; if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) rgi->deferred_events= new Deferred_log_events(rli); @@ -783,6 +791,14 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) have GTID, like a MariaDB 5.5 or MySQL master. */ qev->rgi= serial_rgi; + /* Handle master log name change, seen in Rotate_log_event. */ + if (typ == ROTATE_EVENT) + { + Rotate_log_event *rev= static_cast(qev->ev); + memcpy(rli->future_event_master_log_name, + rev->new_log_ident, rev->ident_len+1); + } + rpt_handle_event(qev, NULL); delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); my_free(qev); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 7830470a929..7057ec66de2 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -24,6 +24,9 @@ struct rpl_parallel_thread { Log_event *ev; rpl_group_info *rgi; ulonglong future_event_relay_log_pos; + char event_relay_log_name[FN_REFLEN]; + char future_event_master_log_name[FN_REFLEN]; + ulonglong event_relay_log_pos; } *event_queue, *last_in_queue; }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 0ea6b1e5d13..f3a6863d217 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -877,7 +877,9 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, if (!skip_lock) mysql_mutex_lock(&data_lock); rgi->inc_event_relay_log_pos(); - if (opt_slave_parallel_threads > 0) + DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", + (long) log_pos, (long) group_master_log_pos)); + if (rgi->is_parallel_exec) { /* In case of parallel replication, do not update the position backwards. */ int cmp= strcmp(group_relay_log_name, event_relay_log_name); @@ -888,6 +890,18 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, notify_group_relay_log_name_update(); } else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos) group_relay_log_pos= event_relay_log_pos; + + cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name); + if (cmp <= 0) + { + if (cmp < 0) + { + strcpy(group_master_log_name, rgi->future_event_master_log_name); + notify_group_master_log_name_update(); + } + if (group_master_log_pos < log_pos) + group_master_log_pos= log_pos; + } } else { @@ -895,6 +909,8 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, group_relay_log_pos= event_relay_log_pos; strmake_buf(group_relay_log_name, event_relay_log_name); notify_group_relay_log_name_update(); + if (log_pos) // 3.23 binlogs don't have log_posx + group_master_log_pos= log_pos; } /* @@ -927,12 +943,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, the relay log is not "val". With the end_log_pos solution, we avoid computations involving lengthes. */ - DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", - (long) log_pos, (long) group_master_log_pos)); - if (log_pos) // 3.23 binlogs don't have log_posx - { - group_master_log_pos= log_pos; - } mysql_cond_broadcast(&data_cond); if (!skip_lock) mysql_mutex_unlock(&data_lock); @@ -1436,6 +1446,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_) wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), + is_parallel_exec(false), row_stmt_start_timestamp(0), long_find_row_note_printed(false) { bzero(¤t_gtid, sizeof(current_gtid)); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 92f65a1397d..38268ee85c5 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -185,6 +185,10 @@ public: char event_relay_log_name[FN_REFLEN]; ulonglong event_relay_log_pos; ulonglong future_event_relay_log_pos; + /* + The master log name for current event. Only used in parallel replication. + */ + char future_event_master_log_name[FN_REFLEN]; #ifdef HAVE_valgrind bool is_fake; /* Mark that this is a fake relay log info structure */ @@ -216,18 +220,6 @@ public: */ bool sql_force_rotate_relay; - /* - When it commits, InnoDB internally stores the master log position it has - processed so far; the position to store is the one of the end of the - committing event (the COMMIT query event, or the event if in autocommit - mode). - */ -#if MYSQL_VERSION_ID < 40100 - ulonglong future_master_log_pos; -#else - ulonglong future_group_master_log_pos; -#endif - time_t last_master_timestamp; void clear_until_condition(); @@ -557,7 +549,15 @@ struct rpl_group_info */ time_t last_event_start_time; + char *event_relay_log_name; + char event_relay_log_name_buf[FN_REFLEN]; + ulonglong event_relay_log_pos; ulonglong future_event_relay_log_pos; + /* + The master log name for current event. Only used in parallel replication. + */ + char future_event_master_log_name[FN_REFLEN]; + bool is_parallel_exec; private: /* @@ -685,7 +685,7 @@ public: inline void inc_event_relay_log_pos() { - if (opt_slave_parallel_threads == 0 || + if (!is_parallel_exec || rli->event_relay_log_pos < future_event_relay_log_pos) rli->event_relay_log_pos= future_event_relay_log_pos; } diff --git a/sql/slave.cc b/sql/slave.cc index 50960991faf..e73e3e14c10 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3361,6 +3361,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, } serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; + serial_rgi->event_relay_log_name= rli->event_relay_log_name; + serial_rgi->event_relay_log_pos= rli->event_relay_log_pos; exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); delete_or_keep_event_post_apply(serial_rgi, typ, ev); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 91f13bebd12..d509a614b6e 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4228,6 +4228,8 @@ static bool check_pseudo_slave_mode(sys_var *self, THD *thd, set_var *var) #ifndef EMBEDDED_LIBRARY delete thd->rli_fake; thd->rli_fake= NULL; + delete thd->rgi_fake; + thd->rgi_fake= NULL; #endif } else if (previous_val && val)