diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..e04c0846 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "external/crypto_wrapper"] + path = external/crypto_wrapper + url = https://github.com/9EOR9/crypto_wrapper diff --git a/CMakeLists.txt b/CMakeLists.txt index 69036045..27346ba2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,8 @@ MACRO(ADD_OPTION _name _text _default) ENDIF() ENDMACRO() +ADD_OPTION(WITH_CRYPTO "build with cryptograpy support" OFF) + ### Options ### IF(NOT WIN32) ADD_OPTION(WITH_MYSQLCOMPAT "creates libmysql* symbolic links" OFF) @@ -314,6 +316,9 @@ IF(NOT WITH_SSL STREQUAL "OFF") IF(WIN32) CHECK_INCLUDE_FILES (${OPENSSL_INCLUDE_DIR}/openssl/applink.c HAVE_OPENSSL_APPLINK_C) ENDIF() + IF(WITH_CRYPTO) + SET(WITH_CRYPTO=openssl) + ENDIF() INCLUDE_DIRECTORIES(BEFORE ${OPENSSL_INCLUDE_DIR}) @@ -340,6 +345,9 @@ IF(NOT WITH_SSL STREQUAL "OFF") SET(SSL_LIBRARIES ${GNUTLS_LIBRARY}) SET(TLS_LIBRARY_VERSION "GnuTLS ${GNUTLS_VERSION_STRING}") INCLUDE_DIRECTORIES(${GNUTLS_INCLUDE_DIR}) + IF(WITH_CRYPTO) + SET(WITH_CRYPTO=nettle) + ENDIF() ELSE() MESSAGE(FATAL_ERROR "GnuTLS not found") ENDIF() @@ -353,6 +361,9 @@ IF(NOT WITH_SSL STREQUAL "OFF") INCLUDE_DIRECTORIES("${CC_SOURCE_DIR}/plugins/pvio/") SET(SSL_LIBRARIES secur32) SET(TLS_LIBRARY_VERSION "Schannel ${CMAKE_SYSTEM_VERSION}") + IF(WITH_CRYPTO) + SET(WITH_CRYPTO=schannel) + ENDIF() ENDIF() ENDIF() MESSAGE1(TLS_LIBRARY_VERSION "TLS library/version: ${TLS_LIBRARY_VERSION}") @@ -360,6 +371,12 @@ IF(NOT WITH_SSL STREQUAL "OFF") MARK_AS_ADVANCED(SSL_SOURCES) ENDIF() +IF(WITH_CRYPTO) + ADD_DEFINITIONS(-DHAVE_CRYPTO=1) + ADD_SUBDIRECTORY(${CC_SOURCE_DIR}/external/crypto_wrapper) + INCLUDE_DIRECTORIES(${CC_SOURCE_DIR}/external/crypto_wrapper/include) +ENDIF() + SET(ENABLED_LOCAL_INFILE "AUTO" CACHE STRING "If we should should enable LOAD DATA LOCAL by default (OFF/ON/AUTO)") MARK_AS_ADVANCED(ENABLED_LOCAL_INFILE) IF (ENABLED_LOCAL_INFILE MATCHES "^(0|FALSE)$") diff --git a/external/crypto_wrapper b/external/crypto_wrapper new file mode 160000 index 00000000..006ee66c --- /dev/null +++ b/external/crypto_wrapper @@ -0,0 +1 @@ +Subproject commit 006ee66ca7e41115a0c3042cbdfddb8ee78369f3 diff --git a/include/ma_decimal.h b/include/ma_decimal.h new file mode 100644 index 00000000..e1f4fbf1 --- /dev/null +++ b/include/ma_decimal.h @@ -0,0 +1,64 @@ +/* Copyright (C) 2000 Sergei Golubchik +*/ + +#ifndef _decimal_h +#define _decimal_h + +typedef enum {TRUNCATE=0, HALF_EVEN, HALF_UP, CEILING, FLOOR} decimal_round_mode; +typedef int32 decimal_digit; + +typedef struct st_decimal { + int intg, frac, len; + my_bool sign; + decimal_digit *buf; +} decimal; + +int decimal2string(decimal *from, char *to, int *to_len); +int bin2decimal(const char *from, decimal *to, int precision, int scale); + +int decimal_size(int precision, int scale); +int decimal_bin_size(int precision, int scale); +int decimal_result_size(decimal *from1, decimal *from2, char op, int param); + + +/* set a decimal to zero */ + +#define decimal_make_zero(dec) do { \ + (dec)->buf[0]=0; \ + (dec)->intg=1; \ + (dec)->frac=0; \ + (dec)->sign=0; \ + } while(0) + +/* + returns the length of the buffer to hold string representation + of the decimal (including decimal dot, possible sign and \0) +*/ + +#define decimal_string_size(dec) ((dec)->intg + (dec)->frac + ((dec)->frac > 0) + 2) + +/* negate a decimal */ +#define decimal_neg(dec) do { (dec)->sign^=1; } while(0) + +/* + conventions: + + decimal_smth() == 0 -- everything's ok + decimal_smth() <= 1 -- result is usable, but precision loss is possible + decimal_smth() <= 2 -- result can be unusable, most significant digits + could've been lost + decimal_smth() > 2 -- no result was generated +*/ + +#define E_DEC_OK 0 +#define E_DEC_TRUNCATED 1 +#define E_DEC_OVERFLOW 2 +#define E_DEC_DIV_ZERO 4 +#define E_DEC_BAD_NUM 8 +#define E_DEC_OOM 16 + +#define E_DEC_ERROR 31 +#define E_DEC_FATAL_ERROR 30 + +#endif + diff --git a/include/ma_global.h b/include/ma_global.h index b8ef7e16..71bd3ea7 100644 --- a/include/ma_global.h +++ b/include/ma_global.h @@ -749,6 +749,7 @@ typedef char bool; /* Ordinary boolean values 0 1 */ /* Optimized store functions for Intel x86 */ #define int1store(T,A) *((int8*) (T)) = (A) #define uint1korr(A) (*(((uint8*)(A)))) +#define sint1korr(A) (*(((int8*)(A)))) #if defined(__i386__) || defined(_WIN32) #define sint2korr(A) (*((int16 *) (A))) #define sint3korr(A) ((int32) ((((uchar) (A)[2]) & 128) ? \ diff --git a/include/mariadb_rpl.h b/include/mariadb_rpl.h index 5534dca7..21f3adf4 100644 --- a/include/mariadb_rpl.h +++ b/include/mariadb_rpl.h @@ -81,7 +81,12 @@ enum mariadb_rpl_option { MARIADB_RPL_GTID_CALLBACK, /* GTID callback function */ MARIADB_RPL_GTID_DATA, /* GTID data */ MARIADB_RPL_BUFFER, - MARIADB_RPL_VERIFY_CHECKSUM + MARIADB_RPL_VERIFY_CHECKSUM, + MARIADB_RPL_UNCOMPRESS, + MARIADB_RPL_HOST, + MARIADB_RPL_PORT, + MARIADB_RPL_EXTRACT_VALUES, + MARIADB_RPL_DECRYPTION_KEY }; /* Event types: From MariaDB Server sql/log_event.h */ @@ -164,6 +169,7 @@ enum mariadb_rpl_event { #define COMPLETE_ROWS_F 0x08 #define NO_CHECK_CONSTRAINT_CHECKS_F 0x80 + enum mariadb_rpl_status_code { Q_FLAGS2_CODE= 0x00, Q_SQL_MODE_CODE= 0x01, @@ -190,6 +196,21 @@ enum mariadb_rpl_status_code { Q_XID= 129 /* xid: 8 bytes */ }; +enum opt_metadata_field_type +{ + SIGNEDNESS = 1, + DEFAULT_CHARSET, + COLUMN_CHARSET, + COLUMN_NAME, + SET_STR_VALUE, + ENUM_STR_VALUE, + GEOMETRY_TYPE, + SIMPLE_PRIMARY_KEY, + PRIMARY_KEY_WITH_PREFIX, + ENUM_AND_SET_DEFAULT_CHARSET, + ENUM_AND_SET_COLUMN_CHARSET +}; + /* Log Event flags */ /* used in FOMRAT_DESCRIPTION_EVENT. Indicates if it @@ -251,14 +272,13 @@ typedef struct st_mariadb_gtid { unsigned long long sequence_nr; } MARIADB_GTID; + /* Generic replication handle */ typedef struct st_mariadb_rpl { unsigned int version; MYSQL *mysql; char *filename; uint32_t filename_length; - unsigned char *buffer; - unsigned long buffer_size; uint32_t server_id; unsigned long start_position; uint32_t flags; @@ -270,8 +290,35 @@ typedef struct st_mariadb_rpl { FILE *fp; uint32_t error_no; char error_msg[MYSQL_ERRMSG_SIZE]; + uint8_t uncompress; + char *host; + uint32_t port; + uint8_t extract_values; + char nonce[12]; + uint8_t encrypted; + char *decryption_key; } MARIADB_RPL; +typedef struct st_mariadb_rpl_value { + enum enum_field_types field_type; + uint8_t is_null; + uint8_t is_signed; + union { + int64_t ll; + uint64_t ull; + float f; + double d; + MYSQL_TIME tm; + MARIADB_STRING str; + } val; +} MARIADB_RPL_VALUE; + +typedef struct st_rpl_mariadb_row { + uint32_t column_count; + MARIADB_RPL_VALUE *columns; + struct st_rpl_mariadb_row *next; +} MARIADB_RPL_ROW; + /* Event header */ struct st_mariadb_rpl_rotate_event { unsigned long long position; @@ -324,10 +371,21 @@ struct st_mariadb_rpl_table_map_event { unsigned long long table_id; MARIADB_STRING database; MARIADB_STRING table; - unsigned int column_count; + uint32_t column_count; MARIADB_STRING column_types; MARIADB_STRING metadata; unsigned char *null_indicator; + unsigned char *signed_indicator; + MARIADB_CONST_DATA column_names; + MARIADB_CONST_DATA geometry_types; + uint32_t default_charset; + MARIADB_CONST_DATA column_charsets; + MARIADB_CONST_DATA simple_primary_keys; + MARIADB_CONST_DATA prefixed_primary_keys; + MARIADB_CONST_DATA set_values; + MARIADB_CONST_DATA enum_values; + uint8_t enum_set_default_charset; + MARIADB_CONST_DATA enum_set_column_charsets; }; struct st_mariadb_rpl_rand_event { @@ -386,6 +444,7 @@ struct st_mariadb_rpl_rows_event { size_t extra_data_size; void *extra_data; uint8_t compressed; + uint32_t row_count; }; struct st_mariadb_rpl_heartbeat_event { @@ -415,6 +474,7 @@ typedef struct st_mariadb_rpl_event MA_MEM_ROOT memroot; unsigned char *raw_data; size_t raw_data_size; + size_t raw_data_ofs; unsigned int checksum; char ok; enum mariadb_rpl_event event_type; @@ -423,7 +483,6 @@ typedef struct st_mariadb_rpl_event unsigned int event_length; unsigned int next_event_pos; unsigned short flags; - void *raw_data; /****************/ union { struct st_mariadb_rpl_rotate_event rotate; @@ -449,21 +508,41 @@ typedef struct st_mariadb_rpl_event /* Added in C/C 3.3.0 */ uint8_t is_semi_sync; uint8_t semi_sync_flags; + /* Added in C/C 3.3.5 */ + MARIADB_RPL *rpl; } MARIADB_RPL_EVENT; /* compression uses myisampack format */ #define myisam_uint1korr(B) ((uint8_t)(*B)) +#define myisam_sint1korr(B) ((int8_t)(*B)) #define myisam_uint2korr(B)\ ((uint16_t)(((uint16_t)(((const uchar*)(B))[1])) | ((uint16_t) (((const uchar*) (B))[0]) << 8))) +#define myisam_sint2korr(B)\ +((int16_t)(((int16_t)(((const uchar*)(B))[1])) | ((int16_t) (((const uchar*) (B))[0]) << 8))) #define myisam_uint3korr(B)\ ((uint32_t)(((uint32_t)(((const uchar*)(B))[2])) |\ (((uint32_t)(((const uchar*)(B))[1])) << 8) |\ (((uint32_t)(((const uchar*)(B))[0])) << 16))) +#define myisam_sint3korr(B)\ +((int32_t)(((int32_t)(((const uchar*)(B))[2])) |\ +(((int32_t)(((const uchar*)(B))[1])) << 8) |\ +(((int32_t)(((const uchar*)(B))[0])) << 16))) #define myisam_uint4korr(B)\ ((uint32_t)(((uint32_t)(((const uchar*)(B))[3])) |\ (((uint32_t)(((const uchar*)(B))[2])) << 8) |\ (((uint32_t)(((const uchar*) (B))[1])) << 16) |\ (((uint32_t)(((const uchar*) (B))[0])) << 24))) +#define myisam_sint4korr(B)\ +((int32_t)(((int32_t)(((const uchar*)(B))[3])) |\ +(((int32_t)(((const uchar*)(B))[2])) << 8) |\ +(((int32_t)(((const uchar*) (B))[1])) << 16) |\ +(((int32_t)(((const uchar*) (B))[0])) << 24))) +#define mi_uint5korr(B)\ +((uint64_t)(((uint32_t) (((const uchar*) (B))[4])) |\ +(((uint32_t) (((const uchar*) (B))[3])) << 8) |\ +(((uint32_t) (((const uchar*) (B))[2])) << 16) |\ +(((uint32_t) (((const uchar*) (B))[1])) << 24)) |\ +(((uint64_t) (((const uchar*) (B))[0])) << 32)) #define RPL_SAFEGUARD(rpl, event, condition) \ if (!(condition))\ @@ -485,8 +564,40 @@ if (!(condition))\ (a) == DELETE_ROWS_EVENT || (a) == WRITE_ROWS_COMPRESSED_EVENT ||\ (a) == UPDATE_ROWS_COMPRESSED_EVENT || (a) == DELETE_ROWS_COMPRESSED_EVENT) +#define IS_ROW_EVENT(a)\ +((a)->event_type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||\ +(a)->event_type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||\ +(a)->event_type == DELETE_ROWS_COMPRESSED_EVENT_V1 ||\ +(a)->event_type == WRITE_ROWS_EVENT_V1 ||\ +(a)->event_type == UPDATE_ROWS_EVENT_V1 ||\ +(a)->event_type == DELETE_ROWS_EVENT_V1 ||\ +(a)->event_type == WRITE_ROWS_EVENT ||\ +(a)->event_type == UPDATE_ROWS_EVENT ||\ +(a)->event_type == DELETE_ROWS_EVENT) + +static inline uint64_t uintNkorr(uint8_t len, u_char *p) +{ + switch (len) { + case 1: + return *p; + case 2: + return uint2korr(p); + case 3: + return uint3korr(p); + case 4: + return uint4korr(p); + case 8: + return uint8korr(p); + default: + return 0; + } +} + + /* Function prototypes */ MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version); +const char * STDCALL mariadb_rpl_error(MARIADB_RPL *rpl); +uint32_t STDCALL mariadb_rpl_errno(MARIADB_RPL *rpl); int mariadb_rpl_optionsv(MARIADB_RPL *rpl, enum mariadb_rpl_option, ...); int mariadb_rpl_get_optionsv(MARIADB_RPL *rpl, enum mariadb_rpl_option, ...); @@ -496,6 +607,111 @@ void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl); MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event); void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event); +MARIADB_RPL_ROW * STDCALL +mariadb_rpl_extract_rows(MARIADB_RPL *rpl, + MARIADB_RPL_EVENT *tm_event, + MARIADB_RPL_EVENT *row_event); + +/* Returned from get_latest_key_version() */ +#define ENCRYPTION_KEY_VERSION_INVALID (~(unsigned int)0) +#define ENCRYPTION_KEY_NOT_ENCRYPTED (0) + +#define ENCRYPTION_KEY_SYSTEM_DATA 1 +#define ENCRYPTION_KEY_TEMPORARY_DATA 2 + +/* Returned from get_key() */ +#define ENCRYPTION_KEY_BUFFER_TOO_SMALL (100) + +#define ENCRYPTION_FLAG_DECRYPT 0 +#define ENCRYPTION_FLAG_ENCRYPT 1 +#define ENCRYPTION_FLAG_NOPAD 2 + +struct st_mariadb_encryption { + int interface_version; /**< version plugin uses */ + + /********************* KEY MANAGEMENT ***********************************/ + + /** + Function returning latest key version for a given key id. + + @return A version or ENCRYPTION_KEY_VERSION_INVALID to indicate an error. + */ + unsigned int (*get_latest_key_version)(unsigned int key_id); + + /** + Function returning a key for a key version + + @param key_id The requested key id + @param version The requested key version + @param key The key will be stored there. Can be NULL - + in which case no key will be returned + @param key_length in: key buffer size + out: the actual length of the key + + This method can be used to query the key length - the required + buffer size - by passing key==NULL. + + If the buffer size is less than the key length the content of the + key buffer is undefined (the plugin is free to partially fill it with + the key data or leave it untouched). + + @return 0 on success, or + ENCRYPTION_KEY_VERSION_INVALID, ENCRYPTION_KEY_BUFFER_TOO_SMALL + or any other non-zero number for errors + */ + unsigned int (*get_key)(unsigned int key_id, unsigned int version, + unsigned char *key, unsigned int *key_length); + + /********************* ENCRYPTION **************************************/ + /* + The caller uses encryption as follows: + 1. Create the encryption context object of the crypt_ctx_size() bytes. + 2. Initialize it with crypt_ctx_init(). + 3. Repeat crypt_ctx_update() until there are no more data to encrypt. + 4. Write the remaining output bytes and destroy the context object + with crypt_ctx_finish(). + */ + + /** + Returns the size of the encryption context object in bytes + */ + unsigned int (*crypt_ctx_size)(unsigned int key_id, unsigned int key_version); + /** + Initializes the encryption context object. + */ + int (*crypt_ctx_init)(void *ctx, const unsigned char *key, unsigned int klen, + const unsigned char *iv, unsigned int ivlen, int flags, + unsigned int key_id, unsigned int key_version); + /** + Processes (encrypts or decrypts) a chunk of data + + Writes the output to th dst buffer. note that it might write + more bytes that were in the input. or less. or none at all. + */ + int (*crypt_ctx_update)(void *ctx, const unsigned char *src, + unsigned int slen, unsigned char *dst, + unsigned int *dlen); + /** + Writes the remaining output bytes and destroys the encryption context + + crypt_ctx_update might've cached part of the output in the context, + this method will flush these data out. + */ + int (*crypt_ctx_finish)(void *ctx, unsigned char *dst, unsigned int *dlen); + /** + Returns the length of the encrypted data + + It returns the exact length, given only the source length. + Which means, this API only supports encryption algorithms where + the length of the encrypted data only depends on the length of the + input (a.k.a. compression is not supported). + */ + unsigned int (*encrypted_length)(unsigned int slen, unsigned int key_id, + unsigned int key_version); +}; + + + #ifdef __cplusplus } #endif diff --git a/include/mysql.h b/include/mysql.h index ba3063d4..94a651b2 100644 --- a/include/mysql.h +++ b/include/mysql.h @@ -68,6 +68,12 @@ typedef struct st_ma_const_string size_t length; } MARIADB_CONST_STRING; +typedef struct st_ma_const_data +{ + const unsigned char *data; + size_t length; +} MARIADB_CONST_DATA; + #ifndef ST_MA_USED_MEM_DEFINED #define ST_MA_USED_MEM_DEFINED diff --git a/libmariadb/CMakeLists.txt b/libmariadb/CMakeLists.txt index 070fdc95..b3a090bd 100644 --- a/libmariadb/CMakeLists.txt +++ b/libmariadb/CMakeLists.txt @@ -33,6 +33,7 @@ SET(MARIADB_LIB_SYMBOLS mariadb_rpl_fetch mariadb_rpl_optionsv mariadb_rpl_get_optionsv + mariadb_rpl_extract_rows mariadb_rpl_init_ex mariadb_free_rpl_event mariadb_field_attr @@ -303,6 +304,7 @@ SET(LIBMARIADB_SOURCES ${LIBMARIADB_SOURCES} ${CC_SOURCE_DIR}/plugins/auth/my_auth.c ma_array.c ma_charset.c +ma_decimal.c ma_hashtbl.c ma_net.c mariadb_charset.c @@ -433,9 +435,12 @@ IF(WIN32) "FILE_DESCRIPTION:Dynamic lib for client/server communication") ENDIF() +IF(WITH_CRYPTO) + SET(CRYPTO_LIBS cw_crypt) +ENDIF() ADD_LIBRARY(mariadbclient STATIC ${MARIADB_OBJECTS} ${EMPTY_FILE}) -TARGET_LINK_LIBRARIES(mariadbclient ${SYSTEM_LIBS}) +TARGET_LINK_LIBRARIES(mariadbclient ${SYSTEM_LIBS} ${CRYPTO_LIBS}) IF(UNIX) ADD_LIBRARY(libmariadb SHARED ${libmariadb_RC} ${MARIADB_OBJECTS} ${EMPTY_FILE}) @@ -445,7 +450,7 @@ ELSE() SET_TARGET_PROPERTIES(libmariadb PROPERTIES LINKER_LANGUAGE C) ENDIF() -TARGET_LINK_LIBRARIES(libmariadb LINK_PRIVATE ${SYSTEM_LIBS}) +TARGET_LINK_LIBRARIES(libmariadb LINK_PRIVATE ${SYSTEM_LIBS} ${CRYPTO_LIBS}) SIGN_TARGET(libmariadb) diff --git a/libmariadb/ma_decimal.c b/libmariadb/ma_decimal.c new file mode 100644 index 00000000..daf4b797 --- /dev/null +++ b/libmariadb/ma_decimal.c @@ -0,0 +1,479 @@ +/* Copyright (C) 2004 Sergei Golubchik + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Library General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Library General Public License for more details. + + You should have received a copy of the GNU Library General Public + License along with this library; if not see + or write to the Free Software Foundation, Inc., + 51 Franklin St., Fifth Floor, Boston, MA 02110, USA +*/ + +/* +======================================================================= + NOTE: this library implements SQL standard "exact numeric" type + and is not at all generic, but rather intentinally crippled to + follow the standard :) +======================================================================= + Quoting the standard + (SQL:2003, Part 2 Foundations, aka ISO/IEC 9075-2:2003) + +4.4.2 Characteristics of numbers, page 27: + + An exact numeric type has a precision P and a scale S. P is a positive + integer that determines the number of significant digits in a + particular radix R, where R is either 2 or 10. S is a non-negative + integer. Every value of an exact numeric type of scale S is of the + form n*10^{-S}, where n is an integer such that ?-R^P <= n <= R^P. + + [...] + + If an assignment of some number would result in a loss of its most + significant digit, an exception condition is raised. If least + significant digits are lost, implementation-defined rounding or + truncating occurs, with no exception condition being raised. + + [...] + + Whenever an exact or approximate numeric value is assigned to an exact + numeric value site, an approximation of its value that preserves + leading significant digits after rounding or truncating is represented + in the declared type of the target. The value is converted to have the + precision and scale of the target. The choice of whether to truncate + or round is implementation-defined. + + [...] + + All numeric values between the smallest and the largest value, + inclusive, in a given exact numeric type have an approximation + obtained by rounding or truncation for that type; it is + implementation-defined which other numeric values have such + approximations. + +5.3 , page 143 + + ::= + [ [ ] ] + | + +6.1 , page 165: + + 19) The of an shall not be greater than + the of the . + + 20) For the s DECIMAL and NUMERIC: + + a) The maximum value of is implementation-defined. + shall not be greater than this value. + b) The maximum value of is implementation-defined. + shall not be greater than this maximum value. + + 21) NUMERIC specifies the data type exact numeric, with the decimal + precision and scale specified by the and . + + 22) DECIMAL specifies the data type exact numeric, with the decimal + scale specified by the and the implementation-defined + decimal precision equal to or greater than the value of the + specified . + +6.26 , page 241: + + 1) If the declared type of both operands of a dyadic arithmetic + operator is exact numeric, then the declared type of the result is + an implementation-defined exact numeric type, with precision and + scale determined as follows: + + a) Let S1 and S2 be the scale of the first and second operands + respectively. + b) The precision of the result of addition and subtraction is + implementation-defined, and the scale is the maximum of S1 and S2. + c) The precision of the result of multiplication is + implementation-defined, and the scale is S1 + S2. + d) The precision and scale of the result of division are + implementation-defined. +*/ + +#include +#include /* for my_alloca */ +#include +#include +#include +#include + +#ifdef WIN32 +#define alloca _malloca +#endif + +typedef decimal_digit dec1; +typedef longlong dec2; + +#define unlikely(A) (A) +#define DIG_PER_DEC1 9 +#define DIG_MASK 100000000 +#define DIG_BASE 1000000000 +#define DIG_BASE2 LL(1000000000000000000) +#define ROUND_UP(X) (((X)+DIG_PER_DEC1-1)/DIG_PER_DEC1) +static const dec1 powers10[DIG_PER_DEC1+1]={ + 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000}; +static const int dig2bytes[DIG_PER_DEC1+1]={0, 1, 1, 2, 2, 3, 3, 4, 4, 4}; + +#define sanity(d) DBUG_ASSERT((d)->len >0 && ((d)->buf[0] | \ + (d)->buf[(d)->len-1] | 1)) + +#define FIX_INTG_FRAC_ERROR(len, intg1, frac1, error) \ + do \ + { \ + if (unlikely(intg1+frac1 > (len))) \ + { \ + if (unlikely(intg1 > (len))) \ + { \ + intg1=(len); \ + frac1=0; \ + error=E_DEC_OVERFLOW; \ + } \ + else \ + { \ + frac1=(len)-intg1; \ + error=E_DEC_TRUNCATED; \ + } \ + } \ + else \ + error=E_DEC_OK; \ + } while(0) + +#define ADD(to, from1, from2, carry) /* assume carry <= 1 */ \ + do \ + { \ + dec1 a=(from1)+(from2)+(carry); \ + if (((carry)= a >= DIG_BASE)) /* no division here! */ \ + a-=DIG_BASE; \ + (to)=a; \ + } while(0) + +#define ADD2(to, from1, from2, carry) \ + do \ + { \ + dec1 a=(from1)+(from2)+(carry); \ + if (((carry)= a >= DIG_BASE)) \ + a-=DIG_BASE; \ + if (unlikely(a >= DIG_BASE)) \ + { \ + a-=DIG_BASE; \ + carry++; \ + } \ + (to)=a; \ + } while(0) + +#define SUB(to, from1, from2, carry) /* to=from1-from2 */ \ + do \ + { \ + dec1 a=(from1)-(from2)-(carry); \ + if (((carry)= a < 0)) \ + a+=DIG_BASE; \ + (to)=a; \ + } while(0) + +#define SUB2(to, from1, from2, carry) /* to=from1-from2 */ \ + do \ + { \ + dec1 a=(from1)-(from2)-(carry); \ + if (((carry)= a < 0)) \ + a+=DIG_BASE; \ + if (unlikely(a < 0)) \ + { \ + a+=DIG_BASE; \ + carry++; \ + } \ + (to)=a; \ + } while(0) + +/* + Convert decimal to its printable string representation + + SYNOPSIS + decimal2string() + from - value to convert + to - points to buffer where string representation should be stored + *to_len - in: size of to buffer + out: length of the actually written string + + RETURN VALUE + E_DEC_OK/E_DEC_TRUNCATED/E_DEC_OVERFLOW +*/ + +int decimal2string(decimal *from, char *to, int *to_len) +{ + int len, intg=from->intg, frac=from->frac, i; + int error=E_DEC_OK; + char *s=to; + dec1 *buf, *buf0=from->buf, tmp; + + DBUG_ASSERT(*to_len >= 2+from->sign); + + /* removing leading zeroes */ + i=((intg-1) % DIG_PER_DEC1)+1; + while (intg > 0 && *buf0 == 0) + { + intg-=i; + i=DIG_PER_DEC1; + buf0++; + } + if (intg > 0) + { + for (i=(intg-1) % DIG_PER_DEC1; *buf0 < powers10[i--]; intg--) ; + DBUG_ASSERT(intg > 0); + } + else + intg=0; + if (unlikely(intg+frac==0)) + { + intg=1; + tmp=0; + buf0=&tmp; + } + + len= from->sign + intg + test(frac) + frac; + if (unlikely(len > --*to_len)) /* reserve one byte for \0 */ + { + int i=len-*to_len; + error= (frac && i <= frac + 1) ? E_DEC_TRUNCATED : E_DEC_OVERFLOW; + if (frac && i >= frac + 1) i--; + if (i > frac) + { + intg-= i-frac; + frac= 0; + } + else + frac-=i; + len= from->sign + intg + test(frac) + frac; + } + *to_len=len; + s[len]=0; + + if (from->sign) + *s++='-'; + + if (frac) + { + char *s1=s+intg; + buf=buf0+ROUND_UP(intg); + *s1++='.'; + for (; frac>0; frac-=DIG_PER_DEC1) + { + dec1 x=*buf++; + for (i=min(frac, DIG_PER_DEC1); i; i--) + { + dec1 y=x/DIG_MASK; + *s1++='0'+(uchar)y; + x-=y*DIG_MASK; + x*=10; + } + } + } + + s+=intg; + for (buf=buf0+ROUND_UP(intg); intg>0; intg-=DIG_PER_DEC1) + { + dec1 x=*--buf; + for (i=min(intg, DIG_PER_DEC1); i; i--) + { + dec1 y=x/10; + *--s='0'+(uchar)(x-y*10); + x=y; + } + } + return error; +} + +/* + Convert string to decimal + + SYNOPSIS + str2decl() + from - value to convert + to - decimal where where the result will be stored + to->buf and to->len must be set. + end - if not NULL, *end will be set to the char where + conversion ended + fixed - use to->intg, to->frac as limits for input number + + NOTE + to->intg and to->frac can be modified even when fixed=1 + (but only decreased, in this case) + + RETURN VALUE + E_DEC_OK/E_DEC_TRUNCATED/E_DEC_OVERFLOW/E_DEC_BAD_NUM/E_DEC_OOM +*/ + +/* + Convert decimal to its binary fixed-length representation + two representations of the same length can be compared with memcmp + with the correct -1/0/+1 result + + SYNOPSIS + decimal2bin() + from - value to convert + to - points to buffer where string representation should be stored + precision/scale - see decimal_bin_size() below + + NOTE + the buffer is assumed to be of the size decimal_bin_size(precision, scale) + + RETURN VALUE + E_DEC_OK/E_DEC_TRUNCATED/E_DEC_OVERFLOW +*/ + +/* + Restores decimal from its binary fixed-length representation + + SYNOPSIS + bin2decimal() + from - value to convert + to - result + precision/scale - see decimal_bin_size() below + + NOTE + see decimal2bin() + the buffer is assumed to be of the size decimal_bin_size(precision, scale) + + RETURN VALUE + E_DEC_OK/E_DEC_TRUNCATED/E_DEC_OVERFLOW +*/ + +int bin2decimal(const char *from, decimal *to, int precision, int scale) +{ + int error=E_DEC_OK, + intg= precision - scale, + intg0= intg / DIG_PER_DEC1, + frac0= scale / DIG_PER_DEC1, + intg0x= intg - intg0 * DIG_PER_DEC1, + frac0x= scale - frac0*DIG_PER_DEC1, + intg1= intg0 + (intg0x > 0), + frac1= frac0 + (frac0x > 0), + tmp_size= decimal_bin_size(precision, scale); + char *tmp; + dec1 *buf= to->buf, + mask=(*from & 0x80) ? 0 : -1; + char *stop; + + /* Initial implementation from Sergei modified "from" buffer, (which errored + in binlog api when verifying checksum), so we declare from as read only and use + a stack buffer instead */ + tmp= (char *)alloca(tmp_size); + memcpy(tmp, from, tmp_size); + *tmp^= 0x80; /* remove sign bit */ + from= tmp; + + sanity(to); + + FIX_INTG_FRAC_ERROR(to->len, intg1, frac1, error); + if (unlikely(error)) + { + if (intg1 < intg0+(intg0x>0)) + { + from+= dig2bytes[intg0x] + sizeof(dec1)*(intg0 - intg1); + frac0= frac0x= intg0x= 0; + intg0= intg1; + } + else + { + frac0x= 0; + frac0= frac1; + } + } + + to->sign= (mask != 0); + to->intg= intg0 * DIG_PER_DEC1 + intg0x; + to->frac= frac0 * DIG_PER_DEC1 + frac0x; + + if (intg0x) + { + int i= dig2bytes[intg0x]; + dec1 x; + switch (i) + { + case 1: x=myisam_sint1korr(from); break; + case 2: x=myisam_sint2korr(from); break; + case 3: x=myisam_sint3korr(from); break; + case 4: x=myisam_sint4korr(from); break; + default: DBUG_ASSERT(0); + } + from+=i; + *buf=x ^ mask; + if (buf > to->buf || *buf != 0) + buf++; + else + to->intg-=intg0x; + } + for (stop=(char *)from+intg0*sizeof(dec1); from < stop; from+=sizeof(dec1)) + { + DBUG_ASSERT(sizeof(dec1) == 4); + *buf=myisam_sint4korr(from) ^ mask; + if (buf > to->buf || *buf != 0) + buf++; + else + to->intg-=DIG_PER_DEC1; + } + DBUG_ASSERT(to->intg >=0); + for (stop=(char *)from+frac0*sizeof(dec1); from < stop; from+=sizeof(dec1)) + { + DBUG_ASSERT(sizeof(dec1) == 4); + *buf=myisam_sint4korr(from) ^ mask; + buf++; + } + if (frac0x) + { + int i=dig2bytes[frac0x]; + dec1 x; + switch (i) + { + case 1: x=myisam_sint1korr(from); break; + case 2: x=myisam_sint2korr(from); break; + case 3: x=myisam_sint3korr(from); break; + case 4: x=myisam_sint4korr(from); break; + default: DBUG_ASSERT(0); + } + *buf=(x ^ mask) * powers10[DIG_PER_DEC1 - frac0x]; + buf++; + } + return error; +} + +/* + Returns the size of array to hold a decimal with given precision and scale + + RETURN VALUE + size in dec1 + (multiply by sizeof(dec1) to get the size if bytes) +*/ + +int decimal_size(int precision, int scale) +{ + DBUG_ASSERT(scale >= 0 && precision > 0 && scale <= precision); + return ROUND_UP(precision-scale)+ROUND_UP(scale); +} + +/* + Returns the size of array to hold a binary representation of a decimal + + RETURN VALUE + size in bytes +*/ + +int decimal_bin_size(int precision, int scale) +{ + int intg=precision-scale, + intg0=intg/DIG_PER_DEC1, frac0=scale/DIG_PER_DEC1, + intg0x=intg-intg0*DIG_PER_DEC1, frac0x=scale-frac0*DIG_PER_DEC1; + + DBUG_ASSERT(scale >= 0 && precision > 0 && scale <= precision); + return intg0*sizeof(dec1)+dig2bytes[intg0x]+ + frac0*sizeof(dec1)+dig2bytes[frac0x]; +} diff --git a/libmariadb/mariadb_rpl.c b/libmariadb/mariadb_rpl.c index e314ac8d..b4d8ac02 100644 --- a/libmariadb/mariadb_rpl.c +++ b/libmariadb/mariadb_rpl.c @@ -27,13 +27,72 @@ #include #include #include +#include #include +#ifdef HAVE_CRYPTO +#include +#endif #ifdef WIN32 -#define alloca _alloca +#define alloca _malloca #endif +#define RPL_EVENT_HEADER_SIZE 19 +#define RPL_ERR_POS(r) (r)->filename_length, (r)->filename, (r)->start_position +#define RPL_CHECK_NULL_POS(position, end)\ +{\ + uchar *tmp= (position);\ + while (*tmp && tmp < (end))\ + tmp++;\ + if (tmp > (end))\ + goto malformed_packet;\ +} + +#define RPL_CHECK_POS(position, end, bytes)\ +if ((end)-(position) < (ssize_t)(bytes))\ + goto malformed_packet; + +#define RPL_CHECK_FIELD_LENGTH(position, end)\ +{\ + RPL_CHECK_POS((position), (end), 1);\ + RPL_CHECK_POS((position), (end), net_field_size((position)));\ +} + +#define RPL_CHECK_POST_HEADER_LEN(position, end, type)\ +if (rpl->post_header_len[(type) - 1])\ +{\ + RPL_CHECK_POS((position), (end), rpl->post_header_len[(type)-1])\ +} + +static inline int net_field_size(uchar *p) +{ + if (*p <= 251) + return 1; + if (*p == 252) + return 3; + if (*p == 253) + return 4; + return 9; +} + +static inline int rpl_bit_size(uint32_t x) +{ + int bits= 1; + + while (x >>= 1) + bits++; + + return bits; +} + +static inline int rpl_byte_size(uint32_t x) +{ + int bits= rpl_bit_size(x); + + return (bits + 7) / 8; +} + void rpl_set_error(MARIADB_RPL *rpl, unsigned int error_nr, const char *format, @@ -43,6 +102,7 @@ void rpl_set_error(MARIADB_RPL *rpl, const char *errmsg; + return; if (!format) { if (error_nr >= CR_MIN_ERROR && error_nr <= CR_MYSQL_LAST_ERROR) @@ -58,9 +118,109 @@ void rpl_set_error(MARIADB_RPL *rpl, vsnprintf(rpl->error_msg, MYSQL_ERRMSG_SIZE - 1, format ? format : errmsg, ap); va_end(ap); - return; + + /* For backward compatibility we also need to set a connection + error, if we read from primary instead of file */ + if (rpl->mysql) + { + my_set_error(rpl->mysql, error_nr, SQLSTATE_UNKNOWN, rpl->error_msg); + } } +const char * STDCALL mariadb_rpl_error(MARIADB_RPL *rpl) +{ + return rpl->error_msg; +} + +uint32_t STDCALL mariadb_rpl_errno(MARIADB_RPL *rpl) +{ + return rpl->error_no; +} + +uint8_t rpl_parse_opt_metadata(MARIADB_RPL_EVENT *event, const uchar *buffer, size_t length) +{ + const uchar *pos= buffer, *end= buffer + length; + struct st_mariadb_rpl_table_map_event *tm_event= (struct st_mariadb_rpl_table_map_event *)&event->event; + + if (event->event_type != TABLE_MAP_EVENT) + return 1; + + while (pos < end) + { + uint8_t meta_type= *pos++; + uint32_t len; + + RPL_CHECK_FIELD_LENGTH((uchar *)pos, end); + len= net_field_length((uchar **)&pos); + RPL_CHECK_POS(pos, end,len); + + switch(meta_type) + { + case SIGNEDNESS: + tm_event->signed_indicator= (uchar *)pos; + pos+= len; + break; + case DEFAULT_CHARSET: + tm_event->default_charset= *pos; + pos+= len; + break; + case COLUMN_CHARSET: + tm_event->column_charsets.data= pos; + tm_event->column_charsets.length= len; + pos+= len; + break; + case COLUMN_NAME: + tm_event->column_names.data= pos; + tm_event->column_names.length= len; + pos+= len; + break; + case SIMPLE_PRIMARY_KEY: + tm_event->simple_primary_keys.data= pos; + tm_event->simple_primary_keys.length= len; + pos+= len; + break; + case PRIMARY_KEY_WITH_PREFIX: + tm_event->prefixed_primary_keys.data= pos; + tm_event->prefixed_primary_keys.length= len; + pos+= len; + break; + case GEOMETRY_TYPE: + { + tm_event->geometry_types.data= pos; + tm_event->geometry_types.length= len; + pos+= len; + break; + } + /* Default character set used by all columns */ + case ENUM_AND_SET_DEFAULT_CHARSET: + tm_event->enum_set_default_charset= *pos; + pos+= len; + break; + case ENUM_AND_SET_COLUMN_CHARSET: + tm_event->enum_set_column_charsets.data= pos; + tm_event->enum_set_column_charsets.length= len; + pos+= len; + break; + case SET_STR_VALUE: + tm_event->set_values.data= pos; + tm_event->set_values.length= len; + pos+= len; + break; + case ENUM_STR_VALUE: + tm_event->enum_values.data= pos; + tm_event->enum_values.length= len; + pos+= len; + break; + default: + rpl_set_error(event->rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(event->rpl), "Unknown/unsupported event type"); + pos+= len; + break; + } + } + return 0; +malformed_packet: + return 1; +} static void *ma_calloc_root(void *memroot, size_t len) { @@ -71,51 +231,426 @@ static void *ma_calloc_root(void *memroot, size_t len) return p; } -static int rpl_set_string_and_len(MARIADB_RPL_EVENT *event, - MARIADB_STRING *s, +static void rpl_set_string_and_len(MARIADB_STRING *s, unsigned char *buffer, size_t len) { if (!buffer || !len) { s->length= 0; - return 0; + return; } - if (!(s->str= ma_calloc_root(&event->memroot, len))) - return 1; - memcpy(s->str, buffer, len); + s->str= (char *)buffer; s->length= len; - return 0; } -static int rpl_set_string0(MARIADB_RPL_EVENT *event, - MARIADB_STRING *s, - const char *buffer) +static uint8_t rpl_alloc_set_string_and_len(MARIADB_RPL_EVENT *event, + MARIADB_STRING *s, + void *buffer, + size_t len) { - size_t len; - if (!buffer || !buffer[0]) + if (!buffer || !len) { s->length= 0; return 0; } - len= strlen(buffer); - - if (!(s->str= ma_calloc_root(&event->memroot, len))) + if (!(s->str = (char *)ma_alloc_root(&event->memroot, len))) return 1; - strcpy(s->str, buffer); + + memcpy(s->str, buffer, len); s->length= len; return 0; } -static int rpl_set_data(MARIADB_RPL_EVENT *event, unsigned char **buf, void *val, size_t len) +static uint8_t rpl_metadata_size(enum enum_field_types field_type) { - if (!val || !len) - return 0; - if (!(*buf= ma_calloc_root(&event->memroot, len))) - return 1; - memcpy(*buf, val, len); - return 0; + switch (field_type) { + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_DATETIME2: + case MYSQL_TYPE_TIMESTAMP2: + case MYSQL_TYPE_TIME2: + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + return 1; + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + case MYSQL_TYPE_NEWDECIMAL: + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_BIT: + return 2; + default: + return 0; + } +} + +static uint8_t ma_rpl_get_second_part(MYSQL_TIME *tm, uchar *ptr, uchar *metadata) +{ + switch(metadata[0]) + { + case 0: + tm->second_part= 0; + return 0; + case 1: + case 2: + tm->second_part= (uint32_t)ptr[0] * 10000; + return 1; + case 3: + case 4: + tm->second_part= myisam_sint2korr(ptr) * 100; + return 2; + case 5: + case 6: + tm->second_part= myisam_sint3korr(ptr); + return 3; + default: + return 0; + } +} + +MARIADB_RPL_ROW * STDCALL +mariadb_rpl_extract_rows(MARIADB_RPL *rpl, + MARIADB_RPL_EVENT *tm_event, + MARIADB_RPL_EVENT *row_event) +{ + uchar *start, *pos, *end; + MARIADB_RPL_ROW *f_row= NULL, *p_row= NULL, *c_row= NULL; + uint32_t column_count; + + if (!rpl || !tm_event || !row_event) + return NULL; + + if (tm_event->event_type != TABLE_MAP_EVENT || !(IS_ROW_EVENT(row_event))) + { + rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl), "Event with wrong event type passed."); + return NULL; + } + + if (row_event->event.rows.table_id != tm_event->event.table_map.table_id) + { + rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl), "table_id in table_map event differs."); + return NULL; + } + + if (!row_event->event.rows.row_data_size || + !row_event->event.rows.row_data) + { + rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Row event has no data."); + return NULL; + } + + column_count= tm_event->event.table_map.column_count; + + start= pos = row_event->event.rows.row_data; + end= start + row_event->event.rows.row_data_size; + + while (pos < end) + { + uchar *n_bitmap; + + uchar *metadata= (uchar *)tm_event->event.table_map.metadata.str; + + if (!(c_row = (MARIADB_RPL_ROW *)ma_calloc_root(&row_event->memroot, sizeof(MARIADB_RPL_ROW))) || + !(c_row->columns= (MARIADB_RPL_VALUE *)ma_calloc_root(&row_event->memroot, + sizeof(MARIADB_RPL_VALUE) * column_count))) + { + rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0); + return NULL; + } + + if (!f_row) + f_row= c_row; + if (p_row) + p_row->next= c_row; + + c_row->column_count= column_count; + n_bitmap= pos; + pos+= (column_count + 7) / 8; + + for (uint32_t i= 0; i < column_count; i++) + { + MARIADB_RPL_VALUE *column= &c_row->columns[i]; + column->field_type= (uchar)tm_event->event.table_map.column_types.str[i]; + /* enum, set and string types are stored as string - first metadata + byte contains real_type, second byte contains the length */ + if (column->field_type == MYSQL_TYPE_STRING) + { + if (metadata[0] == MYSQL_TYPE_ENUM || metadata[0] == MYSQL_TYPE_SET) + column->field_type = metadata[0]; + } + + if ((n_bitmap[i / 8] >> (i % 8)) & 1) + { + column->is_null= 1; + metadata+= rpl_metadata_size(column->field_type); + continue; + } + if (column->field_type == MYSQL_TYPE_BLOB) + { + switch(metadata[0]) + { + case 1: + column->field_type= MYSQL_TYPE_TINY_BLOB; + break; + case 3: + column->field_type= MYSQL_TYPE_MEDIUM_BLOB; + break; + case 4: + column->field_type= MYSQL_TYPE_LONG_BLOB; + break; + default: + break; + } + } + switch (column->field_type) { + case MYSQL_TYPE_TINY: + column->val.ll= sint1korr(pos); + column->val.ull= uint1korr(pos); + pos++; + break; + case MYSQL_TYPE_YEAR: + column->val.ull= uint1korr(pos++) + 1900; + break; + case MYSQL_TYPE_SHORT: + column->val.ll= sint2korr(pos); + column->val.ull= uint2korr(pos); + pos+= 2; + break; + case MYSQL_TYPE_INT24: + column->val.ll= sint3korr(pos); + column->val.ull= uint3korr(pos); + pos+= 3; + break; + case MYSQL_TYPE_LONG: + column->val.ll= sint4korr(pos); + column->val.ull= uint4korr(pos); + pos+= 4; + break; + case MYSQL_TYPE_LONGLONG: + column->val.ll= sint8korr(pos); + column->val.ull= uint8korr(pos); + pos+= 8; + break; + case MYSQL_TYPE_NEWDECIMAL: + { + uint8_t precision= *metadata++; + uint8_t scale= *metadata++; + uint32_t bin_size; + decimal dec; + char str[200]; + char buf[100]; + int s_len= sizeof(str) - 1; + + dec.buf= (void *)buf; + dec.len= sizeof(buf) / sizeof(decimal_digit); + + bin_size= decimal_bin_size(precision, scale); + bin2decimal((char *)pos, &dec, precision, scale); + decimal2string(&dec, str, &s_len); + pos+= bin_size; + + if (rpl_alloc_set_string_and_len(row_event, &column->val.str, str, s_len)) + goto mem_error; + + break; + } + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + { + uint8_t flen= *metadata++; + if (flen == 4) + { + float4get(column->val.f, pos); + } + if (flen == 8) + { + float8get(column->val.d, pos); + } + pos+= flen; + break; + } + case MYSQL_TYPE_BIT: + { + uint8_t num_bits= (metadata[0] & 0xFF) + metadata[1] * 8; + uint8_t b_len= (num_bits + 7) / 8; + metadata+= 2; + if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, b_len)) + goto mem_error; + pos+= b_len; + break; + } + case MYSQL_TYPE_TIMESTAMP: + { + column->val.ull= uint4korr(pos); + pos+= 4; + break; + } + case MYSQL_TYPE_TIMESTAMP2: + { + char tmp[20]; + uint32_t p1= uint4korr(pos); + uint8_t f_len= *metadata++; + uint32_t p2; + pos+= 4; + p2= uintNkorr(f_len, pos); + pos+= f_len; + sprintf(tmp, "%d.%d", p1, p2); + if (rpl_alloc_set_string_and_len(row_event, &column->val.str, tmp, strlen(tmp))) + goto mem_error; + break; + } + case MYSQL_TYPE_DATE: + { + MYSQL_TIME *tm= &column->val.tm; + uint32_t d_val= uint3korr(pos); + pos+= 3; + tm->year= (int)(d_val / (16 * 32)); + tm->month= (int)(d_val / 32 % 16); + tm->day= d_val % 32; + tm->time_type= MYSQL_TIMESTAMP_DATE; + break; + } + case MYSQL_TYPE_TIME2: + { + MYSQL_TIME *tm= &column->val.tm; + int64_t t_val= myisam_uint3korr(pos) - 0x800000LL; + + if ((tm->neg = t_val < 0)) + t_val= -t_val; + + pos+= 3; + tm->hour= (t_val >> 12) % (1 << 10); + tm->minute= (t_val >> 6) % (1 << 6); + tm->second= t_val % (1 << 6); + + pos+= ma_rpl_get_second_part(tm, pos, metadata); + metadata++; + tm->time_type= MYSQL_TIMESTAMP_TIME; + column->field_type= MYSQL_TYPE_TIME; + break; + } + case MYSQL_TYPE_DATETIME2: + { + MYSQL_TIME *tm= &column->val.tm; + uint64_t dt_val= mi_uint5korr(pos) - 0x8000000000LL, + date_part, time_part; + pos+= 5; + + date_part= dt_val >> 17; + time_part= dt_val % (1 << 17); + + tm->day= date_part % (1 << 5); + tm->month= (date_part >> 5) % 13; + tm->year= (date_part >> 5) / 13; + + tm->second= time_part % (1 << 6); + tm->minute= (time_part >> 6) % (1 << 6); + tm->hour= (uint32_t)(time_part >> 12); + + tm->time_type= MYSQL_TIMESTAMP_DATETIME; + column->field_type= MYSQL_TYPE_DATETIME; + + pos+= ma_rpl_get_second_part(tm, pos, metadata); + metadata++; + break; + } + case MYSQL_TYPE_STRING: + { + uint8_t s_len= metadata[2]; + metadata+= 2; + if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, s_len)) + goto mem_error; + pos+= s_len; + break; + } + case MYSQL_TYPE_ENUM: + { + uint8_t e_len= metadata[2]; + metadata+= 2; + column->val.ull= uintNkorr(e_len, pos); + pos+= e_len; + break; + } + case MYSQL_TYPE_SET: + { + uint8_t e_len= metadata[2]; + metadata+= 2; + column->val.ull= uintNkorr(e_len, pos); + pos+= e_len; + break; + } + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_GEOMETRY: + { + uint8_t h_len= *metadata++; + uint64_t b_len= uintNkorr(h_len, pos); + pos+= h_len; + if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, b_len)) + goto mem_error; + pos+= b_len; + break; + } + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_VAR_STRING: + { + uint32_t s_len= uint2korr(metadata); + uint8_t byte_len= rpl_byte_size(s_len); + metadata+= 2; + s_len= uintNkorr(byte_len, pos); + pos+= byte_len; + if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, s_len)) + goto mem_error; + pos+= s_len; + break; + } + case MYSQL_TYPE_TIME: + { + MYSQL_TIME *tm= &column->val.tm; + uint32_t t= uint8korr(pos); + pos+= 8; + tm->hour= (t/100)/100; + tm->minute= (t/100) % 100; + tm->second= t % 100; + tm->time_type= MYSQL_TIMESTAMP_TIME; + break; + } + case MYSQL_TYPE_DATETIME: + { + MYSQL_TIME *tm= &column->val.tm; + uint64_t t= uint8korr(pos); + uint32_t d_val= t / 1000000, + t_val= t % 1000000; + pos+= 8; + tm->year= (d_val / 100) / 100; + tm->month= (d_val / 100) % 100; + tm->day= d_val % 100; + tm->hour= (t_val/100)/100; + tm->minute= (t_val/100) % 100; + tm->second= t_val % 100; + tm->time_type= MYSQL_TIMESTAMP_DATETIME; + break; + } + + + default: + break; + } + } + p_row= c_row; + } + return f_row; + +mem_error: + rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0); + return NULL; } MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version) @@ -125,15 +660,12 @@ MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version) if (version < MARIADB_RPL_REQUIRED_VERSION || version > MARIADB_RPL_VERSION) { - my_set_error(mysql, CR_VERSION_MISMATCH, SQLSTATE_UNKNOWN, 0, version, - MARIADB_RPL_VERSION, MARIADB_RPL_REQUIRED_VERSION); + if (mysql) + my_set_error(mysql, CR_VERSION_MISMATCH, SQLSTATE_UNKNOWN, 0, version, + MARIADB_RPL_VERSION, MARIADB_RPL_REQUIRED_VERSION); return 0; } - /* if there is no connection, we read a file - if (!mysql) - return NULL; */ - if (!(rpl= (MARIADB_RPL *)calloc(1, sizeof(MARIADB_RPL)))) { SET_CLIENT_ERROR(mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0); @@ -157,6 +689,14 @@ MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version) } } } + + /* recommended way to set replica host and port is via rpl_optionsv(), however if + hostname and port was set via mysql_optionsv, we set it here to rpl */ + if (rpl->mysql && rpl->mysql->options.extension && rpl->mysql->options.extension->rpl_host) + { + mariadb_rpl_optionsv(rpl, MARIADB_RPL_HOST, rpl->mysql->options.extension->rpl_host); + mariadb_rpl_optionsv(rpl, MARIADB_RPL_PORT, rpl->mysql->options.extension->rpl_port); + } return rpl; } @@ -191,7 +731,7 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) /* if replica was specified, we will register replica via COM_REGISTER_SLAVE */ - if (rpl->mysql && rpl->mysql->options.extension && rpl->mysql->options.extension->rpl_host) + if (rpl->host) { /* Protocol: Ofs Len Data @@ -220,7 +760,7 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) /* Don't send user, password, rank and server_id */ *p++= 0; *p++= 0; - int2store(p, rpl->mysql->options.extension->rpl_port); + int2store(p, rpl->port); p+= 2; int4store(p, 0); @@ -234,12 +774,7 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) if (rpl->mysql) { - ptr= buf= - #ifdef WIN32 - (unsigned char *)malloca(rpl->filename_length + 11); - #else - (unsigned char *)alloca(rpl->filename_length + 11); - #endif + ptr= buf= (unsigned char *)alloca(rpl->filename_length + 11); int4store(ptr, (unsigned int)rpl->start_position); ptr+= 4; @@ -253,7 +788,7 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) return (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0)); } else { - char *buf[RPL_BINLOG_FILEHEADER_SIZE]; + char *buf[RPL_BINLOG_MAGIC_SIZE]; if (rpl->fp) fclose(rpl->fp); @@ -294,9 +829,12 @@ static int ma_set_rpl_filename(MARIADB_RPL *rpl, const unsigned char *filename, } /* - * - * - * + * Returns compression info: + * Ofs Len + * 0 1 header: + * ofs & 0x07 >> 4: algorithm, always 0=zlib + * ofs & 0x07: header size + * 1 header size uncompressed length in MyISAM format. */ static uint32_t get_compression_info(const unsigned char *buf, uint8_t *algorithm, @@ -348,7 +886,7 @@ static uint32_t get_compression_info(const unsigned char *buf, *header_size += 1; return len; } - + MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event) { unsigned char *ev= 0; @@ -361,6 +899,22 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN if (!rpl || (!rpl->mysql && !rpl->fp)) return 0; + if (event) + { + MA_MEM_ROOT memroot= event->memroot; + rpl_event= event; + ma_free_root(&memroot, MYF(MY_KEEP_PREALLOC)); + memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT)); + rpl_event->memroot= memroot; + } else { + if (!(rpl_event = (MARIADB_RPL_EVENT *)malloc(sizeof(MARIADB_RPL_EVENT)))) + goto mem_error; + memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT)); + ma_init_alloc_root(&rpl_event->memroot, 8192, 0); + } + + rpl_event->rpl= rpl; + while (1) { unsigned long pkt_len; @@ -370,7 +924,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN if (pkt_len == packet_error) { - rpl->buffer_size= 0; + mariadb_free_rpl_event(rpl_event); return 0; } @@ -381,7 +935,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN */ if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE) { - rpl->buffer_size= 0; + mariadb_free_rpl_event(rpl_event); return 0; } @@ -395,70 +949,107 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT) continue; } + + if (!(rpl_event->raw_data= ma_alloc_root(&rpl_event->memroot, pkt_len))) + goto mem_error; - rpl->buffer_size= pkt_len; - ev= rpl->buffer= rpl->mysql->net.read_pos; + rpl_event->raw_data_size= pkt_len; + memcpy(rpl_event->raw_data, rpl->mysql->net.read_pos, pkt_len); + ev= rpl_event->raw_data; } else if (rpl->fp) { char buf[EVENT_HEADER_OFS]; /* header */ size_t rc; uint32_t len= 0; char *p= buf; + uint32_t start_pos= ftell(rpl->fp); + + if (feof(rpl->fp)) + { + return NULL; + } memset(buf, 0, EVENT_HEADER_OFS); - if (fread(buf, 1, EVENT_HEADER_OFS, rpl->fp) != EVENT_HEADER_OFS) + if ((rc= fread(buf, 1, EVENT_HEADER_OFS - 1, rpl->fp)) != EVENT_HEADER_OFS - 1) { rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Can't read event header"); + mariadb_free_rpl_event(rpl_event); return NULL; } len= uint4korr(p + 9); - if (rpl->buffer_size < len) + if (!(rpl_event->raw_data= ma_alloc_root(&rpl_event->memroot, len))) { - if (!(rpl->buffer= realloc(rpl->buffer, len))) - { - rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0); - return NULL; - } + rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0); + mariadb_free_rpl_event(rpl_event); + return NULL; } - rpl->buffer_size= len; - memcpy(rpl->buffer, buf, EVENT_HEADER_OFS); - len-= EVENT_HEADER_OFS; - rc= fread(rpl->buffer + EVENT_HEADER_OFS, 1, len, rpl->fp); + rpl_event->raw_data_size= len; + memcpy(rpl_event->raw_data, buf, EVENT_HEADER_OFS - 1); + len-= (EVENT_HEADER_OFS - 1); + rc= fread(rpl_event->raw_data + EVENT_HEADER_OFS - 1, 1, len, rpl->fp); if (rc != len) { rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Error while reading post header"); + mariadb_free_rpl_event(rpl_event); return NULL; } - ev= rpl->buffer; - } - ev_end= rpl->buffer + rpl->buffer_size; + ev= rpl_event->raw_data; - if (event) - { - MA_MEM_ROOT memroot= event->memroot; - rpl_event= event; - ma_free_root(&memroot, MYF(MY_KEEP_PREALLOC)); - memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT)); - rpl_event->memroot= memroot; - } else { - if (!(rpl_event = (MARIADB_RPL_EVENT *)malloc(sizeof(MARIADB_RPL_EVENT)))) - goto mem_error; - memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT)); - ma_init_alloc_root(&rpl_event->memroot, 8192, 0); + /* We don't decrypt yet */ + if (rpl->encrypted) { + uchar iv[16]; + int rc; + uint32 event_len= uint4korr(ev + 9); + uint32 dlen= event_len; + uchar *dst; + + #ifndef HAVE_CRYPTO + return rpl_event; + #endif + /* Application needs to set key after START_ENCRYPTION_EVENT, if not done + we return the encrypted event */ + if (!rpl->decryption_key) + return rpl_event; + + memcpy(iv, rpl->nonce, 12); + int4store(iv + 12, start_pos); + + /* move timestamp */ + memcpy(ev + 9, ev, 4); + dst= (uchar *)malloc(event_len); + if ((rc= cw_crypt(CW_AES_CTR, CW_CRYPT_DECRYPT, ev + 4, event_len - 4, dst + 4, &dlen, + (uchar*)rpl->decryption_key, strlen(rpl->decryption_key), iv, 16))) + { + rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl), "Decryption failed"); + free(dst); + return rpl_event; + } + + memcpy(dst, dst + 9, 4); + int4store(dst + 9, event_len); + memcpy(ev, dst, event_len); + free(dst); + } } + ev_end= rpl_event->raw_data + rpl_event->raw_data_size; + + if (rpl->mysql) { + RPL_CHECK_POS(ev, ev_end, 1); rpl_event->ok= *ev++; /* CONC-470: add support for semi snychronous replication */ if ((rpl_event->is_semi_sync= (*ev == SEMI_SYNC_INDICATOR))) { + RPL_CHECK_POS(ev, ev_end, 1); ev++; rpl_event->semi_sync_flags= *ev++; } } + rpl_event->raw_data_ofs= ev - rpl_event->raw_data; /* check sum verification: check sum will be calculated from begin of binlog header @@ -475,12 +1066,15 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN - uint32_t event_len: length of the event. If checksum is enabled, the length also include 4 bytes of checksum + ------------- if START_ENCRYPTION_EVENT was sent, --------------- + encrypted part starts here: - uint32_t next_pos: Position of next binary log event - uint16_t flags: flags The size of binlog event header must match the header size returned by FORMAT_DESCIPTION_EVENT. In version 4 it is always 19. ********************************************************************/ + RPL_CHECK_POS(ev, ev_end, RPL_EVENT_HEADER_SIZE); rpl_event->timestamp= uint4korr(ev); ev+= 4; rpl_event->event_type= (unsigned char)*ev++; @@ -497,13 +1091,16 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN /* start of post_header */ ev_start= ev; + DBUG_ASSERT(rpl_event->event_type < ENUM_END_EVENT); + switch(rpl_event->event_type) { case UNKNOWN_EVENT: case SLAVE_EVENT: return rpl_event; break; case HEARTBEAT_LOG_EVENT: - /* post header */ + /* no post header size */ + RPL_CHECK_POS(ev, ev_end, 11); rpl_event->event.heartbeat.timestamp= uint4korr(ev); ev+= 4; rpl_event->event.heartbeat.next_position= uint4korr(ev); @@ -516,29 +1113,31 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN break; case BEGIN_LOAD_QUERY_EVENT: - /* Post header */ + /* check post header size */ + RPL_CHECK_POST_HEADER_LEN(ev, ev_end, BEGIN_LOAD_QUERY_EVENT); rpl_event->event.begin_load_query.file_id= uint4korr(ev); ev+= 4; - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type]); - /* Payload: query_data (zero terminated) */ - if (rpl_set_data(event, &rpl_event->event.begin_load_query.data, ev, strlen((char *)ev))) - goto mem_error; - ev+= strlen((char *)ev) + 1; + RPL_CHECK_NULL_POS(ev, ev_end); + rpl_event->event.begin_load_query.data= ev; + RPL_CHECK_NULL_POS(ev, ev_end); + ev+= strlen((char *)ev); + /* terminating zero */ + RPL_CHECK_POS(ev, ev_end, 1); + ev++; break; case START_ENCRYPTION_EVENT: /* Post header */ + RPL_CHECK_POS(ev, ev_end, 17); rpl_event->event.start_encryption.scheme= *ev++; rpl_event->event.start_encryption.key_version= uint4korr(ev); ev+= 4; memcpy(rpl_event->event.start_encryption.nonce, ev, 12); + memcpy(rpl->nonce, ev, 12); ev+= 12; - - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); + rpl->encrypted= 1; break; case EXECUTE_LOAD_QUERY_EVENT: @@ -547,6 +1146,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN uint8_t schema_len; /* Post header */ + RPL_CHECK_POS(ev, ev_end, rpl->post_header_len[EXECUTE_LOAD_QUERY_EVENT - 1]); rpl_event->event.execute_load_query.thread_id= uint4korr(ev); ev+= 4; rpl_event->event.execute_load_query.execution_time= uint4korr(ev); @@ -564,36 +1164,35 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN ev+= 4; rpl_event->event.execute_load_query.duplicate_flag= *ev++; - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); - /* Payload: - status variables - query schema - statement */ - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.execute_load_query.status_vars, ev, status_len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, status_len); + rpl_set_string_and_len(&rpl_event->event.execute_load_query.status_vars, ev, status_len); ev+= status_len; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.execute_load_query.schema, ev, schema_len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, schema_len); + rpl_set_string_and_len(&rpl_event->event.execute_load_query.schema, ev, schema_len); + /* terminating zero */ + RPL_CHECK_POS(ev, ev_end, 1); ev+= (schema_len + 1); + len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1); - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.execute_load_query.statement, ev, len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, len); + rpl_set_string_and_len(&rpl_event->event.execute_load_query.statement, ev, len); ev+= len; break; } case BINLOG_CHECKPOINT_EVENT: /* Post header */ + RPL_CHECK_POS(ev, ev_end, rpl->post_header_len[BINLOG_CHECKPOINT_EVENT - 1]); len= uint4korr(ev); ev+= 4; - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); - - /* payload: filname */ - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.checkpoint.filename, ev, len) || - ma_set_rpl_filename(rpl, ev, len)) + /* payload: filename */ + RPL_CHECK_POS(ev, ev_end, len); + rpl_set_string_and_len(&rpl_event->event.checkpoint.filename, ev, len); + if (ma_set_rpl_filename(rpl, ev, len)) goto mem_error; ev+= len; break; @@ -617,11 +1216,12 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN /* We don't speak bing log protocol version < 4, in case it's an older protocol version an error will be returned. */ + RPL_CHECK_POS(ev, ev_end, 57); if ((rpl_event->event.format_description.format = uint2korr(ev)) < 4) { mariadb_free_rpl_event(rpl_event); - my_set_error(rpl->mysql, CR_ERR_UNSUPPORTED_BINLOG_FORMAT, SQLSTATE_UNKNOWN, 0, - rpl->filename_length, rpl->filename, rpl->start_position, uint2korr(ev)); + rpl_set_error(rpl, CR_ERR_UNSUPPORTED_BINLOG_FORMAT, SQLSTATE_UNKNOWN, 0, + RPL_ERR_POS(rpl), uint2korr(ev)); return 0; } @@ -635,12 +1235,13 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN /*Post header lengths: 1 byte for each event, non used events/gaps in enum should have a zero value */ len= ev_end - ev - 5; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.format_description.post_header_lengths, ev, len)) - goto mem_error; + rpl_set_string_and_len(&rpl_event->event.format_description.post_header_lengths, ev, len); memset(rpl->post_header_len, 0, ENUM_END_EVENT); memcpy(rpl->post_header_len, rpl_event->event.format_description.post_header_lengths.str, MIN(len, ENUM_END_EVENT)); + ev+= len; + RPL_CHECK_POS(ev, ev_end, 5); if ((rpl->use_checksum= *ev++)) { rpl_event->checksum= uint4korr(ev); @@ -656,6 +1257,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN /*********** post_header ***********/ + RPL_CHECK_POS(ev, ev_end, rpl->post_header_len[rpl_event->event_type - 1]); rpl_event->event.query.thread_id= uint4korr(ev); ev+= 4; rpl_event->event.query.seconds= uint4korr(ev); @@ -667,28 +1269,23 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN status_len= uint2korr(ev); ev+= 2; - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); - /******* payload ******/ - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.query.status, ev, status_len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, status_len + db_len + 1); + rpl_set_string_and_len(&rpl_event->event.query.status, ev, status_len); ev+= status_len; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.query.database, ev, db_len)) - goto mem_error; + rpl_set_string_and_len(&rpl_event->event.query.database, ev, db_len); ev+= db_len + 1; /* zero terminated */ len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1); + RPL_CHECK_POS(ev, ev_end, len); - if (rpl_event->event_type == QUERY_EVENT) { - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.query.statement, ev, len)) - goto mem_error; + if (rpl_event->event_type == QUERY_EVENT || !rpl->uncompress) { + rpl_set_string_and_len(&rpl_event->event.query.statement, ev, len); } - - if (rpl_event->event_type == QUERY_COMPRESSED_EVENT) + else if (rpl_event->event_type == QUERY_COMPRESSED_EVENT) { uint8_t header_size= 0, algorithm= 0; @@ -731,48 +1328,60 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN byte netadata{metadata_size] byte bit fields, indicating which column can be null n= (column_count + 7) / 8; + + if (remaining_bytes) + byte optional metadata */ + RPL_CHECK_POST_HEADER_LEN(ev, ev_end, TABLE_MAP_EVENT); - /* Post header length */ + /* Post header */ rpl_event->event.table_map.table_id= uint6korr(ev); ev+= 8; /* 2 byte in header ignored */ - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); - /* Payload */ - len= *ev; - ev++; + RPL_CHECK_POS(ev, ev_end, 1); + len= *ev++; + RPL_CHECK_POS(ev, ev_end, len + 1); - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.database, ev, len)) - goto mem_error; + rpl_set_string_and_len(&rpl_event->event.table_map.database, ev, len); ev+= len + 1; /* Zero terminated */ - len= *ev; - ev++; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.table, ev, len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, 1); + len= *ev++; + RPL_CHECK_POS(ev, ev_end, len + 1); + rpl_set_string_and_len(&rpl_event->event.table_map.table, ev, len); ev+= len + 1; /* Zero terminated */ - rpl_event->event.table_map.column_count= mysql_net_field_length(&ev); - len= rpl_event->event.table_map.column_count; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.column_types, ev, len)) - goto mem_error; - + RPL_CHECK_FIELD_LENGTH(ev, ev_end); + len= rpl_event->event.table_map.column_count= mysql_net_field_length(&ev); + RPL_CHECK_POS(ev, ev_end, len); + rpl_set_string_and_len(&rpl_event->event.table_map.column_types, ev, len); ev+= len; + + RPL_CHECK_FIELD_LENGTH(ev, ev_end); len= mysql_net_field_length(&ev); - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.table_map.metadata, ev, len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, len); + rpl_set_string_and_len(&rpl_event->event.table_map.metadata, ev, len); + ev+= len; + + len= (rpl_event->event.table_map.column_count + 7) / 8; + RPL_CHECK_POS(ev, ev_end, len); + rpl_event->event.table_map.null_indicator= ev; ev+= len; len= ev_end - ev - (rpl->use_checksum ? 4 : 0); - if (rpl_set_data(rpl_event, &rpl_event->event.table_map.null_indicator, ev, len)) - goto mem_error; - ev+= len; + RPL_CHECK_POS(ev, ev_end, len); + + if (len > 0) /* optional metadata */ + { + rpl_parse_opt_metadata(rpl_event, ev, len); + ev+= len; + } break; case RAND_EVENT: + RPL_CHECK_POS(ev, ev_end, 16); rpl_event->event.rand.first_seed= uint8korr(ev); ev+= 8; rpl_event->event.rand.second_seed= uint8korr(ev); @@ -782,6 +1391,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN } case INTVAR_EVENT: + RPL_CHECK_POS(ev, ev_end, 9); rpl_event->event.intvar.type= *ev; ev++; rpl_event->event.intvar.value= uint8korr(ev); @@ -789,44 +1399,90 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN break; case USER_VAR_EVENT: + RPL_CHECK_POS(ev, ev_end, 4); len= uint4korr(ev); ev+= 4; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.uservar.name, ev, len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, len); + rpl_set_string_and_len(&rpl_event->event.uservar.name, ev, len); ev+= len; + RPL_CHECK_POS(ev, ev_end, 1); if (!(rpl_event->event.uservar.is_null= (uint8)*ev)) { ev++; + RPL_CHECK_POS(ev, ev_end, 9); rpl_event->event.uservar.type= *ev; ev++; rpl_event->event.uservar.charset_nr= uint4korr(ev); ev+= 4; len= uint4korr(ev); ev+= 4; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.uservar.value, ev, len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, len); + + if (rpl_event->event.uservar.type == DECIMAL_RESULT) + { + char str[200]; + int s_len= sizeof(str) - 1; + int precision= (int)ev[0], + scale= (int)ev[1]; + decimal d; + decimal_digit buf[10]; + d.len= 10; + d.buf= buf; + bin2decimal((char *)(ev+2), &d, precision, scale); + decimal2string(&d, str, &s_len); + if (!(rpl_event->event.uservar.value.str = + (char *)ma_calloc_root(&rpl_event->memroot, s_len))) + goto mem_error; + memcpy(rpl_event->event.uservar.value.str, str, s_len); + rpl_event->event.uservar.value.length= s_len; + } else if (rpl_event->event.uservar.type == INT_RESULT) + { + uint64_t val64; + if (!(rpl_event->event.uservar.value.str = + (char *)ma_calloc_root(&rpl_event->memroot, sizeof(longlong)))) + goto mem_error; + val64= uint8korr(ev); + memcpy(rpl_event->event.uservar.value.str, &val64, sizeof(uint64_t)); + rpl_event->event.uservar.value.length= sizeof(uint64_t); + } else if (rpl_event->event.uservar.type == REAL_RESULT) + { + double d; + float8get(d, ev); + ev+= 8; + if (!(rpl_event->event.uservar.value.str = + (char *)ma_calloc_root(&rpl_event->memroot, 24))) + goto mem_error; + memset(rpl_event->event.uservar.value.str, 0, 24); + sprintf(rpl_event->event.uservar.value.str, "%.14g", d); + rpl_event->event.uservar.value.length= strlen(rpl_event->event.uservar.value.str); + } + else + rpl_set_string_and_len(&rpl_event->event.uservar.value, ev, len); ev+= len; - if ((unsigned long)(ev - rpl->buffer) < rpl->buffer_size) + if ((unsigned long)(ev - rpl_event->raw_data) < rpl_event->raw_data_size) rpl_event->event.uservar.flags= *ev; + ev++; } break; case ANNOTATE_ROWS_EVENT: - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); - /* Payload */ - len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1); - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.annotate_rows.statement, ev, len)) - goto mem_error; + len= ev_end - ev - (rpl->use_checksum ? 4 : 0); + if (len > 0) + rpl_set_string_and_len(&rpl_event->event.annotate_rows.statement, ev, len); break; case ROTATE_EVENT: - /* Payload */ + RPL_CHECK_POST_HEADER_LEN(ev, ev_end, ROTATE_EVENT); + rpl_event->event.rotate.position= uint8korr(ev); ev+= 8; + /* Payload */ len= ev_end - ev - 4; + if (!len) + goto malformed_packet; + if (rpl_event->timestamp == 0 && rpl_event->flags & LOG_EVENT_ARTIFICIAL_F) { @@ -837,8 +1493,9 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN rpl->artificial_checksun= 0; } } - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.rotate.filename, ev, len) || - ma_set_rpl_filename(rpl, ev, len)) + RPL_CHECK_POS(ev, ev_end, len); + rpl_set_string_and_len(&rpl_event->event.rotate.filename, ev, len); + if (ma_set_rpl_filename(rpl, ev, len)) goto mem_error; ev+= len; @@ -852,6 +1509,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN Header: - uint64_t transaction number */ + RPL_CHECK_POS(ev, ev_end, 8); rpl_event->event.xid.transaction_nr= uint8korr(ev); break; @@ -869,6 +1527,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN Payload: char xid, where n is sum of gtrid and bqual lengths */ + RPL_CHECK_POS(ev, ev_end, 13); rpl_event->event.xa_prepare_log.one_phase= *ev; ev++; @@ -878,8 +1537,8 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN ev+= 4; len+= rpl_event->event.xa_prepare_log.bqual_len= uint4korr(ev); ev+= 4; - if (rpl_set_string_and_len(rpl_event, &rpl_event->event.xa_prepare_log.xid, ev, len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, len); + rpl_set_string_and_len(&rpl_event->event.xa_prepare_log.xid, ev, len); break; case STOP_EVENT: @@ -902,14 +1561,16 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN Header: uint8_t flag: commit flag - uint64_t source_id: numerical representation of server's UUID + byte<16> source_id: numerical representation of server's UUID uint64_t sequence_nr: sequence number */ + RPL_CHECK_POS(ev, ev_end, 25); rpl_event->event.gtid_log.commit_flag= *ev; ev++; memcpy(rpl_event->event.gtid_log.source_id, ev, 16); ev+= 16; rpl_event->event.gtid_log.sequence_nr= uint8korr(ev); + ev+= 8; break; case GTID_EVENT: @@ -930,6 +1591,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN else char[6] unused */ + RPL_CHECK_POST_HEADER_LEN(ev, ev_end, GTID_EVENT); rpl_event->event.gtid.sequence_nr= uint8korr(ev); ev+= 8; rpl_event->event.gtid.domain_id= uint4korr(ev); @@ -938,6 +1600,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN ev++; if (rpl_event->event.gtid.flags & FL_GROUP_COMMIT_ID) { + RPL_CHECK_POS(ev, ev_end, 8); rpl_event->event.gtid.commit_id= uint8korr(ev); ev+= 8; } @@ -965,21 +1628,19 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN uint64_t sequence_nr */ + RPL_CHECK_POST_HEADER_LEN(ev, ev_end, GTID_LIST_EVENT); rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev); ev+=4; - /* check post_header_length */ - DBUG_ASSERT(ev - ev_start == rpl->post_header_len[rpl_event->event_type - 1]); - + RPL_CHECK_POS(ev, ev_end, rpl_event->event.gtid_list.gtid_cnt * 16); /* Payload */ if (rpl_event->event.gtid_list.gtid_cnt) { - uint32 i; if (!(rpl_event->event.gtid_list.gtid= (MARIADB_GTID *)ma_calloc_root(&rpl_event->memroot, sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt))) goto mem_error; - for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++) + for (uint32_t i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++) { rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev); ev+= 4; @@ -989,8 +1650,6 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN ev+= 8; } } - else - ev+=4; break; case WRITE_ROWS_COMPRESSED_EVENT_V1: @@ -1042,6 +1701,8 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN uint32_t bitmap_len= 0; + RPL_CHECK_POST_HEADER_LEN(ev, ev_end, rpl_event->event_type); + if (rpl_event->event_type >= WRITE_ROWS_COMPRESSED_EVENT) { return rpl_event; rpl_event->event.rows.compressed= 1; @@ -1057,47 +1718,44 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN rpl_event->event.rows.table_id= uint6korr(ev); ev+= 6; - /* TODO: Flags not defined yet in rpl.h */ rpl_event->event.rows.flags= uint2korr(ev); ev+= 2; + /* payload */ + /* ROWS_EVENT V2 has the extra-data field. See also: https://dev.mysql.com/doc/internals/en/rows-event.html */ if (IS_ROW_VERSION2(rpl_event->event_type)) { + RPL_CHECK_POS(ev, ev_end, 2); rpl_event->event.rows.extra_data_size= uint2korr(ev); ev+= 2; + RPL_CHECK_POS(ev, ev_end, rpl_event->event.rows.extra_data_size); if (rpl_event->event.rows.extra_data_size - 2 > 0) { - if (!(rpl_event->event.rows.extra_data = - (char *)ma_calloc_root(&rpl_event->memroot, - rpl_event->event.rows.extra_data_size - 2))) - goto mem_error; - memcpy(rpl_event->event.rows.extra_data, - ev, - rpl_event->event.rows.extra_data_size -2); + rpl_set_string_and_len(rpl_event->event.rows.extra_data, ev, rpl_event->event.rows.extra_data_size - 2); ev+= rpl_event->event.rows.extra_data_size; } } /* END_ROWS_EVENT_V2 */ /* number of columns */ + RPL_CHECK_FIELD_LENGTH(ev, ev_end); rpl_event->event.rows.column_count= mysql_net_field_length(&ev); bitmap_len= (rpl_event->event.rows.column_count + 7) / 8; DBUG_ASSERT(rpl_event->event.rows.column_count > 0); /* columns updated bitmap */ - if (rpl_set_data(rpl_event, &rpl_event->event.rows.column_bitmap, ev, bitmap_len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, bitmap_len); + rpl_event->event.rows.column_bitmap= ev; ev+= bitmap_len; if (rpl_event->event_type == UPDATE_ROWS_EVENT_V1 || rpl_event->event_type == UPDATE_ROWS_COMPRESSED_EVENT_V1) { - if (rpl_set_data(rpl_event, &rpl_event->event.rows.column_update_bitmap, - ev, bitmap_len)) - goto mem_error; + RPL_CHECK_POS(ev, ev_end, bitmap_len); + rpl_event->event.rows.column_update_bitmap= ev; ev+= bitmap_len; } @@ -1114,12 +1772,12 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN if ((uncompress((Bytef*)rpl_event->event.rows.row_data, (uLong *)&uncompressed_len, (Bytef*)ev + header_size, (uLongf )len) != Z_OK)) { - my_set_error(rpl->mysql, CR_ERR_NET_UNCOMPRESS, SQLSTATE_UNKNOWN, 0, rpl->filename_length, - rpl->filename, rpl->start_position); + rpl_set_error(rpl, CR_ERR_NET_UNCOMPRESS, SQLSTATE_UNKNOWN, 0, RPL_ERR_POS(rpl)); mariadb_free_rpl_event(rpl_event); return 0; } rpl_event->event.rows.row_data_size= uncompressed_len; + RPL_CHECK_POS(ev, ev_end, header_size + len); ev+= header_size + len; } else { rpl_event->event.rows.row_data_size= ev_end - ev - (rpl->use_checksum ? 4 : 0); @@ -1135,8 +1793,8 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN if (!(rpl_event->flags & LOG_EVENT_IGNORABLE_F)) { mariadb_free_rpl_event(rpl_event); - my_set_error(rpl->mysql, CR_UNKNOWN_BINLOG_EVENT, SQLSTATE_UNKNOWN, 0, - rpl->filename_length, rpl->filename, rpl->start_position, rpl_event->event_type); + rpl_set_error(rpl, CR_UNKNOWN_BINLOG_EVENT, 0, RPL_ERR_POS(rpl), + rpl_event->event_type); return 0; } return rpl_event; @@ -1163,33 +1821,36 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN if (rpl->use_checksum && !rpl_event->checksum) { - rpl_event->checksum= uint4korr(ev_end -4); + rpl_event->checksum= uint4korr(ev_end - 4); - if (rpl_event->checksum && rpl->verify_checksum) + if (rpl_event->checksum && rpl->verify_checksum) { unsigned long crc= crc32_z(0L, Z_NULL, 0); crc= crc32_z(crc, checksum_start, ev_end - checksum_start - 4); if (rpl_event->checksum != (uint32_t)crc) { - my_set_error(rpl->mysql, CR_ERR_CHECKSUM_VERIFICATION_ERROR, SQLSTATE_UNKNOWN, 0, - rpl->filename_length, rpl->filename, rpl->start_position, + rpl_set_error(rpl, CR_ERR_CHECKSUM_VERIFICATION_ERROR, SQLSTATE_UNKNOWN, 0, + RPL_ERR_POS(rpl), rpl_event->checksum, (uint32_t)crc); mariadb_free_rpl_event(rpl_event); return 0; } } } - - //rpl->start_position= rpl_event->next_event_pos; return rpl_event; } mem_error: mariadb_free_rpl_event(rpl_event); - SET_CLIENT_ERROR(rpl->mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0); + rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0); return 0; net_error: mariadb_free_rpl_event(rpl_event); - SET_CLIENT_ERROR(rpl->mysql, CR_CONNECTION_ERROR, SQLSTATE_UNKNOWN, 0); + rpl_set_error(rpl, CR_CONNECTION_ERROR, 0); + return 0; +malformed_packet: + rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl), + "Malformed packet"); + mariadb_free_rpl_event(rpl_event); return 0; } @@ -1197,13 +1858,15 @@ void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl) { if (!rpl) return; - if (rpl->filename) - free((void *)rpl->filename); + free((void *)rpl->filename); if (rpl->fp) { - free(rpl->buffer); fclose(rpl->fp); } + free(rpl->host); +#ifdef HAVE_CRYPTO + free(rpl->decryption_key); +#endif free(rpl); return; } @@ -1259,6 +1922,33 @@ int mariadb_rpl_optionsv(MARIADB_RPL *rpl, rpl->verify_checksum= va_arg(ap, uint32_t); break; } + case MARIADB_RPL_UNCOMPRESS: + { + rpl->uncompress= (uint8_t)va_arg(ap, uint32_t); + break; + } + case MARIADB_RPL_PORT: + { + rpl->port= va_arg(ap, uint32_t); + break; + } + case MARIADB_RPL_HOST: + { + rpl->host= strdup(va_arg(ap, char *)); + break; + } + case MARIADB_RPL_EXTRACT_VALUES: + { + rpl->extract_values= (uint8_t)va_arg(ap, uint32_t); + break; + } +#ifdef HAVE_CRYPTO + case MARIADB_RPL_DECRYPTION_KEY: + { + rpl->decryption_key=strdup(va_arg(ap, char *)); + break; + } +#endif default: rc= -1; goto end;