1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-29 05:21:33 +03:00

Here comes a nasty patch, although I am not ready to push it yet. I will

first pull, merge,test, and get it to work.

The main change is the new replication code - now we have two slave threads
SQL thread and I/O thread. I have also re-written a lot of the code to 
prepare for multi-master implementation. 

I also documented IO_CACHE quite extensively and to some extend, THD class.


Makefile.am:
  moved tags target script into a separate file
include/my_sys.h:
  fixes in IO_CACHE for SEQ_READ_APPEND + some documentation
libmysqld/lib_sql.cc:
  updated replication locks, but now I see I did it wrong and it won't compile. Will fix
  before the push.
mysql-test/r/rpl000014.result:
  test result update
mysql-test/r/rpl000015.result:
  test result update
mysql-test/r/rpl000016.result:
  test result update
mysql-test/r/rpl_log.result:
  test result update
mysql-test/t/rpl000016-slave.sh:
  remove relay logs
mysql-test/t/rpl000017-slave.sh:
  remove relay logs
mysql-test/t/rpl_log.test:
  updated test
mysys/mf_iocache.c:
  IO_CACHE updates to make replication work
mysys/mf_iocache2.c:
  IO_CACHE update to make replication work
mysys/thr_mutex.c:
  cosmetic change
sql/item_func.cc:
  new replication code
sql/lex.h:
  new replication
sql/log.cc:
  new replication
sql/log_event.cc:
  new replication
sql/log_event.h:
  new replication
sql/mini_client.cc:
  new replication
sql/mini_client.h:
  new replication
sql/mysql_priv.h:
  new replication
sql/mysqld.cc:
  new replication
sql/repl_failsafe.cc:
  new replication
sql/slave.cc:
  new replication
sql/slave.h:
  new replication
sql/sql_class.cc:
  new replication
sql/sql_class.h:
  new replication
sql/sql_lex.h:
  new replication
sql/sql_parse.cc:
  new replication
sql/sql_repl.cc:
  new replication
sql/sql_repl.h:
  new replication
sql/sql_show.cc:
  new replication
sql/sql_yacc.yy:
  new replication
sql/stacktrace.c:
  more robust stack tracing
sql/structs.h:
  new replication code
BitKeeper/etc/ignore:
  Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
This commit is contained in:
unknown
2002-01-19 19:16:52 -07:00
parent 0831ce1c61
commit 5df61c3cdc
39 changed files with 2464 additions and 1032 deletions

View File

