mirror of
https://github.com/MariaDB/server.git
synced 2025-07-07 06:01:31 +03:00
MDEV-13073 This part merges the Ali semisync related changes
and specifically the ack receiving functionality. Semisync is turned to be static instead of plugin so its functions are invoked at the same points as RUN_HOOKS. The RUN_HOOKS and the observer interface remain to be removed by later patch. Todo: React on killed status by repl_semisync_master.wait_after_sync(). Currently Repl_semi_sync_master::commit_trx does not check the killed status. There were few bugfixes found that are present in mysql and its unclear whether/how they are covered. Those include: Bug#15985893: GTID SKIPPED EVENTS ON MASTER CAUSE SEMI SYNC TIME-OUTS Bug#17932935 CALLING IS_SEMI_SYNC_SLAVE() IN EACH FUNCTION CALL HAS BAD PERFORMANCE Bug#20574628: SEMI-SYNC REPLICATION PERFORMANCE DEGRADES WITH A HIGH NUMBER OF THREADS
This commit is contained in:
@ -16,6 +16,7 @@ connection master;
|
||||
call mtr.add_suppression("Timeout waiting for reply of binlog");
|
||||
call mtr.add_suppression("Read semi-sync reply");
|
||||
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
|
||||
call mtr.add_suppression("mysqld: Got an error reading communication packets");
|
||||
connection slave;
|
||||
call mtr.add_suppression("Master server does not support semi-sync");
|
||||
call mtr.add_suppression("Semi-sync slave .* reply");
|
||||
|
@ -4,6 +4,7 @@ connection master;
|
||||
call mtr.add_suppression("Timeout waiting for reply of binlog");
|
||||
call mtr.add_suppression("Read semi-sync reply");
|
||||
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
|
||||
call mtr.add_suppression("mysqld: Got an error reading communication packets");
|
||||
connection slave;
|
||||
call mtr.add_suppression("Master server does not support semi-sync");
|
||||
call mtr.add_suppression("Semi-sync slave .* reply");
|
||||
@ -176,7 +177,7 @@ Variable_name Value
|
||||
Rpl_semi_sync_master_yes_tx 14
|
||||
show status like 'Rpl_semi_sync_master_clients';
|
||||
Variable_name Value
|
||||
Rpl_semi_sync_master_clients 1
|
||||
Rpl_semi_sync_master_clients 0
|
||||
[ semi-sync replication of these transactions will fail ]
|
||||
insert into t1 values (500);
|
||||
[ master status should be OFF ]
|
||||
@ -321,7 +322,6 @@ connection slave;
|
||||
include/stop_slave.inc
|
||||
reset slave;
|
||||
connection master;
|
||||
kill query _tid;
|
||||
connection slave;
|
||||
include/start_slave.inc
|
||||
connection master;
|
||||
@ -353,7 +353,6 @@ include/stop_slave.inc
|
||||
reset slave;
|
||||
connection master;
|
||||
reset master;
|
||||
kill query _tid;
|
||||
set sql_log_bin=0;
|
||||
grant replication slave on *.* to rpl@127.0.0.1 identified by 'rpl_password';
|
||||
flush privileges;
|
||||
@ -404,7 +403,6 @@ SHOW STATUS LIKE 'Rpl_semi_sync_slave_status';
|
||||
Variable_name Value
|
||||
Rpl_semi_sync_slave_status OFF
|
||||
connection master;
|
||||
kill query _tid;
|
||||
[ Semi-sync status on master should be ON ]
|
||||
show status like 'Rpl_semi_sync_master_clients';
|
||||
Variable_name Value
|
||||
|
@ -4,6 +4,7 @@ where name like 'Wait/Synch/Mutex/sql/%'
|
||||
and name not in ('wait/synch/mutex/sql/DEBUG_SYNC::mutex')
|
||||
order by name limit 10;
|
||||
NAME ENABLED TIMED
|
||||
wait/synch/mutex/sql/Ack_receiver::m_mutex YES YES
|
||||
wait/synch/mutex/sql/Cversion_lock YES YES
|
||||
wait/synch/mutex/sql/Delayed_insert::mutex YES YES
|
||||
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES
|
||||
@ -13,7 +14,6 @@ wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES
|
||||
wait/synch/mutex/sql/LOCK_active_mi YES YES
|
||||
wait/synch/mutex/sql/LOCK_after_binlog_sync YES YES
|
||||
wait/synch/mutex/sql/LOCK_audit_mask YES YES
|
||||
wait/synch/mutex/sql/LOCK_binlog YES YES
|
||||
select * from performance_schema.setup_instruments
|
||||
where name like 'Wait/Synch/Rwlock/sql/%'
|
||||
and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock')
|
||||
@ -36,6 +36,7 @@ where name like 'Wait/Synch/Cond/sql/%'
|
||||
'wait/synch/cond/sql/DEBUG_SYNC::cond')
|
||||
order by name limit 10;
|
||||
NAME ENABLED TIMED
|
||||
wait/synch/cond/sql/Ack_receiver::m_cond YES YES
|
||||
wait/synch/cond/sql/COND_binlog_send YES YES
|
||||
wait/synch/cond/sql/COND_flush_thread_cache YES YES
|
||||
wait/synch/cond/sql/COND_group_commit_orderer YES YES
|
||||
@ -45,7 +46,6 @@ wait/synch/cond/sql/COND_parallel_entry YES YES
|
||||
wait/synch/cond/sql/COND_prepare_ordered YES YES
|
||||
wait/synch/cond/sql/COND_queue_state YES YES
|
||||
wait/synch/cond/sql/COND_rpl_thread YES YES
|
||||
wait/synch/cond/sql/COND_rpl_thread_pool YES YES
|
||||
select * from performance_schema.setup_instruments
|
||||
where name='Wait';
|
||||
select * from performance_schema.setup_instruments
|
||||
|
@ -86,6 +86,7 @@ EVENT_NAME COUNT_STAR SUM_TIMER_WAIT MIN_TIMER_WAIT AVG_TIMER_WAIT MAX_TIMER_WAI
|
||||
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_bin_log_updated 0 0 0 0 0
|
||||
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_queue_busy 0 0 0 0 0
|
||||
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_relay_log_updated 0 0 0 0 0
|
||||
wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_binlog_end_pos 0 0 0 0 0
|
||||
wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_index 0 0 0 0 0
|
||||
connection slave;
|
||||
"============ Performance schema on slave ============"
|
||||
@ -193,5 +194,6 @@ EVENT_NAME COUNT_STAR
|
||||
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_bin_log_updated NONE
|
||||
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_queue_busy NONE
|
||||
wait/synch/cond/sql/MYSQL_RELAY_LOG::COND_relay_log_updated MANY
|
||||
wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_binlog_end_pos NONE
|
||||
wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_index MANY
|
||||
include/stop_slave.inc
|
||||
|
@ -4,6 +4,7 @@ connection master;
|
||||
call mtr.add_suppression("Timeout waiting for reply of binlog");
|
||||
call mtr.add_suppression("Read semi-sync reply");
|
||||
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
|
||||
call mtr.add_suppression("mysqld: Got an error reading communication packets");
|
||||
connection slave;
|
||||
call mtr.add_suppression("Master server does not support semi-sync");
|
||||
call mtr.add_suppression("Semi-sync slave .* reply");
|
||||
@ -176,7 +177,7 @@ Variable_name Value
|
||||
Rpl_semi_sync_master_yes_tx 14
|
||||
show status like 'Rpl_semi_sync_master_clients';
|
||||
Variable_name Value
|
||||
Rpl_semi_sync_master_clients 1
|
||||
Rpl_semi_sync_master_clients 0
|
||||
[ semi-sync replication of these transactions will fail ]
|
||||
insert into t1 values (500);
|
||||
[ master status should be OFF ]
|
||||
@ -321,7 +322,6 @@ connection slave;
|
||||
include/stop_slave.inc
|
||||
reset slave;
|
||||
connection master;
|
||||
kill query _tid;
|
||||
connection slave;
|
||||
include/start_slave.inc
|
||||
connection master;
|
||||
@ -353,7 +353,6 @@ include/stop_slave.inc
|
||||
reset slave;
|
||||
connection master;
|
||||
reset master;
|
||||
kill query _tid;
|
||||
set sql_log_bin=0;
|
||||
grant replication slave on *.* to rpl@127.0.0.1 identified by 'rpl_password';
|
||||
flush privileges;
|
||||
@ -404,7 +403,6 @@ SHOW STATUS LIKE 'Rpl_semi_sync_slave_status';
|
||||
Variable_name Value
|
||||
Rpl_semi_sync_slave_status OFF
|
||||
connection master;
|
||||
kill query _tid;
|
||||
[ Semi-sync status on master should be ON ]
|
||||
show status like 'Rpl_semi_sync_master_clients';
|
||||
Variable_name Value
|
||||
|
@ -5,6 +5,7 @@ call mtr.add_suppression("Timeout waiting for reply of binlog");
|
||||
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
|
||||
call mtr.add_suppression("Read semi-sync reply");
|
||||
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
|
||||
call mtr.add_suppression("mysqld: Got an error reading communication packets");
|
||||
connection slave;
|
||||
call mtr.add_suppression("Master server does not support semi-sync");
|
||||
call mtr.add_suppression("Semi-sync slave .* reply");
|
||||
|
@ -6,6 +6,7 @@ call mtr.add_suppression("Timeout waiting for reply of binlog");
|
||||
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
|
||||
call mtr.add_suppression("Read semi-sync reply");
|
||||
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
|
||||
call mtr.add_suppression("mysqld: Got an error reading communication packets");
|
||||
connection slave;
|
||||
call mtr.add_suppression("Master server does not support semi-sync");
|
||||
call mtr.add_suppression("Semi-sync slave .* reply");
|
||||
|
@ -10,6 +10,8 @@ call mtr.add_suppression("Timeout waiting for reply of binlog");
|
||||
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
|
||||
call mtr.add_suppression("Read semi-sync reply");
|
||||
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.");
|
||||
call mtr.add_suppression("mysqld: Got an error reading communication packets");
|
||||
|
||||
connection slave;
|
||||
call mtr.add_suppression("Master server does not support semi-sync");
|
||||
call mtr.add_suppression("Semi-sync slave .* reply");
|
||||
|
@ -65,6 +65,12 @@ set global rpl_semi_sync_master_enabled=1e1;
|
||||
ERROR 42000: Incorrect argument type to variable 'rpl_semi_sync_master_enabled'
|
||||
set global rpl_semi_sync_master_enabled="some text";
|
||||
ERROR 42000: Variable 'rpl_semi_sync_master_enabled' can't be set to the value of 'some text'
|
||||
connect con1,localhost,root,,;
|
||||
connect con2,localhost,root,,;
|
||||
disconnect con1;
|
||||
disconnect con2;
|
||||
connection default;
|
||||
SET @@global.rpl_semi_sync_master_enabled = 1;
|
||||
SET @@global.rpl_semi_sync_master_enabled = @start_global_value;
|
||||
select @@global.rpl_semi_sync_master_enabled;
|
||||
@@global.rpl_semi_sync_master_enabled
|
||||
|
@ -51,6 +51,35 @@ set global rpl_semi_sync_master_enabled=1e1;
|
||||
--error ER_WRONG_VALUE_FOR_VAR
|
||||
set global rpl_semi_sync_master_enabled="some text";
|
||||
|
||||
#
|
||||
# Test conflicting concurrent setting
|
||||
#
|
||||
--let $val_saved= `SELECT @@global.rpl_semi_sync_master_enabled`
|
||||
connect (con1,localhost,root,,);
|
||||
connect (con2,localhost,root,,);
|
||||
--let $iter=100
|
||||
--disable_query_log
|
||||
while ($iter)
|
||||
{
|
||||
--connection con1
|
||||
--send_eval SET @@global.rpl_semi_sync_master_enabled = $iter % 2
|
||||
|
||||
--connection con2
|
||||
--send_eval SET @@global.rpl_semi_sync_master_enabled = ($iter + 1) % 2
|
||||
|
||||
--connection con1
|
||||
reap;
|
||||
--connection con2
|
||||
reap;
|
||||
|
||||
--dec $iter
|
||||
}
|
||||
--enable_query_log
|
||||
disconnect con1;
|
||||
disconnect con2;
|
||||
|
||||
--connection default
|
||||
--eval SET @@global.rpl_semi_sync_master_enabled = $val_saved
|
||||
|
||||
#
|
||||
# Cleanup
|
||||
|
@ -139,6 +139,7 @@ SET (SQL_SOURCE
|
||||
my_json_writer.cc
|
||||
rpl_gtid.cc rpl_parallel.cc
|
||||
semisync.cc semisync_master.cc semisync_slave.cc
|
||||
semisync_master_ack_receiver.cc
|
||||
sql_type.cc
|
||||
item_windowfunc.cc sql_window.cc
|
||||
sql_cte.cc
|
||||
|
@ -50,6 +50,7 @@
|
||||
#ifdef WITH_ARIA_STORAGE_ENGINE
|
||||
#include "../storage/maria/ha_maria.h"
|
||||
#endif
|
||||
#include "semisync_master.h"
|
||||
|
||||
#include "wsrep_mysqld.h"
|
||||
#include "wsrep.h"
|
||||
@ -1485,6 +1486,10 @@ done:
|
||||
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
|
||||
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
|
||||
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
|
||||
#ifdef REPLICATION
|
||||
repl_semisync_master.waitAfterCommit(thd, all);
|
||||
DEBUG_SYNC(thd, "after_group_after_commit");
|
||||
#endif
|
||||
goto end;
|
||||
|
||||
/* Come here if error and we need to rollback. */
|
||||
@ -1730,6 +1735,9 @@ int ha_rollback_trans(THD *thd, bool all)
|
||||
ER_WARNING_NOT_COMPLETE_ROLLBACK,
|
||||
ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
|
||||
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
|
||||
#ifdef REPLICATION
|
||||
repl_semisync_master.waitAfterRollback(thd, all);
|
||||
#endif
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
|
80
sql/log.cc
80
sql/log.cc
@ -53,6 +53,7 @@
|
||||
#include "debug_sync.h"
|
||||
#include "sql_show.h"
|
||||
#include "my_pthread.h"
|
||||
#include "semisync_master.h"
|
||||
#include "wsrep_mysqld.h"
|
||||
#include "sp_rcontext.h"
|
||||
#include "sp_head.h"
|
||||
@ -3329,7 +3330,7 @@ void MYSQL_BIN_LOG::init_pthread_objects()
|
||||
mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end,
|
||||
&COND_binlog_background_thread_end, 0);
|
||||
|
||||
mysql_mutex_init(key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
|
||||
mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
|
||||
MY_MUTEX_INIT_SLOW);
|
||||
}
|
||||
|
||||
@ -5250,6 +5251,7 @@ end:
|
||||
close(LOG_CLOSE_INDEX);
|
||||
sql_print_error(fatal_log_error, new_name_ptr, errno);
|
||||
}
|
||||
|
||||
mysql_mutex_unlock(&LOCK_index);
|
||||
if (need_lock)
|
||||
mysql_mutex_unlock(&LOCK_log);
|
||||
@ -6376,7 +6378,12 @@ err:
|
||||
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
|
||||
if ((error= RUN_HOOK(binlog_storage, after_flush,
|
||||
(thd, log_file_name, file->pos_in_file,
|
||||
synced, true, true))))
|
||||
synced, true, true)))
|
||||
#ifdef REPLICATION
|
||||
|| repl_semisync_master.reportBinlogUpdate(thd, log_file_name,
|
||||
file->pos_in_file)
|
||||
#endif
|
||||
)
|
||||
{
|
||||
sql_print_error("Failed to run 'after_flush' hooks");
|
||||
error= 1;
|
||||
@ -6408,7 +6415,12 @@ err:
|
||||
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
|
||||
if (RUN_HOOK(binlog_storage, after_sync,
|
||||
(thd, log_file_name, file->pos_in_file,
|
||||
true, true)))
|
||||
true, true))
|
||||
#ifdef REPLICATION
|
||||
|| repl_semisync_master.waitAfterSync(log_file_name,
|
||||
file->pos_in_file)
|
||||
#endif
|
||||
)
|
||||
{
|
||||
error=1;
|
||||
/* error is already printed inside hook */
|
||||
@ -7589,7 +7601,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
|
||||
else if (is_leader)
|
||||
trx_group_commit_leader(entry);
|
||||
else if (!entry->queued_by_other)
|
||||
{
|
||||
DEBUG_SYNC(entry->thd, "after_semisync_queue");
|
||||
|
||||
entry->thd->wait_for_wakeup_ready();
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
@ -7840,11 +7856,20 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
|
||||
{
|
||||
last= current->next == NULL;
|
||||
if (!current->error &&
|
||||
RUN_HOOK(binlog_storage, after_flush,
|
||||
(current->thd,
|
||||
current->cache_mngr->last_commit_pos_file,
|
||||
current->cache_mngr->last_commit_pos_offset, synced,
|
||||
first, last)))
|
||||
(RUN_HOOK(binlog_storage, after_flush,
|
||||
(current->thd,
|
||||
current->cache_mngr->last_commit_pos_file,
|
||||
current->cache_mngr->last_commit_pos_offset, synced,
|
||||
first, last))
|
||||
#ifdef REPLICATION
|
||||
|| (DBUG_EVALUATE_IF("failed_report_binlog_update", 1, 0) ||
|
||||
repl_semisync_master.
|
||||
reportBinlogUpdate(current->thd,
|
||||
current->cache_mngr->last_commit_pos_file,
|
||||
current->cache_mngr->
|
||||
last_commit_pos_offset))
|
||||
#endif
|
||||
))
|
||||
{
|
||||
current->error= ER_ERROR_ON_WRITE;
|
||||
current->commit_errno= -1;
|
||||
@ -7927,12 +7952,22 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
|
||||
{
|
||||
last= current->next == NULL;
|
||||
if (!current->error &&
|
||||
RUN_HOOK(binlog_storage, after_sync,
|
||||
(RUN_HOOK(binlog_storage, after_sync,
|
||||
(current->thd, current->cache_mngr->last_commit_pos_file,
|
||||
current->cache_mngr->last_commit_pos_offset,
|
||||
first, last)))
|
||||
first, last))
|
||||
#ifdef REPLICATION
|
||||
|| (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) ||
|
||||
repl_semisync_master.waitAfterSync(current->cache_mngr->
|
||||
last_commit_pos_file,
|
||||
current->cache_mngr->
|
||||
last_commit_pos_offset))
|
||||
#endif
|
||||
))
|
||||
{
|
||||
/* error is already printed inside hook */
|
||||
const char *hook_name= rpl_semi_sync_master_enabled ?
|
||||
"'waitAfterSync'" : "binlog_storage 'after_sync'";
|
||||
sql_print_error("Failed to call '%s'", hook_name);
|
||||
}
|
||||
first= false;
|
||||
}
|
||||
@ -8268,24 +8303,6 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
|
||||
LOCK_log is released by the caller.
|
||||
*/
|
||||
|
||||
int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
|
||||
const struct timespec *timeout)
|
||||
{
|
||||
int ret= 0;
|
||||
DBUG_ENTER("wait_for_update_bin_log");
|
||||
|
||||
thd_wait_begin(thd, THD_WAIT_BINLOG);
|
||||
mysql_mutex_assert_owner(&LOCK_binlog_end_pos);
|
||||
if (!timeout)
|
||||
mysql_cond_wait(&COND_bin_log_updated, &LOCK_binlog_end_pos);
|
||||
else
|
||||
ret= mysql_cond_timedwait(&COND_bin_log_updated, &LOCK_binlog_end_pos,
|
||||
const_cast<struct timespec *>(timeout));
|
||||
thd_wait_end(thd);
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
|
||||
int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd,
|
||||
struct timespec *timeout)
|
||||
{
|
||||
@ -10427,7 +10444,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
|
||||
|
||||
*out_gtid_list= NULL;
|
||||
|
||||
if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
|
||||
if (!(ev= Log_event::read_log_event(cache, &init_fdle,
|
||||
opt_master_verify_checksum)) ||
|
||||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
@ -10443,7 +10460,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
|
||||
{
|
||||
Log_event_type typ;
|
||||
|
||||
ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
|
||||
ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum);
|
||||
if (!ev)
|
||||
{
|
||||
errormsg= "Could not read GTID list event while looking for GTID "
|
||||
@ -10473,6 +10490,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
|
||||
return errormsg;
|
||||
}
|
||||
|
||||
|
||||
struct st_mysql_storage_engine binlog_storage_engine=
|
||||
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
|
||||
|
||||
|
11
sql/log.h
11
sql/log.h
@ -349,6 +349,11 @@ public:
|
||||
/* for documentation of mutexes held in various places in code */
|
||||
};
|
||||
|
||||
/* Tell the io thread if we can delay the master info sync. */
|
||||
#define SEMI_SYNC_SLAVE_DELAY_SYNC 1
|
||||
/* Tell the io thread if the current event needs a ack. */
|
||||
#define SEMI_SYNC_NEED_ACK 2
|
||||
|
||||
class MYSQL_QUERY_LOG: public MYSQL_LOG
|
||||
{
|
||||
public:
|
||||
@ -435,6 +440,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
|
||||
PSI_file_key m_key_file_log_index;
|
||||
|
||||
PSI_file_key m_key_COND_queue_busy;
|
||||
/** The instrumentation key to use for LOCK_binlog_end_pos. */
|
||||
PSI_mutex_key m_key_LOCK_binlog_end_pos;
|
||||
#endif
|
||||
|
||||
struct group_commit_entry
|
||||
@ -667,7 +674,8 @@ public:
|
||||
PSI_cond_key key_bin_log_update,
|
||||
PSI_file_key key_file_log,
|
||||
PSI_file_key key_file_log_index,
|
||||
PSI_file_key key_COND_queue_busy)
|
||||
PSI_file_key key_COND_queue_busy,
|
||||
PSI_mutex_key key_LOCK_binlog_end_pos)
|
||||
{
|
||||
m_key_LOCK_index= key_LOCK_index;
|
||||
m_key_relay_log_update= key_relay_log_update;
|
||||
@ -675,6 +683,7 @@ public:
|
||||
m_key_file_log= key_file_log;
|
||||
m_key_file_log_index= key_file_log_index;
|
||||
m_key_COND_queue_busy= key_COND_queue_busy;
|
||||
m_key_LOCK_binlog_end_pos= key_LOCK_binlog_end_pos;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -933,6 +933,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
|
||||
key_LOCK_thread_count, key_LOCK_thread_cache,
|
||||
key_PARTITION_LOCK_auto_inc;
|
||||
PSI_mutex_key key_RELAYLOG_LOCK_index;
|
||||
PSI_mutex_key key_LOCK_relaylog_end_pos;
|
||||
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
|
||||
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
|
||||
PSI_mutex_key key_LOCK_binlog;
|
||||
@ -947,6 +948,8 @@ PSI_mutex_key key_LOCK_after_binlog_sync;
|
||||
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
|
||||
key_LOCK_slave_background;
|
||||
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
|
||||
PSI_mutex_key key_ss_mutex_LOCK_binlog_;
|
||||
PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
|
||||
|
||||
static PSI_mutex_info all_server_mutexes[]=
|
||||
{
|
||||
@ -967,6 +970,7 @@ static PSI_mutex_info all_server_mutexes[]=
|
||||
{ &key_BINLOG_LOCK_binlog_background_thread, "MYSQL_BIN_LOG::LOCK_binlog_background_thread", 0},
|
||||
{ &key_LOCK_binlog_end_pos, "MYSQL_BIN_LOG::LOCK_binlog_end_pos", 0 },
|
||||
{ &key_RELAYLOG_LOCK_index, "MYSQL_RELAY_LOG::LOCK_index", 0},
|
||||
{ &key_LOCK_relaylog_end_pos, "MYSQL_RELAY_LOG::LOCK_binlog_end_pos", 0},
|
||||
{ &key_delayed_insert_mutex, "Delayed_insert::mutex", 0},
|
||||
{ &key_hash_filo_lock, "hash_filo::lock", 0},
|
||||
{ &key_LOCK_active_mi, "LOCK_active_mi", PSI_FLAG_GLOBAL},
|
||||
@ -1024,6 +1028,7 @@ static PSI_mutex_info all_server_mutexes[]=
|
||||
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
|
||||
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
|
||||
{ &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0},
|
||||
{ &key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0},
|
||||
{ &key_LOCK_binlog, "LOCK_binlog", 0}
|
||||
};
|
||||
|
||||
@ -1078,6 +1083,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
|
||||
key_COND_parallel_entry, key_COND_group_commit_orderer,
|
||||
key_COND_prepare_ordered, key_COND_slave_background;
|
||||
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
|
||||
PSI_cond_key key_ss_cond_Ack_receiver_cond;
|
||||
|
||||
static PSI_cond_info all_server_conds[]=
|
||||
{
|
||||
@ -1131,6 +1137,7 @@ static PSI_cond_info all_server_conds[]=
|
||||
{ &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL},
|
||||
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
|
||||
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0},
|
||||
{ &key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0},
|
||||
{ &key_COND_binlog_send, "COND_binlog_send", 0}
|
||||
};
|
||||
|
||||
@ -1138,6 +1145,7 @@ PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
|
||||
key_thread_handle_manager, key_thread_main,
|
||||
key_thread_one_connection, key_thread_signal_hand,
|
||||
key_thread_slave_background, key_rpl_parallel_thread;
|
||||
PSI_thread_key key_ss_thread_Ack_receiver_thread;
|
||||
|
||||
static PSI_thread_info all_server_threads[]=
|
||||
{
|
||||
@ -1164,6 +1172,7 @@ static PSI_thread_info all_server_threads[]=
|
||||
{ &key_thread_one_connection, "one_connection", 0},
|
||||
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
|
||||
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
|
||||
{ &key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_GLOBAL},
|
||||
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
|
||||
};
|
||||
|
||||
@ -1743,6 +1752,7 @@ static void close_connections(void)
|
||||
Events::deinit();
|
||||
slave_prepare_for_shutdown();
|
||||
mysql_bin_log.stop_background_thread();
|
||||
ack_receiver.stop();
|
||||
|
||||
/*
|
||||
Give threads time to die.
|
||||
@ -2226,7 +2236,6 @@ void clean_up(bool print_message)
|
||||
tc_log->close();
|
||||
#ifdef HAVE_REPLICATION
|
||||
semi_sync_master_deinit();
|
||||
semi_sync_slave_deinit();
|
||||
#endif
|
||||
delegates_destroy();
|
||||
xid_cache_free();
|
||||
@ -4251,7 +4260,8 @@ static int init_common_variables()
|
||||
key_BINLOG_COND_bin_log_updated,
|
||||
key_file_binlog,
|
||||
key_file_binlog_index,
|
||||
key_BINLOG_COND_queue_busy);
|
||||
key_BINLOG_COND_queue_busy,
|
||||
key_LOCK_binlog_end_pos);
|
||||
#endif
|
||||
|
||||
/*
|
||||
@ -5183,8 +5193,12 @@ static int init_server_components()
|
||||
"--log-bin option is not defined.");
|
||||
}
|
||||
|
||||
semi_sync_master_init();
|
||||
semi_sync_slave_init();
|
||||
if (repl_semisync_master.initObject() ||
|
||||
repl_semisync_slave.initObject())
|
||||
{
|
||||
sql_print_error("Could not initialize semisync.");
|
||||
unireg_abort(1);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (opt_bin_log)
|
||||
@ -8599,14 +8613,10 @@ SHOW_VAR status_vars[]= {
|
||||
{"Rpl_semi_sync_master_net_wait_time", (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC},
|
||||
{"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC},
|
||||
{"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_FUNC},
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
{"Rpl_semi_sync_master_request_ack", (char*) &rpl_semi_sync_master_request_ack, SHOW_LONGLONG},
|
||||
{"Rpl_semi_sync_master_get_ack", (char*)&rpl_semi_sync_master_get_ack, SHOW_LONGLONG},
|
||||
#endif
|
||||
{"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status, SHOW_BOOL},
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
{"Rpl_semi_sync_slave_send_ack", (char*) &rpl_semi_sync_slave_send_ack, SHOW_LONGLONG},
|
||||
#endif
|
||||
#endif /* HAVE_REPLICATION */
|
||||
#ifdef HAVE_QUERY_CACHE
|
||||
{"Qcache_free_blocks", (char*) &query_cache.free_memory_blocks, SHOW_LONG_NOFLUSH},
|
||||
@ -8950,7 +8960,7 @@ static int mysql_init_variables(void)
|
||||
transactions_multi_engine= 0;
|
||||
rpl_transactions_multi_engine= 0;
|
||||
transactions_gtid_foreign_engine= 0;
|
||||
run_hooks_enabled= 0;
|
||||
run_hooks_enabled= 0; // don't run hooks, semisync does not need 'em
|
||||
log_bin_basename= NULL;
|
||||
log_bin_index= NULL;
|
||||
|
||||
@ -10348,6 +10358,8 @@ PSI_stage_info stage_waiting_for_master_update= { 0, "Waiting for master update"
|
||||
PSI_stage_info stage_waiting_for_relay_log_space= { 0, "Waiting for the slave SQL thread to free enough relay log space", 0};
|
||||
PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
|
||||
{ 0, "Waiting for semi-sync ACK from slave", 0};
|
||||
PSI_stage_info stage_waiting_for_semi_sync_slave={ 0, "Waiting for semi-sync slave connection", 0};
|
||||
PSI_stage_info stage_reading_semi_sync_ack={ 0, "Reading semi-sync ACK from slave", 0};
|
||||
PSI_stage_info stage_waiting_for_slave_mutex_on_exit= { 0, "Waiting for slave mutex on exit", 0};
|
||||
PSI_stage_info stage_waiting_for_slave_thread_to_start= { 0, "Waiting for slave thread to start", 0};
|
||||
PSI_stage_info stage_waiting_for_table_flush= { 0, "Waiting for table flush", 0};
|
||||
@ -10508,6 +10520,9 @@ PSI_stage_info *all_server_stages[]=
|
||||
& stage_gtid_wait_other_connection,
|
||||
& stage_slave_background_process_request,
|
||||
& stage_slave_background_wait_request,
|
||||
& stage_waiting_for_semi_sync_ack_from_slave,
|
||||
& stage_waiting_for_semi_sync_slave,
|
||||
& stage_reading_semi_sync_ack,
|
||||
& stage_waiting_for_deadlock_kill
|
||||
};
|
||||
|
||||
|
@ -310,6 +310,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
|
||||
key_LOCK_start_thread,
|
||||
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
|
||||
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
|
||||
extern PSI_mutex_key key_LOCK_relaylog_end_pos;
|
||||
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
|
||||
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
|
||||
|
||||
|
@ -209,6 +209,10 @@ extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
|
||||
if semisync replication is not enabled, we can return immediately.
|
||||
*/
|
||||
#ifdef HAVE_REPLICATION
|
||||
/*
|
||||
As semisync is unpluggined and its hooks are turned into static
|
||||
invocations all other hooks are not run for optimization sake.
|
||||
*/
|
||||
#define RUN_HOOK(group, hook, args) \
|
||||
(unlikely(run_hooks_enabled) ? group ##_delegate->hook args : 0)
|
||||
#else
|
||||
|
@ -311,6 +311,11 @@ class Master_info : public Slave_reporting_capability
|
||||
|
||||
/* The parallel replication mode. */
|
||||
enum_slave_parallel_mode parallel_mode;
|
||||
/*
|
||||
semi_ack is used to identify if the current binlog event needs an
|
||||
ACK from slave, or if delay_master is enabled.
|
||||
*/
|
||||
int semi_ack;
|
||||
};
|
||||
|
||||
int init_master_info(Master_info* mi, const char* master_info_fname,
|
||||
|
@ -74,7 +74,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
|
||||
key_RELAYLOG_COND_bin_log_updated,
|
||||
key_file_relaylog,
|
||||
key_file_relaylog_index,
|
||||
key_RELAYLOG_COND_queue_busy);
|
||||
key_RELAYLOG_COND_queue_busy,
|
||||
key_LOCK_relaylog_end_pos);
|
||||
#endif
|
||||
|
||||
group_relay_log_name[0]= event_relay_log_name[0]=
|
||||
|
@ -47,6 +47,20 @@ public:
|
||||
return exit_code;
|
||||
}
|
||||
|
||||
inline bool function_exit(const char *func_name, bool exit_code)
|
||||
{
|
||||
if (trace_level_ & kTraceFunction)
|
||||
sql_print_information("<--- %s exit (%s)", func_name,
|
||||
exit_code ? "True" : "False");
|
||||
return exit_code;
|
||||
}
|
||||
|
||||
inline void function_exit(const char *func_name)
|
||||
{
|
||||
if (trace_level_ & kTraceFunction)
|
||||
sql_print_information("<--- %s exit", func_name);
|
||||
}
|
||||
|
||||
Trace()
|
||||
:trace_level_(0L)
|
||||
{}
|
||||
@ -79,5 +93,7 @@ public:
|
||||
#define REPLY_MAGIC_NUM_OFFSET 0
|
||||
#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
|
||||
#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
|
||||
#define REPLY_MESSAGE_MAX_LENGTH \
|
||||
(REPLY_MAGIC_NUM_LEN + REPLY_BINLOG_POS_LEN + REPLY_BINLOG_NAME_LEN)
|
||||
|
||||
#endif /* SEMISYNC_H */
|
||||
|
@ -24,7 +24,9 @@
|
||||
#define TIME_BILLION 1000000000
|
||||
|
||||
/* This indicates whether semi-synchronous replication is enabled. */
|
||||
my_bool rpl_semi_sync_master_enabled;
|
||||
my_bool rpl_semi_sync_master_enabled= 0;
|
||||
unsigned long long rpl_semi_sync_master_request_ack = 0;
|
||||
unsigned long long rpl_semi_sync_master_get_ack = 0;
|
||||
my_bool rpl_semi_sync_master_wait_no_slave = 1;
|
||||
my_bool rpl_semi_sync_master_status = 0;
|
||||
ulong rpl_semi_sync_master_wait_point =
|
||||
@ -47,6 +49,15 @@ ulonglong rpl_semi_sync_master_net_wait_time = 0;
|
||||
ulonglong rpl_semi_sync_master_trx_wait_time = 0;
|
||||
|
||||
ReplSemiSyncMaster repl_semisync_master;
|
||||
Ack_receiver ack_receiver;
|
||||
|
||||
/*
|
||||
structure to save transaction log filename and position
|
||||
*/
|
||||
typedef struct Trans_binlog_info {
|
||||
my_off_t log_pos;
|
||||
char log_file[FN_REFLEN];
|
||||
} Trans_binlog_info;
|
||||
|
||||
static int getWaitTime(const struct timespec& start_ts);
|
||||
|
||||
@ -336,7 +347,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
|
||||
wait_file_pos_(0),
|
||||
master_enabled_(false),
|
||||
wait_timeout_(0L),
|
||||
state_(0)
|
||||
state_(0),
|
||||
wait_point_(0)
|
||||
{
|
||||
strcpy(reply_file_name_, "");
|
||||
strcpy(wait_file_name_, "");
|
||||
@ -345,18 +357,13 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
|
||||
int ReplSemiSyncMaster::initObject()
|
||||
{
|
||||
int result;
|
||||
const char *kWho = "ReplSemiSyncMaster::initObject";
|
||||
|
||||
if (init_done_)
|
||||
{
|
||||
fprintf(stderr, "%s called twice\n", kWho);
|
||||
return 1;
|
||||
}
|
||||
init_done_ = true;
|
||||
|
||||
/* References to the parameter works after set_options(). */
|
||||
setWaitTimeout(rpl_semi_sync_master_timeout);
|
||||
setTraceLevel(rpl_semi_sync_master_trace_level);
|
||||
setWaitPoint(rpl_semi_sync_master_wait_point);
|
||||
|
||||
/* Mutex initialization can only be done after MY_INIT(). */
|
||||
mysql_mutex_init(key_LOCK_binlog,
|
||||
@ -365,9 +372,22 @@ int ReplSemiSyncMaster::initObject()
|
||||
&COND_binlog_send, NULL);
|
||||
|
||||
if (rpl_semi_sync_master_enabled)
|
||||
{
|
||||
result = enableMaster();
|
||||
if (!result)
|
||||
result= ack_receiver.start(); /* Start the ACK thread. */
|
||||
}
|
||||
else
|
||||
{
|
||||
result = disableMaster();
|
||||
}
|
||||
|
||||
/*
|
||||
If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily
|
||||
switch off semisync to avoid hang if there's none active slave.
|
||||
*/
|
||||
if (!rpl_semi_sync_master_wait_no_slave)
|
||||
switch_off();
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -390,7 +410,6 @@ int ReplSemiSyncMaster::enableMaster()
|
||||
|
||||
set_master_enabled(true);
|
||||
state_ = true;
|
||||
run_hooks_enabled= 1;
|
||||
sql_print_information("Semi-sync replication enabled on the master.");
|
||||
}
|
||||
else
|
||||
@ -498,12 +517,50 @@ void ReplSemiSyncMaster::remove_slave()
|
||||
unlock();
|
||||
}
|
||||
|
||||
bool ReplSemiSyncMaster::is_semi_sync_slave()
|
||||
int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar *packet,
|
||||
ulong packet_len)
|
||||
{
|
||||
int null_value;
|
||||
long long val= 0;
|
||||
get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
|
||||
return val;
|
||||
const char *kWho = "ReplSemiSyncMaster::reportReplyPacket";
|
||||
int result= -1;
|
||||
char log_file_name[FN_REFLEN+1];
|
||||
my_off_t log_file_pos;
|
||||
ulong log_file_len = 0;
|
||||
|
||||
function_enter(kWho);
|
||||
|
||||
if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum))
|
||||
{
|
||||
sql_print_error("Read semi-sync reply magic number error");
|
||||
goto l_end;
|
||||
}
|
||||
|
||||
if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET))
|
||||
{
|
||||
sql_print_error("Read semi-sync reply length error: packet is too small");
|
||||
goto l_end;
|
||||
}
|
||||
|
||||
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
|
||||
log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
|
||||
if (unlikely(log_file_len >= FN_REFLEN))
|
||||
{
|
||||
sql_print_error("Read semi-sync reply binlog file length too large");
|
||||
goto l_end;
|
||||
}
|
||||
strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
|
||||
log_file_name[log_file_len] = 0;
|
||||
|
||||
DBUG_ASSERT(dirname_length(log_file_name) == 0);
|
||||
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: Got reply(%s, %lu) from server %u",
|
||||
kWho, log_file_name, (ulong)log_file_pos, server_id);
|
||||
|
||||
rpl_semi_sync_master_get_ack++;
|
||||
reportReplyBinlog(server_id, log_file_name, log_file_pos);
|
||||
|
||||
l_end:
|
||||
return function_exit(kWho, result);
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
|
||||
@ -602,6 +659,121 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
|
||||
return function_exit(kWho, 0);
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::waitAfterSync(const char *log_file, my_off_t log_pos)
|
||||
{
|
||||
if (!getMasterEnabled())
|
||||
return 0;
|
||||
|
||||
int ret= 0;
|
||||
if(log_pos &&
|
||||
waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
|
||||
ret= commitTrx(log_file + dirname_length(log_file), log_pos);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::waitAfterCommit(THD* thd, bool all)
|
||||
{
|
||||
if (!getMasterEnabled())
|
||||
return 0;
|
||||
|
||||
int ret= 0;
|
||||
const char *log_file;
|
||||
my_off_t log_pos;
|
||||
|
||||
bool is_real_trans=
|
||||
(all || thd->transaction.all.ha_list == 0);
|
||||
/*
|
||||
The coordinates are propagated to this point having been computed
|
||||
in reportBinlogUpdate
|
||||
*/
|
||||
Trans_binlog_info *log_info= thd->semisync_info;
|
||||
log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
|
||||
log_pos= log_info ? log_info->log_pos : 0;
|
||||
|
||||
DBUG_ASSERT(!log_file || dirname_length(log_file) == 0);
|
||||
|
||||
if (is_real_trans &&
|
||||
log_pos &&
|
||||
waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
|
||||
ret= commitTrx(log_file, log_pos);
|
||||
|
||||
if (is_real_trans && log_info)
|
||||
{
|
||||
log_info->log_file[0]= 0;
|
||||
log_info->log_pos= 0;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::waitAfterRollback(THD *thd, bool all)
|
||||
{
|
||||
return waitAfterCommit(thd, all);
|
||||
}
|
||||
|
||||
/**
|
||||
The method runs after flush to binary log is done.
|
||||
*/
|
||||
int ReplSemiSyncMaster::reportBinlogUpdate(THD* thd, const char *log_file,
|
||||
my_off_t log_pos)
|
||||
{
|
||||
if (getMasterEnabled())
|
||||
{
|
||||
Trans_binlog_info *log_info;
|
||||
|
||||
if (!(log_info= thd->semisync_info))
|
||||
{
|
||||
if(!(log_info=
|
||||
(Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
|
||||
return 1;
|
||||
thd->semisync_info= log_info;
|
||||
}
|
||||
strcpy(log_info->log_file, log_file + dirname_length(log_file));
|
||||
log_info->log_pos = log_pos;
|
||||
|
||||
return writeTranxInBinlog(log_info->log_file, log_pos);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void ReplSemiSyncMaster::dump_start(THD* thd,
|
||||
const char *log_file,
|
||||
my_off_t log_pos)
|
||||
{
|
||||
if (!thd->semi_sync_slave)
|
||||
return;
|
||||
|
||||
if (ack_receiver.add_slave(thd))
|
||||
{
|
||||
sql_print_error("Failed to register slave to semi-sync ACK receiver "
|
||||
"thread. Turning off semisync");
|
||||
thd->semi_sync_slave= 0;
|
||||
return;
|
||||
}
|
||||
|
||||
add_slave();
|
||||
reportReplyBinlog(thd->variables.server_id, log_file + dirname_length(log_file), log_pos);
|
||||
sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), pos(%s, %lu",
|
||||
thd->variables.server_id, log_file, (unsigned long)log_pos);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void ReplSemiSyncMaster::dump_end(THD* thd)
|
||||
{
|
||||
if (!thd->semi_sync_slave)
|
||||
return;
|
||||
|
||||
sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %d)", thd->variables.server_id);
|
||||
|
||||
remove_slave();
|
||||
ack_receiver.remove_slave(thd);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
|
||||
my_off_t trx_wait_binlog_pos)
|
||||
{
|
||||
@ -850,42 +1022,23 @@ int ReplSemiSyncMaster::try_switch_on(int server_id,
|
||||
return function_exit(kWho, 0);
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
|
||||
ulong size)
|
||||
int ReplSemiSyncMaster::reserveSyncHeader(String* packet)
|
||||
{
|
||||
const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
|
||||
function_enter(kWho);
|
||||
|
||||
int hlen=0;
|
||||
if (!is_semi_sync_slave())
|
||||
{
|
||||
hlen= 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* No enough space for the extra header, disable semi-sync master */
|
||||
if (sizeof(kSyncHeader) > size)
|
||||
{
|
||||
sql_print_warning("No enough space in the packet "
|
||||
"for semi-sync extra header, "
|
||||
"semi-sync replication disabled");
|
||||
disableMaster();
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Set the magic number and the sync status. By default, no sync
|
||||
* is required.
|
||||
*/
|
||||
memcpy(header, kSyncHeader, sizeof(kSyncHeader));
|
||||
hlen= sizeof(kSyncHeader);
|
||||
}
|
||||
return function_exit(kWho, hlen);
|
||||
/* Set the magic number and the sync status. By default, no sync
|
||||
* is required.
|
||||
*/
|
||||
packet->append(reinterpret_cast<const char*>(kSyncHeader),
|
||||
sizeof(kSyncHeader));
|
||||
return function_exit(kWho, 0);
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
|
||||
int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
|
||||
const char *log_file_name,
|
||||
my_off_t log_file_pos,
|
||||
uint32 server_id)
|
||||
bool* need_sync)
|
||||
{
|
||||
const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
|
||||
int cmp = 0;
|
||||
@ -894,8 +1047,11 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
|
||||
/* If the semi-sync master is not enabled, or the slave is not a semi-sync
|
||||
* target, do not request replies from the slave.
|
||||
*/
|
||||
if (!getMasterEnabled() || !is_semi_sync_slave())
|
||||
if (!getMasterEnabled() || !thd->semi_sync_slave)
|
||||
{
|
||||
*need_sync = false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
function_enter(kWho);
|
||||
|
||||
@ -903,12 +1059,15 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
|
||||
|
||||
/* This is the real check inside the mutex. */
|
||||
if (!getMasterEnabled())
|
||||
goto l_end; // sync= false at this point in time
|
||||
{
|
||||
assert(sync == false);
|
||||
goto l_end;
|
||||
}
|
||||
|
||||
if (is_on())
|
||||
{
|
||||
/* semi-sync is ON */
|
||||
/* sync= false; No sync unless a transaction is involved. */
|
||||
sync = false; /* No sync unless a transaction is involved. */
|
||||
|
||||
if (reply_file_name_inited_)
|
||||
{
|
||||
@ -962,8 +1121,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
|
||||
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
|
||||
kWho, server_id, log_file_name,
|
||||
kWho, thd->variables.server_id, log_file_name,
|
||||
(ulong)log_file_pos, sync, (int)is_on());
|
||||
*need_sync= sync;
|
||||
|
||||
l_end:
|
||||
unlock();
|
||||
@ -1033,6 +1193,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
|
||||
log_file_name, (ulong)log_file_pos);
|
||||
switch_off();
|
||||
}
|
||||
else
|
||||
{
|
||||
rpl_semi_sync_master_request_ack++;
|
||||
}
|
||||
}
|
||||
|
||||
l_end:
|
||||
@ -1041,19 +1205,12 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
|
||||
return function_exit(kWho, result);
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
|
||||
const char *event_buf)
|
||||
int ReplSemiSyncMaster::flushNet(THD *thd,
|
||||
const char *event_buf)
|
||||
{
|
||||
const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
|
||||
const unsigned char *packet;
|
||||
char log_file_name[FN_REFLEN];
|
||||
my_off_t log_file_pos;
|
||||
ulong log_file_len = 0;
|
||||
ulong packet_len;
|
||||
const char *kWho = "ReplSemiSyncMaster::flushNet";
|
||||
int result = -1;
|
||||
struct timespec start_ts;
|
||||
ulong trc_level = trace_level_;
|
||||
LINT_INIT_STRUCT(start_ts);
|
||||
NET* net= &thd->net;
|
||||
|
||||
function_enter(kWho);
|
||||
|
||||
@ -1065,9 +1222,6 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
|
||||
goto l_end;
|
||||
}
|
||||
|
||||
if (trc_level & kTraceNetWait)
|
||||
set_timespec(start_ts, 0);
|
||||
|
||||
/* We flush to make sure that the current event is sent to the network,
|
||||
* instead of being buffered in the TCP/IP stack.
|
||||
*/
|
||||
@ -1079,82 +1233,35 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
|
||||
}
|
||||
|
||||
net_clear(net, 0);
|
||||
if (trc_level & kTraceDetail)
|
||||
sql_print_information("%s: Wait for replica's reply", kWho);
|
||||
|
||||
/* Wait for the network here. Though binlog dump thread can indefinitely wait
|
||||
* here, transactions would not wait indefintely.
|
||||
* Transactions wait on binlog replies detected by binlog dump threads. If
|
||||
* binlog dump threads wait too long, transactions will timeout and continue.
|
||||
*/
|
||||
packet_len = my_net_read(net);
|
||||
|
||||
if (trc_level & kTraceNetWait)
|
||||
{
|
||||
int wait_time = getWaitTime(start_ts);
|
||||
if (wait_time < 0)
|
||||
{
|
||||
sql_print_error("Semi-sync master wait for reply "
|
||||
"fail to get wait time.");
|
||||
rpl_semi_sync_master_timefunc_fails++;
|
||||
}
|
||||
else
|
||||
{
|
||||
rpl_semi_sync_master_net_wait_num++;
|
||||
rpl_semi_sync_master_net_wait_time += wait_time;
|
||||
}
|
||||
}
|
||||
|
||||
if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
|
||||
{
|
||||
if (packet_len == packet_error)
|
||||
sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
|
||||
net->last_error, net->last_errno);
|
||||
else
|
||||
sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
|
||||
net->last_error, net->last_errno);
|
||||
goto l_end;
|
||||
}
|
||||
|
||||
packet = net->read_pos;
|
||||
if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
|
||||
{
|
||||
sql_print_error("Read semi-sync reply magic number error");
|
||||
goto l_end;
|
||||
}
|
||||
|
||||
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
|
||||
log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
|
||||
if (log_file_len >= FN_REFLEN)
|
||||
{
|
||||
sql_print_error("Read semi-sync reply binlog file length too large");
|
||||
goto l_end;
|
||||
}
|
||||
strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
|
||||
log_file_name[log_file_len] = 0;
|
||||
|
||||
if (trc_level & kTraceDetail)
|
||||
sql_print_information("%s: Got reply (%s, %lu)",
|
||||
kWho, log_file_name, (ulong)log_file_pos);
|
||||
|
||||
result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
|
||||
net->pkt_nr++;
|
||||
result = 0;
|
||||
rpl_semi_sync_master_net_wait_num++;
|
||||
|
||||
l_end:
|
||||
thd->clear_error();
|
||||
return function_exit(kWho, result);
|
||||
}
|
||||
|
||||
|
||||
int ReplSemiSyncMaster::resetMaster()
|
||||
int ReplSemiSyncMaster::afterResetMaster()
|
||||
{
|
||||
const char *kWho = "ReplSemiSyncMaster::resetMaster";
|
||||
const char *kWho = "ReplSemiSyncMaster::afterResetMaster";
|
||||
int result = 0;
|
||||
|
||||
function_enter(kWho);
|
||||
|
||||
if (rpl_semi_sync_master_enabled)
|
||||
{
|
||||
sql_print_information("Enable Semi-sync Master after reset master");
|
||||
enableMaster();
|
||||
}
|
||||
|
||||
lock();
|
||||
|
||||
state_ = getMasterEnabled()? 1 : 0;
|
||||
if (rpl_semi_sync_master_clients == 0 &&
|
||||
!rpl_semi_sync_master_wait_no_slave)
|
||||
state_ = 0;
|
||||
else
|
||||
state_ = getMasterEnabled()? 1 : 0;
|
||||
|
||||
wait_file_name_inited_ = false;
|
||||
reply_file_name_inited_ = false;
|
||||
@ -1176,6 +1283,31 @@ int ReplSemiSyncMaster::resetMaster()
|
||||
return function_exit(kWho, result);
|
||||
}
|
||||
|
||||
int ReplSemiSyncMaster::beforeResetMaster()
|
||||
{
|
||||
const char *kWho = "ReplSemiSyncMaster::beforeResetMaster";
|
||||
int result = 0;
|
||||
|
||||
function_enter(kWho);
|
||||
|
||||
if (rpl_semi_sync_master_enabled)
|
||||
disableMaster();
|
||||
|
||||
return function_exit(kWho, result);
|
||||
}
|
||||
|
||||
void ReplSemiSyncMaster::checkAndSwitch()
|
||||
{
|
||||
lock();
|
||||
if (getMasterEnabled() && is_on())
|
||||
{
|
||||
if (!rpl_semi_sync_master_wait_no_slave
|
||||
&& rpl_semi_sync_master_clients == 0)
|
||||
switch_off();
|
||||
}
|
||||
unlock();
|
||||
}
|
||||
|
||||
void ReplSemiSyncMaster::setExportStats()
|
||||
{
|
||||
lock();
|
||||
@ -1219,212 +1351,8 @@ static int getWaitTime(const struct timespec& start_ts)
|
||||
return (int)(end_usecs - start_usecs);
|
||||
}
|
||||
|
||||
/***************************************************************************
|
||||
Semisync master interface setup and deinit
|
||||
***************************************************************************/
|
||||
|
||||
C_MODE_START
|
||||
|
||||
int repl_semi_report_binlog_update(Binlog_storage_param *param,
|
||||
const char *log_file,
|
||||
my_off_t log_pos, uint32 flags)
|
||||
{
|
||||
int error= 0;
|
||||
|
||||
if (repl_semisync_master.getMasterEnabled())
|
||||
{
|
||||
/*
|
||||
Let us store the binlog file name and the position, so that
|
||||
we know how long to wait for the binlog to the replicated to
|
||||
the slave in synchronous replication.
|
||||
*/
|
||||
error= repl_semisync_master.writeTranxInBinlog(log_file,
|
||||
log_pos);
|
||||
}
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
int repl_semi_request_commit(Trans_param *param)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_report_binlog_sync(Binlog_storage_param *param,
|
||||
const char *log_file,
|
||||
my_off_t log_pos, uint32 flags)
|
||||
{
|
||||
int error= 0;
|
||||
if (rpl_semi_sync_master_wait_point ==
|
||||
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
|
||||
{
|
||||
error= repl_semisync_master.commitTrx(log_file, log_pos);
|
||||
}
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
int repl_semi_report_commit(Trans_param *param)
|
||||
{
|
||||
if (rpl_semi_sync_master_wait_point !=
|
||||
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
|
||||
|
||||
if (is_real_trans && param->log_pos)
|
||||
{
|
||||
const char *binlog_name= param->log_file;
|
||||
return repl_semisync_master.commitTrx(binlog_name, param->log_pos);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_report_rollback(Trans_param *param)
|
||||
{
|
||||
return repl_semi_report_commit(param);
|
||||
}
|
||||
|
||||
int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
|
||||
const char *log_file,
|
||||
my_off_t log_pos)
|
||||
{
|
||||
bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave();
|
||||
|
||||
if (semi_sync_slave)
|
||||
{
|
||||
/* One more semi-sync slave */
|
||||
repl_semisync_master.add_slave();
|
||||
|
||||
/*
|
||||
Let's assume this semi-sync slave has already received all
|
||||
binlog events before the filename and position it requests.
|
||||
*/
|
||||
repl_semisync_master.reportReplyBinlog(param->server_id, log_file, log_pos);
|
||||
}
|
||||
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
|
||||
semi_sync_slave ? "semi-sync" : "asynchronous",
|
||||
param->server_id, log_file, (ulong)log_pos);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_binlog_dump_end(Binlog_transmit_param *param)
|
||||
{
|
||||
bool semi_sync_slave= repl_semisync_master.is_semi_sync_slave();
|
||||
|
||||
sql_print_information("Stop %s binlog_dump to slave (server_id: %d)",
|
||||
semi_sync_slave ? "semi-sync" : "asynchronous",
|
||||
param->server_id);
|
||||
if (semi_sync_slave)
|
||||
{
|
||||
/* One less semi-sync slave */
|
||||
repl_semisync_master.remove_slave();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_reserve_header(Binlog_transmit_param *param,
|
||||
unsigned char *header,
|
||||
ulong size, ulong *len)
|
||||
{
|
||||
*len += repl_semisync_master.reserveSyncHeader(header, size);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_before_send_event(Binlog_transmit_param *param,
|
||||
unsigned char *packet, ulong len,
|
||||
const char *log_file, my_off_t log_pos)
|
||||
{
|
||||
return repl_semisync_master.updateSyncHeader(packet,
|
||||
log_file,
|
||||
log_pos,
|
||||
param->server_id);
|
||||
}
|
||||
|
||||
int repl_semi_after_send_event(Binlog_transmit_param *param,
|
||||
const char *event_buf, ulong len)
|
||||
{
|
||||
if (repl_semisync_master.is_semi_sync_slave())
|
||||
{
|
||||
THD *thd= current_thd;
|
||||
/*
|
||||
Possible errors in reading slave reply are ignored deliberately
|
||||
because we do not want dump thread to quit on this. Error
|
||||
messages are already reported.
|
||||
*/
|
||||
(void) repl_semisync_master.readSlaveReply(&thd->net,
|
||||
param->server_id, event_buf);
|
||||
thd->clear_error();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_reset_master(Binlog_transmit_param *param)
|
||||
{
|
||||
if (repl_semisync_master.resetMaster())
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
C_MODE_END
|
||||
|
||||
Trans_observer trans_observer=
|
||||
{
|
||||
sizeof(Trans_observer), // len
|
||||
|
||||
repl_semi_report_commit, // after_commit
|
||||
repl_semi_report_rollback, // after_rollback
|
||||
};
|
||||
|
||||
Binlog_storage_observer storage_observer=
|
||||
{
|
||||
sizeof(Binlog_storage_observer), // len
|
||||
|
||||
repl_semi_report_binlog_update, // report_update
|
||||
repl_semi_report_binlog_sync, // after_sync
|
||||
};
|
||||
|
||||
Binlog_transmit_observer transmit_observer=
|
||||
{
|
||||
sizeof(Binlog_transmit_observer), // len
|
||||
|
||||
repl_semi_binlog_dump_start, // start
|
||||
repl_semi_binlog_dump_end, // stop
|
||||
repl_semi_reserve_header, // reserve_header
|
||||
repl_semi_before_send_event, // before_send_event
|
||||
repl_semi_after_send_event, // after_send_event
|
||||
repl_semi_reset_master, // reset
|
||||
};
|
||||
|
||||
static bool semi_sync_master_inited= 0;
|
||||
|
||||
int semi_sync_master_init()
|
||||
{
|
||||
void *p= 0;
|
||||
if (repl_semisync_master.initObject())
|
||||
return 1;
|
||||
if (register_trans_observer(&trans_observer, p))
|
||||
return 1;
|
||||
if (register_binlog_storage_observer(&storage_observer, p))
|
||||
return 1;
|
||||
if (register_binlog_transmit_observer(&transmit_observer, p))
|
||||
return 1;
|
||||
semi_sync_master_inited= 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void semi_sync_master_deinit()
|
||||
{
|
||||
void *p= 0;
|
||||
if (!semi_sync_master_inited)
|
||||
return;
|
||||
|
||||
unregister_trans_observer(&trans_observer, p);
|
||||
unregister_binlog_storage_observer(&storage_observer, p);
|
||||
unregister_binlog_transmit_observer(&transmit_observer, p);
|
||||
repl_semisync_master.cleanup();
|
||||
semi_sync_master_inited= 0;
|
||||
ack_receiver.cleanup();
|
||||
}
|
||||
|
@ -20,14 +20,13 @@
|
||||
#define SEMISYNC_MASTER_H
|
||||
|
||||
#include "semisync.h"
|
||||
#include "semisync_master_ack_receiver.h"
|
||||
|
||||
#ifdef HAVE_PSI_INTERFACE
|
||||
extern PSI_mutex_key key_LOCK_binlog;
|
||||
extern PSI_cond_key key_COND_binlog_send;
|
||||
#endif
|
||||
|
||||
extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
|
||||
|
||||
struct TranxNode {
|
||||
char log_name_[FN_REFLEN];
|
||||
my_off_t log_pos_;
|
||||
@ -432,6 +431,9 @@ class ReplSemiSyncMaster
|
||||
|
||||
bool state_; /* whether semi-sync is switched */
|
||||
|
||||
/*Waiting for ACK before/after innodb commit*/
|
||||
ulong wait_point_;
|
||||
|
||||
void lock();
|
||||
void unlock();
|
||||
void cond_broadcast();
|
||||
@ -473,6 +475,17 @@ class ReplSemiSyncMaster
|
||||
wait_timeout_ = wait_timeout;
|
||||
}
|
||||
|
||||
/*set the ACK point, after binlog sync or after transaction commit*/
|
||||
void setWaitPoint(unsigned long ack_point)
|
||||
{
|
||||
wait_point_ = ack_point;
|
||||
}
|
||||
|
||||
ulong waitPoint() //no cover line
|
||||
{
|
||||
return wait_point_; //no cover line
|
||||
}
|
||||
|
||||
/* Initialize this class after MySQL parameters are initialized. this
|
||||
* function should be called once at bootstrap time.
|
||||
*/
|
||||
@ -490,8 +503,9 @@ class ReplSemiSyncMaster
|
||||
/* Remove a semi-sync replication slave */
|
||||
void remove_slave();
|
||||
|
||||
/* Is the slave servered by the thread requested semi-sync */
|
||||
bool is_semi_sync_slave();
|
||||
/* It parses a reply packet and call reportReplyBinlog to handle it. */
|
||||
int reportReplyPacket(uint32 server_id, const uchar *packet,
|
||||
ulong packet_len);
|
||||
|
||||
/* In semi-sync replication, reports up to which binlog position we have
|
||||
* received replies from the slave indicating that it already get the events.
|
||||
@ -527,42 +541,61 @@ class ReplSemiSyncMaster
|
||||
int commitTrx(const char* trx_wait_binlog_name,
|
||||
my_off_t trx_wait_binlog_pos);
|
||||
|
||||
/*Wait for ACK after writing/sync binlog to file*/
|
||||
int waitAfterSync(const char* log_file, my_off_t log_pos);
|
||||
|
||||
/*Wait for ACK after commting the transaction*/
|
||||
int waitAfterCommit(THD* thd, bool all);
|
||||
|
||||
/*Wait after the transaction is rollback*/
|
||||
int waitAfterRollback(THD *thd, bool all);
|
||||
/*Store the current binlog position in active_tranxs_. This position should
|
||||
* be acked by slave*/
|
||||
int reportBinlogUpdate(THD *thd, const char *log_file,my_off_t log_pos);
|
||||
|
||||
void dump_start(THD* thd,
|
||||
const char *log_file,
|
||||
my_off_t log_pos);
|
||||
|
||||
void dump_end(THD* thd);
|
||||
|
||||
/* Reserve space in the replication event packet header:
|
||||
* . slave semi-sync off: 1 byte - (0)
|
||||
* . slave semi-sync on: 3 byte - (0, 0xef, 0/1}
|
||||
*
|
||||
*
|
||||
* Input:
|
||||
* header - (IN) the header buffer
|
||||
* size - (IN) size of the header buffer
|
||||
* packet - (IN) the header buffer
|
||||
*
|
||||
* Return:
|
||||
* size of the bytes reserved for header
|
||||
*/
|
||||
int reserveSyncHeader(unsigned char *header, unsigned long size);
|
||||
int reserveSyncHeader(String* packet);
|
||||
|
||||
/* Update the sync bit in the packet header to indicate to the slave whether
|
||||
* the master will wait for the reply of the event. If semi-sync is switched
|
||||
* off and we detect that the slave is catching up, we switch semi-sync on.
|
||||
*
|
||||
* Input:
|
||||
* THD - (IN) current dump thread
|
||||
* packet - (IN) the packet containing the replication event
|
||||
* log_file_name - (IN) the event ending position's file name
|
||||
* log_file_pos - (IN) the event ending position's file offset
|
||||
* need_sync - (IN) identify if flushNet is needed to call.
|
||||
* server_id - (IN) master server id number
|
||||
*
|
||||
* Return:
|
||||
* 0: success; non-zero: error
|
||||
*/
|
||||
int updateSyncHeader(unsigned char *packet,
|
||||
int updateSyncHeader(THD* thd, unsigned char *packet,
|
||||
const char *log_file_name,
|
||||
my_off_t log_file_pos,
|
||||
uint32 server_id);
|
||||
my_off_t log_file_pos,
|
||||
bool* need_sync);
|
||||
|
||||
/* Called when a transaction finished writing binlog events.
|
||||
* . update the 'largest' transactions' binlog event position
|
||||
* . insert the ending position in the active transaction list if
|
||||
* semi-sync is on
|
||||
*
|
||||
*
|
||||
* Input: (the transaction events' ending binlog position)
|
||||
* log_file_name - (IN) transaction ending position's file name
|
||||
* log_file_pos - (IN) transaction ending position's file offset
|
||||
@ -574,16 +607,8 @@ class ReplSemiSyncMaster
|
||||
|
||||
/* Read the slave's reply so that we know how much progress the slave makes
|
||||
* on receive replication events.
|
||||
*
|
||||
* Input:
|
||||
* net - (IN) the connection to master
|
||||
* server_id - (IN) master server id number
|
||||
* event_buf - (IN) pointer to the event packet
|
||||
*
|
||||
* Return:
|
||||
* 0: success; non-zero: error
|
||||
*/
|
||||
int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
|
||||
int flushNet(THD* thd, const char *event_buf);
|
||||
|
||||
/* Export internal statistics for semi-sync replication. */
|
||||
void setExportStats();
|
||||
@ -591,7 +616,12 @@ class ReplSemiSyncMaster
|
||||
/* 'reset master' command is issued from the user and semi-sync need to
|
||||
* go off for that.
|
||||
*/
|
||||
int resetMaster();
|
||||
int afterResetMaster();
|
||||
|
||||
/*called before reset master*/
|
||||
int beforeResetMaster();
|
||||
|
||||
void checkAndSwitch();
|
||||
};
|
||||
|
||||
enum rpl_semi_sync_master_wait_point_t {
|
||||
@ -599,6 +629,9 @@ enum rpl_semi_sync_master_wait_point_t {
|
||||
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT,
|
||||
};
|
||||
|
||||
extern ReplSemiSyncMaster repl_semisync_master;
|
||||
extern Ack_receiver ack_receiver;
|
||||
|
||||
/* System and status variables for the master component */
|
||||
extern my_bool rpl_semi_sync_master_enabled;
|
||||
extern my_bool rpl_semi_sync_master_status;
|
||||
@ -620,6 +653,8 @@ extern ulonglong rpl_semi_sync_master_net_wait_num;
|
||||
extern ulonglong rpl_semi_sync_master_trx_wait_num;
|
||||
extern ulonglong rpl_semi_sync_master_net_wait_time;
|
||||
extern ulonglong rpl_semi_sync_master_trx_wait_time;
|
||||
extern unsigned long long rpl_semi_sync_master_request_ack;
|
||||
extern unsigned long long rpl_semi_sync_master_get_ack;
|
||||
|
||||
/*
|
||||
This indicates whether we should keep waiting if no semi-sync slave
|
||||
@ -630,7 +665,10 @@ extern ulonglong rpl_semi_sync_master_trx_wait_time;
|
||||
extern char rpl_semi_sync_master_wait_no_slave;
|
||||
extern ReplSemiSyncMaster repl_semisync_master;
|
||||
|
||||
int semi_sync_master_init();
|
||||
extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
|
||||
extern PSI_stage_info stage_reading_semi_sync_ack;
|
||||
extern PSI_stage_info stage_waiting_for_semi_sync_slave;
|
||||
|
||||
void semi_sync_master_deinit();
|
||||
|
||||
#endif /* SEMISYNC_MASTER_H */
|
||||
|
308
sql/semisync_master_ack_receiver.cc
Normal file
308
sql/semisync_master_ack_receiver.cc
Normal file
@ -0,0 +1,308 @@
|
||||
/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
||||
|
||||
#include <my_global.h>
|
||||
#include "semisync_master.h"
|
||||
#include "semisync_master_ack_receiver.h"
|
||||
|
||||
extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
|
||||
extern PSI_cond_key key_ss_cond_Ack_receiver_cond;
|
||||
extern PSI_thread_key key_ss_thread_Ack_receiver_thread;
|
||||
extern ReplSemiSyncMaster repl_semisync;
|
||||
|
||||
/* Callback function of ack receive thread */
|
||||
pthread_handler_t ack_receive_handler(void *arg)
|
||||
{
|
||||
Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
|
||||
|
||||
my_thread_init();
|
||||
recv->run();
|
||||
my_thread_end();
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Ack_receiver::Ack_receiver()
|
||||
{
|
||||
const char *kWho = "Ack_receiver::Ack_receiver";
|
||||
function_enter(kWho);
|
||||
|
||||
m_status= ST_DOWN;
|
||||
mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex,
|
||||
MY_MUTEX_INIT_FAST);
|
||||
mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL);
|
||||
m_pid= 0;
|
||||
|
||||
function_exit(kWho);
|
||||
}
|
||||
|
||||
void Ack_receiver::cleanup()
|
||||
{
|
||||
const char *kWho = "Ack_receiver::~Ack_receiver";
|
||||
function_enter(kWho);
|
||||
|
||||
stop();
|
||||
mysql_mutex_destroy(&m_mutex);
|
||||
mysql_cond_destroy(&m_cond);
|
||||
|
||||
function_exit(kWho);
|
||||
}
|
||||
|
||||
bool Ack_receiver::start()
|
||||
{
|
||||
const char *kWho = "Ack_receiver::start";
|
||||
function_enter(kWho);
|
||||
|
||||
mysql_mutex_lock(&m_mutex);
|
||||
if(m_status == ST_DOWN)
|
||||
{
|
||||
pthread_attr_t attr;
|
||||
|
||||
m_status= ST_UP;
|
||||
|
||||
if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
|
||||
pthread_attr_init(&attr) != 0 ||
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
|
||||
#ifndef _WIN32
|
||||
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
|
||||
#endif
|
||||
mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,
|
||||
&attr, ack_receive_handler, this))
|
||||
{
|
||||
sql_print_error("Failed to start semi-sync ACK receiver thread, "
|
||||
" could not create thread(errno:%d)", errno);
|
||||
|
||||
m_status= ST_DOWN;
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
|
||||
return function_exit(kWho, true);
|
||||
}
|
||||
(void) pthread_attr_destroy(&attr);
|
||||
}
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
|
||||
return function_exit(kWho, false);
|
||||
}
|
||||
|
||||
void Ack_receiver::stop()
|
||||
{
|
||||
const char *kWho = "Ack_receiver::stop";
|
||||
function_enter(kWho);
|
||||
|
||||
mysql_mutex_lock(&m_mutex);
|
||||
if (m_status == ST_UP)
|
||||
{
|
||||
m_status= ST_STOPPING;
|
||||
mysql_cond_broadcast(&m_cond);
|
||||
|
||||
while (m_status == ST_STOPPING)
|
||||
mysql_cond_wait(&m_cond, &m_mutex);
|
||||
|
||||
DBUG_ASSERT(m_status == ST_DOWN);
|
||||
|
||||
m_pid= 0;
|
||||
}
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
|
||||
function_exit(kWho);
|
||||
}
|
||||
|
||||
bool Ack_receiver::add_slave(THD *thd)
|
||||
{
|
||||
Slave *slave;
|
||||
const char *kWho = "Ack_receiver::add_slave";
|
||||
function_enter(kWho);
|
||||
|
||||
if (!(slave= new Slave))
|
||||
return function_exit(kWho, true);
|
||||
|
||||
slave->thd= thd;
|
||||
slave->vio= *thd->net.vio;
|
||||
slave->vio.mysql_socket.m_psi= NULL;
|
||||
slave->vio.read_timeout= 1;
|
||||
|
||||
mysql_mutex_lock(&m_mutex);
|
||||
m_slaves.push_back(slave);
|
||||
m_slaves_changed= true;
|
||||
mysql_cond_broadcast(&m_cond);
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
|
||||
return function_exit(kWho, false);
|
||||
}
|
||||
|
||||
void Ack_receiver::remove_slave(THD *thd)
|
||||
{
|
||||
I_List_iterator<Slave> it(m_slaves);
|
||||
Slave *slave;
|
||||
const char *kWho = "Ack_receiver::remove_slave";
|
||||
function_enter(kWho);
|
||||
|
||||
mysql_mutex_lock(&m_mutex);
|
||||
|
||||
while ((slave= it++))
|
||||
{
|
||||
if (slave->thd == thd)
|
||||
{
|
||||
delete slave;
|
||||
m_slaves_changed= true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
function_exit(kWho);
|
||||
}
|
||||
|
||||
inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
|
||||
{
|
||||
MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
|
||||
}
|
||||
|
||||
inline void Ack_receiver::wait_for_slave_connection()
|
||||
{
|
||||
set_stage_info(stage_waiting_for_semi_sync_slave);
|
||||
mysql_cond_wait(&m_cond, &m_mutex);
|
||||
}
|
||||
|
||||
my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
|
||||
{
|
||||
my_socket max_fd= INVALID_SOCKET;
|
||||
Slave *slave;
|
||||
I_List_iterator<Slave> it(m_slaves);
|
||||
|
||||
*count= 0;
|
||||
FD_ZERO(fds);
|
||||
while ((slave= it++))
|
||||
{
|
||||
(*count)++;
|
||||
my_socket fd= slave->sock_fd();
|
||||
max_fd= (fd > max_fd ? fd : max_fd);
|
||||
FD_SET(fd, fds);
|
||||
}
|
||||
|
||||
return max_fd;
|
||||
}
|
||||
|
||||
/* Auxilary function to initialize a NET object with given net buffer. */
|
||||
static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
|
||||
{
|
||||
memset(net, 0, sizeof(NET));
|
||||
net->max_packet= buff_len;
|
||||
net->buff= buff;
|
||||
net->buff_end= buff + buff_len;
|
||||
net->read_pos= net->buff;
|
||||
}
|
||||
|
||||
void Ack_receiver::run()
|
||||
{
|
||||
// skip LOCK_global_system_variables due to the 3rd arg
|
||||
THD *thd= new THD(next_thread_id(), false, true);
|
||||
NET net;
|
||||
unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
|
||||
fd_set read_fds;
|
||||
my_socket max_fd= INVALID_SOCKET;
|
||||
Slave *slave;
|
||||
|
||||
my_thread_init();
|
||||
|
||||
DBUG_ENTER("Ack_receiver::run");
|
||||
|
||||
sql_print_information("Starting ack receiver thread");
|
||||
thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
|
||||
thd->thread_stack= (char*) &thd;
|
||||
thd->store_globals();
|
||||
thd->security_ctx->skip_grants();
|
||||
thread_safe_increment32(&service_thread_count);
|
||||
thd->set_command(COM_DAEMON);
|
||||
init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
|
||||
|
||||
mysql_mutex_lock(&m_mutex);
|
||||
m_slaves_changed= true;
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
|
||||
while (1)
|
||||
{
|
||||
fd_set fds;
|
||||
int ret;
|
||||
uint slave_count;
|
||||
|
||||
mysql_mutex_lock(&m_mutex);
|
||||
if (unlikely(m_status == ST_STOPPING))
|
||||
goto end;
|
||||
|
||||
set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
|
||||
if (unlikely(m_slaves_changed))
|
||||
{
|
||||
if (unlikely(m_slaves.is_empty()))
|
||||
{
|
||||
wait_for_slave_connection();
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
continue;
|
||||
}
|
||||
|
||||
max_fd= get_slave_sockets(&read_fds, &slave_count);
|
||||
m_slaves_changed= false;
|
||||
DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd));
|
||||
}
|
||||
|
||||
struct timeval tv= {1, 0};
|
||||
fds= read_fds;
|
||||
/* select requires max fd + 1 for the first argument */
|
||||
ret= select(max_fd+1, &fds, NULL, NULL, &tv);
|
||||
if (ret <= 0)
|
||||
{
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
|
||||
ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
|
||||
|
||||
if (ret == -1)
|
||||
sql_print_information("Failed to select() on semi-sync dump sockets, "
|
||||
"error: errno=%d", socket_errno);
|
||||
/* Sleep 1us, so other threads can catch the m_mutex easily. */
|
||||
my_sleep(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
set_stage_info(stage_reading_semi_sync_ack);
|
||||
I_List_iterator<Slave> it(m_slaves);
|
||||
|
||||
while ((slave= it++))
|
||||
{
|
||||
if (FD_ISSET(slave->sock_fd(), &fds))
|
||||
{
|
||||
ulong len;
|
||||
|
||||
net_clear(&net, 0);
|
||||
net.vio= &slave->vio;
|
||||
|
||||
len= my_net_read(&net);
|
||||
if (likely(len != packet_error))
|
||||
repl_semisync_master.reportReplyPacket(slave->server_id(),
|
||||
net.read_pos, len);
|
||||
else if (net.last_errno == ER_NET_READ_ERROR)
|
||||
FD_CLR(slave->sock_fd(), &read_fds);
|
||||
}
|
||||
}
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
}
|
||||
end:
|
||||
sql_print_information("Stopping ack receiver thread");
|
||||
m_status= ST_DOWN;
|
||||
delete thd;
|
||||
thread_safe_decrement32(&service_thread_count);
|
||||
signal_thd_deleted();
|
||||
mysql_cond_broadcast(&m_cond);
|
||||
mysql_mutex_unlock(&m_mutex);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
119
sql/semisync_master_ack_receiver.h
Normal file
119
sql/semisync_master_ack_receiver.h
Normal file
@ -0,0 +1,119 @@
|
||||
/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
||||
|
||||
#ifndef SEMISYNC_MASTER_ACK_RECEIVER_DEFINED
|
||||
#define SEMISYNC_MASTER_ACK_RECEIVER_DEFINED
|
||||
|
||||
#include "my_global.h"
|
||||
#include "my_pthread.h"
|
||||
#include "sql_class.h"
|
||||
#include "semisync.h"
|
||||
/**
|
||||
Ack_receiver is responsible to control ack receive thread and maintain
|
||||
slave information used by ack receive thread.
|
||||
|
||||
There are mainly four operations on ack receive thread:
|
||||
start: start ack receive thread
|
||||
stop: stop ack receive thread
|
||||
add_slave: maintain a new semisync slave's information
|
||||
remove_slave: remove a semisync slave's information
|
||||
*/
|
||||
class Ack_receiver : public ReplSemiSyncBase
|
||||
{
|
||||
public:
|
||||
Ack_receiver();
|
||||
~Ack_receiver() {}
|
||||
void cleanup();
|
||||
/**
|
||||
Notify ack receiver to receive acks on the dump session.
|
||||
|
||||
It adds the given dump thread into the slave list and wakes
|
||||
up ack thread if it is waiting for any slave coming.
|
||||
|
||||
@param[in] thd THD of a dump thread.
|
||||
|
||||
@return it return false if succeeds, otherwise true is returned.
|
||||
*/
|
||||
bool add_slave(THD *thd);
|
||||
|
||||
/**
|
||||
Notify ack receiver not to receive ack on the dump session.
|
||||
|
||||
it removes the given dump thread from slave list.
|
||||
|
||||
@param[in] thd THD of a dump thread.
|
||||
*/
|
||||
void remove_slave(THD *thd);
|
||||
|
||||
/**
|
||||
Start ack receive thread
|
||||
|
||||
@return it return false if succeeds, otherwise true is returned.
|
||||
*/
|
||||
bool start();
|
||||
|
||||
/**
|
||||
Stop ack receive thread
|
||||
*/
|
||||
void stop();
|
||||
|
||||
/**
|
||||
The core of ack receive thread.
|
||||
|
||||
It monitors all slaves' sockets and receives acks when they come.
|
||||
*/
|
||||
void run();
|
||||
|
||||
void setTraceLevel(unsigned long trace_level)
|
||||
{
|
||||
trace_level_= trace_level;
|
||||
}
|
||||
private:
|
||||
enum status {ST_UP, ST_DOWN, ST_STOPPING};
|
||||
uint8 m_status;
|
||||
/*
|
||||
Protect m_status, m_slaves_changed and m_slaves. ack thread and other
|
||||
session may access the variables at the same time.
|
||||
*/
|
||||
mysql_mutex_t m_mutex;
|
||||
mysql_cond_t m_cond;
|
||||
/* If slave list is updated(add or remove). */
|
||||
bool m_slaves_changed;
|
||||
|
||||
class Slave :public ilink
|
||||
{
|
||||
public:
|
||||
THD *thd;
|
||||
Vio vio;
|
||||
|
||||
my_socket sock_fd() { return vio.mysql_socket.fd; }
|
||||
uint server_id() { return thd->variables.server_id; }
|
||||
};
|
||||
|
||||
I_List<Slave> m_slaves;
|
||||
|
||||
pthread_t m_pid;
|
||||
|
||||
/* Declare them private, so no one can copy the object. */
|
||||
Ack_receiver(const Ack_receiver &ack_receiver);
|
||||
Ack_receiver& operator=(const Ack_receiver &ack_receiver);
|
||||
|
||||
void set_stage_info(const PSI_stage_info &stage);
|
||||
void wait_for_slave_connection();
|
||||
my_socket get_slave_sockets(fd_set *fds, uint *count);
|
||||
};
|
||||
|
||||
extern Ack_receiver ack_receiver;
|
||||
#endif
|
@ -18,10 +18,13 @@
|
||||
#include <my_global.h>
|
||||
#include "semisync_slave.h"
|
||||
|
||||
my_bool rpl_semi_sync_slave_enabled;
|
||||
ReplSemiSyncSlave repl_semisync_slave;
|
||||
|
||||
my_bool rpl_semi_sync_slave_enabled= 0;
|
||||
|
||||
char rpl_semi_sync_slave_delay_master;
|
||||
my_bool rpl_semi_sync_slave_status= 0;
|
||||
ulong rpl_semi_sync_slave_trace_level;
|
||||
ReplSemiSyncSlave repl_semisync_slave;
|
||||
|
||||
/*
|
||||
indicate whether or not the slave should send a reply to the master.
|
||||
@ -31,30 +34,27 @@ ReplSemiSyncSlave repl_semisync_slave;
|
||||
checked in repl_semi_slave_queue_event.
|
||||
*/
|
||||
bool semi_sync_need_reply= false;
|
||||
|
||||
unsigned int rpl_semi_sync_slave_kill_conn_timeout;
|
||||
unsigned long long rpl_semi_sync_slave_send_ack = 0;
|
||||
|
||||
int ReplSemiSyncSlave::initObject()
|
||||
{
|
||||
int result= 0;
|
||||
const char *kWho = "ReplSemiSyncSlave::initObject";
|
||||
|
||||
if (init_done_)
|
||||
{
|
||||
fprintf(stderr, "%s called twice\n", kWho);
|
||||
return 1;
|
||||
}
|
||||
init_done_ = true;
|
||||
|
||||
/* References to the parameter works after set_options(). */
|
||||
setSlaveEnabled(rpl_semi_sync_slave_enabled);
|
||||
setTraceLevel(rpl_semi_sync_slave_trace_level);
|
||||
setDelayMaster(rpl_semi_sync_slave_delay_master);
|
||||
setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
|
||||
unsigned long total_len,
|
||||
bool *need_reply,
|
||||
int *semi_flags,
|
||||
const char **payload,
|
||||
unsigned long *payload_len)
|
||||
{
|
||||
@ -62,128 +62,119 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
|
||||
int read_res = 0;
|
||||
function_enter(kWho);
|
||||
|
||||
if ((unsigned char)(header[0]) == kPacketMagicNum)
|
||||
if (rpl_semi_sync_slave_status)
|
||||
{
|
||||
*need_reply = (header[1] & kPacketFlagSync);
|
||||
*payload_len = total_len - 2;
|
||||
*payload = header + 2;
|
||||
if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1)
|
||||
&& (unsigned char)(header[0]) == kPacketMagicNum)
|
||||
{
|
||||
semi_sync_need_reply = (header[1] & kPacketFlagSync);
|
||||
*payload_len = total_len - 2;
|
||||
*payload = header + 2;
|
||||
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: reply - %d", kWho, *need_reply);
|
||||
}
|
||||
else
|
||||
{
|
||||
sql_print_error("Missing magic number for semi-sync packet, packet "
|
||||
"len: %lu", total_len);
|
||||
read_res = -1;
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: reply - %d", kWho, semi_sync_need_reply);
|
||||
|
||||
if (semi_sync_need_reply)
|
||||
*semi_flags |= SEMI_SYNC_NEED_ACK;
|
||||
if (isDelayMaster())
|
||||
*semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC;
|
||||
}
|
||||
else
|
||||
{
|
||||
sql_print_error("Missing magic number for semi-sync packet, packet "
|
||||
"len: %lu", total_len);
|
||||
read_res = -1;
|
||||
}
|
||||
} else {
|
||||
*payload= header;
|
||||
*payload_len= total_len;
|
||||
}
|
||||
|
||||
return function_exit(kWho, read_res);
|
||||
}
|
||||
|
||||
int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
|
||||
int ReplSemiSyncSlave::slaveStart(Master_info *mi)
|
||||
{
|
||||
bool semi_sync= getSlaveEnabled();
|
||||
|
||||
sql_print_information("Slave I/O thread: Start %s replication to\
|
||||
master '%s@%s:%d' in log '%s' at position %lu",
|
||||
semi_sync ? "semi-sync" : "asynchronous",
|
||||
param->user, param->host, param->port,
|
||||
param->master_log_name[0] ? param->master_log_name : "FIRST",
|
||||
(unsigned long)param->master_log_pos);
|
||||
const_cast<char *>(mi->user), mi->host, mi->port,
|
||||
const_cast<char *>(mi->master_log_name),
|
||||
(unsigned long)(mi->master_log_pos));
|
||||
|
||||
if (semi_sync && !rpl_semi_sync_slave_status)
|
||||
rpl_semi_sync_slave_status= 1;
|
||||
|
||||
/*clear the counter*/
|
||||
rpl_semi_sync_slave_send_ack= 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
|
||||
int ReplSemiSyncSlave::slaveStop(Master_info *mi)
|
||||
{
|
||||
if (rpl_semi_sync_slave_status)
|
||||
rpl_semi_sync_slave_status= 0;
|
||||
if (mysql_reply)
|
||||
mysql_close(mysql_reply);
|
||||
mysql_reply= 0;
|
||||
if (getSlaveEnabled())
|
||||
killConnection(mi->mysql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
|
||||
const char *binlog_filename,
|
||||
my_off_t binlog_filepos)
|
||||
int ReplSemiSyncSlave::resetSlave(Master_info *mi)
|
||||
{
|
||||
const char *kWho = "ReplSemiSyncSlave::slaveReply";
|
||||
NET *net= &mysql->net;
|
||||
uchar reply_buffer[REPLY_MAGIC_NUM_LEN
|
||||
+ REPLY_BINLOG_POS_LEN
|
||||
+ REPLY_BINLOG_NAME_LEN];
|
||||
int reply_res, name_len = strlen(binlog_filename);
|
||||
|
||||
function_enter(kWho);
|
||||
|
||||
/* Prepare the buffer of the reply. */
|
||||
reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
|
||||
int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
|
||||
memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
|
||||
binlog_filename,
|
||||
name_len + 1 /* including trailing '\0' */);
|
||||
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: reply (%s, %lu)", kWho,
|
||||
binlog_filename, (ulong)binlog_filepos);
|
||||
|
||||
net_clear(net, 0);
|
||||
/* Send the reply. */
|
||||
reply_res = my_net_write(net, reply_buffer,
|
||||
name_len + REPLY_BINLOG_NAME_OFFSET);
|
||||
if (!reply_res)
|
||||
{
|
||||
reply_res = net_flush(net);
|
||||
if (reply_res)
|
||||
sql_print_error("Semi-sync slave net_flush() reply failed");
|
||||
}
|
||||
else
|
||||
{
|
||||
sql_print_error("Semi-sync slave send reply failed: %s (%d)",
|
||||
net->last_error, net->last_errno);
|
||||
}
|
||||
|
||||
return function_exit(kWho, reply_res);
|
||||
}
|
||||
|
||||
/***************************************************************************
|
||||
Semisync slave interface setup and deinit
|
||||
***************************************************************************/
|
||||
|
||||
C_MODE_START
|
||||
|
||||
int repl_semi_reset_slave(Binlog_relay_IO_param *param)
|
||||
{
|
||||
// TODO: reset semi-sync slave status here
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
|
||||
uint32 flags)
|
||||
void ReplSemiSyncSlave::killConnection(MYSQL *mysql)
|
||||
{
|
||||
MYSQL *mysql= param->mysql;
|
||||
if (!mysql)
|
||||
return;
|
||||
|
||||
char kill_buffer[30];
|
||||
MYSQL *kill_mysql = NULL;
|
||||
kill_mysql = mysql_init(kill_mysql);
|
||||
mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &kill_conn_timeout_);
|
||||
mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &kill_conn_timeout_);
|
||||
mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &kill_conn_timeout_);
|
||||
|
||||
bool ret= (!mysql_real_connect(kill_mysql, mysql->host,
|
||||
mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0));
|
||||
if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret)
|
||||
{
|
||||
sql_print_information("cannot connect to master to kill slave io_thread's "
|
||||
"connection");
|
||||
if (!ret)
|
||||
mysql_close(kill_mysql);
|
||||
return;
|
||||
}
|
||||
uint kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu",
|
||||
mysql->thread_id);
|
||||
mysql_real_query(kill_mysql, kill_buffer, kill_buffer_length);
|
||||
mysql_close(kill_mysql);
|
||||
}
|
||||
|
||||
int ReplSemiSyncSlave::requestTransmit(Master_info *mi)
|
||||
{
|
||||
MYSQL *mysql= mi->mysql;
|
||||
MYSQL_RES *res= 0;
|
||||
MYSQL_ROW row;
|
||||
const char *query;
|
||||
|
||||
if (!repl_semisync_slave.getSlaveEnabled())
|
||||
if (!getSlaveEnabled())
|
||||
return 0;
|
||||
|
||||
/* Check if master server has semi-sync plugin installed */
|
||||
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
|
||||
if (mysql_real_query(mysql, query, strlen(query)) ||
|
||||
!(res= mysql_store_result(mysql)))
|
||||
{
|
||||
sql_print_error("Execution failed on master: %s", query);
|
||||
sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql));
|
||||
return 1;
|
||||
}
|
||||
|
||||
row= mysql_fetch_row(res);
|
||||
if (!row)
|
||||
if (DBUG_EVALUATE_IF("master_not_support_semisync", 1, 0)
|
||||
|| !row)
|
||||
{
|
||||
/* Master does not support semi-sync */
|
||||
sql_print_warning("Master server does not support semi-sync, "
|
||||
@ -195,8 +186,8 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
|
||||
mysql_free_result(res);
|
||||
|
||||
/*
|
||||
Tell master dump thread that we want to do semi-sync
|
||||
replication
|
||||
Tell master dump thread that we want to do semi-sync
|
||||
replication
|
||||
*/
|
||||
query= "SET @rpl_semi_sync_slave= 1";
|
||||
if (mysql_real_query(mysql, query, strlen(query)))
|
||||
@ -206,83 +197,56 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
|
||||
}
|
||||
mysql_free_result(mysql_store_result(mysql));
|
||||
rpl_semi_sync_slave_status= 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
|
||||
const char *packet, unsigned long len,
|
||||
const char **event_buf, unsigned long *event_len)
|
||||
int ReplSemiSyncSlave::slaveReply(Master_info *mi)
|
||||
{
|
||||
if (rpl_semi_sync_slave_status)
|
||||
return repl_semisync_slave.slaveReadSyncHeader(packet, len,
|
||||
&semi_sync_need_reply,
|
||||
event_buf, event_len);
|
||||
*event_buf= packet;
|
||||
*event_len= len;
|
||||
return 0;
|
||||
}
|
||||
const char *kWho = "ReplSemiSyncSlave::slaveReply";
|
||||
MYSQL* mysql= mi->mysql;
|
||||
const char *binlog_filename= const_cast<char *>(mi->master_log_name);
|
||||
my_off_t binlog_filepos= mi->master_log_pos;
|
||||
|
||||
NET *net= &mysql->net;
|
||||
uchar reply_buffer[REPLY_MAGIC_NUM_LEN
|
||||
+ REPLY_BINLOG_POS_LEN
|
||||
+ REPLY_BINLOG_NAME_LEN];
|
||||
int reply_res = 0;
|
||||
int name_len = strlen(binlog_filename);
|
||||
|
||||
function_enter(kWho);
|
||||
|
||||
int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
|
||||
const char *event_buf,
|
||||
unsigned long event_len,
|
||||
uint32 flags)
|
||||
{
|
||||
if (rpl_semi_sync_slave_status && semi_sync_need_reply)
|
||||
{
|
||||
/*
|
||||
We deliberately ignore the error in slaveReply, such error
|
||||
should not cause the slave IO thread to stop, and the error
|
||||
messages are already reported.
|
||||
*/
|
||||
(void) repl_semisync_slave.slaveReply(param->mysql,
|
||||
param->master_log_name,
|
||||
param->master_log_pos);
|
||||
/* Prepare the buffer of the reply. */
|
||||
reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
|
||||
int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
|
||||
memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
|
||||
binlog_filename,
|
||||
name_len + 1 /* including trailing '\0' */);
|
||||
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: reply (%s, %lu)", kWho,
|
||||
binlog_filename, (ulong)binlog_filepos);
|
||||
|
||||
net_clear(net, 0);
|
||||
/* Send the reply. */
|
||||
reply_res = my_net_write(net, reply_buffer,
|
||||
name_len + REPLY_BINLOG_NAME_OFFSET);
|
||||
if (!reply_res)
|
||||
{
|
||||
reply_res = DBUG_EVALUATE_IF("semislave_failed_net_flush", 1, net_flush(net));
|
||||
if (reply_res)
|
||||
sql_print_error("Semi-sync slave net_flush() reply failed");
|
||||
rpl_semi_sync_slave_send_ack++;
|
||||
}
|
||||
else
|
||||
{
|
||||
sql_print_error("Semi-sync slave send reply failed: %s (%d)",
|
||||
net->last_error, net->last_errno);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
|
||||
{
|
||||
return repl_semisync_slave.slaveStart(param);
|
||||
}
|
||||
|
||||
int repl_semi_slave_io_end(Binlog_relay_IO_param *param)
|
||||
{
|
||||
return repl_semisync_slave.slaveStop(param);
|
||||
}
|
||||
|
||||
C_MODE_END
|
||||
|
||||
Binlog_relay_IO_observer relay_io_observer=
|
||||
{
|
||||
sizeof(Binlog_relay_IO_observer), // len
|
||||
|
||||
repl_semi_slave_io_start, // start
|
||||
repl_semi_slave_io_end, // stop
|
||||
repl_semi_slave_request_dump, // request_transmit
|
||||
repl_semi_slave_read_event, // after_read_event
|
||||
repl_semi_slave_queue_event, // after_queue_event
|
||||
repl_semi_reset_slave, // reset
|
||||
};
|
||||
|
||||
static bool semi_sync_slave_inited= 0;
|
||||
|
||||
int semi_sync_slave_init()
|
||||
{
|
||||
void *p= 0;
|
||||
if (repl_semisync_slave.initObject())
|
||||
return 1;
|
||||
if (register_binlog_relay_io_observer(&relay_io_observer, p))
|
||||
return 1;
|
||||
semi_sync_slave_inited= 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void semi_sync_slave_deinit()
|
||||
{
|
||||
void *p= 0;
|
||||
if (!semi_sync_slave_inited)
|
||||
return;
|
||||
unregister_binlog_relay_io_observer(&relay_io_observer, p);
|
||||
semi_sync_slave_inited= 0;
|
||||
|
||||
return function_exit(kWho, reply_res);
|
||||
}
|
||||
|
@ -19,6 +19,10 @@
|
||||
#define SEMISYNC_SLAVE_H
|
||||
|
||||
#include "semisync.h"
|
||||
#include "my_global.h"
|
||||
#include "sql_priv.h"
|
||||
#include "rpl_mi.h"
|
||||
#include "mysql.h"
|
||||
|
||||
/**
|
||||
The extension class for the slave of semi-synchronous replication
|
||||
@ -44,49 +48,54 @@ public:
|
||||
return slave_enabled_;
|
||||
}
|
||||
void setSlaveEnabled(bool enabled) {
|
||||
run_hooks_enabled|= enabled;
|
||||
slave_enabled_ = enabled;
|
||||
}
|
||||
|
||||
bool isDelayMaster(){
|
||||
return delay_master_;
|
||||
}
|
||||
|
||||
void setDelayMaster(bool enabled) {
|
||||
delay_master_ = enabled;
|
||||
}
|
||||
|
||||
void setKillConnTimeout(unsigned int timeout) {
|
||||
kill_conn_timeout_ = timeout;
|
||||
}
|
||||
|
||||
/* A slave reads the semi-sync packet header and separate the metadata
|
||||
* from the payload data.
|
||||
*
|
||||
*
|
||||
* Input:
|
||||
* header - (IN) packet header pointer
|
||||
* total_len - (IN) total packet length: metadata + payload
|
||||
* need_reply - (IN) whether the master is waiting for the reply
|
||||
* semi_flags - (IN) store flags: SEMI_SYNC_SLAVE_DELAY_SYNC and SEMI_SYNC_NEED_ACK
|
||||
* payload - (IN) payload: the replication event
|
||||
* payload_len - (IN) payload length
|
||||
*
|
||||
* Return:
|
||||
* 0: success; non-zero: error
|
||||
*/
|
||||
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
|
||||
int slaveReadSyncHeader(const char *header, unsigned long total_len, int *semi_flags,
|
||||
const char **payload, unsigned long *payload_len);
|
||||
|
||||
/* A slave replies to the master indicating its replication process. It
|
||||
* indicates that the slave has received all events before the specified
|
||||
* binlog position.
|
||||
*
|
||||
* Input:
|
||||
* mysql - (IN) the mysql network connection
|
||||
* binlog_filename - (IN) the reply point's binlog file name
|
||||
* binlog_filepos - (IN) the reply point's binlog file offset
|
||||
*
|
||||
* Return:
|
||||
* 0: success; non-zero: error
|
||||
*/
|
||||
int slaveReply(MYSQL *mysql, const char *binlog_filename,
|
||||
my_off_t binlog_filepos);
|
||||
|
||||
int slaveStart(Binlog_relay_IO_param *param);
|
||||
int slaveStop(Binlog_relay_IO_param *param);
|
||||
int slaveReply(Master_info* mi);
|
||||
int slaveStart(Master_info *mi);
|
||||
int slaveStop(Master_info *mi);
|
||||
int requestTransmit(Master_info*);
|
||||
void killConnection(MYSQL *mysql);
|
||||
int resetSlave(Master_info *mi);
|
||||
|
||||
private:
|
||||
/* True when initObject has been called */
|
||||
bool init_done_;
|
||||
bool slave_enabled_; /* semi-sycn is enabled on the slave */
|
||||
MYSQL *mysql_reply; /* connection to send reply */
|
||||
bool delay_master_;
|
||||
unsigned int kill_conn_timeout_;
|
||||
};
|
||||
|
||||
|
||||
@ -96,7 +105,8 @@ extern my_bool rpl_semi_sync_slave_status;
|
||||
extern ulong rpl_semi_sync_slave_trace_level;
|
||||
extern ReplSemiSyncSlave repl_semisync_slave;
|
||||
|
||||
int semi_sync_slave_init();
|
||||
void semi_sync_slave_deinit();
|
||||
extern char rpl_semi_sync_slave_delay_master;
|
||||
extern unsigned int rpl_semi_sync_slave_kill_conn_timeout;
|
||||
extern unsigned long long rpl_semi_sync_slave_send_ack;
|
||||
|
||||
#endif /* SEMISYNC_SLAVE_H */
|
||||
|
34
sql/slave.cc
34
sql/slave.cc
@ -61,6 +61,7 @@
|
||||
#include "debug_sync.h"
|
||||
#include "rpl_parallel.h"
|
||||
#include "sql_show.h"
|
||||
#include "semisync_slave.h"
|
||||
|
||||
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
|
||||
|
||||
@ -3590,7 +3591,9 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
|
||||
before_request_transmit,
|
||||
(thd, mi, binlog_flags)))
|
||||
DBUG_RETURN(1);
|
||||
|
||||
if (repl_semisync_slave.requestTransmit(mi))
|
||||
DBUG_RETURN(1);
|
||||
|
||||
// TODO if big log files: Change next to int8store()
|
||||
int4store(buf, (ulong) mi->master_log_pos);
|
||||
int2store(buf + 4, binlog_flags);
|
||||
@ -4615,7 +4618,9 @@ pthread_handler_t handle_slave_io(void *arg)
|
||||
}
|
||||
|
||||
|
||||
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
|
||||
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)) ||
|
||||
(DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
|
||||
|| repl_semisync_slave.slaveStart(mi)))
|
||||
{
|
||||
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
|
||||
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
|
||||
@ -4805,9 +4810,13 @@ Stopping slave I/O thread due to out-of-memory error from master");
|
||||
retry_count=0; // ok event, reset retry counter
|
||||
THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log);
|
||||
event_buf= (const char*)mysql->net.read_pos + 1;
|
||||
mi->semi_ack= 0;
|
||||
if (RUN_HOOK(binlog_relay_io, after_read_event,
|
||||
(thd, mi,(const char*)mysql->net.read_pos + 1,
|
||||
event_len, &event_buf, &event_len)))
|
||||
event_len, &event_buf, &event_len)) ||
|
||||
repl_semisync_slave.
|
||||
slaveReadSyncHeader((const char*)mysql->net.read_pos + 1, event_len,
|
||||
&(mi->semi_ack), &event_buf, &event_len))
|
||||
{
|
||||
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
|
||||
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
|
||||
@ -4868,7 +4877,10 @@ Stopping slave I/O thread due to out-of-memory error from master");
|
||||
}
|
||||
|
||||
if (RUN_HOOK(binlog_relay_io, after_queue_event,
|
||||
(thd, mi, event_buf, event_len, synced)))
|
||||
(thd, mi, event_buf, event_len, synced)) ||
|
||||
(rpl_semi_sync_slave_status &&
|
||||
(mi->semi_ack & SEMI_SYNC_NEED_ACK) &&
|
||||
repl_semisync_slave.slaveReply(mi)))
|
||||
{
|
||||
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
|
||||
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
|
||||
@ -4877,7 +4889,16 @@ Stopping slave I/O thread due to out-of-memory error from master");
|
||||
}
|
||||
|
||||
if (mi->using_gtid == Master_info::USE_GTID_NO &&
|
||||
flush_master_info(mi, TRUE, TRUE))
|
||||
/*
|
||||
If rpl_semi_sync_slave_delay_master is enabled, we will flush
|
||||
master info only when ack is needed. This may lead to at least one
|
||||
group transaction delay but affords better performance improvement.
|
||||
*/
|
||||
(!repl_semisync_slave.getSlaveEnabled() ||
|
||||
(!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) ||
|
||||
(mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) &&
|
||||
(DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) ||
|
||||
flush_master_info(mi, TRUE, TRUE)))
|
||||
{
|
||||
sql_print_error("Failed to flush master info file");
|
||||
goto err;
|
||||
@ -4931,7 +4952,8 @@ err:
|
||||
IO_RPL_LOG_NAME, mi->master_log_pos,
|
||||
tmp.c_ptr_safe());
|
||||
}
|
||||
RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
|
||||
(void) RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
|
||||
repl_semisync_slave.slaveStop(mi);
|
||||
thd->reset_query();
|
||||
thd->reset_db(NULL, 0);
|
||||
if (mysql)
|
||||
|
@ -706,7 +706,7 @@ extern "C" void thd_kill_timeout(THD* thd)
|
||||
}
|
||||
|
||||
|
||||
THD::THD(my_thread_id id, bool is_wsrep_applier)
|
||||
THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock)
|
||||
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
|
||||
/* statement id */ 0),
|
||||
rli_fake(0), rgi_fake(0), rgi_slave(NULL),
|
||||
@ -893,7 +893,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
|
||||
/* Call to init() below requires fully initialized Open_tables_state. */
|
||||
reset_open_tables_state(this);
|
||||
|
||||
init();
|
||||
init(skip_global_sys_var_lock);
|
||||
#if defined(ENABLED_PROFILING)
|
||||
profiling.set_thd(this);
|
||||
#endif
|
||||
@ -1264,10 +1264,11 @@ const Type_handler *THD::type_handler_for_date() const
|
||||
Init common variables that has to be reset on start and on change_user
|
||||
*/
|
||||
|
||||
void THD::init(void)
|
||||
void THD::init(bool skip_lock)
|
||||
{
|
||||
DBUG_ENTER("thd::init");
|
||||
mysql_mutex_lock(&LOCK_global_system_variables);
|
||||
if (!skip_lock)
|
||||
mysql_mutex_lock(&LOCK_global_system_variables);
|
||||
plugin_thdvar_init(this);
|
||||
/*
|
||||
plugin_thd_var_init() sets variables= global_system_variables, which
|
||||
@ -1280,8 +1281,8 @@ void THD::init(void)
|
||||
::strmake(default_master_connection_buff,
|
||||
global_system_variables.default_master_connection.str,
|
||||
variables.default_master_connection.length);
|
||||
|
||||
mysql_mutex_unlock(&LOCK_global_system_variables);
|
||||
if (!skip_lock)
|
||||
mysql_mutex_unlock(&LOCK_global_system_variables);
|
||||
|
||||
user_time.val= start_time= start_time_sec_part= 0;
|
||||
|
||||
@ -4193,7 +4194,8 @@ my_bool thd_net_is_killed()
|
||||
|
||||
void thd_increment_bytes_received(void *thd, ulong length)
|
||||
{
|
||||
((THD*) thd)->status_var.bytes_received+= length;
|
||||
if (thd != NULL) // MDEV-13073 Ack collector having NULL
|
||||
((THD*) thd)->status_var.bytes_received+= length;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1569,7 +1569,8 @@ enum enum_thread_type
|
||||
SYSTEM_THREAD_EVENT_WORKER= 16,
|
||||
SYSTEM_THREAD_BINLOG_BACKGROUND= 32,
|
||||
SYSTEM_THREAD_SLAVE_BACKGROUND= 64,
|
||||
SYSTEM_THREAD_GENERIC= 128
|
||||
SYSTEM_THREAD_GENERIC= 128,
|
||||
SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND= 256
|
||||
};
|
||||
|
||||
inline char const *
|
||||
@ -1585,6 +1586,7 @@ show_system_thread(enum_thread_type thread)
|
||||
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_SCHEDULER);
|
||||
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_WORKER);
|
||||
RETURN_NAME_AS_STRING(SYSTEM_THREAD_SLAVE_BACKGROUND);
|
||||
RETURN_NAME_AS_STRING(SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND);
|
||||
default:
|
||||
sprintf(buf, "<UNKNOWN SYSTEM THREAD: %d>", thread);
|
||||
return buf;
|
||||
@ -2261,7 +2263,8 @@ public:
|
||||
|
||||
/* Needed by MariaDB semi sync replication */
|
||||
Trans_binlog_info *semisync_info;
|
||||
|
||||
/* If this is a semisync slave connection. */
|
||||
bool semi_sync_slave;
|
||||
ulonglong client_capabilities; /* What the client supports */
|
||||
ulong max_client_packet_length;
|
||||
|
||||
@ -3147,11 +3150,20 @@ public:
|
||||
/* Debug Sync facility. See debug_sync.cc. */
|
||||
struct st_debug_sync_control *debug_sync_control;
|
||||
#endif /* defined(ENABLED_DEBUG_SYNC) */
|
||||
THD(my_thread_id id, bool is_wsrep_applier= false);
|
||||
/**
|
||||
@param id thread identifier
|
||||
@param is_wsrep_applier thread type
|
||||
@param skip_lock instruct whether @c LOCK_global_system_variables
|
||||
is already locked, to not acquire it then.
|
||||
*/
|
||||
THD(my_thread_id id, bool is_wsrep_applier= false, bool skip_lock= false);
|
||||
|
||||
~THD();
|
||||
|
||||
void init(void);
|
||||
/**
|
||||
@param skip_lock instruct whether @c LOCK_global_system_variables
|
||||
is already locked, to not acquire it then.
|
||||
*/
|
||||
void init(bool skip_lock= false);
|
||||
/*
|
||||
Initialize memory roots necessary for query processing and (!)
|
||||
pre-allocate memory for it. We can't do that in THD constructor because
|
||||
|
126
sql/sql_repl.cc
126
sql/sql_repl.cc
@ -30,7 +30,8 @@
|
||||
#include <my_dir.h>
|
||||
#include "rpl_handler.h"
|
||||
#include "debug_sync.h"
|
||||
#include "log.h" // get_gtid_list_event
|
||||
#include "semisync_master.h"
|
||||
#include "semisync_slave.h"
|
||||
|
||||
enum enum_gtid_until_state {
|
||||
GTID_UNTIL_NOT_DONE,
|
||||
@ -160,6 +161,7 @@ struct binlog_send_info {
|
||||
|
||||
bool clear_initial_log_pos;
|
||||
bool should_stop;
|
||||
size_t dirlen;
|
||||
|
||||
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
|
||||
char *lfn)
|
||||
@ -315,14 +317,30 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags,
|
||||
|
||||
if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
|
||||
{
|
||||
/* RUN_HOOK() must return zero when thd->semi_sync_slave */
|
||||
DBUG_ASSERT(!info->thd->semi_sync_slave);
|
||||
|
||||
info->error= ER_UNKNOWN_ERROR;
|
||||
*errmsg= "Failed to run hook 'reserve_header'";
|
||||
ret= 1;
|
||||
}
|
||||
if (info->thd->semi_sync_slave)
|
||||
{
|
||||
repl_semisync_master.reserveSyncHeader(packet);
|
||||
}
|
||||
|
||||
*ev_offset= packet->length();
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline bool is_semi_sync_slave()
|
||||
{
|
||||
int null_value;
|
||||
long long val= 0;
|
||||
get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
|
||||
return val;
|
||||
}
|
||||
|
||||
static int send_file(THD *thd)
|
||||
{
|
||||
NET* net = &thd->net;
|
||||
@ -875,73 +893,6 @@ get_binlog_list(MEM_ROOT *memroot)
|
||||
DBUG_RETURN(current_list);
|
||||
}
|
||||
|
||||
/*
|
||||
Find the Gtid_list_log_event at the start of a binlog.
|
||||
|
||||
NULL for ok, non-NULL error message for error.
|
||||
|
||||
If ok, then the event is returned in *out_gtid_list. This can be NULL if we
|
||||
get back to binlogs written by old server version without GTID support. If
|
||||
so, it means we have reached the point to start from, as no GTID events can
|
||||
exist in earlier binlogs.
|
||||
*/
|
||||
|
||||
static const char *
|
||||
get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
|
||||
{
|
||||
Format_description_log_event init_fdle(BINLOG_VERSION);
|
||||
Format_description_log_event *fdle;
|
||||
Log_event *ev;
|
||||
const char *errormsg = NULL;
|
||||
|
||||
*out_gtid_list= NULL;
|
||||
|
||||
if (!(ev= Log_event::read_log_event(cache, &init_fdle,
|
||||
opt_master_verify_checksum)) ||
|
||||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
if (ev)
|
||||
delete ev;
|
||||
return "Could not read format description log event while looking for "
|
||||
"GTID position in binlog";
|
||||
}
|
||||
|
||||
fdle= static_cast<Format_description_log_event *>(ev);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
Log_event_type typ;
|
||||
|
||||
ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum);
|
||||
if (!ev)
|
||||
{
|
||||
errormsg= "Could not read GTID list event while looking for GTID "
|
||||
"position in binlog";
|
||||
break;
|
||||
}
|
||||
typ= ev->get_type_code();
|
||||
if (typ == GTID_LIST_EVENT)
|
||||
break; /* Done, found it */
|
||||
if (typ == START_ENCRYPTION_EVENT)
|
||||
{
|
||||
if (fdle->start_decryption((Start_encryption_log_event*) ev))
|
||||
errormsg= "Could not set up decryption for binlog.";
|
||||
}
|
||||
delete ev;
|
||||
if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
|
||||
typ == FORMAT_DESCRIPTION_EVENT || typ == START_ENCRYPTION_EVENT)
|
||||
continue; /* Continue looking */
|
||||
|
||||
/* We did not find any Gtid_list_log_event, must be old binlog. */
|
||||
ev= NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
delete fdle;
|
||||
*out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
|
||||
return errormsg;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Check if every GTID requested by the slave is contained in this (or a later)
|
||||
@ -1673,6 +1624,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
|
||||
enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
|
||||
slave_connection_state *gtid_state= &info->gtid_state;
|
||||
slave_connection_state *until_gtid_state= info->until_gtid_state;
|
||||
bool need_sync= false;
|
||||
|
||||
if (event_type == GTID_LIST_EVENT &&
|
||||
info->using_gtid_state && until_gtid_state)
|
||||
@ -1984,7 +1936,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
|
||||
|
||||
pos= my_b_tell(log);
|
||||
if (RUN_HOOK(binlog_transmit, before_send_event,
|
||||
(info->thd, info->flags, packet, info->log_file_name, pos)))
|
||||
(info->thd, info->flags, packet, info->log_file_name, pos)) ||
|
||||
repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(),
|
||||
info->log_file_name + info->dirlen,
|
||||
pos, &need_sync))
|
||||
{
|
||||
info->error= ER_UNKNOWN_ERROR;
|
||||
return "run 'before_send_event' hook failed";
|
||||
@ -2012,6 +1967,8 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
|
||||
info->error= ER_UNKNOWN_ERROR;
|
||||
return "Failed to run hook 'after_send_event'";
|
||||
}
|
||||
if (need_sync)
|
||||
repl_semisync_master.flushNet(info->thd, packet->c_ptr());
|
||||
|
||||
return NULL; /* Success */
|
||||
}
|
||||
@ -2748,7 +2705,7 @@ static int send_one_binlog_file(binlog_send_info *info,
|
||||
/** end of file or error */
|
||||
return (int)end_pos;
|
||||
}
|
||||
|
||||
info->dirlen= dirname_length(info->log_file_name);
|
||||
/**
|
||||
* send events from current position up to end_pos
|
||||
*/
|
||||
@ -2770,6 +2727,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
|
||||
binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
|
||||
binlog_send_info *info= &infoobj;
|
||||
bool has_transmit_started= false;
|
||||
|
||||
int old_max_allowed_packet= thd->variables.max_allowed_packet;
|
||||
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
|
||||
@ -2792,6 +2750,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
info->error= ER_UNKNOWN_ERROR;
|
||||
goto err;
|
||||
}
|
||||
has_transmit_started= true;
|
||||
|
||||
/* Check if the dump thread is created by a slave with semisync enabled. */
|
||||
thd->semi_sync_slave = is_semi_sync_slave();
|
||||
repl_semisync_master.dump_start(thd, log_ident, pos);
|
||||
|
||||
/*
|
||||
heartbeat_period from @master_heartbeat_period user variable
|
||||
@ -2908,7 +2871,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
|
||||
|
||||
err:
|
||||
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
|
||||
RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
|
||||
(void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
|
||||
if (has_transmit_started)
|
||||
{
|
||||
repl_semisync_master.dump_end(thd);
|
||||
}
|
||||
|
||||
if (info->thd->killed == KILL_SLAVE_SAME_ID)
|
||||
{
|
||||
@ -3374,7 +3341,9 @@ int reset_slave(THD *thd, Master_info* mi)
|
||||
else if (global_system_variables.log_warnings > 1)
|
||||
sql_print_information("Deleted Master_info file '%s'.", fname);
|
||||
|
||||
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
|
||||
(void) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
|
||||
if (rpl_semi_sync_slave_enabled)
|
||||
repl_semisync_slave.resetSlave(mi);
|
||||
err:
|
||||
mi->unlock_slave_threads();
|
||||
if (error)
|
||||
@ -3876,11 +3845,14 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
|
||||
next_log_number))
|
||||
return 1;
|
||||
RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
|
||||
return 0;
|
||||
bool ret= 0;
|
||||
/* Temporarily disable master semisync before reseting master. */
|
||||
repl_semisync_master.beforeResetMaster();
|
||||
ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
|
||||
next_log_number);
|
||||
(void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
|
||||
repl_semisync_master.afterResetMaster();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
@ -3051,22 +3051,18 @@ static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd,
|
||||
{
|
||||
if (repl_semisync_master.enableMaster() != 0)
|
||||
rpl_semi_sync_master_enabled= false;
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
else if (ack_receiver.start())
|
||||
{
|
||||
repl_semisync_master.disableMaster();
|
||||
rpl_semi_sync_master_enabled= false;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
if (repl_semisync_master.disableMaster() != 0)
|
||||
rpl_semi_sync_master_enabled= true;
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
if (!rpl_semi_sync_master_enabled)
|
||||
ack_receiver.stop();
|
||||
#endif
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@ -3082,27 +3078,21 @@ static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd,
|
||||
enum_var_type type)
|
||||
{
|
||||
repl_semisync_master.setTraceLevel(rpl_semi_sync_master_trace_level);
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level);
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd,
|
||||
enum_var_type type)
|
||||
{
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
repl_semisync_master.setWaitPoint(rpl_semi_sync_master_wait_point);
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd,
|
||||
enum_var_type type)
|
||||
{
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
repl_semisync_master.checkAndSwitch();
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -3168,7 +3158,6 @@ static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd,
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd,
|
||||
enum_var_type type)
|
||||
{
|
||||
@ -3182,8 +3171,6 @@ static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd,
|
||||
repl_semisync_slave.setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout);
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
static Sys_var_mybool Sys_semisync_slave_enabled(
|
||||
"rpl_semi_sync_slave_enabled",
|
||||
@ -3202,7 +3189,6 @@ static Sys_var_ulong Sys_semisync_slave_trace_level(
|
||||
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
|
||||
ON_UPDATE(fix_rpl_semi_sync_slave_trace_level));
|
||||
|
||||
#ifdef HAVE_ACC_RECEIVER
|
||||
static Sys_var_mybool Sys_semisync_slave_delay_master(
|
||||
"rpl_semi_sync_slave_delay_master",
|
||||
"Only write master info file when ack is needed.",
|
||||
@ -3221,7 +3207,6 @@ static Sys_var_uint Sys_semisync_slave_kill_conn_timeout(
|
||||
VALID_RANGE(0, UINT_MAX), DEFAULT(5), BLOCK_SIZE(1),
|
||||
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
|
||||
ON_UPDATE(fix_rpl_semi_sync_slave_kill_conn_timeout));
|
||||
#endif
|
||||
#endif /* HAVE_REPLICATION */
|
||||
|
||||
static Sys_var_ulong Sys_slow_launch_time(
|
||||
|
@ -24,7 +24,7 @@
|
||||
#include "rpl_handler.h"
|
||||
#include "debug_sync.h" // DEBUG_SYNC
|
||||
#include "sql_acl.h"
|
||||
|
||||
#include "semisync_master.h"
|
||||
|
||||
#ifndef EMBEDDED_LIBRARY
|
||||
/**
|
||||
@ -318,9 +318,19 @@ bool trans_commit(THD *thd)
|
||||
transaction, so the hooks for rollback will be called.
|
||||
*/
|
||||
if (res)
|
||||
{
|
||||
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
|
||||
#ifdef HAVE_REPLICATION
|
||||
repl_semisync_master.waitAfterRollback(thd, FALSE);
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
|
||||
#ifdef HAVE_REPLICATION
|
||||
repl_semisync_master.waitAfterCommit(thd, FALSE);
|
||||
#endif
|
||||
}
|
||||
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
|
||||
thd->transaction.all.reset();
|
||||
thd->lex->start_transaction_opt= 0;
|
||||
@ -414,6 +424,9 @@ bool trans_rollback(THD *thd)
|
||||
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
|
||||
res= ha_rollback_trans(thd, TRUE);
|
||||
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
|
||||
#ifdef HAVE_REPLICATION
|
||||
repl_semisync_master.waitAfterRollback(thd, FALSE);
|
||||
#endif
|
||||
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
|
||||
/* Reset the binlog transaction marker */
|
||||
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
|
||||
@ -526,9 +539,19 @@ bool trans_commit_stmt(THD *thd)
|
||||
transaction, so the hooks for rollback will be called.
|
||||
*/
|
||||
if (res)
|
||||
{
|
||||
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
|
||||
#ifdef HAVE_REPLICATION
|
||||
repl_semisync_master.waitAfterRollback(thd, FALSE);
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
|
||||
#ifdef HAVE_REPLICATION
|
||||
repl_semisync_master.waitAfterCommit(thd, FALSE);
|
||||
#endif
|
||||
}
|
||||
|
||||
thd->transaction.stmt.reset();
|
||||
|
||||
@ -568,6 +591,9 @@ bool trans_rollback_stmt(THD *thd)
|
||||
}
|
||||
|
||||
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
|
||||
#ifdef HAVE_REPLICATION
|
||||
repl_semisync_master.waitAfterRollback(thd, FALSE);
|
||||
#endif
|
||||
|
||||
thd->transaction.stmt.reset();
|
||||
|
||||
|
Reference in New Issue
Block a user