diff --git a/include/errmsg.h b/include/errmsg.h index 481b0a62..04c7e5e0 100644 --- a/include/errmsg.h +++ b/include/errmsg.h @@ -100,6 +100,7 @@ extern const char *mariadb_client_errors[]; /* Error messages */ #define CR_FILE_READ 5005 #define CR_BULK_WITHOUT_PARAMETERS 5006 #define CR_INVALID_STMT 5007 +#define CR_VERSION_MISMATCH 5008 /* Always last, if you add new error codes please update the value for CR_MARIADB_LAST_ERROR */ #define CR_MARIADB_LAST_ERROR CR_INVALID_STMT diff --git a/include/mariadb_rpl.h b/include/mariadb_rpl.h new file mode 100644 index 00000000..b78ce08c --- /dev/null +++ b/include/mariadb_rpl.h @@ -0,0 +1,304 @@ +/* Copyright (C) 2018 MariaDB Corporation AB + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Library General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Library General Public License for more details. + + You should have received a copy of the GNU Library General Public + License along with this library; if not, write to the Free + Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02111-1301, USA */ +#ifndef _mariadb_rpl_h_ +#define _mariadb_rpl_h_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +#define MARIADB_RPL_VERSION 0x0001 +#define MARIADB_RPL_REQUIRED_VERSION 0x0001 + +/* Protocol flags */ +#define MARIADB_RPL_BINLOG_DUMP_NON_BLOCK 1 +#define MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS 2 +#define MARIADB_RPL_IGNORE_HEARTBEAT (1 << 17) + +#define EVENT_HEADER_OFS 20 + +#define FL_GROUP_COMMIT_ID 2 +#define FL_STMT_END 1 + +#define LOG_EVENT_ARTIFICIAL_F 0x20 + + +/* Options */ +enum mariadb_rpl_option { + MARIADB_RPL_FILENAME, /* Filename and length */ + MARIADB_RPL_START, /* Start position */ + MARIADB_RPL_SERVER_ID, /* Server ID */ + MARIADB_RPL_FLAGS, /* Protocol flags */ + MARIADB_RPL_GTID_CALLBACK, /* GTID callback function */ + MARIADB_RPL_GTID_DATA, /* GTID data */ + MARIADB_RPL_BUFFER +}; + +/* Event types: From MariaDB Server sql/log_event.h */ +enum mariadb_rpl_event { + UNKNOWN_EVENT= 0, + START_EVENT_V3= 1, + QUERY_EVENT= 2, + STOP_EVENT= 3, + ROTATE_EVENT= 4, + INTVAR_EVENT= 5, + LOAD_EVENT= 6, + SLAVE_EVENT= 7, + CREATE_FILE_EVENT= 8, + APPEND_BLOCK_EVENT= 9, + EXEC_LOAD_EVENT= 10, + DELETE_FILE_EVENT= 11, + NEW_LOAD_EVENT= 12, + RAND_EVENT= 13, + USER_VAR_EVENT= 14, + FORMAT_DESCRIPTION_EVENT= 15, + XID_EVENT= 16, + BEGIN_LOAD_QUERY_EVENT= 17, + EXECUTE_LOAD_QUERY_EVENT= 18, + TABLE_MAP_EVENT = 19, + + PRE_GA_WRITE_ROWS_EVENT = 20, /* deprecated */ + PRE_GA_UPDATE_ROWS_EVENT = 21, /* deprecated */ + PRE_GA_DELETE_ROWS_EVENT = 22, /* deprecated */ + + WRITE_ROWS_EVENT_V1 = 23, + UPDATE_ROWS_EVENT_V1 = 24, + DELETE_ROWS_EVENT_V1 = 25, + INCIDENT_EVENT= 26, + HEARTBEAT_LOG_EVENT= 27, + IGNORABLE_LOG_EVENT= 28, + ROWS_QUERY_LOG_EVENT= 29, + WRITE_ROWS_EVENT = 30, + UPDATE_ROWS_EVENT = 31, + DELETE_ROWS_EVENT = 32, + GTID_LOG_EVENT= 33, + ANONYMOUS_GTID_LOG_EVENT= 34, + PREVIOUS_GTIDS_LOG_EVENT= 35, + TRANSACTION_CONTEXT_EVENT= 36, + VIEW_CHANGE_EVENT= 37, + XA_PREPARE_LOG_EVENT= 38, + + /* + Add new events here - right above this comment! + Existing events (except ENUM_END_EVENT) should never change their numbers + */ + + /* New MySQL/Sun events are to be added right above this comment */ + MYSQL_EVENTS_END, + + MARIA_EVENTS_BEGIN= 160, + ANNOTATE_ROWS_EVENT= 160, + BINLOG_CHECKPOINT_EVENT= 161, + GTID_EVENT= 162, + GTID_LIST_EVENT= 163, + START_ENCRYPTION_EVENT= 164, + QUERY_COMPRESSED_EVENT = 165, + WRITE_ROWS_COMPRESSED_EVENT_V1 = 166, + UPDATE_ROWS_COMPRESSED_EVENT_V1 = 167, + DELETE_ROWS_COMPRESSED_EVENT_V1 = 168, + WRITE_ROWS_COMPRESSED_EVENT = 169, + UPDATE_ROWS_COMPRESSED_EVENT = 170, + DELETE_ROWS_COMPRESSED_EVENT = 171, + + /* Add new MariaDB events here - right above this comment! */ + + ENUM_END_EVENT /* end marker */ +}; + +typedef struct { + char *str; + size_t length; +} MARIADB_STRING; + +enum mariadb_row_event_type { + WRITE_ROWS= 0, + UPDATE_ROWS= 1, + DELETE_ROWS= 2 +}; + +/* Global transaction id */ +typedef struct st_mariadb_gtid { + unsigned int domain_id; + unsigned int server_id; + unsigned long long sequence_nr; +} MARIADB_GTID; + +/* Generic replication handle */ +typedef struct st_mariadb_rpl { + unsigned int version; + MYSQL *mysql; + char *filename; + uint32_t filename_length; + unsigned char *buffer; + unsigned long buffer_size; + uint32_t server_id; + unsigned long start_position; + uint32_t flags; + uint8_t fd_header_len; /* header len from last format description event */ +} MARIADB_RPL; + +/* Event header */ +struct st_mariadb_rpl_rotate_event { + unsigned long long position; + MARIADB_STRING filename; +}; + +struct st_mariadb_rpl_query_event { + uint32_t thread_id; + uint32_t seconds; + MARIADB_STRING database; + uint32_t errornr; + MARIADB_STRING status; + MARIADB_STRING statement; +}; + +struct st_mariadb_rpl_gtid_list_event { + uint32_t gtid_cnt; + MARIADB_GTID *gtid; +}; + +struct st_mariadb_rpl_format_description_event +{ + uint16_t format; + char *server_version; + uint32_t timestamp; + uint8_t header_len; +}; + +struct st_mariadb_rpl_checkpoint_event { + MARIADB_STRING filename; +}; + +struct st_mariadb_rpl_xid_event { + uint64_t transaction_nr; +}; + +struct st_mariadb_rpl_gtid_event { + uint64_t sequence_nr; + uint32_t domain_id; + uint8_t flags; + uint64_t commit_id; +}; + +struct st_mariadb_rpl_annotate_rows_event { + MARIADB_STRING statement; +}; + +struct st_mariadb_rpl_table_map_event { + unsigned long long table_id; + MARIADB_STRING database; + MARIADB_STRING table; + unsigned int column_count; + char *column_types; + MARIADB_STRING metadata; + char *null_indicator; +}; + +struct st_mariadb_rpl_rand_event { + unsigned long long first_seed; + unsigned long long second_seed; +}; + +struct st_mariadb_rpl_encryption_event { + char scheme; + unsigned int key_version; + char *nonce; +}; + +struct st_mariadb_rpl_intvar_event { + char type; + unsigned long long value; +}; + +struct st_mariadb_rpl_uservar_event { + MARIADB_STRING name; + uint8_t is_null; + uint8_t type; + uint32_t charset_nr; + MARIADB_STRING value; + uint8_t flags; +}; + +struct st_mariadb_rpl_rows_event { + enum mariadb_row_event_type type; + uint64_t table_id; + uint16_t flags; + uint32_t column_count; + char *column_bitmap; + char *column_update_bitmap; + size_t row_data_size; + void *row_data; +}; + +struct st_mariadb_rpl_heartbeat_event { + uint32_t timestamp; + uint32_t next_position; + uint8_t type; + uint16_t flags; +}; + +typedef struct st_mariadb_rpl_event +{ + /* common header */ + MA_MEM_ROOT memroot; + unsigned int checksum; + char ok; + enum mariadb_rpl_event event_type; + unsigned int timestamp; + unsigned int server_id; + unsigned int event_length; + unsigned int next_event_pos; + unsigned short flags; + /****************/ + union { + struct st_mariadb_rpl_rotate_event rotate; + struct st_mariadb_rpl_query_event query; + struct st_mariadb_rpl_format_description_event format_description; + struct st_mariadb_rpl_gtid_list_event gtid_list; + struct st_mariadb_rpl_checkpoint_event checkpoint; + struct st_mariadb_rpl_xid_event xid; + struct st_mariadb_rpl_gtid_event gtid; + struct st_mariadb_rpl_annotate_rows_event annotate_rows; + struct st_mariadb_rpl_table_map_event table_map; + struct st_mariadb_rpl_rand_event rand; + struct st_mariadb_rpl_encryption_event encryption; + struct st_mariadb_rpl_intvar_event intvar; + struct st_mariadb_rpl_uservar_event uservar; + struct st_mariadb_rpl_rows_event rows; + struct st_mariadb_rpl_heartbeat_event heartbeat; + } event; +} MARIADB_RPL_EVENT; + +#define mariadb_rpl_init(a) mariadb_rpl_init_ex((a), MARIADB_RPL_VERSION) + +/* Function prototypes */ +MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version); + +int STDCALL mariadb_rpl_optionsv(MARIADB_RPL *rpl, enum mariadb_rpl_option, ...); +int STDCALL mariadb_rpl_get_optionsv(MARIADB_RPL *rpl, enum mariadb_rpl_option, ...); + +int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl); +void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl); +MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event); +void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/libmariadb/CMakeLists.txt b/libmariadb/CMakeLists.txt index e4d9b6fe..b7886742 100644 --- a/libmariadb/CMakeLists.txt +++ b/libmariadb/CMakeLists.txt @@ -23,7 +23,13 @@ SET(MARIADB_LIB_SYMBOLS mysql_optionsv mysql_ps_fetch_functions mariadb_reconnect - mysql_stmt_warning_count) + mysql_stmt_warning_count + mariadb_rpl_open + mariadb_rpl_close + mariadb_rpl_fetch + mariadb_rpl_optionsv + mariadb_rpl_get_optionsv + mariadb_free_rpl_event) IF(WITH_SSL) SET(MARIADB_LIB_SYMBOLS ${MARIADB_LIB_SYMBOLS} mariadb_deinitialize_ssl) ENDIF() @@ -289,6 +295,7 @@ ma_loaddata.c ma_stmt_codec.c ma_string.c ma_dtoa.c +mariadb_rpl.c ${CC_BINARY_DIR}/libmariadb/ma_client_plugin.c ma_io.c ${SSL_SOURCES} diff --git a/libmariadb/ma_errmsg.c b/libmariadb/ma_errmsg.c index ab0a8a38..6f11f7cc 100644 --- a/libmariadb/ma_errmsg.c +++ b/libmariadb/ma_errmsg.c @@ -158,6 +158,7 @@ const char *mariadb_client_errors[] = /* 5005 */ "Error reading file '%s' (Errcode: %d)", /* 5006 */ "Bulk operation without parameters is not supported", /* 5007 */ "Invalid statement handle", + /* 5008 */ "Unsupported version %d. Supported versions are in the range %d - %d", "" }; diff --git a/libmariadb/mariadb_rpl.c b/libmariadb/mariadb_rpl.c new file mode 100644 index 00000000..0c852a66 --- /dev/null +++ b/libmariadb/mariadb_rpl.c @@ -0,0 +1,497 @@ +/************************************************************************************ + Copyright (C) 2018 MariaDB Corpoeation AB + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Library General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Library General Public License for more details. + + You should have received a copy of the GNU Library General Public + License along with this library; if not see + or write to the Free Software Foundation, Inc., + 51 Franklin St., Fifth Floor, Boston, MA 02110, USA + +*************************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int rpl_alloc_string(MARIADB_RPL_EVENT *event, + MARIADB_STRING *s, + unsigned char *buffer, + size_t len) +{ + if (!(s->str= ma_alloc_root(&event->memroot, len))) + return 1; + memcpy(s->str, buffer, len); + s->length= len; + return 0; +} + +MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version) +{ + MARIADB_RPL *rpl; + + if (version < MARIADB_RPL_REQUIRED_VERSION || + version > MARIADB_RPL_VERSION) + { + my_set_error(mysql, CR_VERSION_MISMATCH, SQLSTATE_UNKNOWN, 0, version, + MARIADB_RPL_VERSION, MARIADB_RPL_REQUIRED_VERSION); + return 0; + } + + if (!mysql) + return NULL; + + if (!(rpl= (MARIADB_RPL *)calloc(1, sizeof(MARIADB_RPL)))) + { + SET_CLIENT_ERROR(mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0); + return 0; + } + rpl->version= version; + rpl->mysql= mysql; + return rpl; +} + +void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event) +{ + if (event) + { + ma_free_root(&event->memroot, MYF(0)); + free(event); + } +} + +int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl) +{ + unsigned char *ptr, *buf; + if (!rpl || !rpl->mysql) + return 1; + + /* COM_BINLOG_DUMP: + Ofs Len Data + 0 1 COM_BINLOG_DUMP + 1 4 position + 5 2 flags + 7 4 server id + 11 * filename + + * = filename length + + */ + ptr= buf= +#ifdef WIN32 + (unsigned char *)_alloca(rpl->filename_length + 11); +#else + (unsigned char *)alloca(rpl->filename_length + 11); +#endif + + int4store(ptr, (unsigned int)rpl->start_position); + ptr+= 4; + int2store(ptr, rpl->flags); + ptr+= 2; + int4store(ptr, rpl->server_id); + ptr+= 4; + memcpy(ptr, rpl->filename, rpl->filename_length); + ptr+= rpl->filename_length; + + if (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0)) + return 1; + return 0; +} + +MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event) +{ + unsigned char *ev; + size_t len; + MARIADB_RPL_EVENT *rpl_event= 0; + + if (!rpl || !rpl->mysql) + return 0; + + while (1) { + unsigned long pkt_len= ma_net_safe_read(rpl->mysql); + + if (pkt_len == packet_error) + { + rpl->buffer_size= 0; + return 0; + } + + /* EOF packet: + see https://mariadb.com/kb/en/library/eof_packet/ + Packet length must be less than 9 bytes, EOF header + is 0xFE. + */ + if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE) + { + rpl->buffer_size= 0; + return 0; + } + + /* if ignore heartbeat flag was set, we ignore this + record and continue to fetch next record. + The first byte is always status byte (0x00) + For event header description see + https://mariadb.com/kb/en/library/2-binlog-event-header/ */ + if (rpl->flags & MARIADB_RPL_IGNORE_HEARTBEAT) + { + if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT) + continue; + } + + rpl->buffer_size= pkt_len; + rpl->buffer= rpl->mysql->net.read_pos; + + if (event) + { + MA_MEM_ROOT memroot= event->memroot; + rpl_event= event; + ma_free_root(&memroot, MYF(MY_KEEP_PREALLOC)); + memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT)); + rpl_event->memroot= memroot; + } else { + if (!(rpl_event = (MARIADB_RPL_EVENT *)malloc(sizeof(MARIADB_RPL_EVENT)))) + goto mem_error; + memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT)); + ma_init_alloc_root(&rpl_event->memroot, 8192, 0); + } + rpl_event->checksum= uint4korr(rpl->buffer + rpl->buffer_size - 4); + + rpl_event->ok= rpl->buffer[0]; + rpl_event->timestamp= uint4korr(rpl->buffer + 1); + rpl_event->event_type= (unsigned char)*(rpl->buffer + 5); + rpl_event->server_id= uint4korr(rpl->buffer + 6); + rpl_event->event_length= uint4korr(rpl->buffer + 10); + rpl_event->next_event_pos= uint4korr(rpl->buffer + 14); + rpl_event->flags= uint2korr(rpl->buffer + 18); + + ev= rpl->buffer + EVENT_HEADER_OFS; + + switch(rpl_event->event_type) { + case HEARTBEAT_LOG_EVENT: + rpl_event->event.heartbeat.timestamp= uint4korr(ev); + ev+= 4; + rpl_event->event.heartbeat.next_position= uint4korr(ev); + ev+= 4; + rpl_event->event.heartbeat.type= (uint8_t)*ev; + ev+= 1; + rpl_event->event.heartbeat.flags= uint2korr(ev); + break; + case BINLOG_CHECKPOINT_EVENT: + len= uint4korr(ev); + ev+= 4; + if (rpl_alloc_string(rpl_event, &rpl_event->event.checkpoint.filename, ev, len)) + goto mem_error; + break; + case FORMAT_DESCRIPTION_EVENT: + rpl_event->event.format_description.format = uint2korr(ev); + ev+= 2; + rpl_event->event.format_description.server_version = (char *)(ev); + ev+= 50; + rpl_event->event.format_description.timestamp= uint4korr(ev); + ev+= 2; + rpl->fd_header_len= rpl_event->event.format_description.header_len= (uint8_t)*ev; + break; + case QUERY_EVENT: + { + size_t db_len, status_len; + rpl_event->event.query.thread_id= uint4korr(ev); + ev+= 4; + rpl_event->event.query.seconds= uint4korr(ev); + ev+= 4; + db_len= *ev; + ev++; + rpl_event->event.query.errornr= uint2korr(ev); + ev+= 2; + status_len= uint2korr(ev); + ev+= 2; + if (rpl_alloc_string(rpl_event, &rpl_event->event.query.status, ev, status_len)) + goto mem_error; + ev+= status_len; + + if (rpl_alloc_string(rpl_event, &rpl_event->event.query.database, ev, db_len)) + goto mem_error; + ev+= db_len + 1; /* zero terminated */ + + /* calculate statement size: buffer + buffer_size - current_ofs (ev) - crc_size */ + len= (size_t)(rpl->buffer + rpl->buffer_size - ev - 4); + if (rpl_alloc_string(rpl_event, &rpl_event->event.query.statement, ev, len)) + goto mem_error; + break; + } + case TABLE_MAP_EVENT: + rpl_event->event.table_map.table_id= uint6korr(ev); + ev+= 8; + len= *ev; + ev++; + if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.database, ev, len)) + goto mem_error; + ev+= len + 1; + len= *ev; + ev++; + if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.table, ev, len)) + goto mem_error; + ev+= len + 1; + rpl_event->event.table_map.column_count= mysql_net_field_length(&ev); + rpl_event->event.table_map.column_types= (char *)ev; + ev+= rpl_event->event.table_map.column_count; + len= mysql_net_field_length(&ev); + if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.metadata, ev, len)) + goto mem_error; + break; + case RAND_EVENT: + rpl_event->event.rand.first_seed= uint8korr(ev); + ev+= 8; + rpl_event->event.rand.second_seed= uint8korr(ev); + break; + case INTVAR_EVENT: + rpl_event->event.intvar.type= *ev; + ev++; + rpl_event->event.intvar.value= uint8korr(ev); + break; + case USER_VAR_EVENT: + len= uint4korr(ev); + ev+= 4; + if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.name, ev, len)) + goto mem_error; + ev+= len; + if (!(rpl_event->event.uservar.is_null= (uint8)*ev)) + { + ev++; + rpl_event->event.uservar.type= *ev; + ev++; + rpl_event->event.uservar.charset_nr= uint4korr(ev); + ev+= 4; + len= uint4korr(ev); + ev+= 4; + if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.value, ev, len)) + goto mem_error; + ev+= len; + if ((unsigned long)(ev - rpl->buffer) < rpl->buffer_size) + rpl_event->event.uservar.flags= *ev; + } + break; + case START_ENCRYPTION_EVENT: + rpl_event->event.encryption.scheme= *ev; + ev++; + rpl_event->event.encryption.key_version= uint4korr(ev); + ev+= 4; + rpl_event->event.encryption.nonce= (char *)ev; + break; + case ANNOTATE_ROWS_EVENT: + len= (uint32)(rpl->buffer + rpl->buffer_size - (unsigned char *)ev - 4); + if (rpl_alloc_string(rpl_event, &rpl_event->event.annotate_rows.statement, ev, len)) + goto mem_error; + break; + case ROTATE_EVENT: + rpl_event->event.rotate.position= uint8korr(ev); + ev+= 8; + len= rpl->buffer + rpl->buffer_size - ev; + if (rpl_alloc_string(rpl_event, &rpl_event->event.rotate.filename, ev, len)) + goto mem_error; + break; + case XID_EVENT: + rpl_event->event.xid.transaction_nr= uint8korr(ev); + break; + case STOP_EVENT: + /* nothing to do here */ + break; + case GTID_EVENT: + rpl_event->event.gtid.sequence_nr= uint8korr(ev); + ev+= 8; + rpl_event->event.gtid.domain_id= uint4korr(ev); + ev+= 4; + rpl_event->event.gtid.flags= *ev; + ev++; + if (rpl_event->event.gtid.flags & FL_GROUP_COMMIT_ID) + rpl_event->event.gtid.commit_id= uint8korr(ev); + break; + case GTID_LIST_EVENT: + { + uint32 i; + rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev); + ev++; + if (!(rpl_event->event.gtid_list.gtid= (MARIADB_GTID *)ma_alloc_root(&rpl_event->memroot, sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt))) + goto mem_error; + for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++) + { + rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev); + ev+= 4; + rpl_event->event.gtid_list.gtid[i].server_id= uint4korr(ev); + ev+= 4; + rpl_event->event.gtid_list.gtid[i].sequence_nr= uint8korr(ev); + ev+= 8; + } + break; + } + case WRITE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT_V1: + rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT_V1; + if (rpl->fd_header_len == 6) + { + rpl_event->event.rows.table_id= uint4korr(ev); + ev+= 4; + } else { + rpl_event->event.rows.table_id= uint6korr(ev); + ev+= 6; + } + rpl_event->event.rows.flags= uint2korr(ev); + ev+= 2; + len= rpl_event->event.rows.column_count= mysql_net_field_length(&ev); + if (!len) + break; + if (!(rpl_event->event.rows.column_bitmap = + (char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8))) + goto mem_error; + memcpy(rpl_event->event.rows.column_bitmap, ev, (len + 7) / 8); + ev+= (len + 7) / 8; + if (rpl_event->event_type == UPDATE_ROWS_EVENT_V1) + { + if (!(rpl_event->event.rows.column_update_bitmap = + (char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8))) + goto mem_error; + memcpy(rpl_event->event.rows.column_update_bitmap, ev, (len + 7) / 8); + ev+= (len + 7) / 8; + } + if ((rpl_event->event.rows.row_data_size= rpl->buffer + rpl->buffer_size - ev)) + { + if (!(rpl_event->event.rows.row_data = + (char *)ma_alloc_root(&rpl_event->memroot, rpl_event->event.rows.row_data_size))) + goto mem_error; + memcpy(rpl_event->event.rows.row_data, ev, rpl_event->event.rows.row_data_size); + } + break; + default: + return NULL; + break; + } + return rpl_event; + } +mem_error: + SET_CLIENT_ERROR(rpl->mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0); + return 0; +} + +void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl) +{ + if (!rpl) + return; + if (rpl->filename) + free((void *)rpl->filename); + free(rpl); + return; +} + +int STDCALL mariadb_rpl_optionsv(MARIADB_RPL *rpl, + enum mariadb_rpl_option option, + ...) +{ + va_list ap; + int rc= 0; + + if (!rpl) + return 1; + + va_start(ap, option); + + switch (option) { + case MARIADB_RPL_FILENAME: + { + char *arg1= va_arg(ap, char *); + rpl->filename_length= va_arg(ap, size_t); + free((void *)rpl->filename); + rpl->filename= NULL; + if (rpl->filename_length) + { + rpl->filename= (char *)malloc(rpl->filename_length); + memcpy((void *)rpl->filename, arg1, rpl->filename_length); + } + else if (arg1) + { + rpl->filename= strdup((const char *)arg1); + rpl->filename_length= strlen(rpl->filename); + } + break; + } + case MARIADB_RPL_SERVER_ID: + { + rpl->server_id= va_arg(ap, unsigned int); + break; + } + case MARIADB_RPL_FLAGS: + { + rpl->flags= va_arg(ap, unsigned int); + break; + } + case MARIADB_RPL_START: + { + rpl->start_position= va_arg(ap, unsigned long); + break; + } + default: + rc= -1; + goto end; + } +end: + return rc; +} + +int STDCALL mariadb_rpl_get_optionsv(MARIADB_RPL *rpl, + enum mariadb_rpl_option option, + ...) +{ + va_list ap; + + if (!rpl) + return 1; + + va_start(ap, option); + + switch (option) { + case MARIADB_RPL_FILENAME: + { + const char **name= (const char **)va_arg(ap, char **); + size_t *len= (size_t*)va_arg(ap, size_t *); + + *name= rpl->filename; + *len= rpl->filename_length; + break; + } + case MARIADB_RPL_SERVER_ID: + { + unsigned int *id= va_arg(ap, unsigned int *); + *id= rpl->server_id; + break; + } + case MARIADB_RPL_FLAGS: + { + unsigned int *flags= va_arg(ap, unsigned int *); + *flags= rpl->flags; + break; + } + case MARIADB_RPL_START: + { + unsigned long *start= va_arg(ap, unsigned long *); + *start= rpl->start_position; + break; + } + default: + return 1; + break; + } + return 0; +} diff --git a/unittest/libmariadb/CMakeLists.txt b/unittest/libmariadb/CMakeLists.txt index 9cea9163..b04959e6 100644 --- a/unittest/libmariadb/CMakeLists.txt +++ b/unittest/libmariadb/CMakeLists.txt @@ -26,7 +26,7 @@ INCLUDE_DIRECTORIES(${CC_SOURCE_DIR}/include ${CC_SOURCE_DIR}/unittest/libmariadb) ADD_DEFINITIONS(-DLIBMARIADB) -SET(API_TESTS "conc336" "bulk1" "performance" "basic-t" "fetch" "charset" "logs" "cursor" "errors" "view" "ps" "ps_bugs" "sp" "result" "connection" "misc" "ps_new" "thread" "features-10_2" "bulk1") +SET(API_TESTS "conc336" "bulk1" "performance" "basic-t" "fetch" "charset" "logs" "cursor" "errors" "view" "ps" "ps_bugs" "sp" "result" "connection" "misc" "ps_new" "thread" "features-10_2" "bulk1" "rpl_api") IF(WITH_DYNCOL) SET(API_TESTS ${API_TESTS} "dyncol") ENDIF() diff --git a/unittest/libmariadb/rpl_api.c b/unittest/libmariadb/rpl_api.c new file mode 100644 index 00000000..f4c97e0b --- /dev/null +++ b/unittest/libmariadb/rpl_api.c @@ -0,0 +1,75 @@ +/* +Copyright (c) 2018 MariaDB Corporation AB + +The MySQL Connector/C is licensed under the terms of the GPLv2 +, like most +MySQL Connectors. There are special exceptions to the terms and +conditions of the GPLv2 as it is applied to this software, see the +FLOSS License Exception +. + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published +by the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +for more details. + +You should have received a copy of the GNU General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ +/** + Some basic tests of the client API. +*/ + +#include "my_test.h" +#include "mariadb_rpl.h" + +static int test_rpl_01(MYSQL *mysql) +{ + int i; + unsigned int server_id= 0; + MARIADB_RPL_EVENT *event= NULL; + MARIADB_RPL *rpl= mariadb_rpl_init(mysql); + mysql_query(mysql, "SET @mariadb_slave_capability=4"); + mysql_query(mysql, "SET NAMES latin1"); + mysql_query(mysql, "SET @slave_gtid_strict_mode=1"); + mysql_query(mysql, "SET @slave_gtid_ignore_duplicates=1"); + mysql_query(mysql, "SET NAMES utf8"); + mysql_query(mysql, "SET @master_binlog_checksum= @@global.binlog_checksum"); + rpl->server_id= 12; + rpl->start_position= 4; + rpl->flags= MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS; + + if (mariadb_rpl_open(rpl)) + return FAIL; + + while((event= mariadb_rpl_fetch(rpl, event))) + { + printf("event: %d\n", event->event_type); + } + mariadb_free_rpl_event(event); + return OK; +} + + +struct my_tests_st my_tests[] = { + {"test_rpl_01", test_rpl_01, TEST_CONNECTION_DEFAULT, 0, NULL, NULL}, + {NULL, NULL, 0, 0, NULL, NULL} +}; + + +int main(int argc, char **argv) +{ + if (argc > 1) + get_options(argc, argv); + + get_envvars(); + + run_tests(my_tests); + + return(exit_status()); +}