@ -24,6 +24,8 @@
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
#include <assert.h>
#ifdef MYSQL_CLIENT
static void pretty_print_str(FILE* file, char* str, int len)
{
@ -118,14 +120,14 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg):
if (thd)
{
server_id = thd->server_id;
log_seq = thd->log_seq;
when = thd->start_time;
log_pos = thd->log_pos;
}
else
{
server_id = ::server_id;
log_seq = 0;
when = time(NULL);
log_pos=0;
}
}
@ -156,12 +158,12 @@ Log_event::Log_event(const char* buf, bool old_format):
server_id = uint4korr(buf + SERVER_ID_OFFSET);
if (old_format)
{
log_seq=0;
log_pos=0;
flags=0;
}
else
{
log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
log_pos = uint4korr(buf + LOG_POS_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
}
#ifndef MYSQL_CLIENT
@ -172,13 +174,13 @@ Log_event::Log_event(const char* buf, bool old_format):
#ifndef MYSQL_CLIENT
int Log_event::exec_event(struct st_master_info* mi)
int Log_event::exec_event(struct st_relay_log_info* rli)
{
if (mi)
if (rli)
{
thd->log_seq = 0;
mi->inc_pos(get_event_len(), log_seq);
flush_master_info(mi);
rli->inc_pos(get_event_len(),log_pos);
DBUG_ASSERT(rli->sql_thd != 0);
flush_relay_log_info(rli);
}
return 0;
}
@ -193,14 +195,14 @@ void Query_log_event::pack_info(String* packet)
char buf[256];
String tmp(buf, sizeof(buf));
tmp.length(0);
if(db && db_len)
if (db && db_len)
{
tmp.append("use ");
tmp.append(db, db_len);
tmp.append("; ", 2);
}
if(query && q_len)
if (query && q_len)
tmp.append(query, q_len);
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
@ -345,7 +347,7 @@ void Log_event::init_show_field_list(List<Item>* field_list)
field_list->push_back(new Item_empty_string("Pos", 20));
field_list->push_back(new Item_empty_string("Event_type", 20));
field_list->push_back(new Item_empty_string("Server_id", 20));
field_list->push_back(new Item_empty_string("Log_seq", 20));
field_list->push_back(new Item_empty_string("Orig_log_pos", 20));
field_list->push_back(new Item_empty_string("Info", 20));
}
@ -363,7 +365,7 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos)
event_type = get_type_str();
net_store_data(packet, event_type, strlen(event_type));
net_store_data(packet, server_id);
net_store_data(packet, log_seq);
net_store_data(packet, log_pos);
pack_info(packet);
return my_net_write(&thd->net, (char*)packet->ptr(), packet->length());
}
@ -392,7 +394,7 @@ int Log_event::write_header(IO_CACHE* file)
long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
int4store(pos, tmp);
pos += 4;
int4store(pos, log_seq);
int4store(pos, log_pos);
pos += 4;
int2store(pos, flags);
pos += 2;
@ -456,7 +458,6 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define LOCK_MUTEX
#endif
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file,
@ -501,7 +502,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
}
buf[data_len] = 0;
memcpy(buf, head, header_size);
if(my_b_read(file, (byte*) buf + header_size,
if (my_b_read(file, (byte*) buf + header_size,
data_len - header_size))
{
error = "read error";
@ -511,9 +512,10 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
res->register_temp_buf(buf);
err:
UNLOCK_MUTEX;
if(error)
if (error)
{
sql_print_error(error);
sql_print_error("Error in Log_event::read_log_event(): '%s', \
data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]);
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
}
return res;
@ -581,9 +583,11 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
#ifdef MYSQL_CLIENT
void Log_event::print_header(FILE* file)
{
char llbuff[22];
fputc('#', file);
print_timestamp(file);
fprintf(file, " server id %d ", server_id);
fprintf(file, " server id %d log_pos %s ", server_id,
llstr(log_pos,llbuff));
}
void Log_event::print_timestamp(FILE* file, time_t* ts)
@ -1187,12 +1191,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
#ifndef MYSQL_CLIENT
void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log)
void Log_event::set_log_pos(MYSQL_LOG* log)
{
log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++;
if (!log_pos)
log_pos = my_b_tell(&log->log_file);
}
void Load_log_event::set_fields(List<Item> &fields)
{
uint i;
@ -1205,14 +1209,20 @@ void Load_log_event::set_fields(List<Item> &fields)
}
Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
Slave_log_event::Slave_log_event(THD* thd_arg,
struct st_relay_log_info* rli):
Log_event(thd_arg),mem_pool(0),master_host(0)
{
if(!mi->inited)
if(!rli->inited)
return;
pthread_mutex_lock(&mi->lock);
MASTER_INFO* mi = rli->mi;
// TODO: re-write this better without holding both
// locks at the same time
pthread_mutex_lock(&mi->data_lock);
pthread_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host);
master_log_len = strlen(mi->log_file_name);
master_log_len = strlen(rli->master_log_name);
// on OOM, just do not initialize the structure and print the error
if((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME))))
@ -1220,13 +1230,14 @@ Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1;
memcpy(master_log, mi->log_file_name, master_log_len + 1);
memcpy(master_log, rli->master_log_name, master_log_len + 1);
master_port = mi->port;
master_pos = mi->pos;
master_pos = rli->master_log_pos;
}
else
sql_print_error("Out of memory while recording slave event");
pthread_mutex_unlock(&mi->lock);
pthread_mutex_unlock(&rli->data_lock);
pthread_mutex_unlock(&mi->data_lock);
}
@ -1533,7 +1544,7 @@ void Execute_load_log_event::pack_info(String* packet)
#endif
#ifndef MYSQL_CLIENT
int Query_log_event::exec_event(struct st_master_info* mi)
int Query_log_event::exec_event(struct st_relay_log_info* rli)
{
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
@ -1553,7 +1564,7 @@ int Query_log_event::exec_event(struct st_master_info* mi)
// sanity check to make sure the master did not get a really bad
// error on the query
if (!check_expected_error(thd, (expected_error = error_code)))
if (!check_expected_error(thd,rli,(expected_error = error_code)))
{
mysql_parse(thd, thd->query, q_len);
if (expected_error !=
@ -1570,8 +1581,8 @@ int Query_log_event::exec_event(struct st_master_info* mi)
else if (expected_error == actual_error)
{
thd->query_error = 0;
*last_slave_error = 0;
last_slave_errno = 0;
*rli->last_slave_error = 0;
rli->last_slave_errno = 0;
}
}
else
@ -1592,17 +1603,17 @@ int Query_log_event::exec_event(struct st_master_info* mi)
if (thd->query_error || thd->fatal_error)
{
slave_print_error(actual_error, "error '%s' on query '%s'",
slave_print_error(rli,actual_error, "error '%s' on query '%s'",
actual_error ? thd->net.last_error :
"unexpected success or fatal error", query);
free_root(&thd->mem_root,0);
return 1;
}
free_root(&thd->mem_root,0);
return Log_event::exec_event(mi);
return Log_event::exec_event(rli);
}
int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
{
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
@ -1625,6 +1636,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
// the table will be opened in mysql_load
if(table_rules_on && !tables_ok(thd, &tables))
{
// TODO: this is a bug - this needs to be moved to the I/O thread
if (net)
skip_load_data_infile(net);
}
@ -1632,7 +1644,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
char llbuff[22];
enum enum_duplicates handle_dup = DUP_IGNORE;
if(sql_ex.opt_flags && REPLACE_FLAG)
if (sql_ex.opt_flags && REPLACE_FLAG)
handle_dup = DUP_REPLACE;
sql_exchange ex((char*)fname, sql_ex.opt_flags &&
DUMPFILE_FLAG );
@ -1663,7 +1675,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
thd->query_error = 1;
if(thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \
'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME,
'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields );
if(net)
net->pkt_nr = thd->net.pkt_nr;
@ -1673,6 +1685,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
// we will just ask the master to send us /dev/null if we do not
// want to load the data
// TODO: this a bug - needs to be done in I/O thread
if (net)
skip_load_data_infile(net);
}
@ -1683,10 +1696,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
if(thd->query_error)
{
int sql_error = thd->net.last_errno;
if(!sql_error)
if (!sql_error)
sql_error = ER_UNKNOWN_ERROR;
slave_print_error(sql_error, "Slave: Error '%s' running load data infile ",
slave_print_error(rli,sql_error,
"Slave: Error '%s' running load data infile ",
ER_SAFE(sql_error));
free_root(&thd->mem_root,0);
return 1;
@ -1699,38 +1713,43 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
return 1;
}
return Log_event::exec_event(mi);
return Log_event::exec_event(rli);
}
int Start_log_event::exec_event(struct st_master_info* mi)
int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
if (!mi->old_format)
if (!rli->mi->old_format)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
}
return Log_event::exec_event(mi);
return Log_event::exec_event(rli);
}
int Stop_log_event::exec_event(struct st_master_info* mi)
int Stop_log_event::exec_event(struct st_relay_log_info* rli)
{
if (mi->pos > 4) // stop event should be ignored after rotate event
// do not clean up immediately after rotate event
if (rli->master_log_pos > 4)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
mi->inc_pos(get_event_len(), log_seq);
flush_master_info(mi);
}
thd->log_seq = 0;
// we do not want to update master_log pos because we get a rotate event
// before stop, so by now master_log_name is set to the next log
// if we updated it, we will have incorrect master coordinates and this
// could give false triggers in MASTER_POS_WAIT() that we have reached
// the targed position when in fact we have not
rli->inc_pos(get_event_len(), 0);
flush_relay_log_info(rli);
return 0;
}
int Rotate_log_event::exec_event(struct st_master_info* mi)
int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
{
bool rotate_binlog = 0, write_slave_event = 0;
char* log_name = mi->log_file_name;
pthread_mutex_lock(&mi->lock);
char* log_name = rli->master_log_name;
pthread_mutex_lock(&rli->data_lock);
// TODO: probably needs re-write
// rotate local binlog only if the name of remote has changed
if (!*log_name || !(log_name[ident_len] == 0 &&
!memcmp(log_name, new_log_ident, ident_len)))
@ -1738,41 +1757,38 @@ int Rotate_log_event::exec_event(struct st_master_info* mi)
write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F)
&& mysql_bin_log.is_open());
rotate_binlog = (*log_name && write_slave_event);
memcpy(log_name, new_log_ident,ident_len );
if (ident_len >= sizeof(rli->master_log_name))
return 1;
memcpy(log_name, new_log_ident,ident_len);
log_name[ident_len] = 0;
}
mi->pos = pos;
mi->last_log_seq = log_seq;
#ifndef DBUG_OFF
if (abort_slave_event_count)
++events_till_abort;
#endif
rli->master_log_pos = pos;
rli->relay_log_pos += get_event_len();
if (rotate_binlog)
{
mysql_bin_log.new_file();
mi->last_log_seq = 0;
rli->master_log_pos = 4;
}
pthread_cond_broadcast(&mi->cond);
pthread_mutex_unlock(&mi->lock);
flush_master_info(mi);
pthread_cond_broadcast(&rli->data_cond);
pthread_mutex_unlock(&rli->data_lock);
flush_relay_log_info(rli);
if (write_slave_event)
{
Slave_log_event s(thd, mi);
Slave_log_event s(thd, rli);
if (s.master_host)
{
s.set_log_seq(0, &mysql_bin_log);
s.set_log_pos(&mysql_bin_log);
s.server_id = ::server_id;
mysql_bin_log.write(&s);
}
}
thd->log_seq = 0;
return 0;
}
int Intvar_log_event::exec_event(struct st_master_info* mi)
int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
{
switch(type)
switch (type)
{
case LAST_INSERT_ID_EVENT:
thd->last_insert_id_used = 1;
@ -1782,18 +1798,18 @@ int Intvar_log_event::exec_event(struct st_master_info* mi)
thd->next_insert_id = val;
break;
}
mi->inc_pending(get_event_len());
rli->inc_pending(get_event_len());
return 0;
}
int Slave_log_event::exec_event(struct st_master_info* mi)
int Slave_log_event::exec_event(struct st_relay_log_info* rli)
{
if(mysql_bin_log.is_open())
mysql_bin_log.write(this);
return Log_event::exec_event(mi);
return Log_event::exec_event(rli);
}
int Create_file_log_event::exec_event(struct st_master_info* mi)
int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname_buf[FN_REFLEN+10];
char *p;
@ -1809,7 +1825,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
@ -1820,7 +1836,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
if (write_base(&file))
{
strmov(p, ".info"); // to have it right in the error message
slave_print_error(my_errno, "Could not write to file '%s'", fname_buf);
slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf);
goto err;
}
end_io_cache(&file);
@ -1830,12 +1846,12 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0)
{
slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
slave_print_error(my_errno, "Write to '%s' failed", fname_buf);
slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf);
goto err;
}
if (mysql_bin_log.is_open())
@ -1846,10 +1862,10 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
return error ? 1 : Log_event::exec_event(mi);
return error ? 1 : Log_event::exec_event(rli);
}
int Delete_file_log_event::exec_event(struct st_master_info* mi)
int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@ -1860,10 +1876,10 @@ int Delete_file_log_event::exec_event(struct st_master_info* mi)
(void)my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
return Log_event::exec_event(mi);
return Log_event::exec_event(rli);
}
int Append_block_log_event::exec_event(struct st_master_info* mi)
int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@ -1873,12 +1889,12 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
memcpy(p, ".data", 6);
if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
{
slave_print_error(my_errno, "Could not open file '%s'", fname);
slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
slave_print_error(my_errno, "Write to '%s' failed", fname);
slave_print_error(rli,my_errno, "Write to '%s' failed", fname);
goto err;
}
if (mysql_bin_log.is_open())
@ -1887,10 +1903,10 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
err:
if (fd >= 0)
my_close(fd, MYF(0));
return error ? error : Log_event::exec_event(mi);
return error ? error : Log_event::exec_event(rli);
}
int Execute_load_log_event::exec_event(struct st_master_info* mi)
int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@ -1906,7 +1922,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
slave_print_error(my_errno, "Could not open file '%s'", fname);
slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
@ -1914,7 +1930,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
(bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT)
{
slave_print_error(0, "File '%s' appears corrupted", fname);
slave_print_error(rli,0, "File '%s' appears corrupted", fname);
goto err;
}
// we want to disable binary logging in slave thread
@ -1927,7 +1943,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
lev->thd = thd;
if (lev->exec_event(0,0))
{
slave_print_error(my_errno, "Failed executing load from '%s'", fname);
slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname);
thd->options = save_options;
goto err;
}
@ -1943,7 +1959,7 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
return error ? error : Log_event::exec_event(mi);
return error ? error : Log_event::exec_event(rli);
}