diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result index f6f03b42270..82911bde00f 100644 --- a/mysql-test/r/mysqld--help.result +++ b/mysql-test/r/mysqld--help.result @@ -794,6 +794,11 @@ The following options may be given as the first argument: --slave-net-timeout=# Number of seconds to wait for more data from any master/slave connection before aborting the read + --slave-parallel-max-queued=# + Limit on how much memory SQL threads should use per + parallel replication thread when reading ahead in the + relay log looking for opportunities for parallel + replication. Only used when --slave-parallel-threads > 0. --slave-parallel-threads=# If non-zero, number of threads to spawn to apply in parallel events on the slave that were group-committed on @@ -1148,6 +1153,7 @@ slave-compressed-protocol FALSE slave-exec-mode STRICT slave-max-allowed-packet 1073741824 slave-net-timeout 3600 +slave-parallel-max-queued 131072 slave-parallel-threads 0 slave-skip-errors (No default value) slave-sql-verify-checksum TRUE diff --git a/mysql-test/suite/sys_vars/r/slave_parallel_max_queued_basic.result b/mysql-test/suite/sys_vars/r/slave_parallel_max_queued_basic.result new file mode 100644 index 00000000000..568ecac6de6 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/slave_parallel_max_queued_basic.result @@ -0,0 +1,13 @@ +SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued; +SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default'; +Check default +131072 +SELECT @@SESSION.slave_parallel_max_queued as 'no session var'; +ERROR HY000: Variable 'slave_parallel_max_queued' is a GLOBAL variable +SET GLOBAL slave_parallel_max_queued= 0; +SET GLOBAL slave_parallel_max_queued= DEFAULT; +SET GLOBAL slave_parallel_max_queued= 65536; +SELECT @@GLOBAL.slave_parallel_max_queued; +@@GLOBAL.slave_parallel_max_queued +65536 +SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued; diff --git a/mysql-test/suite/sys_vars/t/slave_parallel_max_queued_basic.test b/mysql-test/suite/sys_vars/t/slave_parallel_max_queued_basic.test new file mode 100644 index 00000000000..e3d3a9365f1 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/slave_parallel_max_queued_basic.test @@ -0,0 +1,14 @@ +--source include/not_embedded.inc + +SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued; + +SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default'; +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +SELECT @@SESSION.slave_parallel_max_queued as 'no session var'; + +SET GLOBAL slave_parallel_max_queued= 0; +SET GLOBAL slave_parallel_max_queued= DEFAULT; +SET GLOBAL slave_parallel_max_queued= 65536; +SELECT @@GLOBAL.slave_parallel_max_queued; + +SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1e7deef8d89..12ee904cc1b 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -548,6 +548,7 @@ ulong stored_program_cache_size= 0; ulong opt_slave_parallel_threads= 0; ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_usec= 0; +ulong opt_slave_parallel_max_queued= 131072; const double log_10[] = { 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009, diff --git a/sql/mysqld.h b/sql/mysqld.h index e45b48f0332..cbcff93e423 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -177,6 +177,7 @@ extern ulong opt_binlog_rows_event_max_size; extern ulong rpl_recovery_rank, thread_cache_size; extern ulong stored_program_cache_size; extern ulong opt_slave_parallel_threads; +extern ulong opt_slave_parallel_max_queued; extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_usec; extern ulong back_log; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 842dcefa0a6..b8d75c7bc82 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -14,10 +14,6 @@ following transactions, so slave binlog position will be correct. And all the retry logic for temporary errors like deadlock. - - We need some user-configurable limit on how far ahead the SQL thread will - fetch and queue events for parallel execution (otherwise if slave gets - behind we will fill up memory with pending malloc()'ed events). - - In GTID replication, we should not need to update master.info and relay-log.info on disk at all except at slave thread stop. They are not used to know where to restart, the updates are not crash-safe, and it @@ -32,6 +28,7 @@ crashes in the middle of writing the event group to the binlog. The slave rolls back the transaction; parallel execution needs to be able to deal with this wrt. commit_orderer and such. + See Format_description_log_event::do_apply_event(). - Retry of failed transactions is not yet implemented for the parallel case. */ @@ -147,8 +144,9 @@ handle_rpl_parallel_thread(void *arg) "Waiting for work from SQL thread"); while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); - rpt->event_queue= rpt->last_in_queue= NULL; + rpt->dequeue(events); thd->exit_cond(old_msg); + mysql_cond_signal(&rpt->COND_rpl_thread); more_events: while (events) @@ -286,7 +284,7 @@ handle_rpl_parallel_thread(void *arg) This is faster than having to wakeup the pool manager thread to give us a new event. */ - rpt->event_queue= rpt->last_in_queue= NULL; + rpt->dequeue(events); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); goto more_events; } @@ -619,7 +617,8 @@ rpl_parallel::wait_for_done() */ bool -rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) +rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, + ulonglong event_size) { rpl_parallel_entry *e; rpl_parallel_thread *cur_thread; @@ -653,6 +652,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) return true; } qev->ev= ev; + qev->event_size= event_size; qev->next= NULL; strcpy(qev->event_relay_log_name, rli->event_relay_log_name); qev->event_relay_log_pos= rli->event_relay_log_pos; @@ -715,17 +715,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) if (cur_thread) { mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - if (cur_thread->current_entry != e) + for (;;) { - /* - The worker thread became idle, and returned to the free list and - possibly was allocated to a different request. This also means - that everything previously queued has already been executed, else - the worker thread would not have become idle. So we should - allocate a new worker thread. - */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - e->rpl_thread= cur_thread= NULL; + if (cur_thread->current_entry != e) + { + /* + The worker thread became idle, and returned to the free list and + possibly was allocated to a different request. This also means + that everything previously queued has already been executed, + else the worker thread would not have become idle. So we should + allocate a new worker thread. + */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + e->rpl_thread= cur_thread= NULL; + break; + } + else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) + break; // The thread is ready to queue into + else + { + /* + We have reached the limit of how much memory we are allowed to + use for queuing events, so wait for the thread to consume some + of its queue. + */ + mysql_cond_wait(&cur_thread->COND_rpl_thread, + &cur_thread->LOCK_rpl_thread); + } } } @@ -819,11 +835,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) /* Queue the event for processing. */ - if (cur_thread->last_in_queue) - cur_thread->last_in_queue->next= qev; - else - cur_thread->event_queue= qev; - cur_thread->last_in_queue= qev; + cur_thread->enqueue(qev); mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_cond_signal(&cur_thread->COND_rpl_thread); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 7057ec66de2..fe9c6708e97 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -27,7 +27,29 @@ struct rpl_parallel_thread { char event_relay_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN]; ulonglong event_relay_log_pos; + size_t event_size; } *event_queue, *last_in_queue; + uint64 queued_size; + + void enqueue(queued_event *qev) + { + if (last_in_queue) + last_in_queue->next= qev; + else + event_queue= qev; + last_in_queue= qev; + queued_size+= qev->event_size; + } + + void dequeue(queued_event *list) + { + queued_event *tmp; + + DBUG_ASSERT(list == event_queue); + event_queue= last_in_queue= NULL; + for (tmp= list; tmp; tmp= tmp->next) + queued_size-= tmp->event_size; + } }; @@ -87,7 +109,8 @@ struct rpl_parallel { void reset(); rpl_parallel_entry *find(uint32 domain_id); void wait_for_done(); - bool do_event(rpl_group_info *serial_rgi, Log_event *ev); + bool do_event(rpl_group_info *serial_rgi, Log_event *ev, + ulonglong event_size); }; diff --git a/sql/slave.cc b/sql/slave.cc index e73e3e14c10..b2bd8b9423e 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -156,7 +156,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, bool suppress_warnings); static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, bool reconnect, bool suppress_warnings); -static Log_event* next_event(rpl_group_info* rgi); +static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); static int queue_event(Master_info* mi,const char* buf,ulong event_len); static int terminate_slave_thread(THD *thd, mysql_mutex_t *term_lock, @@ -3273,6 +3273,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) static int exec_relay_log_event(THD* thd, Relay_log_info* rli, rpl_group_info *serial_rgi) { + ulonglong event_size; DBUG_ENTER("exec_relay_log_event"); /* @@ -3282,7 +3283,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, */ mysql_mutex_lock(&rli->data_lock); - Log_event * ev = next_event(serial_rgi); + Log_event *ev= next_event(serial_rgi, &event_size); if (sql_slave_killed(serial_rgi)) { @@ -3344,7 +3345,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, */ if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0) - DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev)); + DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, event_size)); /* For GTID, allocate a new sub_id for the given domain_id. @@ -5836,8 +5837,10 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg) @return The event read, or NULL on error. If an error occurs, the error is reported through the sql_print_information() or sql_print_error() functions. + + The size of the read event (in bytes) is returned in *event_size. */ -static Log_event* next_event(rpl_group_info *rgi) +static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) { Log_event* ev; Relay_log_info *rli= rgi->rli; @@ -5848,6 +5851,7 @@ static Log_event* next_event(rpl_group_info *rgi) DBUG_ENTER("next_event"); DBUG_ASSERT(thd != 0 && thd == rli->sql_driver_thd); + *event_size= 0; #ifndef DBUG_OFF if (abort_slave_event_count && !rli->events_till_abort--) @@ -5932,11 +5936,13 @@ static Log_event* next_event(rpl_group_info *rgi) opt_slave_sql_verify_checksum))) { + ulonglong old_pos= rli->future_event_relay_log_pos; /* read it while we have a lock, to avoid a mutex lock in inc_event_relay_log_pos() */ rli->future_event_relay_log_pos= my_b_tell(cur_log); + *event_size= rli->future_event_relay_log_pos - old_pos; if (hot_log) mysql_mutex_unlock(log_lock); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index d509a614b6e..1e6c9c69667 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1479,6 +1479,16 @@ static Sys_var_ulong Sys_slave_parallel_threads( VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_slave_parallel_threads), ON_UPDATE(fix_slave_parallel_threads)); + + +static Sys_var_ulong Sys_slave_parallel_max_queued( + "slave_parallel_max_queued", + "Limit on how much memory SQL threads should use per parallel " + "replication thread when reading ahead in the relay log looking for " + "opportunities for parallel replication. Only used when " + "--slave-parallel-threads > 0.", + GLOBAL_VAR(opt_slave_parallel_max_queued), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1)); #endif