mirror of
https://github.com/MariaDB/server.git
synced 2025-08-01 03:47:19 +03:00
Lots of code fixes to the replication code (especially the binary logging and index log file handling)
Fixed bugs in my last changeset that made MySQL hard to compile. Added mutex around some data that could cause table cache corruptions when using OPTIMIZE TABLE / REPAIR TABLE or automatic repair of MyISAM tables. Added mutex around some data in the slave start/stop code that could cause THD linked list corruptions Extended my_chsize() to allow one to specify a filler character. Extend vio_blocking to return the old state (This made some usage of this function much simpler) Added testing for some functions that they caller have got the required mutexes before calling the function. Use setrlimit() to ensure that we can write core file if one specifies --core-file. Added --slave-compressed-protocol Made 2 the minimum length for ft_min_word_len Added variables foreign_key_checks & unique_checks. Less logging from replication code (if not started with --log-warnings) Changed that SHOW INNODB STATUS requre the SUPER privilege More DBUG statements and a lot of new code comments
This commit is contained in:
236
sql/sql_repl.cc
236
sql/sql_repl.cc
@ -180,6 +180,27 @@ err:
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Adjust the position pointer in the binary log file for all running slaves
|
||||
|
||||
SYNOPSIS
|
||||
adjust_linfo_offsets()
|
||||
purge_offset Number of bytes removed from start of log index file
|
||||
|
||||
NOTES
|
||||
- This is called when doing a PURGE when we delete lines from the
|
||||
index log file
|
||||
|
||||
REQUIREMENTS
|
||||
- Before calling this function, we have to ensure that no threads are
|
||||
using any binary log file before purge_offset.a
|
||||
|
||||
TODO
|
||||
- Inform the slave threads that they should sync the position
|
||||
in the binary log file with flush_relay_log_info.
|
||||
Now they sync is done for next read.
|
||||
*/
|
||||
|
||||
void adjust_linfo_offsets(my_off_t purge_offset)
|
||||
{
|
||||
THD *tmp;
|
||||
@ -193,9 +214,10 @@ void adjust_linfo_offsets(my_off_t purge_offset)
|
||||
if ((linfo = tmp->current_linfo))
|
||||
{
|
||||
pthread_mutex_lock(&linfo->lock);
|
||||
/* index file offset can be less that purge offset
|
||||
only if we just started reading the index file. In that case
|
||||
we have nothing to adjust
|
||||
/*
|
||||
Index file offset can be less that purge offset only if
|
||||
we just started reading the index file. In that case
|
||||
we have nothing to adjust
|
||||
*/
|
||||
if (linfo->index_file_offset < purge_offset)
|
||||
linfo->fatal = (linfo->index_file_offset != 0);
|
||||
@ -274,7 +296,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
|
||||
{
|
||||
LOG_INFO linfo;
|
||||
char *log_file_name = linfo.log_file_name;
|
||||
char search_file_name[FN_REFLEN];
|
||||
char search_file_name[FN_REFLEN], *name;
|
||||
IO_CACHE log;
|
||||
File file = -1;
|
||||
String* packet = &thd->packet;
|
||||
@ -295,7 +317,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
if (!mysql_bin_log.is_open())
|
||||
{
|
||||
errmsg = "Binary log is not open";
|
||||
@ -307,17 +328,18 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
|
||||
goto err;
|
||||
}
|
||||
|
||||
name=search_file_name;
|
||||
if (log_ident[0])
|
||||
mysql_bin_log.make_log_name(search_file_name, log_ident);
|
||||
else
|
||||
search_file_name[0] = 0;
|
||||
name=0; // Find first log
|
||||
|
||||
linfo.index_file_offset = 0;
|
||||
thd->current_linfo = &linfo;
|
||||
|
||||
if (mysql_bin_log.find_first_log(&linfo, search_file_name))
|
||||
if (mysql_bin_log.find_log_pos(&linfo, name))
|
||||
{
|
||||
errmsg = "Could not find first log";
|
||||
errmsg = "Could not find first log file name in binary log index file";
|
||||
goto err;
|
||||
}
|
||||
|
||||
@ -332,19 +354,19 @@ impossible position";
|
||||
}
|
||||
|
||||
my_b_seek(&log, pos); // Seek will done on next read
|
||||
packet->length(0);
|
||||
// we need to start a packet with something other than 255
|
||||
// to distiquish it from error
|
||||
packet->append("\0", 1);
|
||||
/*
|
||||
We need to start a packet with something other than 255
|
||||
to distiquish it from error
|
||||
*/
|
||||
packet->set("\0", 1);
|
||||
|
||||
// if we are at the start of the log
|
||||
if (pos == 4)
|
||||
if (pos == BIN_LOG_HEADER_SIZE)
|
||||
{
|
||||
// tell the client log name with a fake rotate_event
|
||||
if (fake_rotate_event(net, packet, log_file_name, &errmsg))
|
||||
goto err;
|
||||
packet->length(0);
|
||||
packet->append("\0", 1);
|
||||
packet->set("\0", 1);
|
||||
}
|
||||
|
||||
while (!net->error && net->vio != 0 && !thd->killed)
|
||||
@ -376,20 +398,21 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
packet->length(0);
|
||||
packet->append("\0",1);
|
||||
packet->set("\0", 1);
|
||||
}
|
||||
// TODO: now that we are logging the offset, check to make sure
|
||||
// the recorded offset and the actual match
|
||||
/*
|
||||
TODO: now that we are logging the offset, check to make sure
|
||||
the recorded offset and the actual match
|
||||
*/
|
||||
if (error != LOG_READ_EOF)
|
||||
{
|
||||
switch(error) {
|
||||
switch (error) {
|
||||
case LOG_READ_BOGUS:
|
||||
errmsg = "bogus data in log event";
|
||||
break;
|
||||
case LOG_READ_TOO_LARGE:
|
||||
errmsg = "log event entry exceeded max_allowed_packet -\
|
||||
increase max_allowed_packet on master";
|
||||
errmsg = "log event entry exceeded max_allowed_packet; \
|
||||
Increase max_allowed_packet on master";
|
||||
break;
|
||||
case LOG_READ_IO:
|
||||
errmsg = "I/O error reading log event";
|
||||
@ -410,18 +433,21 @@ impossible position";
|
||||
if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
|
||||
mysql_bin_log.is_active(log_file_name))
|
||||
{
|
||||
// block until there is more data in the log
|
||||
// unless non-blocking mode requested
|
||||
/*
|
||||
Block until there is more data in the log
|
||||
*/
|
||||
if (net_flush(net))
|
||||
{
|
||||
errmsg = "failed on net_flush()";
|
||||
goto err;
|
||||
}
|
||||
|
||||
// we may have missed the update broadcast from the log
|
||||
// that has just happened, let's try to catch it if it did
|
||||
// if we did not miss anything, we just wait for other threads
|
||||
// to signal us
|
||||
/*
|
||||
We may have missed the update broadcast from the log
|
||||
that has just happened, let's try to catch it if it did.
|
||||
If we did not miss anything, we just wait for other threads
|
||||
to signal us.
|
||||
*/
|
||||
{
|
||||
log.error=0;
|
||||
bool read_packet = 0, fatal_error = 0;
|
||||
@ -435,32 +461,32 @@ impossible position";
|
||||
}
|
||||
#endif
|
||||
|
||||
// no one will update the log while we are reading
|
||||
// now, but we'll be quick and just read one record
|
||||
/*
|
||||
No one will update the log while we are reading
|
||||
now, but we'll be quick and just read one record
|
||||
|
||||
To be able to handle EOF properly, we have to have the
|
||||
pthread_mutex_unlock() statements in the case statements.
|
||||
*/
|
||||
pthread_mutex_lock(log_lock);
|
||||
switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0))
|
||||
{
|
||||
switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
|
||||
case 0:
|
||||
pthread_mutex_unlock(log_lock);
|
||||
/* we read successfully, so we'll need to send it to the slave */
|
||||
read_packet = 1;
|
||||
// we read successfully, so we'll need to send it to the
|
||||
// slave
|
||||
break;
|
||||
|
||||
case LOG_READ_EOF:
|
||||
DBUG_PRINT("wait",("waiting for data in binary log"));
|
||||
// wait_for_update unlocks the log lock - needed to avoid race
|
||||
if (!thd->killed)
|
||||
mysql_bin_log.wait_for_update(thd);
|
||||
else
|
||||
pthread_mutex_unlock(log_lock);
|
||||
DBUG_PRINT("wait",("binary log received update"));
|
||||
break;
|
||||
|
||||
default:
|
||||
pthread_mutex_unlock(log_lock);
|
||||
fatal_error = 1;
|
||||
break;
|
||||
}
|
||||
pthread_mutex_unlock(log_lock);
|
||||
|
||||
if (read_packet)
|
||||
{
|
||||
@ -479,10 +505,11 @@ impossible position";
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
packet->length(0);
|
||||
packet->append("\0",1);
|
||||
// no need to net_flush because we will get to flush later when
|
||||
// we hit EOF pretty quick
|
||||
packet->set("\0", 1);
|
||||
/*
|
||||
No need to net_flush because we will get to flush later when
|
||||
we hit EOF pretty quick
|
||||
*/
|
||||
}
|
||||
|
||||
if (fatal_error)
|
||||
@ -539,7 +566,6 @@ impossible position";
|
||||
err:
|
||||
thd->proc_info = "waiting to finalize termination";
|
||||
end_io_cache(&log);
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
/*
|
||||
Exclude iteration through thread list
|
||||
this is needed for purge_logs() - it will iterate through
|
||||
@ -547,6 +573,7 @@ impossible position";
|
||||
this mutex will make sure that it never tried to update our linfo
|
||||
after we return from this stack frame
|
||||
*/
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thd->current_linfo = 0;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
if (file >= 0)
|
||||
@ -561,16 +588,19 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
|
||||
if (!thd) thd = current_thd;
|
||||
NET* net = &thd->net;
|
||||
int thread_mask;
|
||||
DBUG_ENTER("start_slave");
|
||||
|
||||
if (check_access(thd, SUPER_ACL, any_db))
|
||||
return 1;
|
||||
DBUG_RETURN(1);
|
||||
lock_slave_threads(mi); // this allows us to cleanly read slave_running
|
||||
init_thread_mask(&thread_mask,mi,1 /* inverse */);
|
||||
if (thd->lex.slave_thd_opt)
|
||||
thread_mask &= thd->lex.slave_thd_opt;
|
||||
if (thread_mask)
|
||||
{
|
||||
if (server_id_supplied && (!mi->inited || (mi->inited && *mi->host)))
|
||||
if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
|
||||
slave_errno=ER_MASTER_INFO;
|
||||
else if (server_id_supplied && *mi->host)
|
||||
slave_errno = start_slave_threads(0 /*no mutex */,
|
||||
1 /* wait for start */,
|
||||
mi,
|
||||
@ -588,12 +618,12 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
|
||||
{
|
||||
if (net_report)
|
||||
send_error(net, slave_errno);
|
||||
return 1;
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
else if (net_report)
|
||||
send_ok(net);
|
||||
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
|
||||
@ -628,7 +658,7 @@ int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
|
||||
return 0;
|
||||
}
|
||||
|
||||
int reset_slave(MASTER_INFO* mi)
|
||||
int reset_slave(THD *thd, MASTER_INFO* mi)
|
||||
{
|
||||
MY_STAT stat_area;
|
||||
char fname[FN_REFLEN];
|
||||
@ -639,7 +669,9 @@ int reset_slave(MASTER_INFO* mi)
|
||||
lock_slave_threads(mi);
|
||||
init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */);
|
||||
if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/))
|
||||
|| (error=purge_relay_logs(&mi->rli,1 /*just reset*/,&errmsg)))
|
||||
|| (error=purge_relay_logs(&mi->rli, thd,
|
||||
1 /* just reset */,
|
||||
&errmsg)))
|
||||
goto err;
|
||||
|
||||
end_master_info(mi);
|
||||
@ -713,16 +745,17 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
thd->proc_info = "changing master";
|
||||
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
|
||||
// TODO: see if needs re-write
|
||||
if (init_master_info(mi,master_info_file,relay_log_info_file))
|
||||
if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
|
||||
{
|
||||
send_error(&thd->net, 0, "Could not initialize master info");
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
/* data lock not needed since we have already stopped the running threads,
|
||||
and we have the hold on the run locks which will keep all threads that
|
||||
could possibly modify the data structures from running
|
||||
/*
|
||||
Data lock not needed since we have already stopped the running threads,
|
||||
and we have the hold on the run locks which will keep all threads that
|
||||
could possibly modify the data structures from running
|
||||
*/
|
||||
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
|
||||
{
|
||||
@ -772,7 +805,8 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
{
|
||||
mi->rli.skip_log_purge=0;
|
||||
thd->proc_info="purging old relay logs";
|
||||
if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
|
||||
if (purge_relay_logs(&mi->rli, thd,
|
||||
0 /* not only reset, but also reinit */,
|
||||
&errmsg))
|
||||
{
|
||||
net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
|
||||
@ -782,12 +816,13 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
else
|
||||
{
|
||||
const char* msg;
|
||||
if (init_relay_log_pos(&mi->rli,0/*log already inited*/,
|
||||
0 /*pos already inited*/,
|
||||
/* Relay log is already initialized */
|
||||
if (init_relay_log_pos(&mi->rli,
|
||||
mi->rli.relay_log_name,
|
||||
mi->rli.relay_log_pos,
|
||||
0 /*no data lock*/,
|
||||
&msg))
|
||||
{
|
||||
//Sasha: note that I had to change net_printf() to make this work
|
||||
net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
|
||||
unlock_slave_threads(mi);
|
||||
DBUG_RETURN(1);
|
||||
@ -857,26 +892,27 @@ int show_binlog_events(THD* thd)
|
||||
|
||||
if (mysql_bin_log.is_open())
|
||||
{
|
||||
LOG_INFO linfo;
|
||||
char search_file_name[FN_REFLEN];
|
||||
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
|
||||
LEX_MASTER_INFO *lex_mi = &thd->lex.mi;
|
||||
uint event_count, limit_start, limit_end;
|
||||
const char* log_file_name = lex_mi->log_file_name;
|
||||
Log_event* ev;
|
||||
my_off_t pos = lex_mi->pos;
|
||||
char search_file_name[FN_REFLEN], *name;
|
||||
const char *log_file_name = lex_mi->log_file_name;
|
||||
LOG_INFO linfo;
|
||||
Log_event* ev;
|
||||
|
||||
limit_start = thd->lex.select->offset_limit;
|
||||
limit_end = thd->lex.select->select_limit + limit_start;
|
||||
|
||||
name= search_file_name;
|
||||
if (log_file_name)
|
||||
mysql_bin_log.make_log_name(search_file_name, log_file_name);
|
||||
else
|
||||
search_file_name[0] = 0;
|
||||
name=0; // Find first log
|
||||
|
||||
linfo.index_file_offset = 0;
|
||||
thd->current_linfo = &linfo;
|
||||
|
||||
if (mysql_bin_log.find_first_log(&linfo, search_file_name))
|
||||
if (mysql_bin_log.find_log_pos(&linfo, name))
|
||||
{
|
||||
errmsg = "Could not find target log";
|
||||
goto err;
|
||||
@ -981,71 +1017,65 @@ int show_binlog_info(THD* thd)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Send a lost of all binary logs to client
|
||||
|
||||
SYNOPSIS
|
||||
show_binlogs()
|
||||
thd Thread specific variable
|
||||
|
||||
RETURN VALUES
|
||||
0 ok
|
||||
1 error (Error message sent to client)
|
||||
*/
|
||||
|
||||
int show_binlogs(THD* thd)
|
||||
{
|
||||
const char* errmsg = 0;
|
||||
File index_file;
|
||||
const char *errmsg;
|
||||
IO_CACHE *index_file;
|
||||
char fname[FN_REFLEN];
|
||||
NET* net = &thd->net;
|
||||
List<Item> field_list;
|
||||
String* packet = &thd->packet;
|
||||
IO_CACHE io_cache;
|
||||
String *packet = &thd->packet;
|
||||
uint length;
|
||||
|
||||
if (!mysql_bin_log.is_open())
|
||||
{
|
||||
errmsg = "binlog is not open";
|
||||
goto err;
|
||||
//TODO: Replace with ER() error message
|
||||
errmsg= "You are not using binary logging";
|
||||
goto err_with_msg;
|
||||
}
|
||||
|
||||
field_list.push_back(new Item_empty_string("Log_name", 128));
|
||||
field_list.push_back(new Item_empty_string("Log_name", 255));
|
||||
if (send_fields(thd, field_list, 1))
|
||||
{
|
||||
sql_print_error("Failed in send_fields");
|
||||
return 1;
|
||||
}
|
||||
|
||||
mysql_bin_log.lock_index();
|
||||
index_file = mysql_bin_log.get_index_file();
|
||||
if (index_file < 0)
|
||||
index_file=mysql_bin_log.get_index_file();
|
||||
|
||||
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
|
||||
|
||||
/* The file ends with EOF or empty line */
|
||||
while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
|
||||
{
|
||||
errmsg = "Uninitialized index file pointer";
|
||||
goto err2;
|
||||
}
|
||||
if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, 0, 0,
|
||||
MYF(MY_WME)))
|
||||
{
|
||||
errmsg = "Failed on init_io_cache()";
|
||||
goto err2;
|
||||
}
|
||||
while ((length=my_b_gets(&io_cache, fname, sizeof(fname))))
|
||||
{
|
||||
fname[--length]=0;
|
||||
int dir_len = dirname_length(fname);
|
||||
packet->length(0);
|
||||
net_store_data(packet, fname + dir_len, length-dir_len);
|
||||
/* The -1 is for removing newline from fname */
|
||||
net_store_data(packet, fname + dir_len, length-1-dir_len);
|
||||
if (my_net_write(net, (char*) packet->ptr(), packet->length()))
|
||||
{
|
||||
sql_print_error("Failed in my_net_write");
|
||||
end_io_cache(&io_cache);
|
||||
mysql_bin_log.unlock_index();
|
||||
return 1;
|
||||
}
|
||||
goto err;
|
||||
}
|
||||
|
||||
mysql_bin_log.unlock_index();
|
||||
end_io_cache(&io_cache);
|
||||
send_eof(net);
|
||||
return 0;
|
||||
|
||||
err2:
|
||||
mysql_bin_log.unlock_index();
|
||||
end_io_cache(&io_cache);
|
||||
err:
|
||||
err_with_msg:
|
||||
send_error(net, 0, errmsg);
|
||||
err:
|
||||
mysql_bin_log.unlock_index();
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int log_loaded_block(IO_CACHE* file)
|
||||
{
|
||||
LOAD_FILE_INFO* lf_info;
|
||||
|
Reference in New Issue
Block a user