mirror of
https://github.com/MariaDB/server.git
synced 2025-09-02 09:41:40 +03:00
MDEV-5657: Parallel replication.
Clean up and improve the parallel implementation code, mainly related to scheduling of work to threads and handling of stop and errors. Fix a lot of bugs in various corner cases that could lead to crashes or corruption. Fix that a single replication domain could easily grab all worker threads and stall all other domains; now a configuration variable --slave-domain-parallel-threads allows to limit the number of workers. Allow next event group to start as soon as previous group begins the commit phase (as opposed to when it ends it); this allows multiple event groups on the slave to participate in group commit, even when no other opportunities for parallelism are available. Various fixes: - Fix some races in the rpl.rpl_parallel test case. - Fix an old incorrect assertion in Log_event iocache read. - Fix repeated malloc/free of wait_for_commit and rpl_group_info objects. - Simplify wait_for_commit wakeup logic. - Fix one case in queue_for_group_commit() where killing one thread would fail to correctly signal the error to the next, causing loss of the transaction after slave restart. - Fix leaking of pthreads (and their allocated stack) due to missing PTHREAD_CREATE_DETACHED attribute. - Fix how one batch of group-committed transactions wait for the previous batch before starting to execute themselves. The old code had a very complex scheduling where the first transaction was handled differently, with subtle bugs in corner cases. Now each event group is always scheduled for a new worker (in a round-robin fashion amongst available workers). Keep a count of how many transactions have started to commit, and wait for that counter to reach the appropriate value. - Fix slave stop to wait for all workers to actually complete processing; before, the wait was for update of last_committed_sub_id, which happens a bit earlier, and could leave worker threads potentially accessing bits of the replication state that is no longer valid after slave stop. - Fix a couple of places where the test suite would kill a thread waiting inside enter_cond() in connection with debug_sync; debug_sync + kill can crash in rare cases due to a race with mysys_var_current_mutex in this case. - Fix some corner cases where we had enter_cond() but no exit_cond(). - Fix that we could get failure in wait_for_prior_commit() but forget to flag the error with my_error(). - Fix slave stop (both for normal stop and stop due to error). Now, at stop we pick a specific safe point (in terms of event groups executed) and make sure that all event groups before that point are executed to completion, and that no event group after start executing; this ensures a safe place to restart replication, even for non-transactional stuff/DDL. In error stop, make sure that all prior event groups are allowed to execute to completion, and that any later event groups that have started are rolled back, if possible. The old code could leave eg. T1 and T3 committed but T2 not, or it could even leave half a transaction not rolled back in some random worker, which would cause big problems when that worker was later reused after slave restart. - Fix the accounting of amount of events queued for one worker. Before, the amount was reduced immediately as soon as the events were dequeued (which happens all at once); this allowed twice the amount of events to be queued in memory for each single worker, which is not what users would expect. - Fix that an error set during execution of one event was sometimes not cleared before executing the next, causing problems with the error reporting. - Fix incorrect handling of thd->killed in worker threads.
This commit is contained in:
@@ -115,6 +115,7 @@ SET GLOBAL slave_parallel_threads=10;
|
||||
SET debug_sync='RESET';
|
||||
include/start_slave.inc
|
||||
*** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
|
||||
SET debug_sync='RESET';
|
||||
FLUSH LOGS;
|
||||
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
|
||||
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
|
||||
@@ -141,6 +142,7 @@ INSERT INTO t3 VALUES (6, foo(16,
|
||||
''));
|
||||
SET debug_sync='now WAIT_FOR master_queued3';
|
||||
SET debug_sync='now SIGNAL master_cont1';
|
||||
SET debug_sync='RESET';
|
||||
SELECT * FROM t3 ORDER BY a;
|
||||
a b
|
||||
1 1
|
||||
@@ -213,6 +215,9 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
|
||||
slave-bin.000003 # Xid # # COMMIT /* XID */
|
||||
*** Test STOP SLAVE in parallel mode ***
|
||||
include/stop_slave.inc
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET binlog_direct_non_transactional_updates=0;
|
||||
SET sql_log_bin=0;
|
||||
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
|
||||
@@ -227,10 +232,15 @@ INSERT INTO t3 VALUES(21, 21);
|
||||
INSERT INTO t3 VALUES(22, 22);
|
||||
SET binlog_format=@old_format;
|
||||
BEGIN;
|
||||
INSERT INTO t2 VALUES (21);
|
||||
INSERT INTO t2 VALUES (21);
|
||||
START SLAVE;
|
||||
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
|
||||
STOP SLAVE;
|
||||
SET debug_sync='now WAIT_FOR wait_for_done_waiting';
|
||||
ROLLBACK;
|
||||
SET GLOBAL debug_dbug=@old_dbug;
|
||||
SET debug_sync='RESET';
|
||||
include/wait_for_slave_to_stop.inc
|
||||
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
|
||||
a
|
||||
@@ -292,6 +302,7 @@ a b
|
||||
32 32
|
||||
33 33
|
||||
34 34
|
||||
SET debug_sync='RESET';
|
||||
SET sql_log_bin=0;
|
||||
CALL mtr.add_suppression("Query execution was interrupted");
|
||||
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
|
||||
@@ -308,6 +319,7 @@ STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
|
||||
a b
|
||||
31 31
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@@ -379,6 +391,7 @@ a b
|
||||
42 42
|
||||
43 43
|
||||
44 44
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='now WAIT_FOR t2_query';
|
||||
SET debug_sync='now SIGNAL t2_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_ready';
|
||||
@@ -386,9 +399,7 @@ KILL THD_ID;
|
||||
SET debug_sync='now WAIT_FOR t2_killed';
|
||||
SET debug_sync='now SIGNAL t1_cont';
|
||||
include/wait_for_slave_sql_error.inc [errno=1317,1963]
|
||||
SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
|
||||
a b
|
||||
41 41
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@@ -463,6 +474,7 @@ a b
|
||||
52 52
|
||||
53 53
|
||||
54 54
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='now WAIT_FOR t2_query';
|
||||
SET debug_sync='now SIGNAL t2_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_ready';
|
||||
@@ -473,6 +485,7 @@ include/wait_for_slave_sql_error.inc [errno=1317,1963]
|
||||
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
|
||||
a b
|
||||
51 51
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@@ -514,14 +527,18 @@ include/start_slave.inc
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=3;
|
||||
SET GLOBAL slave_parallel_threads=4;
|
||||
include/start_slave.inc
|
||||
*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
|
||||
SET binlog_format=statement;
|
||||
SET gtid_domain_id=2;
|
||||
BEGIN;
|
||||
INSERT INTO t3 VALUES (70, foo(70,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
|
||||
INSERT INTO t3 VALUES (60, foo(60,
|
||||
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
SET debug_sync='now WAIT_FOR d2_query';
|
||||
SET gtid_domain_id=1;
|
||||
@@ -540,15 +557,27 @@ INSERT INTO t3 VALUES (63, foo(63,
|
||||
'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
|
||||
SET debug_sync='now WAIT_FOR d0_query';
|
||||
SET gtid_domain_id=3;
|
||||
BEGIN;
|
||||
INSERT INTO t3 VALUES (68, foo(68,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
|
||||
INSERT INTO t3 VALUES (69, foo(69,
|
||||
'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
SET debug_sync='now WAIT_FOR d3_query';
|
||||
SET debug_sync='now SIGNAL d2_cont2';
|
||||
SET debug_sync='now WAIT_FOR d2_done';
|
||||
SET debug_sync='now SIGNAL d1_cont2';
|
||||
SET debug_sync='now WAIT_FOR d1_done';
|
||||
SET debug_sync='now SIGNAL d0_cont2';
|
||||
SET debug_sync='now WAIT_FOR d0_done';
|
||||
SET debug_sync='now SIGNAL d3_cont2';
|
||||
SET debug_sync='now WAIT_FOR d3_done';
|
||||
SET binlog_format=statement;
|
||||
INSERT INTO t3 VALUES (64, foo(64,
|
||||
'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
|
||||
INSERT INTO t3 VALUES (65, foo(65, '', ''));
|
||||
SET debug_sync='now WAIT_FOR master_queued2';
|
||||
@@ -569,23 +598,34 @@ a b
|
||||
65 65
|
||||
66 66
|
||||
67 67
|
||||
68 68
|
||||
69 69
|
||||
70 70
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='now SIGNAL d0_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_waiting';
|
||||
SET debug_sync='now SIGNAL d3_cont';
|
||||
SET debug_sync='now WAIT_FOR t2_waiting';
|
||||
SET debug_sync='now SIGNAL d1_cont';
|
||||
SET debug_sync='now WAIT_FOR t3_waiting';
|
||||
SET debug_sync='now SIGNAL d2_cont';
|
||||
SET debug_sync='now WAIT_FOR t4_waiting';
|
||||
KILL THD_ID;
|
||||
SET debug_sync='now WAIT_FOR t3_killed';
|
||||
SET debug_sync='now SIGNAL t1_cont';
|
||||
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
|
||||
a b
|
||||
60 60
|
||||
61 61
|
||||
62 62
|
||||
63 63
|
||||
64 64
|
||||
68 68
|
||||
69 69
|
||||
70 70
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@@ -597,11 +637,11 @@ RETURN x;
|
||||
END
|
||||
||
|
||||
SET sql_log_bin=1;
|
||||
INSERT INTO t3 VALUES (69,0);
|
||||
UPDATE t3 SET b=b+1 WHERE a=60;
|
||||
include/start_slave.inc
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
a b
|
||||
60 60
|
||||
60 61
|
||||
61 61
|
||||
62 62
|
||||
63 63
|
||||
@@ -609,7 +649,9 @@ a b
|
||||
65 65
|
||||
66 66
|
||||
67 67
|
||||
69 0
|
||||
68 68
|
||||
69 69
|
||||
70 70
|
||||
SET sql_log_bin=0;
|
||||
DROP FUNCTION foo;
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
@@ -634,37 +676,31 @@ include/start_slave.inc
|
||||
SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
|
||||
SET GLOBAL slave_parallel_max_queued=9000;
|
||||
SET binlog_format=statement;
|
||||
INSERT INTO t3 VALUES (70, foo(0,
|
||||
INSERT INTO t3 VALUES (80, foo(0,
|
||||
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
|
||||
SET debug_sync='now WAIT_FOR query_waiting';
|
||||
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
|
||||
INSERT INTO t3 VALUES (72, 0);
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
|
||||
a b
|
||||
70 0
|
||||
71 10000
|
||||
72 0
|
||||
80 0
|
||||
81 10000
|
||||
SET debug_sync='now WAIT_FOR wait_queue_ready';
|
||||
KILL THD_ID;
|
||||
SET debug_sync='now WAIT_FOR wait_queue_killed';
|
||||
SET debug_sync='now SIGNAL query_cont';
|
||||
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
a b
|
||||
70 0
|
||||
71 10000
|
||||
SET GLOBAL debug_dbug=@old_dbug;
|
||||
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
|
||||
INSERT INTO t3 VALUES (73,0);
|
||||
INSERT INTO t3 VALUES (82,0);
|
||||
SET debug_sync='RESET';
|
||||
include/start_slave.inc
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
|
||||
a b
|
||||
70 0
|
||||
71 10000
|
||||
72 0
|
||||
73 0
|
||||
80 0
|
||||
81 10000
|
||||
82 0
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
|
Reference in New Issue
Block a user