1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

Implementation of Multi-source replication (MDEV:253)

Documentation of the feature can be found at: http://kb.askmonty.org/en/multi-source-replication/
This code is based on code from Taobao, developed by Plinux

BUILD/SETUP.sh:
  Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it)
client/mysqltest.cc:
  Added support for error names starting with 'W'
  Added connection_name support to --sync_with_master
cmake/maintainer.cmake:
  Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it)
mysql-test/r/mysqltest.result:
  Updated results
mysql-test/r/parser.result:
  Updated results
mysql-test/suite/multi_source/my.cnf:
  Setup of multi-master tests
mysql-test/suite/multi_source/simple.result:
  Simple basic test of multi-source functionality
mysql-test/suite/multi_source/simple.test:
  Simple basic test of multi-source functionality
mysql-test/suite/multi_source/syntax.result:
  Test of multi-source syntax
mysql-test/suite/multi_source/syntax.test:
  Test of multi-source syntax
mysql-test/suite/rpl/r/rpl_rotate_logs.result:
  Updated results because of new error messages
mysql-test/t/parser.test:
  Updated test as master_pos_wait() now takes more arguments than before
sql/event_scheduler.cc:
  No reason to initialize slave_thread (it's guaranteed to be zero here)
sql/item_create.cc:
  Added connection_name argument to master_pos_wait()
  Simplified code
sql/item_func.cc:
  Added connection_name argument to master_pos_wait()
sql/item_func.h:
  Added connection_name argument to master_pos_wait()
sql/log.cc:
  Added tag "Master 'connection_name'" to slave errors that has a connection name.
sql/mysqld.cc:
  Added variable mysqld_server_initialized so that other functions can test if server is fully initialized.
  Free all slave data in one place (fewer ifdef's)
  Removed not needed call to close_active_mi()
  Initialize slaves() later in startup to ensure that everthing is really initialized when slaves start.
  Made status variable slave_running multi-source safe
sql/mysqld.h:
  Added mysqld_server_initialized
sql/rpl_mi.cc:
  Store connection name and cmp_connection_name (only used for show full slave status) in Master_info
  Added code for Master_info_index, which handles storage of multi-master information
  Don't write the empty "" connection_name to multi-master.info file. This is handled by the original code.
sql/rpl_mi.h:
  Added connection_name and Master_info_index
sql/rpl_rli.cc:
  Added connection_name to relay log files.
sql/rpl_rli.h:
  Fixed type of slave_skip_counter as we now access it directly in sys_vars.cc, so it must be uint
sql/share/errmsg-utf8.txt:
  Added new error messages needed for multi-source
  Added multi-source name to error ER_MASTER_INFO and WARN_NO_MASTER_INFO
sql/slave.cc:
  Moved things a bit around to make it easier to handle error conditions.
  Create a global master_info_index and add the "" connection to it
  Ensure that new Master_info doesn't fail.
  Don't call terminate_slave_threads(active_mi..) on end_slave() as this is now done automaticly when deleting master_info_index.
  Delete not needed function close_active_mi(). One can achive same thing by calling end_slave().
  Added support for SHOW FULL SLAVE STATUS (show status for all master connections with connection_name as first column)
sql/slave.h:
  Added new prototypes
sql/sql_base.cc:
  More DBUG_PRINT
sql/sql_class.cc:
  Reset thd->connection_name and thd-->default_master_connection
sql/sql_class.h:
  Added thd->connection_name and thd-->default_master_connection
  Added slave_skip_count to variables to make changing the @@sql_slave_skip_count variable thread safe
sql/sql_const.h:
  Added MAX_CONNECTION_NAME
sql/sql_lex.cc:
  Reset 'lex->verbose' (to simplify some sql_yacc.yy code)
sql/sql_lex.h:
  Added connection_name
sql/sql_parse.cc:
  Added support for connection_name to all SLAVE commands.
  - Instead of using active_mi, we now get the current Master_info from master_info_index.
  - Create new replication threads with CHANGE MASTER
  - Added support for show_all_master_info()
sql/sql_reload.cc:
  Made reset/full slave use master_info_index->get_master_info() instead of active_mi.
  If one uses 'RESET SLAVE "connection_name" all' the connection is removed from master_info_index.
sql/sql_repl.cc:
  sql_slave_skip_counter is moved to thd->variables to make it thread safe and fix some bugs with it
  Add connection name to relay log files.
  Added connection name to errors.
  Added some logging for multi-master if log_warnings > 1
  stop_slave():
  - Don't check if thd is set. It's guaranteed to always be set.
  change_master():
  - Check for duplicate connection names in change_master()
  - Check for wrong arguments first in file (to simplify error handling)
  - Register new connections in master_info_index
sql/sql_yacc.yy:
  Added optional connection_name to a all relevant master/slave commands
sql/strfunc.cc:
  my_global.h shoud always be included first.
sql/sys_vars.cc:
  Added variable default_master_connection
  Made variable sql_slave_skip_counter multi-source safe
sql/sys_vars.h:
  Added Sys_var_session_lexstring (needed for default_master_connection)
  Added Sys_var_multi_source_uint (needed for sql_slave_skip_counter).
This commit is contained in:
Michael Widenius
2012-09-28 02:06:56 +03:00
parent 620d14f8c3
commit 1864d9596d
41 changed files with 1833 additions and 234 deletions

View File

@ -20,6 +20,7 @@
#include "unireg.h" // REQUIRED by other includes
#include "rpl_mi.h"
#include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD
#include "strfunc.h"
#ifdef HAVE_REPLICATION
@ -27,7 +28,8 @@
static void init_master_log_pos(Master_info* mi);
Master_info::Master_info(bool is_slave_recovery)
Master_info::Master_info(LEX_STRING *connection_name_arg,
bool is_slave_recovery)
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(1), fd(-1), io_thd(0),
rli(is_slave_recovery), port(MYSQL_PORT),
@ -40,6 +42,21 @@ Master_info::Master_info(bool is_slave_recovery)
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
/* Store connection name and lower case connection name */
connection_name.length= cmp_connection_name.length=
connection_name_arg->length;
if ((connection_name.str= (char*) my_malloc(connection_name_arg->length*2+2,
MYF(MY_WME))))
{
cmp_connection_name.str= (connection_name.str +
connection_name_arg->length+1);
strmake(connection_name.str, connection_name_arg->str,
connection_name.length);
memcpy(cmp_connection_name.str, connection_name_arg->str,
connection_name.length+1);
my_casedn_str(system_charset_info, cmp_connection_name.str);
}
my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16);
bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
@ -55,6 +72,7 @@ Master_info::Master_info(bool is_slave_recovery)
Master_info::~Master_info()
{
my_free(connection_name.str);
delete_dynamic(&ignore_server_ids);
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
@ -407,7 +425,7 @@ file '%s')", fname);
mi->master_log_name,
(ulong) mi->master_log_pos));
mi->rli.mi = mi;
mi->rli.mi= mi;
if (init_relay_log_info(&mi->rli, slave_info_fname))
goto err;
@ -560,5 +578,455 @@ void end_master_info(Master_info* mi)
DBUG_VOID_RETURN;
}
/* Multi-Master By P.Linux */
uchar *get_key_master_info(Master_info *mi, size_t *length,
my_bool not_used __attribute__((unused)))
{
/* Return lower case name */
*length= mi->cmp_connection_name.length;
return (uchar*) mi->cmp_connection_name.str;
}
void free_key_master_info(Master_info *mi)
{
DBUG_ENTER("free_key_master_info");
terminate_slave_threads(mi,SLAVE_FORCE_ALL);
end_master_info(mi);
delete mi;
DBUG_VOID_RETURN;
}
/**
Check if connection name for master_info is valid.
It's valid if it's a valid system name, is less than
MAX_CONNECTION_NAME.
@return
0 ok
1 error
*/
bool check_master_connection_name(LEX_STRING *name)
{
if (name->length >= MAX_CONNECTION_NAME)
return 1;
return 0;
}
/**
Create a log file with a signed suffix.
@param
res_file_name Store result here
length Length of res_file_name buffer
info_file Original file name (prefix)
separator Separator character
suffix Suffix
@note
If suffix is an empty string, then we don't add any suffix.
This is to allow one to use this function also to generate old
file names without a prefix.
*/
void create_signed_file_name(char *res_file_name, uint length,
const char *info_file,
char separator, LEX_STRING *suffix)
{
char buff[MAX_CONNECTION_NAME+1], res[MAX_CONNECTION_NAME+1], *p;
p= strmake(res_file_name, info_file, length);
if (suffix->length != 0 && p != info_file + length)
{
uint errors;
size_t res_length;
*p++= separator;
/* Create null terminated string */
strmake(buff, suffix->str, suffix->length);
/* Convert to lower case */
my_casedn_str(system_charset_info, buff);
/* Convert to characters usable in a file name */
res_length= strconvert(system_charset_info, buff,
&my_charset_filename, res, sizeof(res), &errors);
strmake(p, res, min(length - (p - res_file_name), res_length));
}
}
Master_info_index::Master_info_index()
{
index_file_name[0] = 0;
bzero((char*) &index_file, sizeof(index_file));
}
Master_info_index::~Master_info_index()
{
/* This will close connection for all objects in the cache */
my_hash_free(&master_info_hash);
end_io_cache(&index_file);
if (index_file.file > 0)
my_close(index_file.file, MYF(MY_WME));
}
/* Load All Master_info from master.info.index File
* RETURN:
* 0 - All Success
* 1 - All Fail
* 2 - Some Success, Some Fail
*/
bool Master_info_index::init_all_master_info()
{
int thread_mask;
int err_num= 0, succ_num= 0; // The number of success read Master_info
char sign[MAX_CONNECTION_NAME];
File index_file_nr;
size_t filename_length, dir_length;
DBUG_ENTER("init_all_master_info");
/*
Create the Master_info index file by prepending 'multi-' before
the master_info_file file name.
*/
fn_format(index_file_name, master_info_file, mysql_data_home,
"", MY_UNPACK_FILENAME);
filename_length= strlen(index_file_name) + 1; /* Count 0 byte */
dir_length= dirname_length(index_file_name);
bmove_upp((uchar*) index_file_name + filename_length + 6,
(uchar*) index_file_name + filename_length,
filename_length - dir_length);
memcpy(index_file_name + dir_length, "multi-", 6);
if ((index_file_nr= my_open(index_file_name,
O_RDWR | O_CREAT | O_BINARY ,
MYF(MY_WME | ME_NOREFRESH))) < 0 ||
my_sync(index_file_nr, MYF(MY_WME)) ||
init_io_cache(&index_file, index_file_nr,
IO_SIZE, READ_CACHE,
my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
0, MYF(MY_WME | MY_WAIT_IF_FULL)))
{
if (index_file_nr >= 0)
my_close(index_file_nr,MYF(0));
sql_print_error("Creation of Master_info index file '%s' failed",
index_file_name);
DBUG_RETURN(1);
}
/* Initialize Master_info Hash Table */
if (my_hash_init(&master_info_hash, system_charset_info,
MAX_REPLICATION_THREAD, 0, 0,
(my_hash_get_key) get_key_master_info,
(my_hash_free_key)free_key_master_info, HASH_UNIQUE))
{
sql_print_error("Initializing Master_info hash table failed");
DBUG_RETURN(1);
}
reinit_io_cache(&index_file, READ_CACHE, 0L,0,0);
while (!init_strvar_from_file(sign, sizeof(sign),
&index_file, NULL))
{
LEX_STRING connection_name;
Master_info *mi;
char buf_master_info_file[FN_REFLEN];
char buf_relay_log_info_file[FN_REFLEN];
connection_name.str= sign;
connection_name.length= strlen(sign);
if (!(mi= new Master_info(&connection_name, relay_log_recovery)) ||
mi->error())
{
delete mi;
DBUG_RETURN(1);
}
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
create_signed_file_name(buf_master_info_file, sizeof(buf_master_info_file),
master_info_file, '.', &connection_name);
create_signed_file_name(buf_relay_log_info_file,
sizeof(buf_relay_log_info_file),
relay_log_info_file, '.', &connection_name);
if (global_system_variables.log_warnings > 1)
sql_print_information("Reading Master_info: '%s' Relay_info:'%s'",
buf_master_info_file, buf_relay_log_info_file);
if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file,
0, thread_mask))
{
err_num++;
sql_print_error("Initialized Master_info from '%s' failed",
buf_master_info_file);
if (!master_info_index->get_master_info(&connection_name,
MYSQL_ERROR::WARN_LEVEL_NOTE))
{
/* Master_info is not in HASH; Add it */
if (master_info_index->add_master_info(mi, FALSE))
return 1;
succ_num++;
unlock_slave_threads(mi);
}
else
{
/* Master_info already in HASH */
sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
(int) connection_name.length, connection_name.str);
unlock_slave_threads(mi);
delete mi;
}
continue;
}
else
{
/* Initialization of Master_info succeded. Add it to HASH */
if (global_system_variables.log_warnings > 1)
sql_print_information("Initialized Master_info from '%s'",
buf_master_info_file);
if (master_info_index->get_master_info(&connection_name,
MYSQL_ERROR::WARN_LEVEL_NOTE))
{
/* Master_info was already registered */
sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
(int) connection_name.length, connection_name.str);
unlock_slave_threads(mi);
delete mi;
continue;
}
/* Master_info was not registered; add it */
if (master_info_index->add_master_info(mi, FALSE))
return 1;
succ_num++;
unlock_slave_threads(mi);
if (!opt_skip_slave_start)
{
if (start_slave_threads(1 /* need mutex */,
0 /* no wait for start*/,
mi,
buf_master_info_file,
buf_relay_log_info_file,
SLAVE_IO | SLAVE_SQL))
{
sql_print_error("Failed to create slave threads for connection %.*s",
(int) connection_name.length,
connection_name.str);
continue;
}
if (global_system_variables.log_warnings)
sql_print_information("Started replication for '%.*s'",
(int) connection_name.length,
connection_name.str);
}
}
}
if (!err_num) // No Error on read Master_info
{
if (global_system_variables.log_warnings > 1)
sql_print_information("Reading of all Master_info entries succeded");
DBUG_RETURN(0);
}
else if (succ_num) // Have some Error and some Success
{
sql_print_warning("Reading of some Master_info entries failed");
DBUG_RETURN(2);
}
else // All failed
{
sql_print_error("Reading of all Master_info entries failed!");
DBUG_RETURN(1);
}
}
/* Write new master.info to master.info.index File */
bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name,
bool do_sync)
{
DBUG_ASSERT(my_b_inited(&index_file) != 0);
DBUG_ENTER("write_master_name_to_index_file");
/* Don't write default slave to master_info.index */
if (name->length == 0)
DBUG_RETURN(0);
reinit_io_cache(&index_file, WRITE_CACHE,
my_b_filelength(&index_file), 0, 0);
if (my_b_write(&index_file, (uchar*) name->str, name->length) ||
my_b_write(&index_file, (uchar*) "\n", 1) ||
flush_io_cache(&index_file) ||
(do_sync && my_sync(index_file.file, MYF(MY_WME))))
{
sql_print_error("Write of new Master_info for '%.*s' to index file failed",
(int) name->length, name->str);
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
/**
Get Master_info for a connection
@param
connection_name Connection name
warning WARN_LEVEL_NOTE -> Don't print anything
WARN_LEVEL_WARN -> Issue warning if not exists
WARN_LEVEL_ERROR-> Issue error if not exists
*/
Master_info *
Master_info_index::get_master_info(LEX_STRING *connection_name,
MYSQL_ERROR::enum_warning_level warning)
{
Master_info *mi;
char buff[MAX_CONNECTION_NAME+1], *res;
uint buff_length;
/* Make name lower case for comparison */
res= strmake(buff, connection_name->str, connection_name->length);
my_casedn_str(system_charset_info, buff);
buff_length= (size_t) (res-buff);
mi= (Master_info*) my_hash_search(&master_info_hash,
(uchar*) buff, buff_length);
if (!mi && warning != MYSQL_ERROR::WARN_LEVEL_NOTE)
{
my_error(WARN_NO_MASTER_INFO,
MYF(warning == MYSQL_ERROR::WARN_LEVEL_WARN ? ME_JUST_WARNING :
0),
(int) connection_name->length,
connection_name->str);
}
return mi;
}
/* Check Master_host & Master_port is duplicated or not */
bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg,
const char *host,
uint port)
{
Master_info *mi;
/* Get full host and port name */
if ((mi= master_info_index->get_master_info(name_arg,
MYSQL_ERROR::WARN_LEVEL_NOTE)))
{
if (!host)
host= mi->host;
if (!port)
port= mi->port;
}
if (!host || !port)
return FALSE; // Not comparable yet
for (uint i= 0; i < master_info_hash.records; ++i)
{
Master_info *tmp_mi;
tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
if (tmp_mi == mi)
continue; // Current connection
if (!strcasecmp(host, tmp_mi->host) && port == tmp_mi->port)
{
sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
(int) tmp_mi->connection_name.length,
tmp_mi->connection_name.str);
return TRUE;
}
}
return FALSE;
}
/* Add a Master_info class to Hash Table */
bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
{
if (!my_hash_insert(&master_info_hash, (uchar*) mi))
{
if (global_system_variables.log_warnings > 1)
sql_print_information("Added new Master_info '%.*s' to hash table",
(int) mi->connection_name.length,
mi->connection_name.str);
if (write_to_file)
return write_master_name_to_index_file(&mi->connection_name, 1);
return FALSE;
}
/* Impossible error (EOM) ? */
sql_print_error("Adding new entry '%.*s' to master_info failed",
(int) mi->connection_name.length,
mi->connection_name.str);
return TRUE;
}
/**
Remove a Master_info class From Hash Table
TODO: Change this to use my_rename() to make the file name creation
atomic
*/
bool Master_info_index::remove_master_info(LEX_STRING *name)
{
Master_info* mi;
DBUG_ENTER("remove_master_info");
if ((mi= get_master_info(name, MYSQL_ERROR::WARN_LEVEL_WARN)))
{
// Delete Master_info and rewrite others to file
if (!my_hash_delete(&master_info_hash, (uchar*) mi))
{
File index_file_nr;
// Close IO_CACHE and FILE handler fisrt
end_io_cache(&index_file);
my_close(index_file.file, MYF(MY_WME));
// Reopen File and truncate it
fn_format(index_file_name, master_info_file, mysql_data_home,
".index", MY_UNPACK_FILENAME | MY_APPEND_EXT);
if ((index_file_nr= my_open(index_file_name,
O_RDWR | O_CREAT | O_TRUNC | O_BINARY ,
MYF(MY_WME))) < 0 ||
init_io_cache(&index_file, index_file_nr,
IO_SIZE, WRITE_CACHE,
my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
0, MYF(MY_WME | MY_WAIT_IF_FULL)))
{
int error= my_errno;
if (index_file_nr >= 0)
my_close(index_file_nr,MYF(0));
sql_print_error("Create of Master Info Index file '%s' failed with "
"error: %M",
index_file_name, error);
DBUG_RETURN(TRUE);
}
// Rewrite Master_info.index
uint i;
for (i= 0; i< master_info_hash.records; ++i)
{
Master_info *tmp_mi;
tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
write_master_name_to_index_file(&tmp_mi->connection_name, 0);
}
my_sync(index_file_nr, MYF(MY_WME));
}
}
DBUG_RETURN(FALSE);
}
#endif /* HAVE_REPLICATION */