diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index ef88ecfef44..7a9fd69b2fb 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -630,6 +630,46 @@ SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; include/start_slave.inc +*** 5. Test killing thread that is waiting for queue of max length to shorten *** +SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued; +SET GLOBAL slave_parallel_max_queued=9000; +SET binlog_format=statement; +INSERT INTO t3 VALUES (70, foo(0, +'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', '')); +SET debug_sync='now WAIT_FOR query_waiting'; +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max"; +INSERT INTO t3 VALUES (72, 0); +SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +a b +70 0 +71 10000 +72 0 +SET debug_sync='now WAIT_FOR wait_queue_ready'; +KILL THD_ID; +SET debug_sync='now WAIT_FOR wait_queue_killed'; +SET debug_sync='now SIGNAL query_cont'; +include/wait_for_slave_sql_error.inc [errno=1317,1927,1963] +STOP SLAVE IO_THREAD; +SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +a b +70 0 +71 10000 +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL slave_parallel_max_queued= @old_max_queued; +INSERT INTO t3 VALUES (73,0); +include/start_slave.inc +SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +a b +70 0 +71 10000 +72 0 +73 0 +include/stop_slave.inc +SET GLOBAL binlog_format=@old_format; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +include/start_slave.inc include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 94d3b53974a..a0232ac71e0 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -940,6 +940,75 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) --delimiter ; SET sql_log_bin=1; +--connection server_2 +# Respawn all worker threads to clear any left-over debug_sync or other stuff. +--source include/stop_slave.inc +SET GLOBAL binlog_format=@old_format; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +--source include/start_slave.inc + + +--echo *** 5. Test killing thread that is waiting for queue of max length to shorten *** + +--let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%' +--source include/wait_condition.inc +--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'` +SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued; +SET GLOBAL slave_parallel_max_queued=9000; + +--connection server_1 +--let bigstring= `SELECT REPEAT('x', 10000)` +SET binlog_format=statement; +# Create an event that will wait to be signalled. +INSERT INTO t3 VALUES (70, foo(0, + 'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', '')); +--disable_query_log +# Create an event that will fill up the queue. +eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring')); +--enable_query_log + +--connection server_2 +SET debug_sync='now WAIT_FOR query_waiting'; +# Inject that the SQL driver thread will signal `wait_queue_ready' to debug_sync +# as it goes to wait for the event queue to become smaller than the value of +# @@slave_parallel_max_queued. +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max"; + +--connection server_1 +# This event will have to wait for the queue to become shorter before it can +# be queued. We will test that things work when we kill the SQL driver thread +# during this wait. +INSERT INTO t3 VALUES (72, 0); +SELECT * FROM t3 WHERE a >= 70 ORDER BY a; + +--connection server_2 +SET debug_sync='now WAIT_FOR wait_queue_ready'; + +--replace_result $thd_id THD_ID +eval KILL $thd_id; + +SET debug_sync='now WAIT_FOR wait_queue_killed'; +SET debug_sync='now SIGNAL query_cont'; + +--let $slave_sql_errno= 1317,1927,1963 +--source include/wait_for_slave_sql_error.inc +STOP SLAVE IO_THREAD; +SELECT * FROM t3 WHERE a >= 70 ORDER BY a; + +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL slave_parallel_max_queued= @old_max_queued; + +--connection server_1 +INSERT INTO t3 VALUES (73,0); +--save_master_pos + +--connection server_2 +--source include/start_slave.inc +--sync_with_master +SELECT * FROM t3 WHERE a >= 70 ORDER BY a; + --connection server_2 --source include/stop_slave.inc diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 3cdd1f5ec8d..af44d79038c 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -801,6 +801,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, sql_thread_stopping= true; if (sql_thread_stopping) { + delete ev; /* QQ: Need a better comment why we return false here */ return false; } @@ -809,6 +810,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, MYF(0)))) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); + delete ev; return true; } qev->ev= ev; @@ -831,6 +833,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); delete rgi; + my_free(qev); + delete ev; return true; } rgi->is_parallel_exec = true; @@ -903,6 +907,14 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); my_error(ER_CONNECTION_KILLED, MYF(0)); delete rgi; + my_free(qev); + delete ev; + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_killed")); + };); + slave_output_error_info(rli, rli->sql_driver_thd); return true; } else @@ -918,6 +930,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread, "Waiting for room in worker thread event queue"); did_enter_cond= true; + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_ready")); + };); } mysql_cond_wait(&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread);