mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
MDEV-26: Global Transaction ID.
Implement CHANGE MASTER TO ... MASTER_GTID_POS = "x-y-z,a-b-c".
This commit is contained in:
@ -6076,13 +6076,12 @@ rpl_slave_state::init()
|
|||||||
inited= true;
|
inited= true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
rpl_slave_state::deinit()
|
rpl_slave_state::truncate_hash()
|
||||||
{
|
{
|
||||||
uint32 i;
|
uint32 i;
|
||||||
|
|
||||||
if (!inited)
|
|
||||||
return;
|
|
||||||
for (i= 0; i < hash.records; ++i)
|
for (i= 0; i < hash.records; ++i)
|
||||||
{
|
{
|
||||||
element *e= (element *)my_hash_element(&hash, i);
|
element *e= (element *)my_hash_element(&hash, i);
|
||||||
@ -6094,8 +6093,17 @@ rpl_slave_state::deinit()
|
|||||||
my_free(l);
|
my_free(l);
|
||||||
l= next;
|
l= next;
|
||||||
}
|
}
|
||||||
/* The element itself is freed by my_hash_free(). */
|
/* The element itself is freed by the hash element free function. */
|
||||||
}
|
}
|
||||||
|
my_hash_reset(&hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
rpl_slave_state::deinit()
|
||||||
|
{
|
||||||
|
if (!inited)
|
||||||
|
return;
|
||||||
|
truncate_hash();
|
||||||
my_hash_free(&hash);
|
my_hash_free(&hash);
|
||||||
mysql_mutex_destroy(&LOCK_slave_state);
|
mysql_mutex_destroy(&LOCK_slave_state);
|
||||||
}
|
}
|
||||||
@ -6148,6 +6156,42 @@ rpl_slave_state::get_element(uint32 domain_id)
|
|||||||
|
|
||||||
#ifdef MYSQL_SERVER
|
#ifdef MYSQL_SERVER
|
||||||
#ifdef HAVE_REPLICATION
|
#ifdef HAVE_REPLICATION
|
||||||
|
int
|
||||||
|
rpl_slave_state::truncate_state_table(THD *thd)
|
||||||
|
{
|
||||||
|
TABLE_LIST tlist;
|
||||||
|
int err= 0;
|
||||||
|
TABLE *table;
|
||||||
|
|
||||||
|
mysql_reset_thd_for_next_command(thd, 0);
|
||||||
|
|
||||||
|
tlist.init_one_table(STRING_WITH_LEN("mysql"),
|
||||||
|
rpl_gtid_slave_state_table_name.str,
|
||||||
|
rpl_gtid_slave_state_table_name.length,
|
||||||
|
NULL, TL_WRITE);
|
||||||
|
if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
|
||||||
|
{
|
||||||
|
table= tlist.table;
|
||||||
|
err= table->file->ha_truncate();
|
||||||
|
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
ha_rollback_trans(thd, FALSE);
|
||||||
|
close_thread_tables(thd);
|
||||||
|
ha_rollback_trans(thd, TRUE);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ha_commit_trans(thd, FALSE);
|
||||||
|
close_thread_tables(thd);
|
||||||
|
ha_commit_trans(thd, TRUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Write a gtid to the replication slave state table.
|
Write a gtid to the replication slave state table.
|
||||||
|
|
||||||
@ -6442,13 +6486,23 @@ gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid)
|
|||||||
Update the slave replication state with the GTID position obtained from
|
Update the slave replication state with the GTID position obtained from
|
||||||
master when connecting with old-style (filename,offset) position.
|
master when connecting with old-style (filename,offset) position.
|
||||||
|
|
||||||
|
If RESET is true then all existing entries are removed. Otherwise only
|
||||||
|
domain_ids mentioned in the STATE_FROM_MASTER are changed.
|
||||||
|
|
||||||
Returns 0 if ok, non-zero if error.
|
Returns 0 if ok, non-zero if error.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
rpl_slave_state::load(THD *thd, char *state_from_master)
|
rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
|
||||||
|
bool reset)
|
||||||
{
|
{
|
||||||
char *end= state_from_master + strlen(state_from_master);
|
char *end= state_from_master + len;
|
||||||
|
|
||||||
|
if (reset)
|
||||||
|
{
|
||||||
|
if (truncate_state_table(thd))
|
||||||
|
return 1;
|
||||||
|
truncate_hash();
|
||||||
|
}
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
rpl_gtid gtid;
|
rpl_gtid gtid;
|
||||||
@ -6463,6 +6517,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master)
|
|||||||
break;
|
break;
|
||||||
if (*state_from_master != ',')
|
if (*state_from_master != ',')
|
||||||
return 1;
|
return 1;
|
||||||
|
++state_from_master;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3003,13 +3003,15 @@ struct rpl_slave_state
|
|||||||
|
|
||||||
void init();
|
void init();
|
||||||
void deinit();
|
void deinit();
|
||||||
|
void truncate_hash();
|
||||||
ulong count() const { return hash.records; }
|
ulong count() const { return hash.records; }
|
||||||
int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
|
int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
|
||||||
|
int truncate_state_table(THD *thd);
|
||||||
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
|
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
|
||||||
bool in_transaction);
|
bool in_transaction);
|
||||||
uint64 next_subid(uint32 domain_id);
|
uint64 next_subid(uint32 domain_id);
|
||||||
int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
|
int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
|
||||||
int load(THD *thd, char *state_from_master);
|
int load(THD *thd, char *state_from_master, size_t len, bool reset);
|
||||||
bool is_empty();
|
bool is_empty();
|
||||||
|
|
||||||
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
|
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
|
||||||
|
@ -6600,3 +6600,7 @@ ER_SLAVE_STARTED
|
|||||||
eng "SLAVE '%.*s' started"
|
eng "SLAVE '%.*s' started"
|
||||||
ER_SLAVE_STOPPED
|
ER_SLAVE_STOPPED
|
||||||
eng "SLAVE '%.*s' stopped"
|
eng "SLAVE '%.*s' stopped"
|
||||||
|
ER_FAILED_GTID_STATE_INIT
|
||||||
|
eng "Failed initializing replication GTID state"
|
||||||
|
ER_INCORRECT_GTID_STATE
|
||||||
|
eng "Could not parse GTID list for MASTER_GTID_POS"
|
||||||
|
@ -1864,7 +1864,8 @@ after_set_capability:
|
|||||||
(master_row= mysql_fetch_row(master_res)) &&
|
(master_row= mysql_fetch_row(master_res)) &&
|
||||||
(master_row[0] != NULL))
|
(master_row[0] != NULL))
|
||||||
{
|
{
|
||||||
rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0]);
|
rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0],
|
||||||
|
strlen(master_row[0]), false);
|
||||||
}
|
}
|
||||||
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
|
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
|
||||||
goto slave_killed_err;
|
goto slave_killed_err;
|
||||||
|
@ -315,6 +315,8 @@ struct LEX_MASTER_INFO
|
|||||||
host= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
|
host= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
|
||||||
ssl_capath= ssl_cipher= relay_log_name= 0;
|
ssl_capath= ssl_cipher= relay_log_name= 0;
|
||||||
pos= relay_log_pos= server_id= port= connect_retry= 0;
|
pos= relay_log_pos= server_id= port= connect_retry= 0;
|
||||||
|
gtid_pos_str.str= NULL;
|
||||||
|
gtid_pos_str.length= 0;
|
||||||
gtid_pos_auto= FALSE;
|
gtid_pos_auto= FALSE;
|
||||||
heartbeat_period= 0;
|
heartbeat_period= 0;
|
||||||
ssl= ssl_verify_server_cert= heartbeat_opt=
|
ssl= ssl_verify_server_cert= heartbeat_opt=
|
||||||
|
@ -2231,6 +2231,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
|
|||||||
char relay_log_info_file_tmp[FN_REFLEN];
|
char relay_log_info_file_tmp[FN_REFLEN];
|
||||||
my_off_t saved_log_pos;
|
my_off_t saved_log_pos;
|
||||||
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
|
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
|
||||||
|
slave_connection_state tmp_slave_state;
|
||||||
DBUG_ENTER("change_master");
|
DBUG_ENTER("change_master");
|
||||||
|
|
||||||
*master_info_added= false;
|
*master_info_added= false;
|
||||||
@ -2261,6 +2262,27 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
|
|||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (lex_mi->gtid_pos_str.str)
|
||||||
|
{
|
||||||
|
if (master_info_index->give_error_if_slave_running())
|
||||||
|
{
|
||||||
|
ret= TRUE;
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
First load it into a dummy object, to check for parse errors.
|
||||||
|
We do not want to wipe the previous state if there is an error
|
||||||
|
in the syntax of the new state!
|
||||||
|
*/
|
||||||
|
if (tmp_slave_state.load(lex_mi->gtid_pos_str.str,
|
||||||
|
lex_mi->gtid_pos_str.length))
|
||||||
|
{
|
||||||
|
my_error(ER_INCORRECT_GTID_STATE, MYF(0));
|
||||||
|
ret= TRUE;
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
thd_proc_info(thd, "Changing master");
|
thd_proc_info(thd, "Changing master");
|
||||||
|
|
||||||
create_logfile_name_with_suffix(master_info_file_tmp,
|
create_logfile_name_with_suffix(master_info_file_tmp,
|
||||||
@ -2426,7 +2448,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
|
|||||||
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
|
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lex_mi->gtid_pos_auto)
|
if (lex_mi->gtid_pos_auto || lex_mi->gtid_pos_str.str)
|
||||||
mi->gtid_pos_auto= true;
|
mi->gtid_pos_auto= true;
|
||||||
else if (lex_mi->gtid_pos_str.str ||
|
else if (lex_mi->gtid_pos_str.str ||
|
||||||
lex_mi->log_file_name || lex_mi->pos ||
|
lex_mi->log_file_name || lex_mi->pos ||
|
||||||
@ -2463,6 +2485,18 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
|
|||||||
strmake(mi->master_log_name, mi->rli.group_master_log_name,
|
strmake(mi->master_log_name, mi->rli.group_master_log_name,
|
||||||
sizeof(mi->master_log_name)-1);
|
sizeof(mi->master_log_name)-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (lex_mi->gtid_pos_str.str)
|
||||||
|
{
|
||||||
|
if (rpl_global_gtid_slave_state.load(thd, lex_mi->gtid_pos_str.str,
|
||||||
|
lex_mi->gtid_pos_str.length, true))
|
||||||
|
{
|
||||||
|
my_error(ER_FAILED_GTID_STATE_INIT, MYF(0));
|
||||||
|
ret= TRUE;
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
|
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
|
||||||
a slave before).
|
a slave before).
|
||||||
|
Reference in New Issue
Block a user