mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
MDEV-30128 remove support for 5.1- replication events
including patches from Andrei Elkin
This commit is contained in:
@@ -81,7 +81,7 @@ DYNAMIC_ARRAY events_in_stmt; // Storing the events that in one statement
|
||||
String stop_event_string; // Storing the STOP_EVENT output string
|
||||
|
||||
extern "C" {
|
||||
char server_version[SERVER_VERSION_LENGTH];
|
||||
char server_version[SERVER_VERSION_LENGTH]="5.0.0";
|
||||
}
|
||||
|
||||
static char *server_id_str;
|
||||
@@ -277,16 +277,10 @@ class Load_log_processor
|
||||
When we see first event corresponding to some LOAD DATA statement in
|
||||
binlog, we create temporary file to store data to be loaded.
|
||||
We add name of this file to file_names array using its file_id as index.
|
||||
If we have Create_file event (i.e. we have binary log in pre-5.0.3
|
||||
format) we also store save event object to be able which is needed to
|
||||
emit LOAD DATA statement when we will meet Exec_load_data event.
|
||||
If we have Begin_load_query event we simply store 0 in
|
||||
File_name_record::event field.
|
||||
*/
|
||||
struct File_name_record
|
||||
{
|
||||
char *fname;
|
||||
Create_file_log_event *event;
|
||||
};
|
||||
/*
|
||||
@todo Should be a map (e.g., a hash map), not an array. With the
|
||||
@@ -356,7 +350,6 @@ public:
|
||||
if (ptr->fname)
|
||||
{
|
||||
my_free(ptr->fname);
|
||||
delete ptr->event;
|
||||
bzero((char *)ptr, sizeof(File_name_record));
|
||||
}
|
||||
}
|
||||
@@ -364,34 +357,6 @@ public:
|
||||
delete_dynamic(&file_names);
|
||||
}
|
||||
|
||||
/**
|
||||
Obtain Create_file event for LOAD DATA statement by its file_id
|
||||
and remove it from this Load_log_processor's list of events.
|
||||
|
||||
Checks whether we have already seen a Create_file_log_event with
|
||||
the given file_id. If yes, returns a pointer to the event and
|
||||
removes the event from array describing active temporary files.
|
||||
From this moment, the caller is responsible for freeing the memory
|
||||
occupied by the event.
|
||||
|
||||
@param[in] file_id File id identifying LOAD DATA statement.
|
||||
|
||||
@return Pointer to Create_file_log_event, or NULL if we have not
|
||||
seen any Create_file_log_event with this file_id.
|
||||
*/
|
||||
Create_file_log_event *grab_event(uint file_id)
|
||||
{
|
||||
File_name_record *ptr;
|
||||
Create_file_log_event *res;
|
||||
|
||||
if (file_id >= file_names.elements)
|
||||
return 0;
|
||||
ptr= dynamic_element(&file_names, file_id, File_name_record*);
|
||||
if ((res= ptr->event))
|
||||
bzero((char *)ptr, sizeof(File_name_record));
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
Obtain file name of temporary file for LOAD DATA statement by its
|
||||
file_id and remove it from this Load_log_processor's list of events.
|
||||
@@ -415,124 +380,18 @@ public:
|
||||
if (file_id >= file_names.elements)
|
||||
return 0;
|
||||
ptr= dynamic_element(&file_names, file_id, File_name_record*);
|
||||
if (!ptr->event)
|
||||
{
|
||||
res= ptr->fname;
|
||||
bzero((char *)ptr, sizeof(File_name_record));
|
||||
}
|
||||
res= ptr->fname;
|
||||
bzero((char *)ptr, sizeof(File_name_record));
|
||||
return res;
|
||||
}
|
||||
Exit_status process(Create_file_log_event *ce);
|
||||
Exit_status process(Begin_load_query_log_event *ce);
|
||||
Exit_status process(Begin_load_query_log_event *blqe);
|
||||
Exit_status process(Append_block_log_event *ae);
|
||||
File prepare_new_file_for_old_format(Load_log_event *le, char *filename);
|
||||
Exit_status load_old_format_file(NET* net, const char *server_fname,
|
||||
uint server_fname_len, File file);
|
||||
Exit_status process_first_event(const char *bname, size_t blen,
|
||||
const uchar *block,
|
||||
size_t block_len, uint file_id,
|
||||
Create_file_log_event *ce);
|
||||
size_t block_len, uint file_id);
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
Creates and opens a new temporary file in the directory specified by previous call to init_by_dir_name() or init_by_cur_dir().
|
||||
|
||||
@param[in] le The basename of the created file will start with the
|
||||
basename of the file pointed to by this Load_log_event.
|
||||
|
||||
@param[out] filename Buffer to save the filename in.
|
||||
|
||||
@return File handle >= 0 on success, -1 on error.
|
||||
*/
|
||||
File Load_log_processor::prepare_new_file_for_old_format(Load_log_event *le,
|
||||
char *filename)
|
||||
{
|
||||
size_t len;
|
||||
char *tail;
|
||||
File file;
|
||||
|
||||
fn_format(filename, le->fname, target_dir_name, "", MY_REPLACE_DIR);
|
||||
len= strlen(filename);
|
||||
tail= filename + len;
|
||||
|
||||
if ((file= create_unique_file(filename,tail)) < 0)
|
||||
{
|
||||
error("Could not construct local filename %s.",filename);
|
||||
return -1;
|
||||
}
|
||||
|
||||
le->set_fname_outside_temp_buf(filename,len+strlen(tail));
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Reads a file from a server and saves it locally.
|
||||
|
||||
@param[in,out] net The server to read from.
|
||||
|
||||
@param[in] server_fname The name of the file that the server should
|
||||
read.
|
||||
|
||||
@param[in] server_fname_len The length of server_fname.
|
||||
|
||||
@param[in,out] file The file to write to.
|
||||
|
||||
@retval ERROR_STOP An error occurred - the program should terminate.
|
||||
@retval OK_CONTINUE No error, the program should continue.
|
||||
*/
|
||||
Exit_status Load_log_processor::load_old_format_file(NET* net,
|
||||
const char*server_fname,
|
||||
uint server_fname_len,
|
||||
File file)
|
||||
{
|
||||
uchar buf[FN_REFLEN+1];
|
||||
buf[0] = 0;
|
||||
memcpy(buf + 1, server_fname, server_fname_len + 1);
|
||||
if (my_net_write(net, buf, server_fname_len +2) || net_flush(net))
|
||||
{
|
||||
error("Failed requesting the remote dump of %s.", server_fname);
|
||||
return ERROR_STOP;
|
||||
}
|
||||
|
||||
for (;;)
|
||||
{
|
||||
ulong packet_len = my_net_read(net);
|
||||
if (packet_len == 0)
|
||||
{
|
||||
if (my_net_write(net, (uchar*) "", 0) || net_flush(net))
|
||||
{
|
||||
error("Failed sending the ack packet.");
|
||||
return ERROR_STOP;
|
||||
}
|
||||
/*
|
||||
we just need to send something, as the server will read but
|
||||
not examine the packet - this is because mysql_load() sends
|
||||
an OK when it is done
|
||||
*/
|
||||
break;
|
||||
}
|
||||
else if (packet_len == packet_error)
|
||||
{
|
||||
error("Failed reading a packet during the dump of %s.", server_fname);
|
||||
return ERROR_STOP;
|
||||
}
|
||||
|
||||
if (packet_len > UINT_MAX)
|
||||
{
|
||||
error("Illegal length of packet read from net.");
|
||||
return ERROR_STOP;
|
||||
}
|
||||
if (my_write(file, net->read_pos, (uint) packet_len, MYF(MY_WME|MY_NABP)))
|
||||
return ERROR_STOP;
|
||||
}
|
||||
|
||||
return OK_CONTINUE;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Process the first event in the sequence of events representing a
|
||||
LOAD DATA statement.
|
||||
@@ -556,8 +415,7 @@ Exit_status Load_log_processor::process_first_event(const char *bname,
|
||||
size_t blen,
|
||||
const uchar *block,
|
||||
size_t block_len,
|
||||
uint file_id,
|
||||
Create_file_log_event *ce)
|
||||
uint file_id)
|
||||
{
|
||||
size_t full_len= target_dir_name_len + blen + 9 + 9 + 1;
|
||||
Exit_status retval= OK_CONTINUE;
|
||||
@@ -569,7 +427,6 @@ Exit_status Load_log_processor::process_first_event(const char *bname,
|
||||
if (!(fname= (char*) my_malloc(PSI_NOT_INSTRUMENTED, full_len,MYF(MY_WME))))
|
||||
{
|
||||
error("Out of memory.");
|
||||
delete ce;
|
||||
DBUG_RETURN(ERROR_STOP);
|
||||
}
|
||||
|
||||
@@ -584,12 +441,10 @@ Exit_status Load_log_processor::process_first_event(const char *bname,
|
||||
error("Could not construct local filename %s%s.",
|
||||
target_dir_name,bname);
|
||||
my_free(fname);
|
||||
delete ce;
|
||||
DBUG_RETURN(ERROR_STOP);
|
||||
}
|
||||
|
||||
rec.fname= fname;
|
||||
rec.event= ce;
|
||||
|
||||
/*
|
||||
fname is freed in process_event()
|
||||
@@ -600,13 +455,9 @@ Exit_status Load_log_processor::process_first_event(const char *bname,
|
||||
{
|
||||
error("Out of memory.");
|
||||
my_free(fname);
|
||||
delete ce;
|
||||
DBUG_RETURN(ERROR_STOP);
|
||||
}
|
||||
|
||||
if (ce)
|
||||
ce->set_fname_outside_temp_buf(fname, strlen(fname));
|
||||
|
||||
if (my_write(file, (uchar*)block, block_len, MYF(MY_WME|MY_NABP)))
|
||||
{
|
||||
error("Failed writing to file.");
|
||||
@@ -621,32 +472,12 @@ Exit_status Load_log_processor::process_first_event(const char *bname,
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Process the given Create_file_log_event.
|
||||
|
||||
@see Load_log_processor::process_first_event(const char*,uint,const char*,uint,uint,Create_file_log_event*)
|
||||
|
||||
@param ce Create_file_log_event to process.
|
||||
|
||||
@retval ERROR_STOP An error occurred - the program should terminate.
|
||||
@retval OK_CONTINUE No error, the program should continue.
|
||||
*/
|
||||
Exit_status Load_log_processor::process(Create_file_log_event *ce)
|
||||
{
|
||||
const char *bname= ce->fname + dirname_length(ce->fname);
|
||||
size_t blen= ce->fname_len - (bname-ce->fname);
|
||||
|
||||
return process_first_event(bname, blen, ce->block, ce->block_len,
|
||||
ce->file_id, ce);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Process the given Begin_load_query_log_event.
|
||||
|
||||
@see Load_log_processor::process_first_event(const char*,uint,const char*,uint,uint,Create_file_log_event*)
|
||||
|
||||
@param ce Begin_load_query_log_event to process.
|
||||
@param blqe Begin_load_query_log_event to process.
|
||||
|
||||
@retval ERROR_STOP An error occurred - the program should terminate.
|
||||
@retval OK_CONTINUE No error, the program should continue.
|
||||
@@ -654,7 +485,7 @@ Exit_status Load_log_processor::process(Create_file_log_event *ce)
|
||||
Exit_status Load_log_processor::process(Begin_load_query_log_event *blqe)
|
||||
{
|
||||
return process_first_event("SQL_LOAD_MB", 11, blqe->block, blqe->block_len,
|
||||
blqe->file_id, 0);
|
||||
blqe->file_id);
|
||||
}
|
||||
|
||||
|
||||
@@ -1245,41 +1076,6 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
|
||||
break;
|
||||
}
|
||||
|
||||
case CREATE_FILE_EVENT:
|
||||
{
|
||||
Create_file_log_event* ce= (Create_file_log_event*)ev;
|
||||
/*
|
||||
We test if this event has to be ignored. If yes, we don't save
|
||||
this event; this will have the good side-effect of ignoring all
|
||||
related Append_block and Exec_load.
|
||||
Note that Load event from 3.23 is not tested.
|
||||
*/
|
||||
if (shall_skip_database(ce->db))
|
||||
goto end; // Next event
|
||||
/*
|
||||
We print the event, but with a leading '#': this is just to inform
|
||||
the user of the original command; the command we want to execute
|
||||
will be a derivation of this original command (we will change the
|
||||
filename and use LOCAL), prepared in the 'case EXEC_LOAD_EVENT'
|
||||
below.
|
||||
*/
|
||||
print_skip_replication_statement(print_event_info, ev);
|
||||
if (ce->print(result_file, print_event_info, TRUE))
|
||||
goto err;
|
||||
// If this binlog is not 3.23 ; why this test??
|
||||
if (glob_description_event->binlog_version >= 3)
|
||||
{
|
||||
/*
|
||||
transfer the responsibility for destroying the event to
|
||||
load_processor
|
||||
*/
|
||||
ev= NULL;
|
||||
if ((retval= load_processor.process(ce)) != OK_CONTINUE)
|
||||
goto end;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case APPEND_BLOCK_EVENT:
|
||||
/*
|
||||
Append_block_log_events can safely print themselves even if
|
||||
@@ -1293,36 +1089,6 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
|
||||
goto end;
|
||||
break;
|
||||
|
||||
case EXEC_LOAD_EVENT:
|
||||
{
|
||||
if (ev->print(result_file, print_event_info))
|
||||
goto err;
|
||||
Execute_load_log_event *exv= (Execute_load_log_event*)ev;
|
||||
Create_file_log_event *ce= load_processor.grab_event(exv->file_id);
|
||||
/*
|
||||
if ce is 0, it probably means that we have not seen the Create_file
|
||||
event (a bad binlog, or most probably --start-position is after the
|
||||
Create_file event). Print a warning comment.
|
||||
*/
|
||||
if (ce)
|
||||
{
|
||||
bool error;
|
||||
/*
|
||||
We must not convert earlier, since the file is used by
|
||||
my_open() in Load_log_processor::append().
|
||||
*/
|
||||
convert_path_to_forward_slashes((char*) ce->fname);
|
||||
error= ce->print(result_file, print_event_info, TRUE);
|
||||
my_free((void*)ce->fname);
|
||||
delete ce;
|
||||
if (error)
|
||||
goto err;
|
||||
}
|
||||
else
|
||||
warning("Ignoring Execute_load_log_event as there is no "
|
||||
"Create_file event for file_id: %u", exv->file_id);
|
||||
break;
|
||||
}
|
||||
case FORMAT_DESCRIPTION_EVENT:
|
||||
delete glob_description_event;
|
||||
glob_description_event= (Format_description_log_event*) ev;
|
||||
@@ -1579,23 +1345,14 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
|
||||
destroy_evt= FALSE;
|
||||
break;
|
||||
}
|
||||
case PRE_GA_WRITE_ROWS_EVENT:
|
||||
case PRE_GA_DELETE_ROWS_EVENT:
|
||||
case PRE_GA_UPDATE_ROWS_EVENT:
|
||||
{
|
||||
Old_rows_log_event *e= (Old_rows_log_event*) ev;
|
||||
bool is_stmt_end= e->get_flags(Rows_log_event::STMT_END_F);
|
||||
if (print_row_event(print_event_info, ev, e->get_table_id(),
|
||||
e->get_flags(Old_rows_log_event::STMT_END_F)))
|
||||
goto err;
|
||||
DBUG_PRINT("info", ("is_stmt_end: %d", (int) is_stmt_end));
|
||||
if (!is_stmt_end && opt_flashback)
|
||||
destroy_evt= FALSE;
|
||||
break;
|
||||
}
|
||||
case START_ENCRYPTION_EVENT:
|
||||
glob_description_event->start_decryption((Start_encryption_log_event*)ev);
|
||||
/* fall through */
|
||||
case PRE_GA_WRITE_ROWS_EVENT:
|
||||
case PRE_GA_DELETE_ROWS_EVENT:
|
||||
case PRE_GA_UPDATE_ROWS_EVENT:
|
||||
case CREATE_FILE_EVENT:
|
||||
case EXEC_LOAD_EVENT:
|
||||
default:
|
||||
print_skip_replication_statement(print_event_info, ev);
|
||||
if (ev->print(result_file, print_event_info))
|
||||
@@ -2809,22 +2566,10 @@ static Exit_status check_master_version()
|
||||
glob_description_event= NULL;
|
||||
|
||||
switch (version) {
|
||||
case 3:
|
||||
glob_description_event= new Format_description_log_event(1);
|
||||
break;
|
||||
case 4:
|
||||
glob_description_event= new Format_description_log_event(3);
|
||||
break;
|
||||
case 5:
|
||||
case 10:
|
||||
case 11:
|
||||
/*
|
||||
The server is soon going to send us its Format_description log
|
||||
event, unless it is a 5.0 server with 3.23 or 4.0 binlogs.
|
||||
So we first assume that this is 4.0 (which is enough to read the
|
||||
Format_desc event if one comes).
|
||||
*/
|
||||
glob_description_event= new Format_description_log_event(3);
|
||||
glob_description_event= new Format_description_log_event(4);
|
||||
break;
|
||||
default:
|
||||
error("Could not find server version: "
|
||||
@@ -2883,8 +2628,6 @@ static Exit_status handle_event_text_mode(PRINT_EVENT_INFO *print_event_info,
|
||||
}
|
||||
|
||||
Log_event_type type= ev->get_type_code();
|
||||
if (glob_description_event->binlog_version >= 3 ||
|
||||
(type != LOAD_EVENT && type != CREATE_FILE_EVENT))
|
||||
{
|
||||
/*
|
||||
If this is a Rotate event, maybe it's the end of the requested binlog;
|
||||
@@ -2943,31 +2686,6 @@ static Exit_status handle_event_text_mode(PRINT_EVENT_INFO *print_event_info,
|
||||
if (retval != OK_CONTINUE)
|
||||
DBUG_RETURN(retval);
|
||||
}
|
||||
else
|
||||
{
|
||||
Load_log_event *le= (Load_log_event*)ev;
|
||||
const char *old_fname= le->fname;
|
||||
uint old_len= le->fname_len;
|
||||
File file;
|
||||
Exit_status retval;
|
||||
char fname[FN_REFLEN+1];
|
||||
|
||||
if ((file= load_processor.prepare_new_file_for_old_format(le,fname)) < 0)
|
||||
{
|
||||
DBUG_RETURN(ERROR_STOP);
|
||||
}
|
||||
|
||||
retval= process_event(print_event_info, ev, old_off, logname);
|
||||
if (retval != OK_CONTINUE)
|
||||
{
|
||||
my_close(file,MYF(MY_WME));
|
||||
DBUG_RETURN(retval);
|
||||
}
|
||||
retval= load_processor.load_old_format_file(net,old_fname,old_len,file);
|
||||
my_close(file,MYF(MY_WME));
|
||||
if (retval != OK_CONTINUE)
|
||||
DBUG_RETURN(retval);
|
||||
}
|
||||
|
||||
DBUG_RETURN(OK_CONTINUE);
|
||||
}
|
||||
@@ -3240,7 +2958,7 @@ static Exit_status check_header(IO_CACHE* file,
|
||||
MY_STAT my_file_stat;
|
||||
|
||||
delete glob_description_event;
|
||||
if (!(glob_description_event= new Format_description_log_event(3)))
|
||||
if (!(glob_description_event= new Format_description_log_event(4)))
|
||||
{
|
||||
error("Failed creating Format_description_log_event; out of memory?");
|
||||
return ERROR_STOP;
|
||||
@@ -3312,25 +3030,7 @@ static Exit_status check_header(IO_CACHE* file,
|
||||
{
|
||||
DBUG_PRINT("info",("buf[EVENT_TYPE_OFFSET=%d]=%d",
|
||||
EVENT_TYPE_OFFSET, buf[EVENT_TYPE_OFFSET]));
|
||||
/* always test for a Start_v3, even if no --start-position */
|
||||
if (buf[EVENT_TYPE_OFFSET] == START_EVENT_V3)
|
||||
{
|
||||
/* This is 3.23 or 4.x */
|
||||
if (uint4korr(buf + EVENT_LEN_OFFSET) <
|
||||
(LOG_EVENT_MINIMAL_HEADER_LEN + START_V3_HEADER_LEN))
|
||||
{
|
||||
/* This is 3.23 (format 1) */
|
||||
delete glob_description_event;
|
||||
if (!(glob_description_event= new Format_description_log_event(1)))
|
||||
{
|
||||
error("Failed creating Format_description_log_event; "
|
||||
"out of memory?");
|
||||
return ERROR_STOP;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
else if (tmp_pos >= start_position)
|
||||
if (tmp_pos >= start_position)
|
||||
break;
|
||||
else if (buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
@@ -3803,7 +3503,6 @@ struct encryption_service_st encryption_handler=
|
||||
#include "password.c"
|
||||
#include "log_event.cc"
|
||||
#include "log_event_client.cc"
|
||||
#include "log_event_old.cc"
|
||||
#include "rpl_utility.cc"
|
||||
#include "sql_string.cc"
|
||||
#include "sql_list.cc"
|
||||
|
Reference in New Issue
Block a user