1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-01 03:47:19 +03:00

MDEV-13073. This part converts the Ali patch`s identifiers to the MariaDB standard. Also some renaming is done as well as white spaces removal.

This commit is contained in:
Andrei Elkin
2017-11-25 18:54:42 +02:00
committed by Monty
parent 6a84e3407d
commit f279d3c43a
17 changed files with 347 additions and 338 deletions

View File

@ -5,7 +5,7 @@ CALL mtr.add_suppression("Failed to register slave to semi-sync ACK receiver thr
CALL mtr.add_suppression("Failed to stop ack receiver thread on pthread_join.*"); CALL mtr.add_suppression("Failed to stop ack receiver thread on pthread_join.*");
CALL mtr.add_suppression("Got an error reading communication packets:*"); CALL mtr.add_suppression("Got an error reading communication packets:*");
CALL mtr.add_suppression("Timeout waiting for reply of binlog*"); CALL mtr.add_suppression("Timeout waiting for reply of binlog*");
CALL mtr.add_suppression("slaveReadSyncHeader*"); CALL mtr.add_suppression("slave_read_sync_header*");
CALL mtr.add_suppression("Missing magic number for semi-sync*"); CALL mtr.add_suppression("Missing magic number for semi-sync*");
CALL mtr.add_suppression("Got timeout reading communication packets*"); CALL mtr.add_suppression("Got timeout reading communication packets*");
CALL mtr.add_suppression("Failed to call*"); CALL mtr.add_suppression("Failed to call*");

View File

@ -7,7 +7,7 @@ CALL mtr.add_suppression("Failed to register slave to semi-sync ACK receiver thr
CALL mtr.add_suppression("Failed to stop ack receiver thread on pthread_join.*"); CALL mtr.add_suppression("Failed to stop ack receiver thread on pthread_join.*");
CALL mtr.add_suppression("Got an error reading communication packets:*"); CALL mtr.add_suppression("Got an error reading communication packets:*");
CALL mtr.add_suppression("Timeout waiting for reply of binlog*"); CALL mtr.add_suppression("Timeout waiting for reply of binlog*");
CALL mtr.add_suppression("slaveReadSyncHeader*"); CALL mtr.add_suppression("slave_read_sync_header*");
CALL mtr.add_suppression("Missing magic number for semi-sync*"); CALL mtr.add_suppression("Missing magic number for semi-sync*");
CALL mtr.add_suppression("Got timeout reading communication packets*"); CALL mtr.add_suppression("Got timeout reading communication packets*");
CALL mtr.add_suppression("Failed to call*"); CALL mtr.add_suppression("Failed to call*");

View File

@ -1486,7 +1486,7 @@ done:
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered); mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterCommit(thd, all); repl_semisync_master.wait_after_commit(thd, all);
DEBUG_SYNC(thd, "after_group_after_commit"); DEBUG_SYNC(thd, "after_group_after_commit");
#endif #endif
goto end; goto end;
@ -1734,7 +1734,7 @@ int ha_rollback_trans(THD *thd, bool all)
ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK)); ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, all); repl_semisync_master.wait_after_rollback(thd, all);
#endif #endif
DBUG_RETURN(error); DBUG_RETURN(error);
} }

View File

@ -6376,8 +6376,8 @@ err:
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered); mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (repl_semisync_master.reportBinlogUpdate(thd, log_file_name, if (repl_semisync_master.report_binlog_update(thd, log_file_name,
file->pos_in_file)) file->pos_in_file))
{ {
sql_print_error("Failed to run 'after_flush' hooks"); sql_print_error("Failed to run 'after_flush' hooks");
error= 1; error= 1;
@ -6409,8 +6409,8 @@ err:
mysql_mutex_assert_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered); mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (repl_semisync_master.waitAfterSync(log_file_name, if (repl_semisync_master.wait_after_sync(log_file_name,
file->pos_in_file)) file->pos_in_file))
{ {
error=1; error=1;
/* error is already printed inside hook */ /* error is already printed inside hook */
@ -7847,10 +7847,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (!current->error && if (!current->error &&
repl_semisync_master. repl_semisync_master.
reportBinlogUpdate(current->thd, report_binlog_update(current->thd,
current->cache_mngr->last_commit_pos_file, current->cache_mngr->last_commit_pos_file,
current->cache_mngr-> current->cache_mngr->
last_commit_pos_offset)) last_commit_pos_offset))
{ {
current->error= ER_ERROR_ON_WRITE; current->error= ER_ERROR_ON_WRITE;
current->commit_errno= -1; current->commit_errno= -1;
@ -7935,10 +7935,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (!current->error) if (!current->error)
current->error= current->error=
repl_semisync_master.waitAfterSync(current->cache_mngr-> repl_semisync_master.wait_after_sync(current->cache_mngr->
last_commit_pos_file, last_commit_pos_file,
current->cache_mngr-> current->cache_mngr->
last_commit_pos_offset); last_commit_pos_offset);
#endif #endif
first= false; first= false;
} }

View File

@ -947,8 +947,7 @@ PSI_mutex_key key_LOCK_after_binlog_sync;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered, PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_background; key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share; PSI_mutex_key key_TABLE_SHARE_LOCK_share;
PSI_mutex_key key_ss_mutex_LOCK_binlog_; PSI_mutex_key key_LOCK_ack_receiver;
PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
static PSI_mutex_info all_server_mutexes[]= static PSI_mutex_info all_server_mutexes[]=
{ {
@ -1027,7 +1026,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0}, { &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}, { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
{ &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}, { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0},
{ &key_ss_mutex_Ack_receiver_mutex, "Ack_receiver::m_mutex", 0}, { &key_LOCK_ack_receiver, "Ack_receiver::mutex", 0},
{ &key_LOCK_binlog, "LOCK_binlog", 0} { &key_LOCK_binlog, "LOCK_binlog", 0}
}; };
@ -1082,7 +1081,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_parallel_entry, key_COND_group_commit_orderer, key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_background; key_COND_prepare_ordered, key_COND_slave_background;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
PSI_cond_key key_ss_cond_Ack_receiver_cond; PSI_cond_key key_COND_ack_receiver;
static PSI_cond_info all_server_conds[]= static PSI_cond_info all_server_conds[]=
{ {
@ -1136,7 +1135,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL}, { &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}, { &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}, { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0},
{ &key_ss_cond_Ack_receiver_cond, "Ack_receiver::m_cond", 0}, { &key_COND_ack_receiver, "Ack_receiver::cond", 0},
{ &key_COND_binlog_send, "COND_binlog_send", 0} { &key_COND_binlog_send, "COND_binlog_send", 0}
}; };
@ -1144,7 +1143,7 @@ PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main, key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand, key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread; key_thread_slave_background, key_rpl_parallel_thread;
PSI_thread_key key_ss_thread_Ack_receiver_thread; PSI_thread_key key_thread_ack_receiver;
static PSI_thread_info all_server_threads[]= static PSI_thread_info all_server_threads[]=
{ {
@ -1171,7 +1170,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_one_connection, "one_connection", 0}, { &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL}, { &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL}, { &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
{ &key_ss_thread_Ack_receiver_thread, "Ack_receiver", PSI_FLAG_GLOBAL}, { &key_thread_ack_receiver, "Ack_receiver", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0} { &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
}; };
@ -5184,8 +5183,8 @@ static int init_server_components()
"--log-bin option is not defined."); "--log-bin option is not defined.");
} }
if (repl_semisync_master.initObject() || if (repl_semisync_master.init_object() ||
repl_semisync_slave.initObject()) repl_semisync_slave.init_object())
{ {
sql_print_error("Could not initialize semisync."); sql_print_error("Could not initialize semisync.");
unireg_abort(1); unireg_abort(1);
@ -8256,7 +8255,7 @@ static int show_ssl_get_cipher_list(THD *thd, SHOW_VAR *var, char *buff,
#define DEF_SHOW_FUNC(name, show_type) \ #define DEF_SHOW_FUNC(name, show_type) \
static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \ static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
{ \ { \
repl_semisync_master.setExportStats(); \ repl_semisync_master.set_export_stats(); \
var->type= show_type; \ var->type= show_type; \
var->value= (char *)&rpl_semi_sync_master_##name; \ var->value= (char *)&rpl_semi_sync_master_##name; \
return 0; \ return 0; \

View File

@ -19,14 +19,14 @@
#include <my_global.h> #include <my_global.h>
#include "semisync.h" #include "semisync.h"
const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef; const unsigned char Repl_semi_sync_base::k_packet_magic_num= 0xef;
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01; const unsigned char Repl_semi_sync_base::k_packet_flag_sync= 0x01;
const unsigned long Trace::kTraceGeneral = 0x0001; const unsigned long Trace::k_trace_general= 0x0001;
const unsigned long Trace::kTraceDetail = 0x0010; const unsigned long Trace::k_trace_detail= 0x0010;
const unsigned long Trace::kTraceNetWait = 0x0020; const unsigned long Trace::k_trace_net_wait= 0x0020;
const unsigned long Trace::kTraceFunction = 0x0040; const unsigned long Trace::k_trace_function= 0x0040;
const unsigned char ReplSemiSyncBase::kSyncHeader[2] = const unsigned char Repl_semi_sync_base::k_sync_header[2]=
{ReplSemiSyncBase::kPacketMagicNum, 0}; {Repl_semi_sync_base::k_packet_magic_num, 0};

View File

@ -28,10 +28,10 @@
*/ */
class Trace { class Trace {
public: public:
static const unsigned long kTraceFunction; static const unsigned long k_trace_function;
static const unsigned long kTraceGeneral; static const unsigned long k_trace_general;
static const unsigned long kTraceDetail; static const unsigned long k_trace_detail;
static const unsigned long kTraceNetWait; static const unsigned long k_trace_net_wait;
unsigned long trace_level_; /* the level for tracing */ unsigned long trace_level_; /* the level for tracing */
@ -46,14 +46,14 @@ public:
/** /**
Base class for semi-sync master and slave classes Base class for semi-sync master and slave classes
*/ */
class ReplSemiSyncBase class Repl_semi_sync_base
:public Trace { :public Trace {
public: public:
static const unsigned char kSyncHeader[2]; /* three byte packet header */ static const unsigned char k_sync_header[2]; /* three byte packet header */
/* Constants in network packet header. */ /* Constants in network packet header. */
static const unsigned char kPacketMagicNum; static const unsigned char k_packet_magic_num;
static const unsigned char kPacketFlagSync; static const unsigned char k_packet_flag_sync;
}; };
/* The layout of a semisync slave reply packet: /* The layout of a semisync slave reply packet:

View File

@ -48,7 +48,7 @@ ulong rpl_semi_sync_master_clients = 0;
ulonglong rpl_semi_sync_master_net_wait_time = 0; ulonglong rpl_semi_sync_master_net_wait_time = 0;
ulonglong rpl_semi_sync_master_trx_wait_time = 0; ulonglong rpl_semi_sync_master_trx_wait_time = 0;
ReplSemiSyncMaster repl_semisync_master; Repl_semi_sync_master repl_semisync_master;
Ack_receiver ack_receiver; Ack_receiver ack_receiver;
/* /*
@ -59,7 +59,7 @@ typedef struct Trans_binlog_info {
char log_file[FN_REFLEN]; char log_file[FN_REFLEN];
} Trans_binlog_info; } Trans_binlog_info;
static int getWaitTime(const struct timespec& start_ts); static int get_wait_time(const struct timespec& start_ts);
static ulonglong timespec_to_usec(const struct timespec *ts) static ulonglong timespec_to_usec(const struct timespec *ts)
{ {
@ -68,12 +68,12 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
/******************************************************************************* /*******************************************************************************
* *
* <ActiveTranx> class : manage all active transaction nodes * <Active_tranx> class : manage all active transaction nodes
* *
******************************************************************************/ ******************************************************************************/
ActiveTranx::ActiveTranx(mysql_mutex_t *lock, Active_tranx::Active_tranx(mysql_mutex_t *lock,
ulong trace_level) ulong trace_level)
: Trace(trace_level), allocator_(max_connections), : Trace(trace_level), allocator_(max_connections),
num_entries_(max_connections << 1), /* Transaction hash table size num_entries_(max_connections << 1), /* Transaction hash table size
* is set to double the size * is set to double the size
@ -85,22 +85,22 @@ ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
trx_rear_ = NULL; trx_rear_ = NULL;
/* Create the hash table to find a transaction's ending event. */ /* Create the hash table to find a transaction's ending event. */
trx_htb_ = new TranxNode *[num_entries_]; trx_htb_ = new Tranx_node *[num_entries_];
for (int idx = 0; idx < num_entries_; ++idx) for (int idx = 0; idx < num_entries_; ++idx)
trx_htb_[idx] = NULL; trx_htb_[idx] = NULL;
sql_print_information("Semi-sync replication initialized for transactions."); sql_print_information("Semi-sync replication initialized for transactions.");
} }
ActiveTranx::~ActiveTranx() Active_tranx::~Active_tranx()
{ {
delete [] trx_htb_; delete [] trx_htb_;
trx_htb_ = NULL; trx_htb_ = NULL;
num_entries_ = 0; num_entries_ = 0;
} }
unsigned int ActiveTranx::calc_hash(const unsigned char *key, unsigned int Active_tranx::calc_hash(const unsigned char *key,
unsigned int length) unsigned int length)
{ {
unsigned int nr = 1, nr2 = 4; unsigned int nr = 1, nr2 = 4;
@ -113,8 +113,8 @@ unsigned int ActiveTranx::calc_hash(const unsigned char *key,
return((unsigned int) nr); return((unsigned int) nr);
} }
unsigned int ActiveTranx::get_hash_value(const char *log_file_name, unsigned int Active_tranx::get_hash_value(const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
{ {
unsigned int hash1 = calc_hash((const unsigned char *)log_file_name, unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
strlen(log_file_name)); strlen(log_file_name));
@ -124,8 +124,8 @@ unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
return (hash1 + hash2) % num_entries_; return (hash1 + hash2) % num_entries_;
} }
int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1, int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
const char *log_file_name2, my_off_t log_file_pos2) const char *log_file_name2, my_off_t log_file_pos2)
{ {
int cmp = strcmp(log_file_name1, log_file_name2); int cmp = strcmp(log_file_name1, log_file_name2);
@ -139,10 +139,10 @@ int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
return 0; return 0;
} }
int ActiveTranx::insert_tranx_node(const char *log_file_name, int Active_tranx::insert_tranx_node(const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
{ {
TranxNode *ins_node; Tranx_node *ins_node;
int result = 0; int result = 0;
unsigned int hash_val; unsigned int hash_val;
@ -206,13 +206,13 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
DBUG_RETURN(result); DBUG_RETURN(result);
} }
bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
{ {
DBUG_ENTER("Active_tranx::is_tranx_end_pos"); DBUG_ENTER("Active_tranx::is_tranx_end_pos");
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
TranxNode *entry = trx_htb_[hash_val]; Tranx_node *entry = trx_htb_[hash_val];
while (entry != NULL) while (entry != NULL)
{ {
@ -229,10 +229,10 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
DBUG_RETURN(entry != NULL); DBUG_RETURN(entry != NULL);
} }
int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, int Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
{ {
TranxNode *new_front; Tranx_node *new_front;
DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes"); DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes");
@ -258,7 +258,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
/* No active transaction nodes after the call. */ /* No active transaction nodes after the call. */
/* Clear the hash table. */ /* Clear the hash table. */
memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *)); memset(trx_htb_, 0, num_entries_ * sizeof(Tranx_node *));
allocator_.free_all_nodes(); allocator_.free_all_nodes();
/* Clear the active transaction list. */ /* Clear the active transaction list. */
@ -273,7 +273,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
} }
else if (new_front != trx_front_) else if (new_front != trx_front_)
{ {
TranxNode *curr_node, *next_node; Tranx_node *curr_node, *next_node;
/* Delete all transaction nodes before the confirmation point. */ /* Delete all transaction nodes before the confirmation point. */
int n_frees = 0; int n_frees = 0;
@ -285,7 +285,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
/* Remove the node from the hash table. */ /* Remove the node from the hash table. */
unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_); unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
TranxNode **hash_ptr = &(trx_htb_[hash_val]); Tranx_node **hash_ptr = &(trx_htb_[hash_val]);
while ((*hash_ptr) != NULL) while ((*hash_ptr) != NULL)
{ {
if ((*hash_ptr) == curr_node) if ((*hash_ptr) == curr_node)
@ -314,28 +314,28 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
/******************************************************************************* /*******************************************************************************
* *
* <ReplSemiSyncMaster> class: the basic code layer for sync-replication master. * <Repl_semi_sync_master> class: the basic code layer for syncsync master.
* <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave. * <Repl_semi_sync_slave> class: the basic code layer for syncsync slave.
* *
* The most important functions during semi-syn replication listed: * The most important functions during semi-syn replication listed:
* *
* Master: * Master:
* . reportReplyBinlog(): called by the binlog dump thread when it receives * . report_reply_binlog(): called by the binlog dump thread when it receives
* the slave's status information. * the slave's status information.
* . updateSyncHeader(): based on transaction waiting information, decide * . update_sync_header(): based on transaction waiting information, decide
* whether to request the slave to reply. * whether to request the slave to reply.
* . writeTranxInBinlog(): called by the transaction thread when it finishes * . write_tranx_in_binlog(): called by the transaction thread when it finishes
* writing all transaction events in binlog. * writing all transaction events in binlog.
* . commitTrx(): transaction thread wait for the slave reply. * . commit_trx(): transaction thread wait for the slave reply.
* *
* Slave: * Slave:
* . slaveReadSyncHeader(): read the semi-sync header from the master, get the * . slave_read_sync_header(): read the semi-sync header from the master, get
* sync status and get the payload for events. * the sync status and get the payload for events.
* . slaveReply(): reply to the master about the replication progress. * . slave_reply(): reply to the master about the replication progress.
* *
******************************************************************************/ ******************************************************************************/
ReplSemiSyncMaster::ReplSemiSyncMaster() Repl_semi_sync_master::Repl_semi_sync_master()
: active_tranxs_(NULL), : active_tranxs_(NULL),
init_done_(false), init_done_(false),
reply_file_name_inited_(false), reply_file_name_inited_(false),
@ -351,16 +351,16 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
strcpy(wait_file_name_, ""); strcpy(wait_file_name_, "");
} }
int ReplSemiSyncMaster::initObject() int Repl_semi_sync_master::init_object()
{ {
int result; int result;
init_done_ = true; init_done_ = true;
/* References to the parameter works after set_options(). */ /* References to the parameter works after set_options(). */
setWaitTimeout(rpl_semi_sync_master_timeout); set_wait_timeout(rpl_semi_sync_master_timeout);
setTraceLevel(rpl_semi_sync_master_trace_level); set_trace_level(rpl_semi_sync_master_trace_level);
setWaitPoint(rpl_semi_sync_master_wait_point); set_wait_point(rpl_semi_sync_master_wait_point);
/* Mutex initialization can only be done after MY_INIT(). */ /* Mutex initialization can only be done after MY_INIT(). */
mysql_mutex_init(key_LOCK_binlog, mysql_mutex_init(key_LOCK_binlog,
@ -370,13 +370,13 @@ int ReplSemiSyncMaster::initObject()
if (rpl_semi_sync_master_enabled) if (rpl_semi_sync_master_enabled)
{ {
result = enableMaster(); result = enable_master();
if (!result) if (!result)
result= ack_receiver.start(); /* Start the ACK thread. */ result= ack_receiver.start(); /* Start the ACK thread. */
} }
else else
{ {
result = disableMaster(); result = disable_master();
} }
/* /*
@ -389,16 +389,16 @@ int ReplSemiSyncMaster::initObject()
return result; return result;
} }
int ReplSemiSyncMaster::enableMaster() int Repl_semi_sync_master::enable_master()
{ {
int result = 0; int result = 0;
/* Must have the lock when we do enable of disable. */ /* Must have the lock when we do enable of disable. */
lock(); lock();
if (!getMasterEnabled()) if (!get_master_enabled())
{ {
active_tranxs_ = new ActiveTranx(&LOCK_binlog, trace_level_); active_tranxs_ = new Active_tranx(&LOCK_binlog, trace_level_);
if (active_tranxs_ != NULL) if (active_tranxs_ != NULL)
{ {
commit_file_name_inited_ = false; commit_file_name_inited_ = false;
@ -421,12 +421,12 @@ int ReplSemiSyncMaster::enableMaster()
return result; return result;
} }
int ReplSemiSyncMaster::disableMaster() int Repl_semi_sync_master::disable_master()
{ {
/* Must have the lock when we do enable of disable. */ /* Must have the lock when we do enable of disable. */
lock(); lock();
if (getMasterEnabled()) if (get_master_enabled())
{ {
/* Switch off the semi-sync first so that waiting transaction will be /* Switch off the semi-sync first so that waiting transaction will be
* waken up. * waken up.
@ -450,7 +450,7 @@ int ReplSemiSyncMaster::disableMaster()
return 0; return 0;
} }
void ReplSemiSyncMaster::cleanup() void Repl_semi_sync_master::cleanup()
{ {
if (init_done_) if (init_done_)
{ {
@ -462,22 +462,22 @@ void ReplSemiSyncMaster::cleanup()
delete active_tranxs_; delete active_tranxs_;
} }
void ReplSemiSyncMaster::lock() void Repl_semi_sync_master::lock()
{ {
mysql_mutex_lock(&LOCK_binlog); mysql_mutex_lock(&LOCK_binlog);
} }
void ReplSemiSyncMaster::unlock() void Repl_semi_sync_master::unlock()
{ {
mysql_mutex_unlock(&LOCK_binlog); mysql_mutex_unlock(&LOCK_binlog);
} }
void ReplSemiSyncMaster::cond_broadcast() void Repl_semi_sync_master::cond_broadcast()
{ {
mysql_cond_broadcast(&COND_binlog_send); mysql_cond_broadcast(&COND_binlog_send);
} }
int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time) int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time)
{ {
int wait_res; int wait_res;
@ -489,20 +489,20 @@ int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
DBUG_RETURN(wait_res); DBUG_RETURN(wait_res);
} }
void ReplSemiSyncMaster::add_slave() void Repl_semi_sync_master::add_slave()
{ {
lock(); lock();
rpl_semi_sync_master_clients++; rpl_semi_sync_master_clients++;
unlock(); unlock();
} }
void ReplSemiSyncMaster::remove_slave() void Repl_semi_sync_master::remove_slave()
{ {
lock(); lock();
rpl_semi_sync_master_clients--; rpl_semi_sync_master_clients--;
/* Only switch off if semi-sync is enabled and is on */ /* Only switch off if semi-sync is enabled and is on */
if (getMasterEnabled() && is_on()) if (get_master_enabled() && is_on())
{ {
/* If user has chosen not to wait if no semi-sync slave available /* If user has chosen not to wait if no semi-sync slave available
and the last semi-sync slave exits, turn off semi-sync on master and the last semi-sync slave exits, turn off semi-sync on master
@ -515,8 +515,9 @@ void ReplSemiSyncMaster::remove_slave()
unlock(); unlock();
} }
int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar *packet, int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
ulong packet_len) const uchar *packet,
ulong packet_len)
{ {
int result= -1; int result= -1;
char log_file_name[FN_REFLEN+1]; char log_file_name[FN_REFLEN+1];
@ -525,7 +526,8 @@ int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar *packet,
DBUG_ENTER("Repl_semi_sync_master::report_reply_packet"); DBUG_ENTER("Repl_semi_sync_master::report_reply_packet");
if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)) if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] !=
Repl_semi_sync_master::k_packet_magic_num))
{ {
sql_print_error("Read semi-sync reply magic number error"); sql_print_error("Read semi-sync reply magic number error");
goto l_end; goto l_end;
@ -554,16 +556,16 @@ int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar *packet,
log_file_name, (ulong)log_file_pos, server_id)); log_file_name, (ulong)log_file_pos, server_id));
rpl_semi_sync_master_get_ack++; rpl_semi_sync_master_get_ack++;
reportReplyBinlog(server_id, log_file_name, log_file_pos); report_reply_binlog(server_id, log_file_name, log_file_pos);
l_end: l_end:
DBUG_RETURN(result); DBUG_RETURN(result);
} }
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
const char *log_file_name, const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
{ {
int cmp; int cmp;
bool can_release_threads = false; bool can_release_threads = false;
@ -571,13 +573,13 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog"); DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog");
if (!(getMasterEnabled())) if (!(get_master_enabled()))
DBUG_RETURN(0); DBUG_RETURN(0);
lock(); lock();
/* This is the real check inside the mutex. */ /* This is the real check inside the mutex. */
if (!getMasterEnabled()) if (!get_master_enabled())
goto l_end; goto l_end;
if (!is_on()) if (!is_on())
@ -592,7 +594,7 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
*/ */
if (reply_file_name_inited_) if (reply_file_name_inited_)
{ {
cmp = ActiveTranx::compare(log_file_name, log_file_pos, cmp = Active_tranx::compare(log_file_name, log_file_pos,
reply_file_name_, reply_file_pos_); reply_file_name_, reply_file_pos_);
/* If the requested position is behind the sending binlog position, /* If the requested position is behind the sending binlog position,
@ -630,8 +632,8 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
/* Let us check if some of the waiting threads doing a trx /* Let us check if some of the waiting threads doing a trx
* commit can now proceed. * commit can now proceed.
*/ */
cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, cmp = Active_tranx::compare(reply_file_name_, reply_file_pos_,
wait_file_name_, wait_file_pos_); wait_file_name_, wait_file_pos_);
if (cmp >= 0) if (cmp >= 0)
{ {
/* Yes, at least one waiting thread can now proceed: /* Yes, at least one waiting thread can now proceed:
@ -656,22 +658,22 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int ReplSemiSyncMaster::waitAfterSync(const char *log_file, my_off_t log_pos) int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos)
{ {
if (!getMasterEnabled()) if (!get_master_enabled())
return 0; return 0;
int ret= 0; int ret= 0;
if(log_pos && if(log_pos &&
waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
ret= commitTrx(log_file + dirname_length(log_file), log_pos); ret= commit_trx(log_file + dirname_length(log_file), log_pos);
return ret; return ret;
} }
int ReplSemiSyncMaster::waitAfterCommit(THD* thd, bool all) int Repl_semi_sync_master::wait_after_commit(THD* thd, bool all)
{ {
if (!getMasterEnabled()) if (!get_master_enabled())
return 0; return 0;
int ret= 0; int ret= 0;
@ -682,7 +684,7 @@ int ReplSemiSyncMaster::waitAfterCommit(THD* thd, bool all)
(all || thd->transaction.all.ha_list == 0); (all || thd->transaction.all.ha_list == 0);
/* /*
The coordinates are propagated to this point having been computed The coordinates are propagated to this point having been computed
in reportBinlogUpdate in report_binlog_update
*/ */
Trans_binlog_info *log_info= thd->semisync_info; Trans_binlog_info *log_info= thd->semisync_info;
log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
@ -692,8 +694,8 @@ int ReplSemiSyncMaster::waitAfterCommit(THD* thd, bool all)
if (is_real_trans && if (is_real_trans &&
log_pos && log_pos &&
waitPoint() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
ret= commitTrx(log_file, log_pos); ret= commit_trx(log_file, log_pos);
if (is_real_trans && log_info) if (is_real_trans && log_info)
{ {
@ -704,18 +706,18 @@ int ReplSemiSyncMaster::waitAfterCommit(THD* thd, bool all)
return ret; return ret;
} }
int ReplSemiSyncMaster::waitAfterRollback(THD *thd, bool all) int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all)
{ {
return waitAfterCommit(thd, all); return wait_after_commit(thd, all);
} }
/** /**
The method runs after flush to binary log is done. The method runs after flush to binary log is done.
*/ */
int ReplSemiSyncMaster::reportBinlogUpdate(THD* thd, const char *log_file, int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file,
my_off_t log_pos) my_off_t log_pos)
{ {
if (getMasterEnabled()) if (get_master_enabled())
{ {
Trans_binlog_info *log_info; Trans_binlog_info *log_info;
@ -729,13 +731,13 @@ int ReplSemiSyncMaster::reportBinlogUpdate(THD* thd, const char *log_file,
strcpy(log_info->log_file, log_file + dirname_length(log_file)); strcpy(log_info->log_file, log_file + dirname_length(log_file));
log_info->log_pos = log_pos; log_info->log_pos = log_pos;
return writeTranxInBinlog(log_info->log_file, log_pos); return write_tranx_in_binlog(log_info->log_file, log_pos);
} }
return 0; return 0;
} }
int ReplSemiSyncMaster::dump_start(THD* thd, int Repl_semi_sync_master::dump_start(THD* thd,
const char *log_file, const char *log_file,
my_off_t log_pos) my_off_t log_pos)
{ {
@ -751,14 +753,17 @@ int ReplSemiSyncMaster::dump_start(THD* thd,
} }
add_slave(); add_slave();
reportReplyBinlog(thd->variables.server_id, log_file + dirname_length(log_file), log_pos); report_reply_binlog(thd->variables.server_id,
sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), pos(%s, %lu", log_file + dirname_length(log_file), log_pos);
thd->variables.server_id, log_file, (unsigned long)log_pos); sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), "
"pos(%s, %lu",
thd->variables.server_id, log_file,
(unsigned long)log_pos);
return 0; return 0;
} }
void ReplSemiSyncMaster::dump_end(THD* thd) void Repl_semi_sync_master::dump_end(THD* thd)
{ {
if (!thd->semi_sync_slave) if (!thd->semi_sync_slave)
return; return;
@ -771,13 +776,13 @@ void ReplSemiSyncMaster::dump_end(THD* thd)
return; return;
} }
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos) my_off_t trx_wait_binlog_pos)
{ {
DBUG_ENTER("Repl_semi_sync_master::commit_trx"); DBUG_ENTER("Repl_semi_sync_master::commit_trx");
if (getMasterEnabled() && trx_wait_binlog_name) if (get_master_enabled() && trx_wait_binlog_name)
{ {
struct timespec start_ts; struct timespec start_ts;
struct timespec abstime; struct timespec abstime;
@ -796,7 +801,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
& old_stage); & old_stage);
/* This is the real check inside the mutex. */ /* This is the real check inside the mutex. */
if (!getMasterEnabled() || !is_on()) if (!get_master_enabled() || !is_on())
goto l_end; goto l_end;
DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)\n", DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)\n",
@ -808,8 +813,9 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
{ {
if (reply_file_name_inited_) if (reply_file_name_inited_)
{ {
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, int cmp = Active_tranx::compare(reply_file_name_, reply_file_pos_,
trx_wait_binlog_name, trx_wait_binlog_pos); trx_wait_binlog_name,
trx_wait_binlog_pos);
if (cmp >= 0) if (cmp >= 0)
{ {
/* We have already sent the relevant binlog to the slave: no need to /* We have already sent the relevant binlog to the slave: no need to
@ -828,8 +834,9 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
*/ */
if (wait_file_name_inited_) if (wait_file_name_inited_)
{ {
int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, int cmp = Active_tranx::compare(trx_wait_binlog_name,
wait_file_name_, wait_file_pos_); trx_wait_binlog_pos,
wait_file_name_, wait_file_pos_);
if (cmp <= 0) if (cmp <= 0)
{ {
/* This thd has a lower position, let's update the minimum info. */ /* This thd has a lower position, let's update the minimum info. */
@ -894,7 +901,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
{ {
int wait_time; int wait_time;
wait_time = getWaitTime(start_ts); wait_time = get_wait_time(start_ts);
if (wait_time < 0) if (wait_time < 0)
{ {
DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at " DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at "
@ -913,7 +920,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
/* /*
At this point, the binlog file and position of this transaction At this point, the binlog file and position of this transaction
must have been removed from ActiveTranx. must have been removed from Active_tranx.
active_tranxs_ may be NULL if someone disabled semi sync during active_tranxs_ may be NULL if someone disabled semi sync during
cond_timewait() cond_timewait()
*/ */
@ -950,11 +957,11 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
* If semi-sync is disabled, all transactions still update the wait * If semi-sync is disabled, all transactions still update the wait
* position with the last position in binlog. But no transactions will * position with the last position in binlog. But no transactions will
* wait for confirmations and the active transaction list would not be * wait for confirmations and the active transaction list would not be
* maintained. In binlog dump thread, updateSyncHeader() checks whether * maintained. In binlog dump thread, update_sync_header() checks whether
* the current sending event catches up with last wait position. If it * the current sending event catches up with last wait position. If it
* does match, semi-sync will be switched on again. * does match, semi-sync will be switched on again.
*/ */
int ReplSemiSyncMaster::switch_off() int Repl_semi_sync_master::switch_off()
{ {
int result; int result;
@ -975,9 +982,9 @@ int ReplSemiSyncMaster::switch_off()
DBUG_RETURN(result); DBUG_RETURN(result);
} }
int ReplSemiSyncMaster::try_switch_on(int server_id, int Repl_semi_sync_master::try_switch_on(int server_id,
const char *log_file_name, const char *log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
{ {
bool semi_sync_on = false; bool semi_sync_on = false;
@ -991,7 +998,7 @@ int ReplSemiSyncMaster::try_switch_on(int server_id,
*/ */
if (commit_file_name_inited_) if (commit_file_name_inited_)
{ {
int cmp = ActiveTranx::compare(log_file_name, log_file_pos, int cmp = Active_tranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_); commit_file_name_, commit_file_pos_);
semi_sync_on = (cmp >= 0); semi_sync_on = (cmp >= 0);
} }
@ -1014,22 +1021,22 @@ int ReplSemiSyncMaster::try_switch_on(int server_id,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int ReplSemiSyncMaster::reserveSyncHeader(String* packet) int Repl_semi_sync_master::reserve_sync_header(String* packet)
{ {
DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header"); DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header");
/* Set the magic number and the sync status. By default, no sync /* Set the magic number and the sync status. By default, no sync
* is required. * is required.
*/ */
packet->append(reinterpret_cast<const char*>(kSyncHeader), packet->append(reinterpret_cast<const char*>(k_sync_header),
sizeof(kSyncHeader)); sizeof(k_sync_header));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet, int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
const char *log_file_name, const char *log_file_name,
my_off_t log_file_pos, my_off_t log_file_pos,
bool* need_sync) bool* need_sync)
{ {
int cmp = 0; int cmp = 0;
bool sync = false; bool sync = false;
@ -1039,7 +1046,7 @@ int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
/* If the semi-sync master is not enabled, or the slave is not a semi-sync /* If the semi-sync master is not enabled, or the slave is not a semi-sync
* target, do not request replies from the slave. * target, do not request replies from the slave.
*/ */
if (!getMasterEnabled() || !thd->semi_sync_slave) if (!get_master_enabled() || !thd->semi_sync_slave)
{ {
*need_sync = false; *need_sync = false;
DBUG_RETURN(0); DBUG_RETURN(0);
@ -1048,7 +1055,7 @@ int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
lock(); lock();
/* This is the real check inside the mutex. */ /* This is the real check inside the mutex. */
if (!getMasterEnabled()) if (!get_master_enabled())
{ {
assert(sync == false); assert(sync == false);
goto l_end; goto l_end;
@ -1061,7 +1068,7 @@ int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
if (reply_file_name_inited_) if (reply_file_name_inited_)
{ {
cmp = ActiveTranx::compare(log_file_name, log_file_pos, cmp = Active_tranx::compare(log_file_name, log_file_pos,
reply_file_name_, reply_file_pos_); reply_file_name_, reply_file_pos_);
if (cmp <= 0) if (cmp <= 0)
{ {
@ -1074,7 +1081,7 @@ int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
if (wait_file_name_inited_) if (wait_file_name_inited_)
{ {
cmp = ActiveTranx::compare(log_file_name, log_file_pos, cmp = Active_tranx::compare(log_file_name, log_file_pos,
wait_file_name_, wait_file_pos_); wait_file_name_, wait_file_pos_);
} }
else else
@ -1099,7 +1106,7 @@ int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
{ {
if (commit_file_name_inited_) if (commit_file_name_inited_)
{ {
int cmp = ActiveTranx::compare(log_file_name, log_file_pos, int cmp = Active_tranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_); commit_file_name_, commit_file_pos_);
sync = (cmp >= 0); sync = (cmp >= 0);
} }
@ -1123,14 +1130,14 @@ int ReplSemiSyncMaster::updateSyncHeader(THD* thd, unsigned char *packet,
*/ */
if (sync) if (sync)
{ {
(packet)[2] = kPacketFlagSync; (packet)[2] = k_packet_flag_sync;
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
my_off_t log_file_pos) my_off_t log_file_pos)
{ {
int result = 0; int result = 0;
@ -1139,20 +1146,20 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
lock(); lock();
/* This is the real check inside the mutex. */ /* This is the real check inside the mutex. */
if (!getMasterEnabled()) if (!get_master_enabled())
goto l_end; goto l_end;
/* Update the 'largest' transaction commit position seen so far even /* Update the 'largest' transaction commit position seen so far even
* though semi-sync is switched off. * though semi-sync is switched off.
* It is much better that we update commit_file_* here, instead of * It is much better that we update commit_file_* here, instead of
* inside commitTrx(). This is mostly because updateSyncHeader() * inside commit_trx(). This is mostly because update_sync_header()
* will watch for commit_file_* to decide whether to switch semi-sync * will watch for commit_file_* to decide whether to switch semi-sync
* on. The detailed reason is explained in function updateSyncHeader(). * on. The detailed reason is explained in function update_sync_header().
*/ */
if (commit_file_name_inited_) if (commit_file_name_inited_)
{ {
int cmp = ActiveTranx::compare(log_file_name, log_file_pos, int cmp = Active_tranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_); commit_file_name_, commit_file_pos_);
if (cmp > 0) if (cmp > 0)
{ {
/* This is a larger position, let's update the maximum info. */ /* This is a larger position, let's update the maximum info. */
@ -1194,16 +1201,16 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
DBUG_RETURN(result); DBUG_RETURN(result);
} }
int ReplSemiSyncMaster::flushNet(THD *thd, int Repl_semi_sync_master::flush_net(THD *thd,
const char *event_buf) const char *event_buf)
{ {
int result = -1; int result = -1;
NET* net= &thd->net; NET* net= &thd->net;
DBUG_ENTER("Repl_semi_sync_master::flush_net"); DBUG_ENTER("Repl_semi_sync_master::flush_net");
assert((unsigned char)event_buf[1] == kPacketMagicNum); assert((unsigned char)event_buf[1] == k_packet_magic_num);
if ((unsigned char)event_buf[2] != kPacketFlagSync) if ((unsigned char)event_buf[2] != k_packet_flag_sync)
{ {
/* current event does not require reply */ /* current event does not require reply */
result = 0; result = 0;
@ -1231,7 +1238,7 @@ int ReplSemiSyncMaster::flushNet(THD *thd,
DBUG_RETURN(result); DBUG_RETURN(result);
} }
int ReplSemiSyncMaster::afterResetMaster() int Repl_semi_sync_master::after_reset_master()
{ {
int result = 0; int result = 0;
@ -1240,7 +1247,7 @@ int ReplSemiSyncMaster::afterResetMaster()
if (rpl_semi_sync_master_enabled) if (rpl_semi_sync_master_enabled)
{ {
sql_print_information("Enable Semi-sync Master after reset master"); sql_print_information("Enable Semi-sync Master after reset master");
enableMaster(); enable_master();
} }
lock(); lock();
@ -1249,7 +1256,7 @@ int ReplSemiSyncMaster::afterResetMaster()
!rpl_semi_sync_master_wait_no_slave) !rpl_semi_sync_master_wait_no_slave)
state_ = 0; state_ = 0;
else else
state_ = getMasterEnabled()? 1 : 0; state_ = get_master_enabled()? 1 : 0;
wait_file_name_inited_ = false; wait_file_name_inited_ = false;
reply_file_name_inited_ = false; reply_file_name_inited_ = false;
@ -1271,22 +1278,22 @@ int ReplSemiSyncMaster::afterResetMaster()
DBUG_RETURN(result); DBUG_RETURN(result);
} }
int ReplSemiSyncMaster::beforeResetMaster() int Repl_semi_sync_master::before_reset_master()
{ {
int result = 0; int result = 0;
DBUG_ENTER("Repl_semi_sync_master::before_reset_master"); DBUG_ENTER("Repl_semi_sync_master::before_reset_master");
if (rpl_semi_sync_master_enabled) if (rpl_semi_sync_master_enabled)
disableMaster(); disable_master();
DBUG_RETURN(result); DBUG_RETURN(result);
} }
void ReplSemiSyncMaster::checkAndSwitch() void Repl_semi_sync_master::check_and_switch()
{ {
lock(); lock();
if (getMasterEnabled() && is_on()) if (get_master_enabled() && is_on())
{ {
if (!rpl_semi_sync_master_wait_no_slave if (!rpl_semi_sync_master_wait_no_slave
&& rpl_semi_sync_master_clients == 0) && rpl_semi_sync_master_clients == 0)
@ -1295,7 +1302,7 @@ void ReplSemiSyncMaster::checkAndSwitch()
unlock(); unlock();
} }
void ReplSemiSyncMaster::setExportStats() void Repl_semi_sync_master::set_export_stats()
{ {
lock(); lock();
@ -1318,7 +1325,7 @@ void ReplSemiSyncMaster::setExportStats()
* >= 0: the waiting time in microsecons(us) * >= 0: the waiting time in microsecons(us)
* < 0: error in get time or time back traverse * < 0: error in get time or time back traverse
*/ */
static int getWaitTime(const struct timespec& start_ts) static int get_wait_time(const struct timespec& start_ts)
{ {
ulonglong start_usecs, end_usecs; ulonglong start_usecs, end_usecs;
struct timespec end_ts; struct timespec end_ts;

View File

@ -27,18 +27,18 @@ extern PSI_mutex_key key_LOCK_binlog;
extern PSI_cond_key key_COND_binlog_send; extern PSI_cond_key key_COND_binlog_send;
#endif #endif
struct TranxNode { struct Tranx_node {
char log_name_[FN_REFLEN]; char log_name_[FN_REFLEN];
my_off_t log_pos_; my_off_t log_pos_;
struct TranxNode *next_; /* the next node in the sorted list */ struct Tranx_node *next_; /* the next node in the sorted list */
struct TranxNode *hash_next_; /* the next node during hash collision */ struct Tranx_node *hash_next_; /* the next node during hash collision */
}; };
/** /**
@class TranxNodeAllocator @class Tranx_node_allocator
This class provides memory allocating and freeing methods for This class provides memory allocating and freeing methods for
TranxNode. The main target is performance. Tranx_node. The main target is performance.
@section ALLOCATE How to allocate a node @section ALLOCATE How to allocate a node
The pointer of the first node after 'last_node' in current_block is The pointer of the first node after 'last_node' in current_block is
@ -51,7 +51,7 @@ struct TranxNode {
After some nodes are freed, there probably are some free nodes before After some nodes are freed, there probably are some free nodes before
the sequence of the allocated nodes, but we do not reuse it. It is better the sequence of the allocated nodes, but we do not reuse it. It is better
to keep the allocated nodes are in the sequence, for it is more efficient to keep the allocated nodes are in the sequence, for it is more efficient
for allocating and freeing TranxNode. for allocating and freeing Tranx_node.
@section FREENODE How to free nodes @section FREENODE How to free nodes
There are two methods for freeing nodes. They are free_all_nodes and There are two methods for freeing nodes. They are free_all_nodes and
@ -68,23 +68,23 @@ struct TranxNode {
more efficient. more efficient.
*/ */
#define BLOCK_TRANX_NODES 16 #define BLOCK_TRANX_NODES 16
class TranxNodeAllocator class Tranx_node_allocator
{ {
public: public:
/** /**
@param reserved_nodes @param reserved_nodes
The number of reserved TranxNodes. It is used to set 'reserved_blocks' The number of reserved Tranx_nodes. It is used to set 'reserved_blocks'
which can contain at least 'reserved_nodes' number of TranxNodes. When which can contain at least 'reserved_nodes' number of Tranx_nodes. When
freeing memory, we will reserve at least reserved_blocks of Blocks not freeing memory, we will reserve at least reserved_blocks of Blocks not
freed. freed.
*/ */
TranxNodeAllocator(uint reserved_nodes) : Tranx_node_allocator(uint reserved_nodes) :
reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES + reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES +
(reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)), (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)),
first_block(NULL), last_block(NULL), first_block(NULL), last_block(NULL),
current_block(NULL), last_node(-1), block_num(0) {} current_block(NULL), last_node(-1), block_num(0) {}
~TranxNodeAllocator() ~Tranx_node_allocator()
{ {
Block *block= first_block; Block *block= first_block;
while (block != NULL) while (block != NULL)
@ -101,11 +101,11 @@ public:
it are in use. A new Block is allocated and is put into the rear of the it are in use. A new Block is allocated and is put into the rear of the
Block link table if no Block is free. Block link table if no Block is free.
@return Return a TranxNode *, or NULL if an error occurred. @return Return a Tranx_node *, or NULL if an error occurred.
*/ */
TranxNode *allocate_node() Tranx_node *allocate_node()
{ {
TranxNode *trx_node; Tranx_node *trx_node;
Block *block= current_block; Block *block= current_block;
if (last_node == BLOCK_TRANX_NODES-1) if (last_node == BLOCK_TRANX_NODES-1)
@ -151,7 +151,7 @@ public:
@return Return 0, or 1 if an error occurred. @return Return 0, or 1 if an error occurred.
*/ */
int free_nodes_before(TranxNode* node) int free_nodes_before(Tranx_node* node)
{ {
Block *block; Block *block;
Block *prev_block= NULL; Block *prev_block= NULL;
@ -186,16 +186,16 @@ private:
uint reserved_blocks; uint reserved_blocks;
/** /**
A sequence memory which contains BLOCK_TRANX_NODES TranxNodes. A sequence memory which contains BLOCK_TRANX_NODES Tranx_nodes.
BLOCK_TRANX_NODES The number of TranxNodes which are in a Block. BLOCK_TRANX_NODES The number of Tranx_nodes which are in a Block.
next Every Block has a 'next' pointer which points to the next Block. next Every Block has a 'next' pointer which points to the next Block.
These linking Blocks constitute a Block link table. These linking Blocks constitute a Block link table.
*/ */
struct Block { struct Block {
Block *next; Block *next;
TranxNode nodes[BLOCK_TRANX_NODES]; Tranx_node nodes[BLOCK_TRANX_NODES];
}; };
/** /**
@ -290,20 +290,20 @@ private:
/** /**
This class manages memory for active transaction list. This class manages memory for active transaction list.
We record each active transaction with a TranxNode, each session We record each active transaction with a Tranx_node, each session
can have only one open transaction. Because of EVENT, the total can have only one open transaction. Because of EVENT, the total
active transaction nodes can exceed the maximum allowed active transaction nodes can exceed the maximum allowed
connections. connections.
*/ */
class ActiveTranx class Active_tranx
:public Trace { :public Trace {
private: private:
TranxNodeAllocator allocator_; Tranx_node_allocator allocator_;
/* These two record the active transaction list in sort order. */ /* These two record the active transaction list in sort order. */
TranxNode *trx_front_, *trx_rear_; Tranx_node *trx_front_, *trx_rear_;
TranxNode **trx_htb_; /* A hash table on active transactions. */ Tranx_node **trx_htb_; /* A hash table on active transactions. */
int num_entries_; /* maximum hash table entries */ int num_entries_; /* maximum hash table entries */
mysql_mutex_t *lock_; /* mutex lock */ mysql_mutex_t *lock_; /* mutex lock */
@ -314,23 +314,23 @@ private:
unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos); unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
int compare(const char *log_file_name1, my_off_t log_file_pos1, int compare(const char *log_file_name1, my_off_t log_file_pos1,
const TranxNode *node2) { const Tranx_node *node2) {
return compare(log_file_name1, log_file_pos1, return compare(log_file_name1, log_file_pos1,
node2->log_name_, node2->log_pos_); node2->log_name_, node2->log_pos_);
} }
int compare(const TranxNode *node1, int compare(const Tranx_node *node1,
const char *log_file_name2, my_off_t log_file_pos2) { const char *log_file_name2, my_off_t log_file_pos2) {
return compare(node1->log_name_, node1->log_pos_, return compare(node1->log_name_, node1->log_pos_,
log_file_name2, log_file_pos2); log_file_name2, log_file_pos2);
} }
int compare(const TranxNode *node1, const TranxNode *node2) { int compare(const Tranx_node *node1, const Tranx_node *node2) {
return compare(node1->log_name_, node1->log_pos_, return compare(node1->log_name_, node1->log_pos_,
node2->log_name_, node2->log_pos_); node2->log_name_, node2->log_pos_);
} }
public: public:
ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); Active_tranx(mysql_mutex_t *lock, unsigned long trace_level);
~ActiveTranx(); ~Active_tranx();
/* Insert an active transaction node with the specified position. /* Insert an active transaction node with the specified position.
* *
@ -366,13 +366,13 @@ public:
/** /**
The extension class for the master of semi-synchronous replication The extension class for the master of semi-synchronous replication
*/ */
class ReplSemiSyncMaster class Repl_semi_sync_master
:public ReplSemiSyncBase { :public Repl_semi_sync_base {
private: private:
ActiveTranx *active_tranxs_; /* active transaction list: the list will Active_tranx *active_tranxs_; /* active transaction list: the list will
be cleared when semi-sync switches off. */ be cleared when semi-sync switches off. */
/* True when initObject has been called */ /* True when init_object has been called */
bool init_done_; bool init_done_;
/* This cond variable is signaled when enough binlog has been sent to slave, /* This cond variable is signaled when enough binlog has been sent to slave,
@ -456,32 +456,32 @@ class ReplSemiSyncMaster
const char *log_file_name, my_off_t log_file_pos); const char *log_file_name, my_off_t log_file_pos);
public: public:
ReplSemiSyncMaster(); Repl_semi_sync_master();
~ReplSemiSyncMaster() {} ~Repl_semi_sync_master() {}
void cleanup(); void cleanup();
bool getMasterEnabled() { bool get_master_enabled() {
return master_enabled_; return master_enabled_;
} }
void setTraceLevel(unsigned long trace_level) { void set_trace_level(unsigned long trace_level) {
trace_level_ = trace_level; trace_level_ = trace_level;
if (active_tranxs_) if (active_tranxs_)
active_tranxs_->trace_level_ = trace_level; active_tranxs_->trace_level_ = trace_level;
} }
/* Set the transaction wait timeout period, in milliseconds. */ /* Set the transaction wait timeout period, in milliseconds. */
void setWaitTimeout(unsigned long wait_timeout) { void set_wait_timeout(unsigned long wait_timeout) {
wait_timeout_ = wait_timeout; wait_timeout_ = wait_timeout;
} }
/*set the ACK point, after binlog sync or after transaction commit*/ /*set the ACK point, after binlog sync or after transaction commit*/
void setWaitPoint(unsigned long ack_point) void set_wait_point(unsigned long ack_point)
{ {
wait_point_ = ack_point; wait_point_ = ack_point;
} }
ulong waitPoint() //no cover line ulong wait_point() //no cover line
{ {
return wait_point_; //no cover line return wait_point_; //no cover line
} }
@ -489,13 +489,13 @@ class ReplSemiSyncMaster
/* Initialize this class after MySQL parameters are initialized. this /* Initialize this class after MySQL parameters are initialized. this
* function should be called once at bootstrap time. * function should be called once at bootstrap time.
*/ */
int initObject(); int init_object();
/* Enable the object to enable semi-sync replication inside the master. */ /* Enable the object to enable semi-sync replication inside the master. */
int enableMaster(); int enable_master();
/* Enable the object to enable semi-sync replication inside the master. */ /* Enable the object to enable semi-sync replication inside the master. */
int disableMaster(); int disable_master();
/* Add a semi-sync replication slave */ /* Add a semi-sync replication slave */
void add_slave(); void add_slave();
@ -503,8 +503,8 @@ class ReplSemiSyncMaster
/* Remove a semi-sync replication slave */ /* Remove a semi-sync replication slave */
void remove_slave(); void remove_slave();
/* It parses a reply packet and call reportReplyBinlog to handle it. */ /* It parses a reply packet and call report_reply_binlog to handle it. */
int reportReplyPacket(uint32 server_id, const uchar *packet, int report_reply_packet(uint32 server_id, const uchar *packet,
ulong packet_len); ulong packet_len);
/* In semi-sync replication, reports up to which binlog position we have /* In semi-sync replication, reports up to which binlog position we have
@ -519,9 +519,9 @@ class ReplSemiSyncMaster
* Return: * Return:
* 0: success; non-zero: error * 0: success; non-zero: error
*/ */
int reportReplyBinlog(uint32 server_id, int report_reply_binlog(uint32 server_id,
const char* log_file_name, const char* log_file_name,
my_off_t end_offset); my_off_t end_offset);
/* Commit a transaction in the final step. This function is called from /* Commit a transaction in the final step. This function is called from
* InnoDB before returning from the low commit. If semi-sync is switch on, * InnoDB before returning from the low commit. If semi-sync is switch on,
@ -538,20 +538,20 @@ class ReplSemiSyncMaster
* Return: * Return:
* 0: success; non-zero: error * 0: success; non-zero: error
*/ */
int commitTrx(const char* trx_wait_binlog_name, int commit_trx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos); my_off_t trx_wait_binlog_pos);
/*Wait for ACK after writing/sync binlog to file*/ /*Wait for ACK after writing/sync binlog to file*/
int waitAfterSync(const char* log_file, my_off_t log_pos); int wait_after_sync(const char* log_file, my_off_t log_pos);
/*Wait for ACK after commting the transaction*/ /*Wait for ACK after commting the transaction*/
int waitAfterCommit(THD* thd, bool all); int wait_after_commit(THD* thd, bool all);
/*Wait after the transaction is rollback*/ /*Wait after the transaction is rollback*/
int waitAfterRollback(THD *thd, bool all); int wait_after_rollback(THD *thd, bool all);
/*Store the current binlog position in active_tranxs_. This position should /*Store the current binlog position in active_tranxs_. This position should
* be acked by slave*/ * be acked by slave*/
int reportBinlogUpdate(THD *thd, const char *log_file,my_off_t log_pos); int report_binlog_update(THD *thd, const char *log_file,my_off_t log_pos);
int dump_start(THD* thd, int dump_start(THD* thd,
const char *log_file, const char *log_file,
@ -569,7 +569,7 @@ class ReplSemiSyncMaster
* Return: * Return:
* size of the bytes reserved for header * size of the bytes reserved for header
*/ */
int reserveSyncHeader(String* packet); int reserve_sync_header(String* packet);
/* Update the sync bit in the packet header to indicate to the slave whether /* Update the sync bit in the packet header to indicate to the slave whether
* the master will wait for the reply of the event. If semi-sync is switched * the master will wait for the reply of the event. If semi-sync is switched
@ -580,16 +580,16 @@ class ReplSemiSyncMaster
* packet - (IN) the packet containing the replication event * packet - (IN) the packet containing the replication event
* log_file_name - (IN) the event ending position's file name * log_file_name - (IN) the event ending position's file name
* log_file_pos - (IN) the event ending position's file offset * log_file_pos - (IN) the event ending position's file offset
* need_sync - (IN) identify if flushNet is needed to call. * need_sync - (IN) identify if flush_net is needed to call.
* server_id - (IN) master server id number * server_id - (IN) master server id number
* *
* Return: * Return:
* 0: success; non-zero: error * 0: success; non-zero: error
*/ */
int updateSyncHeader(THD* thd, unsigned char *packet, int update_sync_header(THD* thd, unsigned char *packet,
const char *log_file_name, const char *log_file_name,
my_off_t log_file_pos, my_off_t log_file_pos,
bool* need_sync); bool* need_sync);
/* Called when a transaction finished writing binlog events. /* Called when a transaction finished writing binlog events.
* . update the 'largest' transactions' binlog event position * . update the 'largest' transactions' binlog event position
@ -603,25 +603,25 @@ class ReplSemiSyncMaster
* Return: * Return:
* 0: success; non-zero: error * 0: success; non-zero: error
*/ */
int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos); int write_tranx_in_binlog(const char* log_file_name, my_off_t log_file_pos);
/* Read the slave's reply so that we know how much progress the slave makes /* Read the slave's reply so that we know how much progress the slave makes
* on receive replication events. * on receive replication events.
*/ */
int flushNet(THD* thd, const char *event_buf); int flush_net(THD* thd, const char *event_buf);
/* Export internal statistics for semi-sync replication. */ /* Export internal statistics for semi-sync replication. */
void setExportStats(); void set_export_stats();
/* 'reset master' command is issued from the user and semi-sync need to /* 'reset master' command is issued from the user and semi-sync need to
* go off for that. * go off for that.
*/ */
int afterResetMaster(); int after_reset_master();
/*called before reset master*/ /*called before reset master*/
int beforeResetMaster(); int before_reset_master();
void checkAndSwitch(); void check_and_switch();
}; };
enum rpl_semi_sync_master_wait_point_t { enum rpl_semi_sync_master_wait_point_t {
@ -629,7 +629,7 @@ enum rpl_semi_sync_master_wait_point_t {
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT, SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT,
}; };
extern ReplSemiSyncMaster repl_semisync_master; extern Repl_semi_sync_master repl_semisync_master;
extern Ack_receiver ack_receiver; extern Ack_receiver ack_receiver;
/* System and status variables for the master component */ /* System and status variables for the master component */
@ -663,7 +663,7 @@ extern unsigned long long rpl_semi_sync_master_get_ack;
1 (default) : keep waiting until timeout even no available semi-sync slave. 1 (default) : keep waiting until timeout even no available semi-sync slave.
*/ */
extern char rpl_semi_sync_master_wait_no_slave; extern char rpl_semi_sync_master_wait_no_slave;
extern ReplSemiSyncMaster repl_semisync_master; extern Repl_semi_sync_master repl_semisync_master;
extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
extern PSI_stage_info stage_reading_semi_sync_ack; extern PSI_stage_info stage_reading_semi_sync_ack;

View File

@ -17,10 +17,10 @@
#include "semisync_master.h" #include "semisync_master.h"
#include "semisync_master_ack_receiver.h" #include "semisync_master_ack_receiver.h"
extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex; extern PSI_mutex_key key_LOCK_ack_receiver;
extern PSI_cond_key key_ss_cond_Ack_receiver_cond; extern PSI_cond_key key_COND_ack_receiver;
extern PSI_thread_key key_ss_thread_Ack_receiver_thread; extern PSI_thread_key key_thread_ack_receiver;
extern ReplSemiSyncMaster repl_semisync; extern Repl_semi_sync_master repl_semisync;
/* Callback function of ack receive thread */ /* Callback function of ack receive thread */
pthread_handler_t ack_receive_handler(void *arg) pthread_handler_t ack_receive_handler(void *arg)
@ -39,9 +39,9 @@ Ack_receiver::Ack_receiver()
DBUG_ENTER("Ack_receiver::Ack_receiver"); DBUG_ENTER("Ack_receiver::Ack_receiver");
m_status= ST_DOWN; m_status= ST_DOWN;
mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex, mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex,
MY_MUTEX_INIT_FAST); MY_MUTEX_INIT_FAST);
mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL); mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
m_pid= 0; m_pid= 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
@ -75,7 +75,7 @@ bool Ack_receiver::start()
#ifndef _WIN32 #ifndef _WIN32
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
#endif #endif
mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid, mysql_thread_create(key_thread_ack_receiver, &m_pid,
&attr, ack_receive_handler, this)) &attr, ack_receive_handler, this))
{ {
sql_print_error("Failed to start semi-sync ACK receiver thread, " sql_print_error("Failed to start semi-sync ACK receiver thread, "
@ -283,8 +283,8 @@ void Ack_receiver::run()
len= my_net_read(&net); len= my_net_read(&net);
if (likely(len != packet_error)) if (likely(len != packet_error))
repl_semisync_master.reportReplyPacket(slave->server_id(), repl_semisync_master.report_reply_packet(slave->server_id(),
net.read_pos, len); net.read_pos, len);
else if (net.last_errno == ER_NET_READ_ERROR) else if (net.last_errno == ER_NET_READ_ERROR)
FD_CLR(slave->sock_fd(), &read_fds); FD_CLR(slave->sock_fd(), &read_fds);
} }

View File

@ -30,7 +30,7 @@
add_slave: maintain a new semisync slave's information add_slave: maintain a new semisync slave's information
remove_slave: remove a semisync slave's information remove_slave: remove a semisync slave's information
*/ */
class Ack_receiver : public ReplSemiSyncBase class Ack_receiver : public Repl_semi_sync_base
{ {
public: public:
Ack_receiver(); Ack_receiver();
@ -76,7 +76,7 @@ public:
*/ */
void run(); void run();
void setTraceLevel(unsigned long trace_level) void set_trace_level(unsigned long trace_level)
{ {
trace_level_= trace_level; trace_level_= trace_level;
} }

View File

@ -18,7 +18,7 @@
#include <my_global.h> #include <my_global.h>
#include "semisync_slave.h" #include "semisync_slave.h"
ReplSemiSyncSlave repl_semisync_slave; Repl_semi_sync_slave repl_semisync_slave;
my_bool rpl_semi_sync_slave_enabled= 0; my_bool rpl_semi_sync_slave_enabled= 0;
@ -37,26 +37,26 @@ bool semi_sync_need_reply= false;
unsigned int rpl_semi_sync_slave_kill_conn_timeout; unsigned int rpl_semi_sync_slave_kill_conn_timeout;
unsigned long long rpl_semi_sync_slave_send_ack = 0; unsigned long long rpl_semi_sync_slave_send_ack = 0;
int ReplSemiSyncSlave::initObject() int Repl_semi_sync_slave::init_object()
{ {
int result= 0; int result= 0;
init_done_ = true; init_done_ = true;
/* References to the parameter works after set_options(). */ /* References to the parameter works after set_options(). */
setSlaveEnabled(rpl_semi_sync_slave_enabled); set_slave_enabled(rpl_semi_sync_slave_enabled);
setTraceLevel(rpl_semi_sync_slave_trace_level); set_trace_level(rpl_semi_sync_slave_trace_level);
setDelayMaster(rpl_semi_sync_slave_delay_master); set_delay_master(rpl_semi_sync_slave_delay_master);
setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout); set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout);
return result; return result;
} }
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, int Repl_semi_sync_slave::slave_read_sync_header(const char *header,
unsigned long total_len, unsigned long total_len,
int *semi_flags, int *semi_flags,
const char **payload, const char **payload,
unsigned long *payload_len) unsigned long *payload_len)
{ {
int read_res = 0; int read_res = 0;
DBUG_ENTER("Repl_semi_sync_slave::slave_read_sync_header"); DBUG_ENTER("Repl_semi_sync_slave::slave_read_sync_header");
@ -64,9 +64,9 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
if (rpl_semi_sync_slave_status) if (rpl_semi_sync_slave_status)
{ {
if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1) if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1)
&& (unsigned char)(header[0]) == kPacketMagicNum) && (unsigned char)(header[0]) == k_packet_magic_num)
{ {
semi_sync_need_reply = (header[1] & kPacketFlagSync); semi_sync_need_reply = (header[1] & k_packet_flag_sync);
*payload_len = total_len - 2; *payload_len = total_len - 2;
*payload = header + 2; *payload = header + 2;
@ -76,7 +76,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
if (semi_sync_need_reply) if (semi_sync_need_reply)
*semi_flags |= SEMI_SYNC_NEED_ACK; *semi_flags |= SEMI_SYNC_NEED_ACK;
if (isDelayMaster()) if (is_delay_master())
*semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC; *semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC;
} }
else else
@ -93,9 +93,9 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
DBUG_RETURN(read_res); DBUG_RETURN(read_res);
} }
int ReplSemiSyncSlave::slaveStart(Master_info *mi) int Repl_semi_sync_slave::slave_start(Master_info *mi)
{ {
bool semi_sync= getSlaveEnabled(); bool semi_sync= get_slave_enabled();
sql_print_information("Slave I/O thread: Start %s replication to\ sql_print_information("Slave I/O thread: Start %s replication to\
master '%s@%s:%d' in log '%s' at position %lu", master '%s@%s:%d' in log '%s' at position %lu",
@ -112,21 +112,21 @@ int ReplSemiSyncSlave::slaveStart(Master_info *mi)
return 0; return 0;
} }
int ReplSemiSyncSlave::slaveStop(Master_info *mi) int Repl_semi_sync_slave::slave_stop(Master_info *mi)
{ {
if (rpl_semi_sync_slave_status) if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0; rpl_semi_sync_slave_status= 0;
if (getSlaveEnabled()) if (get_slave_enabled())
killConnection(mi->mysql); kill_connection(mi->mysql);
return 0; return 0;
} }
int ReplSemiSyncSlave::resetSlave(Master_info *mi) int Repl_semi_sync_slave::reset_slave(Master_info *mi)
{ {
return 0; return 0;
} }
void ReplSemiSyncSlave::killConnection(MYSQL *mysql) void Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
{ {
if (!mysql) if (!mysql)
return; return;
@ -154,14 +154,14 @@ void ReplSemiSyncSlave::killConnection(MYSQL *mysql)
mysql_close(kill_mysql); mysql_close(kill_mysql);
} }
int ReplSemiSyncSlave::requestTransmit(Master_info *mi) int Repl_semi_sync_slave::request_transmit(Master_info *mi)
{ {
MYSQL *mysql= mi->mysql; MYSQL *mysql= mi->mysql;
MYSQL_RES *res= 0; MYSQL_RES *res= 0;
MYSQL_ROW row; MYSQL_ROW row;
const char *query; const char *query;
if (!getSlaveEnabled()) if (!get_slave_enabled())
return 0; return 0;
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
@ -201,7 +201,7 @@ int ReplSemiSyncSlave::requestTransmit(Master_info *mi)
return 0; return 0;
} }
int ReplSemiSyncSlave::slaveReply(Master_info *mi) int Repl_semi_sync_slave::slave_reply(Master_info *mi)
{ {
MYSQL* mysql= mi->mysql; MYSQL* mysql= mi->mysql;
const char *binlog_filename= const_cast<char *>(mi->master_log_name); const char *binlog_filename= const_cast<char *>(mi->master_log_name);
@ -219,7 +219,7 @@ int ReplSemiSyncSlave::slaveReply(Master_info *mi)
if (rpl_semi_sync_slave_status && semi_sync_need_reply) if (rpl_semi_sync_slave_status && semi_sync_need_reply)
{ {
/* Prepare the buffer of the reply. */ /* Prepare the buffer of the reply. */
reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum; reply_buffer[REPLY_MAGIC_NUM_OFFSET] = k_packet_magic_num;
int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos); int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET, memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
binlog_filename, binlog_filename,

View File

@ -29,39 +29,38 @@ class Master_info;
/** /**
The extension class for the slave of semi-synchronous replication The extension class for the slave of semi-synchronous replication
*/ */
class ReplSemiSyncSlave class Repl_semi_sync_slave
:public ReplSemiSyncBase { :public Repl_semi_sync_base {
public: public:
ReplSemiSyncSlave() Repl_semi_sync_slave() :slave_enabled_(false) {}
:slave_enabled_(false) ~Repl_semi_sync_slave() {}
{}
~ReplSemiSyncSlave() {}
void setTraceLevel(unsigned long trace_level) { void set_trace_level(unsigned long trace_level) {
trace_level_ = trace_level; trace_level_ = trace_level;
} }
/* Initialize this class after MySQL parameters are initialized. this /* Initialize this class after MySQL parameters are initialized. this
* function should be called once at bootstrap time. * function should be called once at bootstrap time.
*/ */
int initObject(); int init_object();
bool getSlaveEnabled() { bool get_slave_enabled() {
return slave_enabled_; return slave_enabled_;
} }
void setSlaveEnabled(bool enabled) {
void set_slave_enabled(bool enabled) {
slave_enabled_ = enabled; slave_enabled_ = enabled;
} }
bool isDelayMaster(){ bool is_delay_master(){
return delay_master_; return delay_master_;
} }
void setDelayMaster(bool enabled) { void set_delay_master(bool enabled) {
delay_master_ = enabled; delay_master_ = enabled;
} }
void setKillConnTimeout(unsigned int timeout) { void set_kill_conn_timeout(unsigned int timeout) {
kill_conn_timeout_ = timeout; kill_conn_timeout_ = timeout;
} }
@ -71,29 +70,31 @@ public:
* Input: * Input:
* header - (IN) packet header pointer * header - (IN) packet header pointer
* total_len - (IN) total packet length: metadata + payload * total_len - (IN) total packet length: metadata + payload
* semi_flags - (IN) store flags: SEMI_SYNC_SLAVE_DELAY_SYNC and SEMI_SYNC_NEED_ACK * semi_flags - (IN) store flags: SEMI_SYNC_SLAVE_DELAY_SYNC and
SEMI_SYNC_NEED_ACK
* payload - (IN) payload: the replication event * payload - (IN) payload: the replication event
* payload_len - (IN) payload length * payload_len - (IN) payload length
* *
* Return: * Return:
* 0: success; non-zero: error * 0: success; non-zero: error
*/ */
int slaveReadSyncHeader(const char *header, unsigned long total_len, int *semi_flags, int slave_read_sync_header(const char *header, unsigned long total_len,
const char **payload, unsigned long *payload_len); int *semi_flags,
const char **payload, unsigned long *payload_len);
/* A slave replies to the master indicating its replication process. It /* A slave replies to the master indicating its replication process. It
* indicates that the slave has received all events before the specified * indicates that the slave has received all events before the specified
* binlog position. * binlog position.
*/ */
int slaveReply(Master_info* mi); int slave_reply(Master_info* mi);
int slaveStart(Master_info *mi); int slave_start(Master_info *mi);
int slaveStop(Master_info *mi); int slave_stop(Master_info *mi);
int requestTransmit(Master_info*); int request_transmit(Master_info*);
void killConnection(MYSQL *mysql); void kill_connection(MYSQL *mysql);
int resetSlave(Master_info *mi); int reset_slave(Master_info *mi);
private: private:
/* True when initObject has been called */ /* True when init_object has been called */
bool init_done_; bool init_done_;
bool slave_enabled_; /* semi-sycn is enabled on the slave */ bool slave_enabled_; /* semi-sycn is enabled on the slave */
bool delay_master_; bool delay_master_;
@ -105,7 +106,7 @@ private:
extern my_bool rpl_semi_sync_slave_enabled; extern my_bool rpl_semi_sync_slave_enabled;
extern my_bool rpl_semi_sync_slave_status; extern my_bool rpl_semi_sync_slave_status;
extern ulong rpl_semi_sync_slave_trace_level; extern ulong rpl_semi_sync_slave_trace_level;
extern ReplSemiSyncSlave repl_semisync_slave; extern Repl_semi_sync_slave repl_semisync_slave;
extern char rpl_semi_sync_slave_delay_master; extern char rpl_semi_sync_slave_delay_master;
extern unsigned int rpl_semi_sync_slave_kill_conn_timeout; extern unsigned int rpl_semi_sync_slave_kill_conn_timeout;

View File

@ -3586,7 +3586,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
if (opt_log_slave_updates && opt_replicate_annotate_row_events) if (opt_log_slave_updates && opt_replicate_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
if (repl_semisync_slave.requestTransmit(mi)) if (repl_semisync_slave.request_transmit(mi))
DBUG_RETURN(1); DBUG_RETURN(1);
// TODO if big log files: Change next to int8store() // TODO if big log files: Change next to int8store()
@ -4614,7 +4614,7 @@ pthread_handler_t handle_slave_io(void *arg)
if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
|| repl_semisync_slave.slaveStart(mi)) || repl_semisync_slave.slave_start(mi))
{ {
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR), ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@ -4806,8 +4806,8 @@ Stopping slave I/O thread due to out-of-memory error from master");
event_buf= (const char*)mysql->net.read_pos + 1; event_buf= (const char*)mysql->net.read_pos + 1;
mi->semi_ack= 0; mi->semi_ack= 0;
if (repl_semisync_slave. if (repl_semisync_slave.
slaveReadSyncHeader((const char*)mysql->net.read_pos + 1, event_len, slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len,
&(mi->semi_ack), &event_buf, &event_len)) &(mi->semi_ack), &event_buf, &event_len))
{ {
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR), ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@ -4865,7 +4865,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
} }
if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK) && if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK) &&
repl_semisync_slave.slaveReply(mi)) repl_semisync_slave.slave_reply(mi))
{ {
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR), ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@ -4879,7 +4879,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
master info only when ack is needed. This may lead to at least one master info only when ack is needed. This may lead to at least one
group transaction delay but affords better performance improvement. group transaction delay but affords better performance improvement.
*/ */
(!repl_semisync_slave.getSlaveEnabled() || (!repl_semisync_slave.get_slave_enabled() ||
(!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) || (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) ||
(mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) && (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) &&
(DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) || (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) ||
@ -4937,7 +4937,7 @@ err:
IO_RPL_LOG_NAME, mi->master_log_pos, IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe()); tmp.c_ptr_safe());
} }
repl_semisync_slave.slaveStop(mi); repl_semisync_slave.slave_stop(mi);
thd->reset_query(); thd->reset_query();
thd->reset_db(NULL, 0); thd->reset_db(NULL, 0);
if (mysql) if (mysql)

View File

@ -316,7 +316,7 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags,
if (info->thd->semi_sync_slave) if (info->thd->semi_sync_slave)
{ {
if (repl_semisync_master.reserveSyncHeader(packet)) if (repl_semisync_master.reserve_sync_header(packet))
{ {
info->error= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed to run hook 'reserve_header'"; *errmsg= "Failed to run hook 'reserve_header'";
@ -1945,9 +1945,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave); THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log); pos= my_b_tell(log);
if (repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(), if (repl_semisync_master.update_sync_header(info->thd,
info->log_file_name + info->dirlen, (uchar*) packet->c_ptr(),
pos, &need_sync)) info->log_file_name + info->dirlen,
pos, &need_sync))
{ {
info->error= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed"; return "run 'before_send_event' hook failed";
@ -1969,7 +1970,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
} }
} }
if (need_sync && repl_semisync_master.flushNet(info->thd, packet->c_ptr())) if (need_sync && repl_semisync_master.flush_net(info->thd, packet->c_ptr()))
{ {
info->error= ER_UNKNOWN_ERROR; info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'"; return "Failed to run hook 'after_send_event'";
@ -3341,7 +3342,7 @@ int reset_slave(THD *thd, Master_info* mi)
sql_print_information("Deleted Master_info file '%s'.", fname); sql_print_information("Deleted Master_info file '%s'.", fname);
if (rpl_semi_sync_slave_enabled) if (rpl_semi_sync_slave_enabled)
repl_semisync_slave.resetSlave(mi); repl_semisync_slave.reset_slave(mi);
err: err:
mi->unlock_slave_threads(); mi->unlock_slave_threads();
if (error) if (error)
@ -3845,10 +3846,10 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
bool ret= 0; bool ret= 0;
/* Temporarily disable master semisync before reseting master. */ /* Temporarily disable master semisync before reseting master. */
repl_semisync_master.beforeResetMaster(); repl_semisync_master.before_reset_master();
ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len, ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
next_log_number); next_log_number);
repl_semisync_master.afterResetMaster(); repl_semisync_master.after_reset_master();
return ret; return ret;
} }

View File

@ -3049,17 +3049,17 @@ static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd,
{ {
if (rpl_semi_sync_master_enabled) if (rpl_semi_sync_master_enabled)
{ {
if (repl_semisync_master.enableMaster() != 0) if (repl_semisync_master.enable_master() != 0)
rpl_semi_sync_master_enabled= false; rpl_semi_sync_master_enabled= false;
else if (ack_receiver.start()) else if (ack_receiver.start())
{ {
repl_semisync_master.disableMaster(); repl_semisync_master.disable_master();
rpl_semi_sync_master_enabled= false; rpl_semi_sync_master_enabled= false;
} }
} }
else else
{ {
if (repl_semisync_master.disableMaster() != 0) if (repl_semisync_master.disable_master() != 0)
rpl_semi_sync_master_enabled= true; rpl_semi_sync_master_enabled= true;
if (!rpl_semi_sync_master_enabled) if (!rpl_semi_sync_master_enabled)
ack_receiver.stop(); ack_receiver.stop();
@ -3070,29 +3070,29 @@ static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd,
static bool fix_rpl_semi_sync_master_timeout(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_master_timeout(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_master.setWaitTimeout(rpl_semi_sync_master_timeout); repl_semisync_master.set_wait_timeout(rpl_semi_sync_master_timeout);
return false; return false;
} }
static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_master.setTraceLevel(rpl_semi_sync_master_trace_level); repl_semisync_master.set_trace_level(rpl_semi_sync_master_trace_level);
ack_receiver.setTraceLevel(rpl_semi_sync_master_trace_level); ack_receiver.set_trace_level(rpl_semi_sync_master_trace_level);
return false; return false;
} }
static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_master.setWaitPoint(rpl_semi_sync_master_wait_point); repl_semisync_master.set_wait_point(rpl_semi_sync_master_wait_point);
return false; return false;
} }
static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_master.checkAndSwitch(); repl_semisync_master.check_and_switch();
return false; return false;
} }
@ -3147,28 +3147,29 @@ static Sys_var_enum Sys_semisync_master_wait_point(
static bool fix_rpl_semi_sync_slave_enabled(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_slave_enabled(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_slave.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0); repl_semisync_slave.set_slave_enabled(rpl_semi_sync_slave_enabled != 0);
return false; return false;
} }
static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_slave.setTraceLevel(rpl_semi_sync_slave_trace_level); repl_semisync_slave.set_trace_level(rpl_semi_sync_slave_trace_level);
return false; return false;
} }
static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_slave.setDelayMaster(rpl_semi_sync_slave_delay_master); repl_semisync_slave.set_delay_master(rpl_semi_sync_slave_delay_master);
return false; return false;
} }
static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd, static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd,
enum_var_type type) enum_var_type type)
{ {
repl_semisync_slave.setKillConnTimeout(rpl_semi_sync_slave_kill_conn_timeout); repl_semisync_slave.
set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout);
return false; return false;
} }

View File

@ -319,13 +319,13 @@ bool trans_commit(THD *thd)
if (res) if (res)
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE); repl_semisync_master.wait_after_rollback(thd, FALSE);
#endif #endif
} }
else else
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterCommit(thd, FALSE); repl_semisync_master.wait_after_commit(thd, FALSE);
#endif #endif
} }
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
@ -421,7 +421,7 @@ bool trans_rollback(THD *thd)
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= ha_rollback_trans(thd, TRUE); res= ha_rollback_trans(thd, TRUE);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE); repl_semisync_master.wait_after_rollback(thd, FALSE);
#endif #endif
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
/* Reset the binlog transaction marker */ /* Reset the binlog transaction marker */
@ -537,13 +537,13 @@ bool trans_commit_stmt(THD *thd)
if (res) if (res)
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE); repl_semisync_master.wait_after_rollback(thd, FALSE);
#endif #endif
} }
else else
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterCommit(thd, FALSE); repl_semisync_master.wait_after_commit(thd, FALSE);
#endif #endif
} }
@ -585,7 +585,7 @@ bool trans_rollback_stmt(THD *thd)
} }
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE); repl_semisync_master.wait_after_rollback(thd, FALSE);
#endif #endif
thd->transaction.stmt.reset(); thd->transaction.stmt.reset();