1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

Merge various replication-related patches into MariaDB 5.3:

- MWL#116 Group commit
 - MWL#136 Enhancements for START TRANSACTION WITH CONSISTENT SNAPSHOT
 - MWL#47 Annotate_rows_log_event
 - MWL#163 innodb_release_locks_early
 - Percona patch enhancing row-based replication for tables with no primary key
This commit is contained in:
unknown
2011-04-08 09:39:33 +02:00
105 changed files with 7078 additions and 905 deletions

View File

@ -108,6 +108,7 @@ static ulonglong rec_count= 0;
static short binlog_flags = 0;
static MYSQL* mysql = NULL;
static const char* dirname_for_local_load= 0;
static bool opt_skip_annotate_rows_events= 0;
/**
Pointer to the Format_description_log_event of the currently active binlog.
@ -129,6 +130,70 @@ enum Exit_status {
OK_STOP
};
/**
Pointer to the last read Annotate_rows_log_event. Having read an
Annotate_rows event, we should not print it immediatedly because all
subsequent rbr events can be filtered away, and have to keep it for a while.
Also because of that when reading a remote Annotate event we have to keep
its binary log representation in a separately allocated buffer.
*/
static Annotate_rows_log_event *annotate_event= NULL;
void free_annotate_event()
{
if (annotate_event)
{
delete annotate_event;
annotate_event= 0;
}
}
Log_event* read_remote_annotate_event(uchar* net_buf, ulong event_len,
const char **error_msg)
{
uchar *event_buf;
Log_event* event;
if (!(event_buf= (uchar*) my_malloc(event_len + 1, MYF(MY_WME))))
{
error("Out of memory");
return 0;
}
memcpy(event_buf, net_buf, event_len);
event_buf[event_len]= 0;
if (!(event= Log_event::read_log_event((const char*) event_buf, event_len,
error_msg, glob_description_event)))
{
my_free(event_buf, MYF(0));
return 0;
}
/*
Ensure the event->temp_buf is pointing to the allocated buffer.
(TRUE = free temp_buf on the event deletion)
*/
event->register_temp_buf((char*)event_buf, TRUE);
return event;
}
void keep_annotate_event(Annotate_rows_log_event* event)
{
free_annotate_event();
annotate_event= event;
}
void print_annotate_event(PRINT_EVENT_INFO *print_event_info)
{
if (annotate_event)
{
annotate_event->print(result_file, print_event_info);
delete annotate_event; // the event should not be printed more than once
annotate_event= 0;
}
}
static Exit_status dump_local_log_entries(PRINT_EVENT_INFO *print_event_info,
const char* logname);
static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
@ -783,8 +848,19 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
case QUERY_EVENT:
{
Query_log_event *qe= (Query_log_event*)ev;
if (!qe->is_trans_keyword() && shall_skip_database(qe->db))
goto end;
if (!qe->is_trans_keyword())
{
if (shall_skip_database(qe->db))
goto end;
}
else
{
/*
In case the event for one of these statements is obtained
from binary log 5.0, make it compatible with 5.1
*/
qe->flags|= LOG_EVENT_SUPPRESS_USE_F;
}
print_use_stmt(print_event_info, qe);
if (opt_base64_output_mode == BASE64_OUTPUT_ALWAYS)
{
@ -933,6 +1009,19 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
my_free(fname, MYF(MY_WME));
break;
}
case ANNOTATE_ROWS_EVENT:
if (!opt_skip_annotate_rows_events)
{
/*
We don't print Annotate event just now because all subsequent
rbr-events can be filtered away. Instead we'll keep the event
till it will be printed together with the first not filtered
away Table map or the last rbr will be processed.
*/
keep_annotate_event((Annotate_rows_log_event*) ev);
destroy_evt= FALSE;
}
break;
case TABLE_MAP_EVENT:
{
Table_map_log_event *map= ((Table_map_log_event *)ev);
@ -942,6 +1031,13 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
destroy_evt= FALSE;
goto end;
}
/*
The Table map is to be printed, so it's just the time when we may
print the kept Annotate event (if there is any).
print_annotate_event() also deletes the kept Annotate event.
*/
print_annotate_event(print_event_info);
size_t len_to= 0;
const char* db_to= binlog_filter->get_rewrite_db(map->get_db_name(), &len_to);
if (len_to && map->rewrite_db(db_to, len_to, glob_description_event))
@ -978,6 +1074,13 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
if (print_event_info->m_table_map_ignored.count() > 0)
print_event_info->m_table_map_ignored.clear_tables();
/*
If there is a kept Annotate event and all corresponding
rbr-events were filtered away, the Annotate event was not
freed and it is just the time to do it.
*/
free_annotate_event();
/*
One needs to take into account an event that gets
filtered but was last event in the statement. If this is
@ -1212,6 +1315,11 @@ that may lead to an endless loop.",
"Updates to a database with a different name than the original. \
Example: rewrite-db='from->to'.",
0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"skip-annotate-rows-events", OPT_SKIP_ANNOTATE_ROWS_EVENTS,
"Don't print Annotate_rows events stored in the binary log.",
(uchar**) &opt_skip_annotate_rows_events,
(uchar**) &opt_skip_annotate_rows_events,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
@ -1683,6 +1791,8 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
cast to uint32.
*/
int4store(buf, (uint32)start_position);
if (!opt_skip_annotate_rows_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags);
size_t tlen = strlen(logname);
@ -1715,18 +1825,30 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
break; // end of data
DBUG_PRINT("info",( "len: %lu net->read_pos[5]: %d\n",
len, net->read_pos[5]));
if (!(ev= Log_event::read_log_event((const char*) net->read_pos + 1 ,
len - 1, &error_msg,
glob_description_event)))
if (net->read_pos[5] == ANNOTATE_ROWS_EVENT)
{
error("Could not construct log event object: %s", error_msg);
DBUG_RETURN(ERROR_STOP);
}
/*
If reading from a remote host, ensure the temp_buf for the
Log_event class is pointing to the incoming stream.
*/
ev->register_temp_buf((char *) net->read_pos + 1, FALSE);
if (!(ev= read_remote_annotate_event(net->read_pos + 1, len - 1,
&error_msg)))
{
error("Could not construct annotate event object: %s", error_msg);
DBUG_RETURN(ERROR_STOP);
}
}
else
{
if (!(ev= Log_event::read_log_event((const char*) net->read_pos + 1 ,
len - 1, &error_msg,
glob_description_event)))
{
error("Could not construct log event object: %s", error_msg);
DBUG_RETURN(ERROR_STOP);
}
/*
If reading from a remote host, ensure the temp_buf for the
Log_event class is pointing to the incoming stream.
*/
ev->register_temp_buf((char *) net->read_pos + 1, FALSE);
}
Log_event_type type= ev->get_type_code();
if (glob_description_event->binlog_version >= 3 ||
@ -2236,6 +2358,7 @@ int main(int argc, char** argv)
if (result_file != stdout)
my_fclose(result_file, MYF(0));
cleanup();
free_annotate_event();
delete binlog_filter;
free_root(&s_mem_root, MYF(0));
free_defaults(defaults_argv);