diff --git a/mysql-test/suite/rpl/t/rpl_parallel2.test b/mysql-test/suite/rpl/t/rpl_parallel2.test new file mode 100644 index 00000000000..b3f970c909a --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel2.test @@ -0,0 +1,70 @@ +--source include/have_binlog_format_statement.inc +--source include/have_xtradb.inc + +connect (m1,127.0.0.1,root,,test,$MASTER_MYPORT,); +connect (m2,127.0.0.1,root,,test,$MASTER_MYPORT,); +connect (m3,127.0.0.1,root,,test,$MASTER_MYPORT,); +connect (m4,127.0.0.1,root,,test,$MASTER_MYPORT,); +connect (s1,127.0.0.1,root,,test,$SLAVE_MYPORT,); +connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,); +connect (s3,127.0.0.1,root,,test,$SLAVE_MYPORT,); +connect (s4,127.0.0.1,root,,test,$SLAVE_MYPORT,); + +--connection m1 +SELECT @@server_id; +SET sql_log_bin=0; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +SET sql_log_bin=1; +SET @old_count= @@GLOBAL.binlog_commit_wait_count; +SET @old_usec= @@GLOBAL.binlog_commit_wait_usec; +SET GLOBAL binlog_commit_wait_usec = 30*1000000; + +--connection s1 +SELECT @@server_id; +SET sql_log_bin=0; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +SET sql_log_bin=1; + +--replace_result $MASTER_MYPORT MASTER_PORT +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, + master_user='root', master_use_gtid=current_pos; + +--connection m1 +SET GLOBAL binlog_commit_wait_count = 4; + +send INSERT INTO t1 VALUES (1); + +--connection m2 +send INSERT INTO t1 VALUES (2); +--connection m3 +send INSERT INTO t1 VALUES (3); +--connection m4 +INSERT INTO t1 VALUES (4); +--connection m1 +reap; +--connection m2 +reap; +--connection m3 +reap; + +--connection m1 +SHOW BINLOG EVENTS; + +--connection s1 +--source include/start_slave.inc +SELECT * FROM t1; +--source include/stop_slave.inc +SELECT * FROM t1; + +--connection m1 +SET sql_log_bin=0; +DROP TABLE t1; +SET sql_log_bin=1; +SET GLOBAL binlog_commit_wait_count= @old_count; +SET GLOBAL binlog_commit_wait_usec= @old_usec; + +--connection s1 +RESET SLAVE ALL; +SET sql_log_bin=0; +DROP TABLE t1; +SET sql_log_bin=1; diff --git a/sql/log.cc b/sql/log.cc index e3eb5f9a331..61d4428fc18 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -88,6 +88,7 @@ ulong opt_binlog_dbug_fsync_sleep= 0; #endif mysql_mutex_t LOCK_prepare_ordered; +mysql_cond_t COND_prepare_ordered; mysql_mutex_t LOCK_commit_ordered; static ulonglong binlog_status_var_num_commits; @@ -6679,6 +6680,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry, } } + if (opt_binlog_commit_wait_count > 0) + mysql_cond_signal(&COND_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered); DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); @@ -6840,6 +6843,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) binlog_id= current_binlog_id; mysql_mutex_lock(&LOCK_prepare_ordered); + if (opt_binlog_commit_wait_count) + wait_for_sufficient_commits(); current= group_commit_queue; group_commit_queue= NULL; mysql_mutex_unlock(&LOCK_prepare_ordered); @@ -7135,6 +7140,48 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, return 0; } + +void +MYSQL_BIN_LOG::wait_for_sufficient_commits() +{ + size_t count; + group_commit_entry *e; + group_commit_entry *last_head; + struct timespec wait_until; + + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_prepare_ordered); + + count= 0; + for (e= last_head= group_commit_queue; e; e= e->next) + ++count; + if (count >= opt_binlog_commit_wait_count) + return; + + mysql_mutex_unlock(&LOCK_log); + set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec); + + for (;;) + { + int err; + group_commit_entry *head; + + err= mysql_cond_timedwait(&COND_prepare_ordered, &LOCK_prepare_ordered, + &wait_until); + if (err == ETIMEDOUT) + break; + head= group_commit_queue; + for (e= head; e && e != last_head; e= e->next) + ++count; + if (count >= opt_binlog_commit_wait_count) + break; + last_head= head; + } + + mysql_mutex_lock(&LOCK_log); +} + + /** Wait until we get a signal that the relay log has been updated. diff --git a/sql/log.h b/sql/log.h index 2345f0acf9c..48cc568da11 100644 --- a/sql/log.h +++ b/sql/log.h @@ -85,9 +85,11 @@ protected: prepare_ordered() or commit_ordered() methods. */ extern mysql_mutex_t LOCK_prepare_ordered; +extern mysql_cond_t COND_prepare_ordered; extern mysql_mutex_t LOCK_commit_ordered; #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered; +extern PSI_cond_key key_COND_prepare_ordered; #endif class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging @@ -685,6 +687,7 @@ public: } void set_max_size(ulong max_size_arg); void signal_update(); + void wait_for_sufficient_commits(); void wait_for_update_relay_log(THD* thd); int wait_for_update_bin_log(THD* thd, const struct timespec * timeout); void init(ulong max_size); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 816756338a4..a7fa78838a9 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -544,6 +544,8 @@ ulong rpl_recovery_rank=0; ulong stored_program_cache_size= 0; ulong opt_slave_parallel_threads= 0; +ulong opt_binlog_commit_wait_count= 0; +ulong opt_binlog_commit_wait_usec= 0; const double log_10[] = { 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009, @@ -895,7 +897,7 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, - key_COND_parallel_entry; + key_COND_parallel_entry, key_COND_prepare_ordered; static PSI_cond_info all_server_conds[]= { @@ -940,7 +942,8 @@ static PSI_cond_info all_server_conds[]= { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_rpl_thread, "COND_rpl_thread", 0}, { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0}, - { &key_COND_parallel_entry, "COND_parallel_entry", 0} + { &key_COND_parallel_entry, "COND_parallel_entry", 0}, + { &key_COND_prepare_ordered, "COND_prepare_ordered", 0} }; PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, @@ -2017,6 +2020,7 @@ static void clean_up_mutexes() mysql_mutex_destroy(&LOCK_server_started); mysql_cond_destroy(&COND_server_started); mysql_mutex_destroy(&LOCK_prepare_ordered); + mysql_cond_destroy(&COND_prepare_ordered); mysql_mutex_destroy(&LOCK_commit_ordered); DBUG_VOID_RETURN; } @@ -4117,6 +4121,7 @@ static int init_thread_environment() &LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL); mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered, MY_MUTEX_INIT_SLOW); diff --git a/sql/mysqld.h b/sql/mysqld.h index 3475835c67b..345e9fa74c9 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -177,6 +177,8 @@ extern ulong opt_binlog_rows_event_max_size; extern ulong rpl_recovery_rank, thread_cache_size; extern ulong stored_program_cache_size; extern ulong opt_slave_parallel_threads; +extern ulong opt_binlog_commit_wait_count; +extern ulong opt_binlog_commit_wait_usec; extern ulong back_log; extern ulong executed_events; extern char language[FN_REFLEN]; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index e5c700041ef..8f97c19e5ad 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -210,8 +210,7 @@ handle_rpl_parallel_thread(void *arg) if (entry->last_committed_sub_id < event_gtid_sub_id) { entry->last_committed_sub_id= event_gtid_sub_id; - if (entry->need_signal) - mysql_cond_broadcast(&entry->COND_parallel_entry); + mysql_cond_broadcast(&entry->COND_parallel_entry); } mysql_mutex_unlock(&entry->LOCK_parallel_entry); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 09bde20f5af..a84722e9263 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -50,7 +50,6 @@ struct rpl_parallel_entry { uint64 last_seq_no; uint64 last_commit_id; bool active; - bool need_signal; rpl_parallel_thread *rpl_thread; /* The sub_id of the last transaction to commit within this domain_id. diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index f63960a4e36..1273bff1750 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1483,6 +1483,26 @@ static Sys_var_ulong Sys_slave_parallel_threads( #endif +static Sys_var_ulong Sys_binlog_commit_wait_count( + "binlog_commit_wait_count", + "If non-zero, binlog write will wait at most binlog_commit_wait_usec " + "microseconds for at least this many commits to queue up for group " + "commit to the binlog. This can reduce I/O on the binlog and provide " + "increased opportunity for parallel apply on the slave, but too high " + "a value will decrease commit throughput.", + GLOBAL_VAR(opt_binlog_commit_wait_count), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1)); + + +static Sys_var_ulong Sys_binlog_commit_wait_usec( + "binlog_commit_wait_usec", + "Maximum time, in microseconds, to wait for more commits to queue up " + " for binlog group commit. Only takes effect if the value of " + "binlog_commit_wait_count is non-zero.", + GLOBAL_VAR(opt_binlog_commit_wait_usec), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, ULONG_MAX), DEFAULT(100000), BLOCK_SIZE(1)); + + static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type) { SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables;