mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
MDEV-4506: Parallel replication.
Add another test case, using DEBUG_SYNC. Fix one bug found.
This commit is contained in:
@ -63,5 +63,5 @@ if (`SELECT '$binlog_limit' <> ''`)
|
||||
|
||||
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR $_binlog_start <binlog_start>
|
||||
--replace_column 2 # 4 # 5 #
|
||||
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /block_len=[0-9]+/block_len=#/ /Server ver:.*$/SERVER_VERSION, BINLOG_VERSION/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/ /\[([0-9]-[0-9]-[0-9]+)\]/[#-#-#]/
|
||||
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /block_len=[0-9]+/block_len=#/ /Server ver:.*$/SERVER_VERSION, BINLOG_VERSION/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/ /\[([0-9]-[0-9]-[0-9]+)\]/[#-#-#]/ /cid=[0-9]+/cid=#/
|
||||
--eval $_statement
|
||||
|
@ -38,8 +38,84 @@ SELECT * FROM t1 ORDER BY a;
|
||||
a
|
||||
1
|
||||
2
|
||||
*** Test two transactions in different domains committed in opposite order on slave but in a single group commit. ***
|
||||
include/stop_slave.inc
|
||||
SET sql_log_bin=0;
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
SET sql_log_bin=1;
|
||||
SET @old_format= @@SESSION.binlog_format;
|
||||
SET binlog_format='statement';
|
||||
SET gtid_domain_id=1;
|
||||
INSERT INTO t2 VALUES (foo(10,
|
||||
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1',
|
||||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2'));
|
||||
FLUSH LOGS;
|
||||
SET sql_log_bin=0;
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
IF d1 != '' THEN
|
||||
SET debug_sync = d1;
|
||||
END IF;
|
||||
IF d2 != '' THEN
|
||||
SET debug_sync = d2;
|
||||
END IF;
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
SET sql_log_bin=1;
|
||||
SET @old_format=@@GLOBAL.binlog_format;
|
||||
SET GLOBAL binlog_format=statement;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
include/start_slave.inc
|
||||
SET debug_sync='now WAIT_FOR ready1';
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t2 VALUES (foo(11,
|
||||
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3',
|
||||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4'));
|
||||
SET gtid_domain_id=0;
|
||||
SET binlog_format=@old_format;
|
||||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
|
||||
a
|
||||
10
|
||||
11
|
||||
SET debug_sync='now WAIT_FOR ready3';
|
||||
SET debug_sync='now SIGNAL cont3';
|
||||
SET debug_sync='now WAIT_FOR ready4';
|
||||
SET debug_sync='now SIGNAL cont1';
|
||||
SET debug_sync='now WAIT_FOR ready2';
|
||||
SET debug_sync='now SIGNAL cont4';
|
||||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
|
||||
a
|
||||
10
|
||||
11
|
||||
show binlog events in 'slave-bin.000002' from <binlog_start>;
|
||||
Log_name Pos Event_type Server_id End_log_pos Info
|
||||
slave-bin.000002 # Binlog_checkpoint # # slave-bin.000002
|
||||
slave-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
|
||||
slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(11,
|
||||
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3',
|
||||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4'))
|
||||
slave-bin.000002 # Xid # # COMMIT /* XID */
|
||||
slave-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
|
||||
slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(10,
|
||||
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1',
|
||||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2'))
|
||||
slave-bin.000002 # Xid # # COMMIT /* XID */
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
include/start_slave.inc
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||
include/start_slave.inc
|
||||
DROP function foo;
|
||||
DROP TABLE t1,t2;
|
||||
include/rpl_end.inc
|
||||
|
@ -63,12 +63,106 @@ UNLOCK TABLES;
|
||||
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
|
||||
|
||||
--echo *** Test two transactions in different domains committed in opposite order on slave but in a single group commit. ***
|
||||
--connection server_2
|
||||
--source include/stop_slave.inc
|
||||
|
||||
--connection server_1
|
||||
# Use a stored function to inject a debug_sync into the appropriate THD.
|
||||
# The function does nothing on the master, and on the slave it injects the
|
||||
# desired debug_sync action(s).
|
||||
SET sql_log_bin=0;
|
||||
--delimiter ||
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
--delimiter ;
|
||||
SET sql_log_bin=1;
|
||||
|
||||
SET @old_format= @@SESSION.binlog_format;
|
||||
SET binlog_format='statement';
|
||||
SET gtid_domain_id=1;
|
||||
INSERT INTO t2 VALUES (foo(10,
|
||||
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1',
|
||||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2'));
|
||||
|
||||
--connection server_2
|
||||
FLUSH LOGS;
|
||||
SET sql_log_bin=0;
|
||||
--delimiter ||
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
IF d1 != '' THEN
|
||||
SET debug_sync = d1;
|
||||
END IF;
|
||||
IF d2 != '' THEN
|
||||
SET debug_sync = d2;
|
||||
END IF;
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
--delimiter ;
|
||||
SET sql_log_bin=1;
|
||||
SET @old_format=@@GLOBAL.binlog_format;
|
||||
SET GLOBAL binlog_format=statement;
|
||||
# We need to restart all parallel threads for the new global setting to
|
||||
# be copied to the session-level values.
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
--source include/start_slave.inc
|
||||
|
||||
# First make sure the first insert is ready to commit, but not queued yet.
|
||||
SET debug_sync='now WAIT_FOR ready1';
|
||||
|
||||
--connection server_1
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t2 VALUES (foo(11,
|
||||
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3',
|
||||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4'));
|
||||
SET gtid_domain_id=0;
|
||||
SET binlog_format=@old_format;
|
||||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
|
||||
|
||||
--connection server_2
|
||||
# Now wait for the second insert to queue itself as the leader, and then
|
||||
# wait for more commits to queue up.
|
||||
SET debug_sync='now WAIT_FOR ready3';
|
||||
SET debug_sync='now SIGNAL cont3';
|
||||
SET debug_sync='now WAIT_FOR ready4';
|
||||
# Now allow the first insert to queue up to participate in group commit.
|
||||
SET debug_sync='now SIGNAL cont1';
|
||||
SET debug_sync='now WAIT_FOR ready2';
|
||||
# Finally allow the second insert to proceed and do the group commit.
|
||||
SET debug_sync='now SIGNAL cont4';
|
||||
|
||||
--let $wait_condition= SELECT COUNT(*) = 2 FROM t2 WHERE a >= 10
|
||||
--source include/wait_condition.inc
|
||||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
|
||||
# The two INSERT transactions should have been committed in opposite order,
|
||||
# but in the same group commit (seen by precense of cid=# in the SHOW
|
||||
# BINLOG output).
|
||||
--let $binlog_file= slave-bin.000002
|
||||
--source include/show_binlog_events.inc
|
||||
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
--source include/start_slave.inc
|
||||
|
||||
|
||||
--connection server_2
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||
--source include/start_slave.inc
|
||||
|
||||
--connection server_1
|
||||
DROP function foo;
|
||||
DROP TABLE t1,t2;
|
||||
|
||||
--source include/rpl_end.inc
|
||||
|
@ -6623,6 +6623,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
|
||||
return false;
|
||||
|
||||
/* Now enqueue ourselves in the group commit queue. */
|
||||
DEBUG_SYNC(entry->thd, "commit_before_enqueue");
|
||||
entry->thd->clear_wakeup_ready();
|
||||
mysql_mutex_lock(&LOCK_prepare_ordered);
|
||||
orig_queue= group_commit_queue;
|
||||
|
@ -507,6 +507,14 @@ rpl_parallel::rpl_parallel() :
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
rpl_parallel::reset()
|
||||
{
|
||||
my_hash_reset(&domain_hash);
|
||||
current= NULL;
|
||||
}
|
||||
|
||||
|
||||
rpl_parallel::~rpl_parallel()
|
||||
{
|
||||
my_hash_free(&domain_hash);
|
||||
|
@ -76,6 +76,7 @@ struct rpl_parallel {
|
||||
|
||||
rpl_parallel();
|
||||
~rpl_parallel();
|
||||
void reset();
|
||||
rpl_parallel_entry *find(uint32 domain_id);
|
||||
void wait_for_done();
|
||||
bool do_event(rpl_group_info *serial_rgi, Log_event *ev);
|
||||
|
@ -4066,6 +4066,7 @@ pthread_handler_t handle_slave_sql(void *arg)
|
||||
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
|
||||
*/
|
||||
rli->clear_error();
|
||||
rli->parallel.reset();
|
||||
|
||||
//tell the I/O thread to take relay_log_space_limit into account from now on
|
||||
mysql_mutex_lock(&rli->log_space_lock);
|
||||
|
Reference in New Issue
Block a user