diff --git a/include/errmsg.h b/include/errmsg.h index 3e91b6bb..6a145446 100644 --- a/include/errmsg.h +++ b/include/errmsg.h @@ -108,9 +108,14 @@ extern const char *mariadb_client_errors[]; /* Error messages */ #define CR_ERR_NET_READ 5013 #define CR_ERR_NET_WRITE 5014 #define CR_ERR_NET_UNCOMPRESS 5015 +#define CR_ERR_CHECKSUM_VERIFICATION_ERROR 5016 +#define CR_ERR_UNSUPPORTED_BINLOG_FORMAT 5017 +#define CR_UNKNOWN_BINLOG_EVENT 5018 +#define CR_BINLOG_ERROR 5019 +#define CR_BINLOG_INVALID_FILE 5020 /* Always last, if you add new error codes please update the value for CR_MARIADB_LAST_ERROR */ -#define CR_MARIADB_LAST_ERROR CR_ERR_NET_UNCOMPRESS +#define CR_MARIADB_LAST_ERROR CR_BINLOG_INVALID_FILE #endif diff --git a/include/mariadb_rpl.h b/include/mariadb_rpl.h index 7c416c30..5534dca7 100644 --- a/include/mariadb_rpl.h +++ b/include/mariadb_rpl.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2018-2021 MariaDB Corporation AB +/* Copyright (C) 2018-2022 MariaDB Corporation AB This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public @@ -23,8 +23,11 @@ extern "C" { #include -#define MARIADB_RPL_VERSION 0x0001 -#define MARIADB_RPL_REQUIRED_VERSION 0x0001 +#define MARIADB_RPL_VERSION 0x0002 +#define MARIADB_RPL_REQUIRED_VERSION 0x0002 + +#define RPL_BINLOG_MAGIC (const uchar*) "\xFE\x62\x69\x6E" +#define RPL_BINLOG_MAGIC_SIZE 4 /* Protocol flags */ #define MARIADB_RPL_BINLOG_DUMP_NON_BLOCK 1 @@ -33,10 +36,37 @@ extern "C" { #define EVENT_HEADER_OFS 20 -#define FL_GROUP_COMMIT_ID 2 #define FL_STMT_END 1 -#define LOG_EVENT_ARTIFICIAL_F 0x20 +/* GTID flags */ + +/* FL_STANDALONE is set in case there is no terminating COMMIT event. */ +#define FL_STANDALONE 0x01 + +/* FL_GROUP_COMMIT_ID is set when event group is part of a group commit */ +#define FL_GROUP_COMMIT_ID 0x02 + +/* FL_TRANSACTIONAL is set for an event group that can be safely rolled back + (no MyISAM, eg.). + */ +#define FL_TRANSACTIONAL 0x04 + /* + FL_ALLOW_PARALLEL reflects the (negation of the) value of + @@SESSION.skip_parallel_replication at the time of commit. + */ +#define FL_ALLOW_PARALLEL 0x08; + /* + FL_WAITED is set if a row lock wait (or other wait) is detected during the + execution of the transaction. + */ +#define FL_WAITED 0x10 + /* FL_DDL is set for event group containing DDL. */ +#define FL_DDL 0x20 + /* FL_PREPARED_XA is set for XA transaction. */ +#define FL_PREPARED_XA 0x40 + /* FL_"COMMITTED or ROLLED-BACK"_XA is set for XA transaction. */ +#define FL_COMPLETED_XA 0x80 + /* SEMI SYNCHRONOUS REPLICATION */ #define SEMI_SYNC_INDICATOR 0xEF @@ -50,7 +80,8 @@ enum mariadb_rpl_option { MARIADB_RPL_FLAGS, /* Protocol flags */ MARIADB_RPL_GTID_CALLBACK, /* GTID callback function */ MARIADB_RPL_GTID_DATA, /* GTID data */ - MARIADB_RPL_BUFFER + MARIADB_RPL_BUFFER, + MARIADB_RPL_VERIFY_CHECKSUM }; /* Event types: From MariaDB Server sql/log_event.h */ @@ -125,6 +156,83 @@ enum mariadb_rpl_event { ENUM_END_EVENT /* end marker */ }; +/* ROWS_EVENT flags */ + +#define STMT_END_F 0x01 +#define NO_FOREIGN_KEY_CHECKS_F 0x02 +#define RELAXED_UNIQUE_KEY_CHECKS_F 0x04 +#define COMPLETE_ROWS_F 0x08 +#define NO_CHECK_CONSTRAINT_CHECKS_F 0x80 + +enum mariadb_rpl_status_code { + Q_FLAGS2_CODE= 0x00, + Q_SQL_MODE_CODE= 0x01, + Q_CATALOG_CODE= 0x02, + Q_AUTO_INCREMENT_CODE= 0x03, + Q_CHARSET_CODE= 0x04, + Q_TIMEZONE_CODE= 0x05, + Q_CATALOG_NZ_CODE= 0x06, + Q_LC_TIME_NAMES_CODE= 0x07, + Q_CHARSET_DATABASE_CODE= 0x08, + Q_TABLE_MAP_FOR_UPDATE_CODE= 0x09, + Q_MASTER_DATA_WRITTEN_CODE= 0x0A, + Q_INVOKERS_CODE= 0x0B, + Q_UPDATED_DB_NAMES_CODE= 0x0C, + Q_MICROSECONDS_CODE= 0x0D, + Q_COMMIT_TS_CODE= 0x0E, /* unused */ + Q_COMMIT_TS2_CODE= 0x0F, /* unused */ + Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP_CODE= 0x10, + Q_DDL_LOGGED_WITH_XID_CODE= 0x11, + Q_DEFAULT_COLLATION_FOR_UTF8_CODE= 0x12, + Q_SQL_REQUIRE_PRIMARY_KEY_CODE= 0x13, + Q_DEFAULT_TABLE_ENCRYPTION_CODE= 0x14, + Q_HRNOW= 128, /* second part: 3 bytes */ + Q_XID= 129 /* xid: 8 bytes */ +}; + +/* Log Event flags */ + +/* used in FOMRAT_DESCRIPTION_EVENT. Indicates if it + is the active binary log. + Note: When reading data via COM_BINLOG_DUMP this + flag is never set. +*/ +#define LOG_EVENT_BINLOG_IN_USE_F 0x0001 + +/* Looks like this flag is no longer in use */ +#define LOG_EVENT_FORCED_ROTATE_F 0x0002 + +/* Log entry depends on thread, e.g. when using user variables + or temporary tables */ +#define LOG_EVENT_THREAD_SPECIFIC_F 0x0004 + +/* Indicates that the USE command can be suppressed before + executing a statement: e.g. DRIP SCHEMA */ +#define LOG_EVENT_SUPPRESS_USE_F 0x0008 + +/* ??? */ +#define LOG_EVENT_UPDATE_TABLE_MAP_F 0x0010 + +/* Artifical event */ +#define LOG_EVENT_ARTIFICIAL_F 0x0020 + +/* ??? */ +#define LOG_EVENT_RELAY_LOG_F 0x0040 + +/* If an event is not supported, and LOG_EVENT_IGNORABLE_F was not + set, an error will be reported. */ +#define LOG_EVENT_IGNORABLE_F 0x0080 + +/* ??? */ +#define LOG_EVENT_NO_FILTER_F 0x0100 + +/* ?? */ +#define LOG_EVENT_MTS_ISOLATE_F 0x0200 + +/* if session variable @@skip_repliation was set, this flag will be + reported for events which should be skipped. */ +#define LOG_EVENT_SKIP_REPLICATION_F 0x8000 + typedef struct { char *str; size_t length; @@ -157,6 +265,11 @@ typedef struct st_mariadb_rpl { uint8_t fd_header_len; /* header len from last format description event */ uint8_t use_checksum; uint8_t artificial_checksun; + uint8_t verify_checksum; + uint8_t post_header_len[ENUM_END_EVENT]; + FILE *fp; + uint32_t error_no; + char error_msg[MYSQL_ERRMSG_SIZE]; } MARIADB_RPL; /* Event header */ @@ -185,6 +298,7 @@ struct st_mariadb_rpl_format_description_event char *server_version; uint32_t timestamp; uint8_t header_len; + MARIADB_STRING post_header_lengths; }; struct st_mariadb_rpl_checkpoint_event { @@ -213,7 +327,7 @@ struct st_mariadb_rpl_table_map_event { unsigned int column_count; MARIADB_STRING column_types; MARIADB_STRING metadata; - char *null_indicator; + unsigned char *null_indicator; }; struct st_mariadb_rpl_rand_event { @@ -221,15 +335,33 @@ struct st_mariadb_rpl_rand_event { unsigned long long second_seed; }; -struct st_mariadb_rpl_encryption_event { - char scheme; - unsigned int key_version; - char *nonce; +struct st_mariadb_rpl_intvar_event { + unsigned long long value; + uint8_t type; }; -struct st_mariadb_rpl_intvar_event { - char type; - unsigned long long value; +struct st_mariadb_begin_load_query_event { + uint32_t file_id; + unsigned char *data; +}; + +struct st_mariadb_start_encryption_event { + uint8_t scheme; + uint32_t key_version; + char nonce[12]; +}; + +struct st_mariadb_execute_load_query_event { + uint32_t thread_id; + uint32_t execution_time; + MARIADB_STRING schema; + uint16_t error_code; + uint32_t file_id; + uint32_t ofs1; + uint32_t ofs2; + uint8_t duplicate_flag; + MARIADB_STRING status_vars; + MARIADB_STRING statement; }; struct st_mariadb_rpl_uservar_event { @@ -246,12 +378,14 @@ struct st_mariadb_rpl_rows_event { uint64_t table_id; uint16_t flags; uint32_t column_count; - char *column_bitmap; - char *column_update_bitmap; + unsigned char *column_bitmap; + unsigned char *column_update_bitmap; + unsigned char *null_bitmap; size_t row_data_size; void *row_data; size_t extra_data_size; void *extra_data; + uint8_t compressed; }; struct st_mariadb_rpl_heartbeat_event { @@ -261,10 +395,26 @@ struct st_mariadb_rpl_heartbeat_event { uint16_t flags; }; +struct st_mariadb_rpl_xa_prepare_log_event { + uint8_t one_phase; + uint32_t format_id; + uint32_t gtrid_len; + uint32_t bqual_len; + MARIADB_STRING xid; +}; + +struct st_mariadb_gtid_log_event { + uint8_t commit_flag; + char source_id[16]; + uint64_t sequence_nr; +}; + typedef struct st_mariadb_rpl_event { /* common header */ MA_MEM_ROOT memroot; + unsigned char *raw_data; + size_t raw_data_size; unsigned int checksum; char ok; enum mariadb_rpl_event event_type; @@ -273,6 +423,7 @@ typedef struct st_mariadb_rpl_event unsigned int event_length; unsigned int next_event_pos; unsigned short flags; + void *raw_data; /****************/ union { struct st_mariadb_rpl_rotate_event rotate; @@ -285,18 +436,54 @@ typedef struct st_mariadb_rpl_event struct st_mariadb_rpl_annotate_rows_event annotate_rows; struct st_mariadb_rpl_table_map_event table_map; struct st_mariadb_rpl_rand_event rand; - struct st_mariadb_rpl_encryption_event encryption; struct st_mariadb_rpl_intvar_event intvar; struct st_mariadb_rpl_uservar_event uservar; struct st_mariadb_rpl_rows_event rows; struct st_mariadb_rpl_heartbeat_event heartbeat; + struct st_mariadb_rpl_xa_prepare_log_event xa_prepare_log; + struct st_mariadb_begin_load_query_event begin_load_query; + struct st_mariadb_execute_load_query_event execute_load_query; + struct st_mariadb_gtid_log_event gtid_log; + struct st_mariadb_start_encryption_event start_encryption; } event; /* Added in C/C 3.3.0 */ uint8_t is_semi_sync; uint8_t semi_sync_flags; } MARIADB_RPL_EVENT; +/* compression uses myisampack format */ +#define myisam_uint1korr(B) ((uint8_t)(*B)) +#define myisam_uint2korr(B)\ +((uint16_t)(((uint16_t)(((const uchar*)(B))[1])) | ((uint16_t) (((const uchar*) (B))[0]) << 8))) +#define myisam_uint3korr(B)\ +((uint32_t)(((uint32_t)(((const uchar*)(B))[2])) |\ +(((uint32_t)(((const uchar*)(B))[1])) << 8) |\ +(((uint32_t)(((const uchar*)(B))[0])) << 16))) +#define myisam_uint4korr(B)\ +((uint32_t)(((uint32_t)(((const uchar*)(B))[3])) |\ +(((uint32_t)(((const uchar*)(B))[2])) << 8) |\ +(((uint32_t)(((const uchar*) (B))[1])) << 16) |\ +(((uint32_t)(((const uchar*) (B))[0])) << 24))) + +#define RPL_SAFEGUARD(rpl, event, condition) \ +if (!(condition))\ +{\ + my_set_error((rpl)->mysql, CR_BINLOG_ERROR, SQLSTATE_UNKNOWN, 0,\ + (rpl)->filename_length, (rpl)->filename,\ + (rpl)->start_position,\ + "Packet corrupted");\ + mariadb_free_rpl_event((event));\ + return 0;\ +} + #define mariadb_rpl_init(a) mariadb_rpl_init_ex((a), MARIADB_RPL_VERSION) +#define rpl_clear_error(rpl)\ +(rpl)->error_no= (rpl)->error_msg[0]= 0 + +#define IS_ROW_VERSION2(a)\ + ((a) == WRITE_ROWS_EVENT || (a) == UPDATE_ROWS_EVENT || \ + (a) == DELETE_ROWS_EVENT || (a) == WRITE_ROWS_COMPRESSED_EVENT ||\ + (a) == UPDATE_ROWS_COMPRESSED_EVENT || (a) == DELETE_ROWS_COMPRESSED_EVENT) /* Function prototypes */ MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version); diff --git a/libmariadb/ma_errmsg.c b/libmariadb/ma_errmsg.c index 23b38afb..65eb546a 100644 --- a/libmariadb/ma_errmsg.c +++ b/libmariadb/ma_errmsg.c @@ -109,7 +109,12 @@ const char *mariadb_client_errors[] = /* 5012 */ "Error while loading plugin '%s'", /* 5013 */ "Read error: %s (%d)", /* 5014 */ "Write error: %s (%d)", - /* 5015 */ "Error while uncompressing packet", + /* 5015 */ "Error while uncompressing packet (File: %.*s start_pos=%ld).", + /* 5016 */ "Checksum verification failed (File: %.*s start_pos=%ld). Reported checksum is %x, while calculated checksum is %x.", + /* 5017 */ "Binary log protocol error (File: %.*s start_pos=%ld): Log format %d not supported.", + /* 5018 */ "Binary log error (File: %.*s start_pos=%ld): Unknown event type (%d) with flag 'not_ignorable'.", + /* 5019 */ "Binary log error (File: %.*s start_pos=%ld): %s.", + /* 5020 */ "File '%s' is not a binary log file", "" }; diff --git a/libmariadb/mariadb_rpl.c b/libmariadb/mariadb_rpl.c index 3a0a4e12..e314ac8d 100644 --- a/libmariadb/mariadb_rpl.c +++ b/libmariadb/mariadb_rpl.c @@ -29,22 +29,95 @@ #include #include + #ifdef WIN32 #define alloca _alloca #endif -static int rpl_alloc_string(MARIADB_RPL_EVENT *event, +void rpl_set_error(MARIADB_RPL *rpl, + unsigned int error_nr, + const char *format, + ...) +{ + va_list ap; + + const char *errmsg; + + if (!format) + { + if (error_nr >= CR_MIN_ERROR && error_nr <= CR_MYSQL_LAST_ERROR) + errmsg= ER(error_nr); + else if (error_nr >= CER_MIN_ERROR && error_nr <= CR_MARIADB_LAST_ERROR) + errmsg= CER(error_nr); + else + errmsg= ER(CR_UNKNOWN_ERROR); + } + + rpl->error_no= error_nr; + va_start(ap, format); + vsnprintf(rpl->error_msg, MYSQL_ERRMSG_SIZE - 1, + format ? format : errmsg, ap); + va_end(ap); + return; +} + + +static void *ma_calloc_root(void *memroot, size_t len) +{ + void *p; + + if ((p= ma_alloc_root(memroot, len))) + memset(p, 0, len); + return p; +} + +static int rpl_set_string_and_len(MARIADB_RPL_EVENT *event, MARIADB_STRING *s, unsigned char *buffer, size_t len) { - if (!(s->str= ma_alloc_root(&event->memroot, len))) + if (!buffer || !len) + { + s->length= 0; + return 0; + } + if (!(s->str= ma_calloc_root(&event->memroot, len))) return 1; memcpy(s->str, buffer, len); s->length= len; return 0; } +static int rpl_set_string0(MARIADB_RPL_EVENT *event, + MARIADB_STRING *s, + const char *buffer) +{ + size_t len; + if (!buffer || !buffer[0]) + { + s->length= 0; + return 0; + } + + len= strlen(buffer); + + if (!(s->str= ma_calloc_root(&event->memroot, len))) + return 1; + strcpy(s->str, buffer); + s->length= len; + return 0; +} + +static int rpl_set_data(MARIADB_RPL_EVENT *event, unsigned char **buf, void *val, size_t len) +{ + if (!val || !len) + return 0; + if (!(*buf= ma_calloc_root(&event->memroot, len))) + return 1; + memcpy(*buf, val, len); + return 0; +} + MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version) { MARIADB_RPL *rpl; @@ -57,8 +130,9 @@ MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version) return 0; } + /* if there is no connection, we read a file if (!mysql) - return NULL; + return NULL; */ if (!(rpl= (MARIADB_RPL *)calloc(1, sizeof(MARIADB_RPL)))) { @@ -66,22 +140,23 @@ MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version) return 0; } rpl->version= version; - rpl->mysql= mysql; - if (!mysql_query(mysql, "select @@binlog_checksum")) + if ((rpl->mysql= mysql)) { - MYSQL_RES *result; - if ((result= mysql_store_result(mysql))) + if (!mysql_query(mysql, "select @@binlog_checksum")) { - MYSQL_ROW row= mysql_fetch_row(result); - if (!strcmp(row[0], "CRC32")) + MYSQL_RES *result; + if ((result= mysql_store_result(mysql))) { - rpl->artificial_checksun= 1; + MYSQL_ROW row= mysql_fetch_row(result); + if (!strcmp(row[0], "CRC32")) + { + rpl->artificial_checksun= 1; + } + mysql_free_result(result); } - mysql_free_result(result); } } - return rpl; } @@ -97,7 +172,8 @@ void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event) int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) { unsigned char *ptr, *buf; - if (!rpl || !rpl->mysql) + + if (!rpl) return 1; /* COM_BINLOG_DUMP: @@ -111,10 +187,11 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) * = filename length */ + rpl_clear_error(rpl); /* if replica was specified, we will register replica via COM_REGISTER_SLAVE */ - if (rpl->mysql->options.extension && rpl->mysql->options.extension->rpl_host) + if (rpl->mysql && rpl->mysql->options.extension && rpl->mysql->options.extension->rpl_host) { /* Protocol: Ofs Len Data @@ -155,25 +232,53 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) return 1; } - ptr= buf= -#ifdef WIN32 - (unsigned char *)_alloca(rpl->filename_length + 11); -#else - (unsigned char *)alloca(rpl->filename_length + 11); -#endif + if (rpl->mysql) + { + ptr= buf= + #ifdef WIN32 + (unsigned char *)malloca(rpl->filename_length + 11); + #else + (unsigned char *)alloca(rpl->filename_length + 11); + #endif - int4store(ptr, (unsigned int)rpl->start_position); - ptr+= 4; - int2store(ptr, rpl->flags); - ptr+= 2; - int4store(ptr, rpl->server_id); - ptr+= 4; - memcpy(ptr, rpl->filename, rpl->filename_length); - ptr+= rpl->filename_length; + int4store(ptr, (unsigned int)rpl->start_position); + ptr+= 4; + int2store(ptr, rpl->flags); + ptr+= 2; + int4store(ptr, rpl->server_id); + ptr+= 4; + memcpy(ptr, rpl->filename, rpl->filename_length); + ptr+= rpl->filename_length; - if (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0)) - return 1; - return 0; + return (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0)); + } else + { + char *buf[RPL_BINLOG_FILEHEADER_SIZE]; + + if (rpl->fp) + fclose(rpl->fp); + + if (!(rpl->fp= fopen((const char *)rpl->filename, "r"))) + { + rpl_set_error(rpl, CR_FILE_NOT_FOUND, 0, rpl->filename, errno); + return errno; + } + + if (fread(buf, 1, RPL_BINLOG_MAGIC_SIZE, rpl->fp) != 4) + { + rpl_set_error(rpl, CR_FILE_READ, 0, rpl->filename, errno); + return errno; + } + + /* check if it is a valid binlog file */ + if (memcmp(buf, RPL_BINLOG_MAGIC, RPL_BINLOG_MAGIC_SIZE) != 0) + { + rpl_set_error(rpl, CR_BINLOG_INVALID_FILE, 0, rpl->filename, errno); + return errno; + } + + return 0; + } } static int ma_set_rpl_filename(MARIADB_RPL *rpl, const unsigned char *filename, size_t len) @@ -188,48 +293,146 @@ static int ma_set_rpl_filename(MARIADB_RPL *rpl, const unsigned char *filename, return 0; } +/* + * + * + * + */ +static uint32_t get_compression_info(const unsigned char *buf, + uint8_t *algorithm, + uint8_t *header_size) +{ + uint8_t alg, header; + uint32 len= 0; + + if (!algorithm) + algorithm= &alg; + if (!header_size) + header_size= &header; + + *header_size= 0; + *algorithm= 0; + + if (!buf) + return len; + + if ((buf[0] & 0xe0) != 0x80) + return len; + + *header_size= buf[0] & 0x07; + *algorithm = (buf[0] & 0x07) >> 4; + + buf++; + + /* Attention: we can't use uint*korr, here, we need myisam macros since + length is stored in high byte first order + */ + switch(*header_size) { + case 1: + len= *buf; + break; + case 2: + len= myisam_uint2korr(buf); + break; + case 3: + len= myisam_uint3korr(buf); + break; + case 4: + len= myisam_uint4korr(buf); + break; + default: + len= 0; + break; + } + + *header_size += 1; + return len; +} + MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event) { - unsigned char *ev; - size_t len; + unsigned char *ev= 0; + unsigned char *checksum_start= 0; + unsigned char *ev_start= 0; + unsigned char *ev_end= 0; + size_t len= 0; MARIADB_RPL_EVENT *rpl_event= 0; - if (!rpl || !rpl->mysql) + if (!rpl || (!rpl->mysql && !rpl->fp)) return 0; while (1) { - unsigned long pkt_len= ma_net_safe_read(rpl->mysql); + unsigned long pkt_len; - if (pkt_len == packet_error) + if (rpl->mysql) { - rpl->buffer_size= 0; - return 0; - } + pkt_len= ma_net_safe_read(rpl->mysql); - /* EOF packet: - see https://mariadb.com/kb/en/library/eof_packet/ - Packet length must be less than 9 bytes, EOF header - is 0xFE. - */ - if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE) - { - rpl->buffer_size= 0; - return 0; - } + if (pkt_len == packet_error) + { + rpl->buffer_size= 0; + return 0; + } + + /* EOF packet: + see https://mariadb.com/kb/en/library/eof_packet/ + Packet length must be less than 9 bytes, EOF header + is 0xFE. + */ + if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE) + { + rpl->buffer_size= 0; + return 0; + } /* if ignore heartbeat flag was set, we ignore this record and continue to fetch next record. The first byte is always status byte (0x00) For event header description see https://mariadb.com/kb/en/library/2-binlog-event-header/ */ - if (rpl->flags & MARIADB_RPL_IGNORE_HEARTBEAT) - { - if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT) - continue; - } + if (rpl->flags & MARIADB_RPL_IGNORE_HEARTBEAT) + { + if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT) + continue; + } - rpl->buffer_size= pkt_len; - rpl->buffer= rpl->mysql->net.read_pos; + rpl->buffer_size= pkt_len; + ev= rpl->buffer= rpl->mysql->net.read_pos; + } else if (rpl->fp) { + char buf[EVENT_HEADER_OFS]; /* header */ + size_t rc; + uint32_t len= 0; + char *p= buf; + + memset(buf, 0, EVENT_HEADER_OFS); + if (fread(buf, 1, EVENT_HEADER_OFS, rpl->fp) != EVENT_HEADER_OFS) + { + rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Can't read event header"); + return NULL; + } + len= uint4korr(p + 9); + + if (rpl->buffer_size < len) + { + if (!(rpl->buffer= realloc(rpl->buffer, len))) + { + rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0); + return NULL; + } + } + + rpl->buffer_size= len; + memcpy(rpl->buffer, buf, EVENT_HEADER_OFS); + len-= EVENT_HEADER_OFS; + rc= fread(rpl->buffer + EVENT_HEADER_OFS, 1, len, rpl->fp); + if (rc != len) + { + rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Error while reading post header"); + return NULL; + } + ev= rpl->buffer; + } + ev_end= rpl->buffer + rpl->buffer_size; if (event) { @@ -244,34 +447,63 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT)); ma_init_alloc_root(&rpl_event->memroot, 8192, 0); } - rpl_event->checksum= uint4korr(rpl->buffer + rpl->buffer_size - 4); - rpl_event->ok= rpl->buffer[0]; - - /* CONC-470: add support for semi snychronous replication */ - if ((rpl_event->is_semi_sync= (rpl->buffer[1] == SEMI_SYNC_INDICATOR))) + if (rpl->mysql) { - rpl_event->semi_sync_flags= rpl->buffer[2]; - rpl->buffer+= 2; + rpl_event->ok= *ev++; + + /* CONC-470: add support for semi snychronous replication */ + if ((rpl_event->is_semi_sync= (*ev == SEMI_SYNC_INDICATOR))) + { + ev++; + rpl_event->semi_sync_flags= *ev++; + } } - rpl_event->timestamp= uint4korr(rpl->buffer + 1); - rpl_event->event_type= (unsigned char)*(rpl->buffer + 5); - rpl_event->server_id= uint4korr(rpl->buffer + 6); - rpl_event->event_length= uint4korr(rpl->buffer + 10); - rpl_event->next_event_pos= uint4korr(rpl->buffer + 14); - rpl_event->flags= uint2korr(rpl->buffer + 18); + /* check sum verification: + check sum will be calculated from begin of binlog header + */ + checksum_start= ev; - ev= rpl->buffer + EVENT_HEADER_OFS; + /****************************************************************** + Binlog event header: + + All binary log events have the same header: + - uint32_t timestamp: creation time + - uint8_t event_type: type code of the event + - uint32_t server_id: server which created the event + - uint32_t event_len: length of the event. If checksum is + enabled, the length also include 4 bytes + of checksum + - uint32_t next_pos: Position of next binary log event + - uint16_t flags: flags - if (rpl->use_checksum) - { - rpl_event->checksum= *(ev + rpl_event->event_length - 4); - rpl_event->event_length-= 4; - } + The size of binlog event header must match the header size returned + by FORMAT_DESCIPTION_EVENT. In version 4 it is always 19. + ********************************************************************/ + rpl_event->timestamp= uint4korr(ev); + ev+= 4; + rpl_event->event_type= (unsigned char)*ev++; + rpl_event->server_id= uint4korr(ev); + ev+= 4; + rpl_event->event_length= uint4korr(ev); + ev+= 4; + rpl_event->next_event_pos= uint4korr(ev); + ev+= 4; + rpl_event->flags= uint2korr(ev); + ev+=2; + rpl_event->checksum= 0; + + /* start of post_header */ + ev_start= ev; switch(rpl_event->event_type) { + case UNKNOWN_EVENT: + case SLAVE_EVENT: + return rpl_event; + break; case HEARTBEAT_LOG_EVENT: + /* post header */ rpl_event->event.heartbeat.timestamp= uint4korr(ev); ev+= 4; rpl_event->event.heartbeat.next_position= uint4korr(ev); @@ -279,28 +511,151 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN rpl_event->event.heartbeat.type= (uint8_t)*ev; ev+= 1; rpl_event->event.heartbeat.flags= uint2korr(ev); + ev+= 2; + break; + + case BEGIN_LOAD_QUERY_EVENT: + /* Post header */ + rpl_event->event.begin_load_query.file_id= uint4korr(ev); + ev+= 4; + + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type]); + + /* Payload: query_data (zero terminated) */ + if (rpl_set_data(event, &rpl_event->event.begin_load_query.data, ev, strlen((char *)ev))) + goto mem_error; + ev+= strlen((char *)ev) + 1; + break; + + case START_ENCRYPTION_EVENT: + /* Post header */ + rpl_event->event.start_encryption.scheme= *ev++; + rpl_event->event.start_encryption.key_version= uint4korr(ev); + ev+= 4; + memcpy(rpl_event->event.start_encryption.nonce, ev, 12); + ev+= 12; + + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + break; + + case EXECUTE_LOAD_QUERY_EVENT: + { + uint16_t status_len; + uint8_t schema_len; + + /* Post header */ + rpl_event->event.execute_load_query.thread_id= uint4korr(ev); + ev+= 4; + rpl_event->event.execute_load_query.execution_time= uint4korr(ev); + ev+= 4; + schema_len= *ev++; + rpl_event->event.execute_load_query.error_code= uint2korr(ev); + ev+= 2; + status_len= uint2korr(ev); + ev+= 2; + rpl_event->event.execute_load_query.file_id= uint4korr(ev); + ev+= 4; + rpl_event->event.execute_load_query.ofs1= uint4korr(ev); + ev+= 4; + rpl_event->event.execute_load_query.ofs2= uint4korr(ev); + ev+= 4; + rpl_event->event.execute_load_query.duplicate_flag= *ev++; + + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + + /* Payload: + - status variables + - query schema + - statement */ + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.execute_load_query.status_vars, ev, status_len)) + goto mem_error; + ev+= status_len; + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.execute_load_query.schema, ev, schema_len)) + goto mem_error; + ev+= (schema_len + 1); + len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1); + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.execute_load_query.statement, ev, len)) + goto mem_error; + ev+= len; + break; + } case BINLOG_CHECKPOINT_EVENT: + /* Post header */ len= uint4korr(ev); ev+= 4; - if (rpl_alloc_string(rpl_event, &rpl_event->event.checkpoint.filename, ev, len) || + + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + + /* payload: filname */ + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.checkpoint.filename, ev, len) || ma_set_rpl_filename(rpl, ev, len)) goto mem_error; + ev+= len; break; + case FORMAT_DESCRIPTION_EVENT: - rpl_event->event.format_description.format = uint2korr(ev); + /* + FORMAT_DESCRIPTION_EVENT: + + Header: + uint<2> binary log version + (we support only version 4) + str<50> server version, right padded with \0 + uint<4> timestamp + uint<1> header length + byte post header lengths. Length can be calculated by + ev_end - end - 1 - 4 + uint<1> check sum algorithm byte + uint<4> CRC32 checksum + */ + + + /* We don't speak bing log protocol version < 4, in case it's an older + protocol version an error will be returned. */ + if ((rpl_event->event.format_description.format = uint2korr(ev)) < 4) + { + mariadb_free_rpl_event(rpl_event); + my_set_error(rpl->mysql, CR_ERR_UNSUPPORTED_BINLOG_FORMAT, SQLSTATE_UNKNOWN, 0, + rpl->filename_length, rpl->filename, rpl->start_position, uint2korr(ev)); + return 0; + } + ev+= 2; rpl_event->event.format_description.server_version = (char *)(ev); ev+= 50; rpl_event->event.format_description.timestamp= uint4korr(ev); ev+= 4; - rpl->fd_header_len= rpl_event->event.format_description.header_len= (uint8_t)*ev; - ev= rpl->buffer + rpl->buffer_size - 5; - rpl->use_checksum= *ev; + rpl->fd_header_len= rpl_event->event.format_description.header_len= *ev; + ev+= 1; + /*Post header lengths: 1 byte for each event, non used events/gaps in enum should + have a zero value */ + len= ev_end - ev - 5; + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.format_description.post_header_lengths, ev, len)) + goto mem_error; + memset(rpl->post_header_len, 0, ENUM_END_EVENT); + memcpy(rpl->post_header_len, rpl_event->event.format_description.post_header_lengths.str, + MIN(len, ENUM_END_EVENT)); + ev+= len; + if ((rpl->use_checksum= *ev++)) + { + rpl_event->checksum= uint4korr(ev); + ev+= 4; + } break; + + case QUERY_COMPRESSED_EVENT: case QUERY_EVENT: { size_t db_len, status_len; + + /*********** + post_header + ***********/ rpl_event->event.query.thread_id= uint4korr(ev); ev+= 4; rpl_event->event.query.seconds= uint4korr(ev); @@ -311,56 +666,132 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN ev+= 2; status_len= uint2korr(ev); ev+= 2; - if (rpl_alloc_string(rpl_event, &rpl_event->event.query.status, ev, status_len)) + + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + + /******* + payload + ******/ + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.query.status, ev, status_len)) goto mem_error; ev+= status_len; - if (rpl_alloc_string(rpl_event, &rpl_event->event.query.database, ev, db_len)) + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.query.database, ev, db_len)) goto mem_error; ev+= db_len + 1; /* zero terminated */ - /* calculate statement size: buffer + buffer_size - current_ofs (ev) - crc_size */ - len= (size_t)(rpl->buffer + rpl->buffer_size - ev - (rpl->use_checksum ? 4 : 0)); - if (rpl_alloc_string(rpl_event, &rpl_event->event.query.statement, ev, len)) - goto mem_error; + len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1); + + if (rpl_event->event_type == QUERY_EVENT) { + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.query.statement, ev, len)) + goto mem_error; + } + + if (rpl_event->event_type == QUERY_COMPRESSED_EVENT) + { + uint8_t header_size= 0, + algorithm= 0; + + uint32_t uncompressed_len= get_compression_info(ev, &algorithm, &header_size); + + len-= header_size; + if (!(rpl_event->event.query.statement.str = ma_calloc_root(&rpl_event->memroot, uncompressed_len))) + goto mem_error; + + if ((uncompress((Bytef*)rpl_event->event.query.statement.str, (uLongf *)&uncompressed_len, + (Bytef*)ev + header_size, (uLongf)*&len) != Z_OK)) + { + mariadb_free_rpl_event(rpl_event); + SET_CLIENT_ERROR(rpl->mysql, CR_ERR_NET_UNCOMPRESS, SQLSTATE_UNKNOWN, 0); + return 0; + } + rpl_event->event.query.statement.length= uncompressed_len; + } break; } case TABLE_MAP_EVENT: + { + /* + TABLE_MAP_EVENT: + + Header: + uint<6> table_id + uint<2> unused + + Payload: + uint<1> schema_name length + str schema_name (zero terminated) + uint<1> table_name length + str table_name (zero terminated) + int column_count + byte column_types[column_count], 1 byte for + each column + int meta_data_size + byte netadata{metadata_size] + byte bit fields, indicating which column can be null + n= (column_count + 7) / 8; + */ + + /* Post header length */ rpl_event->event.table_map.table_id= uint6korr(ev); - ev+= 8; + ev+= 8; /* 2 byte in header ignored */ + + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + + /* Payload */ len= *ev; ev++; - if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.database, ev, len)) + + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.database, ev, len)) goto mem_error; - ev+= len + 1; + ev+= len + 1; /* Zero terminated */ + len= *ev; ev++; - if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.table, ev, len)) + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.table, ev, len)) goto mem_error; - ev+= len + 1; + ev+= len + 1; /* Zero terminated */ + rpl_event->event.table_map.column_count= mysql_net_field_length(&ev); len= rpl_event->event.table_map.column_count; - if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.column_types, ev, len)) + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.column_types, ev, len)) goto mem_error; + ev+= len; len= mysql_net_field_length(&ev); - if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.metadata, ev, len)) + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.metadata, ev, len)) goto mem_error; + ev+= len; + + len= ev_end - ev - (rpl->use_checksum ? 4 : 0); + if (rpl_set_data(rpl_event, &rpl_event->event.table_map.null_indicator, ev, len)) + goto mem_error; + ev+= len; + break; + case RAND_EVENT: rpl_event->event.rand.first_seed= uint8korr(ev); ev+= 8; rpl_event->event.rand.second_seed= uint8korr(ev); + ev+= 8; + break; + } + case INTVAR_EVENT: rpl_event->event.intvar.type= *ev; ev++; rpl_event->event.intvar.value= uint8korr(ev); + ev+= 8; break; + case USER_VAR_EVENT: len= uint4korr(ev); ev+= 4; - if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.name, ev, len)) + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.uservar.name, ev, len)) goto mem_error; ev+= len; if (!(rpl_event->event.uservar.is_null= (uint8)*ev)) @@ -372,55 +803,133 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN ev+= 4; len= uint4korr(ev); ev+= 4; - if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.value, ev, len)) + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.uservar.value, ev, len)) goto mem_error; ev+= len; if ((unsigned long)(ev - rpl->buffer) < rpl->buffer_size) rpl_event->event.uservar.flags= *ev; } break; - case START_ENCRYPTION_EVENT: - rpl_event->event.encryption.scheme= *ev; - ev++; - rpl_event->event.encryption.key_version= uint4korr(ev); - ev+= 4; - rpl_event->event.encryption.nonce= (char *)ev; - break; + case ANNOTATE_ROWS_EVENT: - len= (uint32)(rpl->buffer + rpl->buffer_size - (unsigned char *)ev - (rpl->use_checksum ? 4 : 0)); - if (rpl_alloc_string(rpl_event, &rpl_event->event.annotate_rows.statement, ev, len)) + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + + /* Payload */ + len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1); + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.annotate_rows.statement, ev, len)) goto mem_error; break; + case ROTATE_EVENT: + /* Payload */ rpl_event->event.rotate.position= uint8korr(ev); ev+= 8; + + len= ev_end - ev - 4; if (rpl_event->timestamp == 0 && rpl_event->flags & LOG_EVENT_ARTIFICIAL_F) { - const uint8_t header_size= 19; - len= rpl_event->event_length - header_size - 8; if (rpl->artificial_checksun) { - len-= 4; - int4store(ev + len, rpl_event->checksum); - rpl->artificial_checksun= 0; + int4store(ev_end - 4, rpl_event->checksum); + if (mariadb_connection(rpl->mysql)) + rpl->artificial_checksun= 0; } } - else - { - len= rpl_event->event_length - (ev - rpl->mysql->net.read_pos) - 1; - } - if (rpl_alloc_string(rpl_event, &rpl_event->event.rotate.filename, ev, len) || + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.rotate.filename, ev, len) || ma_set_rpl_filename(rpl, ev, len)) goto mem_error; + + ev+= len; break; + case XID_EVENT: + /* + XID_EVENT was generated if a transaction which modified tables was + committed. + + Header: + - uint64_t transaction number + */ + rpl_event->event.xid.transaction_nr= uint8korr(ev); break; - case STOP_EVENT: - /* nothing to do here */ + + case XA_PREPARE_LOG_EVENT: + /* + MySQL only! + + Header: + uint8_t one phase commit + uint32_t format_id + uint32_t length of gtrid + uint32_t length of bqual + + Payload: + char xid, where n is sum of gtrid and bqual lengths + */ + + rpl_event->event.xa_prepare_log.one_phase= *ev; + ev++; + rpl_event->event.xa_prepare_log.format_id= uint4korr(ev); + ev+= 4; + len= rpl_event->event.xa_prepare_log.gtrid_len= uint4korr(ev); + ev+= 4; + len+= rpl_event->event.xa_prepare_log.bqual_len= uint4korr(ev); + ev+= 4; + if (rpl_set_string_and_len(rpl_event, &rpl_event->event.xa_prepare_log.xid, ev, len)) + goto mem_error; break; + + case STOP_EVENT: + /* + STOP_EVENT - server shutdown or crash. It's always the last written + event after shutdown or after resuming from crash. + + After starting the server a new binary log file will be created, additionally + a ROTATE_EVENT will be appended to the old log file. + + No data to process. + */ + break; + + case ANONYMOUS_GTID_LOG_EVENT: + case PREVIOUS_GTIDS_LOG_EVENT: + /* + ANONYMOUS_GTID_LOG_EVENT, + PREVIOUS_GTIDS_LOG_EVENT (MySQL only) + + Header: + uint8_t flag: commit flag + uint64_t source_id: numerical representation of server's UUID + uint64_t sequence_nr: sequence number + */ + rpl_event->event.gtid_log.commit_flag= *ev; + ev++; + memcpy(rpl_event->event.gtid_log.source_id, ev, 16); + ev+= 16; + rpl_event->event.gtid_log.sequence_nr= uint8korr(ev); + break; + case GTID_EVENT: + /* + GTID_EVENT (MariaDB Only): + + A New transaction (BEGIN) was started, or a single transaction + (ddl) statement was executed. In case a single transaction was + executed, the FL_GROUP_COMMIT id flag is not set. + + Header: + uint64_t sequence_nr + uint64_t domain_id + uint8_t flags + + if (flags & FL_GROUP_COMMIT_D) + uint64_t commit_id + else + char[6] unused + */ rpl_event->event.gtid.sequence_nr= uint8korr(ev); ev+= 8; rpl_event->event.gtid.domain_id= uint4korr(ev); @@ -428,96 +937,208 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN rpl_event->event.gtid.flags= *ev; ev++; if (rpl_event->event.gtid.flags & FL_GROUP_COMMIT_ID) - rpl_event->event.gtid.commit_id= uint8korr(ev); - break; - case GTID_LIST_EVENT: - { - uint32 i; - rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev); - ev++; - if (!(rpl_event->event.gtid_list.gtid= (MARIADB_GTID *)ma_alloc_root(&rpl_event->memroot, sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt))) - goto mem_error; - for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++) { - rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev); - ev+= 4; - rpl_event->event.gtid_list.gtid[i].server_id= uint4korr(ev); - ev+= 4; - rpl_event->event.gtid_list.gtid[i].sequence_nr= uint8korr(ev); + rpl_event->event.gtid.commit_id= uint8korr(ev); ev+= 8; } + else + ev+= 6; break; - } - case WRITE_ROWS_EVENT_V1: - case WRITE_ROWS_EVENT: - case UPDATE_ROWS_EVENT_V1: - case UPDATE_ROWS_EVENT: - case DELETE_ROWS_EVENT_V1: - case DELETE_ROWS_EVENT: - if (rpl_event->event_type >= WRITE_ROWS_EVENT) + + case GTID_LIST_EVENT: + /* + GTID_LIST_EVENT (MariaDB only) + + Logged in every binlog to record the current replication state. + Consists of the last GTID seen for each replication domain. + + The Global Transaction ID, GTID for short, consists of three components: + replication domain id, server id and sequence nr + + Header: + uint32_t gtid_cnt - number of global transaction id's + + Payload: + for i=0; i < gtid_cnt; i++ + uint32_t domain_id + uint32_t server_id + uint64_t sequence_nr + */ + + rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev); + ev+=4; + + /* check post_header_length */ + DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + + /* Payload */ + if (rpl_event->event.gtid_list.gtid_cnt) { - rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT; + uint32 i; + if (!(rpl_event->event.gtid_list.gtid= + (MARIADB_GTID *)ma_calloc_root(&rpl_event->memroot, + sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt))) + goto mem_error; + for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++) + { + rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev); + ev+= 4; + rpl_event->event.gtid_list.gtid[i].server_id= uint4korr(ev); + ev+= 4; + rpl_event->event.gtid_list.gtid[i].sequence_nr= uint8korr(ev); + ev+= 8; + } } else - { + ev+=4; + break; + + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + case WRITE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + { + /* + WRITE/UPDATE/DELETE_ROWS_EVENT_V1 (MariaDB only) + WRITE/UPDATE/DELETE_ROWS_EVENT_COMPRESSED_V1 (MariaDB only) + WRITE/UPDATE/DELETE_ROWS_EVENT (MySQL only) + + ROWS events are written for row based replicatoin if data is + inserted, deleted or updated. + + Header + uint<6> table_id + uint<2> flags + + if MySQL (version 2) + uint<<2> extra_data_length + char[extra_data_length] extra_data + + uint number of columns + uint8_t Bitmap of columns used. + n= (number of columns + 7) / 8 + + if UPDATE_ROWS_v1 (MariaDB) + uint8_t columns updated + n= (number of columns + 7) / 8 + + uint7_t null bitmap + n= (number of columns + 7) / 8 + + str Column data. If event is not compressed, + length must be calculated. + + if UPDATE_ROWS_v1 (MariaDB) + byte Null bitmap update + n= (number of columns + 7) / 8 + str Update column data + + */ + + uint32_t bitmap_len= 0; + + if (rpl_event->event_type >= WRITE_ROWS_COMPRESSED_EVENT) { + return rpl_event; + rpl_event->event.rows.compressed= 1; + rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_COMPRESSED_EVENT; + } else if (rpl_event->event_type >= WRITE_ROWS_COMPRESSED_EVENT_V1) { + rpl_event->event.rows.compressed= 1; + rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_COMPRESSED_EVENT_V1; + } else if (rpl_event->event_type >= WRITE_ROWS_EVENT) + rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT; + else rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT_V1; - } - if (rpl->fd_header_len == 6) - { - rpl_event->event.rows.table_id= uint4korr(ev); - ev+= 4; - } else { - rpl_event->event.rows.table_id= uint6korr(ev); - ev+= 6; - } + + rpl_event->event.rows.table_id= uint6korr(ev); + ev+= 6; + + /* TODO: Flags not defined yet in rpl.h */ rpl_event->event.rows.flags= uint2korr(ev); ev+= 2; + /* ROWS_EVENT V2 has the extra-data field. See also: https://dev.mysql.com/doc/internals/en/rows-event.html */ - if (rpl_event->event_type >= WRITE_ROWS_EVENT) + if (IS_ROW_VERSION2(rpl_event->event_type)) { - rpl_event->event.rows.extra_data_size= uint2korr(ev) - 2; + rpl_event->event.rows.extra_data_size= uint2korr(ev); ev+= 2; - if (rpl_event->event.rows.extra_data_size > 0) + if (rpl_event->event.rows.extra_data_size - 2 > 0) { if (!(rpl_event->event.rows.extra_data = - (char *)ma_alloc_root(&rpl_event->memroot, - rpl_event->event.rows.extra_data_size))) + (char *)ma_calloc_root(&rpl_event->memroot, + rpl_event->event.rows.extra_data_size - 2))) goto mem_error; memcpy(rpl_event->event.rows.extra_data, ev, - rpl_event->event.rows.extra_data_size); + rpl_event->event.rows.extra_data_size -2); ev+= rpl_event->event.rows.extra_data_size; } } - len= rpl_event->event.rows.column_count= mysql_net_field_length(&ev); - if (!len) - break; - if (!(rpl_event->event.rows.column_bitmap = - (char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8))) + /* END_ROWS_EVENT_V2 */ + + /* number of columns */ + rpl_event->event.rows.column_count= mysql_net_field_length(&ev); + bitmap_len= (rpl_event->event.rows.column_count + 7) / 8; + DBUG_ASSERT(rpl_event->event.rows.column_count > 0); + + /* columns updated bitmap */ + if (rpl_set_data(rpl_event, &rpl_event->event.rows.column_bitmap, ev, bitmap_len)) goto mem_error; - memcpy(rpl_event->event.rows.column_bitmap, ev, (len + 7) / 8); - ev+= (len + 7) / 8; + ev+= bitmap_len; + if (rpl_event->event_type == UPDATE_ROWS_EVENT_V1 || - rpl_event->event_type == UPDATE_ROWS_EVENT) + rpl_event->event_type == UPDATE_ROWS_COMPRESSED_EVENT_V1) { - if (!(rpl_event->event.rows.column_update_bitmap = - (char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8))) + if (rpl_set_data(rpl_event, &rpl_event->event.rows.column_update_bitmap, + ev, bitmap_len)) goto mem_error; - memcpy(rpl_event->event.rows.column_update_bitmap, ev, (len + 7) / 8); - ev+= (len + 7) / 8; + ev+= bitmap_len; } - len= (rpl->buffer + rpl_event->event_length + EVENT_HEADER_OFS - rpl->fd_header_len) - ev; - if ((rpl_event->event.rows.row_data_size= len)) + + len= ev_end - ev - (rpl->use_checksum ? 4 : 0); + + if (rpl_event->event.rows.compressed) { + uint8_t algorithm= 0, header_size= 0; + uint32_t uncompressed_len= get_compression_info(ev, &algorithm, &header_size); + + if (!(rpl_event->event.rows.row_data = ma_calloc_root(&rpl_event->memroot, uncompressed_len))) + goto mem_error; + + if ((uncompress((Bytef*)rpl_event->event.rows.row_data, (uLong *)&uncompressed_len, + (Bytef*)ev + header_size, (uLongf )len) != Z_OK)) + { + my_set_error(rpl->mysql, CR_ERR_NET_UNCOMPRESS, SQLSTATE_UNKNOWN, 0, rpl->filename_length, + rpl->filename, rpl->start_position); + mariadb_free_rpl_event(rpl_event); + return 0; + } + rpl_event->event.rows.row_data_size= uncompressed_len; + ev+= header_size + len; + } else { + rpl_event->event.rows.row_data_size= ev_end - ev - (rpl->use_checksum ? 4 : 0); if (!(rpl_event->event.rows.row_data = - (char *)ma_alloc_root(&rpl_event->memroot, rpl_event->event.rows.row_data_size))) + (char *)ma_calloc_root(&rpl_event->memroot, rpl_event->event.rows.row_data_size))) goto mem_error; memcpy(rpl_event->event.rows.row_data, ev, rpl_event->event.rows.row_data_size); } break; + } default: + /* We need to report an error if this event can't be ignored */ + if (!(rpl_event->flags & LOG_EVENT_IGNORABLE_F)) + { + mariadb_free_rpl_event(rpl_event); + my_set_error(rpl->mysql, CR_UNKNOWN_BINLOG_EVENT, SQLSTATE_UNKNOWN, 0, + rpl->filename_length, rpl->filename, rpl->start_position, rpl_event->event_type); + return 0; + } return rpl_event; break; } @@ -540,6 +1161,26 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN goto net_error; } + if (rpl->use_checksum && !rpl_event->checksum) + { + rpl_event->checksum= uint4korr(ev_end -4); + + if (rpl_event->checksum && rpl->verify_checksum) + { + unsigned long crc= crc32_z(0L, Z_NULL, 0); + crc= crc32_z(crc, checksum_start, ev_end - checksum_start - 4); + if (rpl_event->checksum != (uint32_t)crc) + { + my_set_error(rpl->mysql, CR_ERR_CHECKSUM_VERIFICATION_ERROR, SQLSTATE_UNKNOWN, 0, + rpl->filename_length, rpl->filename, rpl->start_position, + rpl_event->checksum, (uint32_t)crc); + mariadb_free_rpl_event(rpl_event); + return 0; + } + } + } + + //rpl->start_position= rpl_event->next_event_pos; return rpl_event; } mem_error: @@ -558,6 +1199,11 @@ void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl) return; if (rpl->filename) free((void *)rpl->filename); + if (rpl->fp) + { + free(rpl->buffer); + fclose(rpl->fp); + } free(rpl); return; } @@ -608,6 +1254,11 @@ int mariadb_rpl_optionsv(MARIADB_RPL *rpl, rpl->start_position= va_arg(ap, unsigned long); break; } + case MARIADB_RPL_VERIFY_CHECKSUM: + { + rpl->verify_checksum= va_arg(ap, uint32_t); + break; + } default: rc= -1; goto end;