1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-08-08 14:02:17 +03:00

Manual merge from CONC-325 branch: Initial implementation for binlog/replication API

This commit is contained in:
Georg Richter
2018-12-02 18:28:38 +01:00
parent b87845b18d
commit 1888c141f7
7 changed files with 887 additions and 2 deletions

View File

@@ -100,6 +100,7 @@ extern const char *mariadb_client_errors[]; /* Error messages */
#define CR_FILE_READ 5005
#define CR_BULK_WITHOUT_PARAMETERS 5006
#define CR_INVALID_STMT 5007
#define CR_VERSION_MISMATCH 5008
/* Always last, if you add new error codes please update the
value for CR_MARIADB_LAST_ERROR */
#define CR_MARIADB_LAST_ERROR CR_INVALID_STMT

304
include/mariadb_rpl.h Normal file
View File

@@ -0,0 +1,304 @@
/* Copyright (C) 2018 MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not, write to the Free
Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02111-1301, USA */
#ifndef _mariadb_rpl_h_
#define _mariadb_rpl_h_
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#define MARIADB_RPL_VERSION 0x0001
#define MARIADB_RPL_REQUIRED_VERSION 0x0001
/* Protocol flags */
#define MARIADB_RPL_BINLOG_DUMP_NON_BLOCK 1
#define MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS 2
#define MARIADB_RPL_IGNORE_HEARTBEAT (1 << 17)
#define EVENT_HEADER_OFS 20
#define FL_GROUP_COMMIT_ID 2
#define FL_STMT_END 1
#define LOG_EVENT_ARTIFICIAL_F 0x20
/* Options */
enum mariadb_rpl_option {
MARIADB_RPL_FILENAME, /* Filename and length */
MARIADB_RPL_START, /* Start position */
MARIADB_RPL_SERVER_ID, /* Server ID */
MARIADB_RPL_FLAGS, /* Protocol flags */
MARIADB_RPL_GTID_CALLBACK, /* GTID callback function */
MARIADB_RPL_GTID_DATA, /* GTID data */
MARIADB_RPL_BUFFER
};
/* Event types: From MariaDB Server sql/log_event.h */
enum mariadb_rpl_event {
UNKNOWN_EVENT= 0,
START_EVENT_V3= 1,
QUERY_EVENT= 2,
STOP_EVENT= 3,
ROTATE_EVENT= 4,
INTVAR_EVENT= 5,
LOAD_EVENT= 6,
SLAVE_EVENT= 7,
CREATE_FILE_EVENT= 8,
APPEND_BLOCK_EVENT= 9,
EXEC_LOAD_EVENT= 10,
DELETE_FILE_EVENT= 11,
NEW_LOAD_EVENT= 12,
RAND_EVENT= 13,
USER_VAR_EVENT= 14,
FORMAT_DESCRIPTION_EVENT= 15,
XID_EVENT= 16,
BEGIN_LOAD_QUERY_EVENT= 17,
EXECUTE_LOAD_QUERY_EVENT= 18,
TABLE_MAP_EVENT = 19,
PRE_GA_WRITE_ROWS_EVENT = 20, /* deprecated */
PRE_GA_UPDATE_ROWS_EVENT = 21, /* deprecated */
PRE_GA_DELETE_ROWS_EVENT = 22, /* deprecated */
WRITE_ROWS_EVENT_V1 = 23,
UPDATE_ROWS_EVENT_V1 = 24,
DELETE_ROWS_EVENT_V1 = 25,
INCIDENT_EVENT= 26,
HEARTBEAT_LOG_EVENT= 27,
IGNORABLE_LOG_EVENT= 28,
ROWS_QUERY_LOG_EVENT= 29,
WRITE_ROWS_EVENT = 30,
UPDATE_ROWS_EVENT = 31,
DELETE_ROWS_EVENT = 32,
GTID_LOG_EVENT= 33,
ANONYMOUS_GTID_LOG_EVENT= 34,
PREVIOUS_GTIDS_LOG_EVENT= 35,
TRANSACTION_CONTEXT_EVENT= 36,
VIEW_CHANGE_EVENT= 37,
XA_PREPARE_LOG_EVENT= 38,
/*
Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
*/
/* New MySQL/Sun events are to be added right above this comment */
MYSQL_EVENTS_END,
MARIA_EVENTS_BEGIN= 160,
ANNOTATE_ROWS_EVENT= 160,
BINLOG_CHECKPOINT_EVENT= 161,
GTID_EVENT= 162,
GTID_LIST_EVENT= 163,
START_ENCRYPTION_EVENT= 164,
QUERY_COMPRESSED_EVENT = 165,
WRITE_ROWS_COMPRESSED_EVENT_V1 = 166,
UPDATE_ROWS_COMPRESSED_EVENT_V1 = 167,
DELETE_ROWS_COMPRESSED_EVENT_V1 = 168,
WRITE_ROWS_COMPRESSED_EVENT = 169,
UPDATE_ROWS_COMPRESSED_EVENT = 170,
DELETE_ROWS_COMPRESSED_EVENT = 171,
/* Add new MariaDB events here - right above this comment! */
ENUM_END_EVENT /* end marker */
};
typedef struct {
char *str;
size_t length;
} MARIADB_STRING;
enum mariadb_row_event_type {
WRITE_ROWS= 0,
UPDATE_ROWS= 1,
DELETE_ROWS= 2
};
/* Global transaction id */
typedef struct st_mariadb_gtid {
unsigned int domain_id;
unsigned int server_id;
unsigned long long sequence_nr;
} MARIADB_GTID;
/* Generic replication handle */
typedef struct st_mariadb_rpl {
unsigned int version;
MYSQL *mysql;
char *filename;
uint32_t filename_length;
unsigned char *buffer;
unsigned long buffer_size;
uint32_t server_id;
unsigned long start_position;
uint32_t flags;
uint8_t fd_header_len; /* header len from last format description event */
} MARIADB_RPL;
/* Event header */
struct st_mariadb_rpl_rotate_event {
unsigned long long position;
MARIADB_STRING filename;
};
struct st_mariadb_rpl_query_event {
uint32_t thread_id;
uint32_t seconds;
MARIADB_STRING database;
uint32_t errornr;
MARIADB_STRING status;
MARIADB_STRING statement;
};
struct st_mariadb_rpl_gtid_list_event {
uint32_t gtid_cnt;
MARIADB_GTID *gtid;
};
struct st_mariadb_rpl_format_description_event
{
uint16_t format;
char *server_version;
uint32_t timestamp;
uint8_t header_len;
};
struct st_mariadb_rpl_checkpoint_event {
MARIADB_STRING filename;
};
struct st_mariadb_rpl_xid_event {
uint64_t transaction_nr;
};
struct st_mariadb_rpl_gtid_event {
uint64_t sequence_nr;
uint32_t domain_id;
uint8_t flags;
uint64_t commit_id;
};
struct st_mariadb_rpl_annotate_rows_event {
MARIADB_STRING statement;
};
struct st_mariadb_rpl_table_map_event {
unsigned long long table_id;
MARIADB_STRING database;
MARIADB_STRING table;
unsigned int column_count;
char *column_types;
MARIADB_STRING metadata;
char *null_indicator;
};
struct st_mariadb_rpl_rand_event {
unsigned long long first_seed;
unsigned long long second_seed;
};
struct st_mariadb_rpl_encryption_event {
char scheme;
unsigned int key_version;
char *nonce;
};
struct st_mariadb_rpl_intvar_event {
char type;
unsigned long long value;
};
struct st_mariadb_rpl_uservar_event {
MARIADB_STRING name;
uint8_t is_null;
uint8_t type;
uint32_t charset_nr;
MARIADB_STRING value;
uint8_t flags;
};
struct st_mariadb_rpl_rows_event {
enum mariadb_row_event_type type;
uint64_t table_id;
uint16_t flags;
uint32_t column_count;
char *column_bitmap;
char *column_update_bitmap;
size_t row_data_size;
void *row_data;
};
struct st_mariadb_rpl_heartbeat_event {
uint32_t timestamp;
uint32_t next_position;
uint8_t type;
uint16_t flags;
};
typedef struct st_mariadb_rpl_event
{
/* common header */
MA_MEM_ROOT memroot;
unsigned int checksum;
char ok;
enum mariadb_rpl_event event_type;
unsigned int timestamp;
unsigned int server_id;
unsigned int event_length;
unsigned int next_event_pos;
unsigned short flags;
/****************/
union {
struct st_mariadb_rpl_rotate_event rotate;
struct st_mariadb_rpl_query_event query;
struct st_mariadb_rpl_format_description_event format_description;
struct st_mariadb_rpl_gtid_list_event gtid_list;
struct st_mariadb_rpl_checkpoint_event checkpoint;
struct st_mariadb_rpl_xid_event xid;
struct st_mariadb_rpl_gtid_event gtid;
struct st_mariadb_rpl_annotate_rows_event annotate_rows;
struct st_mariadb_rpl_table_map_event table_map;
struct st_mariadb_rpl_rand_event rand;
struct st_mariadb_rpl_encryption_event encryption;
struct st_mariadb_rpl_intvar_event intvar;
struct st_mariadb_rpl_uservar_event uservar;
struct st_mariadb_rpl_rows_event rows;
struct st_mariadb_rpl_heartbeat_event heartbeat;
} event;
} MARIADB_RPL_EVENT;
#define mariadb_rpl_init(a) mariadb_rpl_init_ex((a), MARIADB_RPL_VERSION)
/* Function prototypes */
MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version);
int STDCALL mariadb_rpl_optionsv(MARIADB_RPL *rpl, enum mariadb_rpl_option, ...);
int STDCALL mariadb_rpl_get_optionsv(MARIADB_RPL *rpl, enum mariadb_rpl_option, ...);
int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl);
void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl);
MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event);
void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -23,7 +23,13 @@ SET(MARIADB_LIB_SYMBOLS
mysql_optionsv
mysql_ps_fetch_functions
mariadb_reconnect
mysql_stmt_warning_count)
mysql_stmt_warning_count
mariadb_rpl_open
mariadb_rpl_close
mariadb_rpl_fetch
mariadb_rpl_optionsv
mariadb_rpl_get_optionsv
mariadb_free_rpl_event)
IF(WITH_SSL)
SET(MARIADB_LIB_SYMBOLS ${MARIADB_LIB_SYMBOLS} mariadb_deinitialize_ssl)
ENDIF()
@@ -289,6 +295,7 @@ ma_loaddata.c
ma_stmt_codec.c
ma_string.c
ma_dtoa.c
mariadb_rpl.c
${CC_BINARY_DIR}/libmariadb/ma_client_plugin.c
ma_io.c
${SSL_SOURCES}

View File

@@ -158,6 +158,7 @@ const char *mariadb_client_errors[] =
/* 5005 */ "Error reading file '%s' (Errcode: %d)",
/* 5006 */ "Bulk operation without parameters is not supported",
/* 5007 */ "Invalid statement handle",
/* 5008 */ "Unsupported version %d. Supported versions are in the range %d - %d",
""
};

497
libmariadb/mariadb_rpl.c Normal file
View File

@@ -0,0 +1,497 @@
/************************************************************************************
Copyright (C) 2018 MariaDB Corpoeation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not see <http://www.gnu.org/licenses>
or write to the Free Software Foundation, Inc.,
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
*************************************************************************************/
#include <ma_global.h>
#include <ma_sys.h>
#include <mysql.h>
#include <errmsg.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <zlib.h>
#include <mariadb_rpl.h>
static int rpl_alloc_string(MARIADB_RPL_EVENT *event,
MARIADB_STRING *s,
unsigned char *buffer,
size_t len)
{
if (!(s->str= ma_alloc_root(&event->memroot, len)))
return 1;
memcpy(s->str, buffer, len);
s->length= len;
return 0;
}
MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version)
{
MARIADB_RPL *rpl;
if (version < MARIADB_RPL_REQUIRED_VERSION ||
version > MARIADB_RPL_VERSION)
{
my_set_error(mysql, CR_VERSION_MISMATCH, SQLSTATE_UNKNOWN, 0, version,
MARIADB_RPL_VERSION, MARIADB_RPL_REQUIRED_VERSION);
return 0;
}
if (!mysql)
return NULL;
if (!(rpl= (MARIADB_RPL *)calloc(1, sizeof(MARIADB_RPL))))
{
SET_CLIENT_ERROR(mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
return 0;
}
rpl->version= version;
rpl->mysql= mysql;
return rpl;
}
void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event)
{
if (event)
{
ma_free_root(&event->memroot, MYF(0));
free(event);
}
}
int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
{
unsigned char *ptr, *buf;
if (!rpl || !rpl->mysql)
return 1;
/* COM_BINLOG_DUMP:
Ofs Len Data
0 1 COM_BINLOG_DUMP
1 4 position
5 2 flags
7 4 server id
11 * filename
* = filename length
*/
ptr= buf=
#ifdef WIN32
(unsigned char *)_alloca(rpl->filename_length + 11);
#else
(unsigned char *)alloca(rpl->filename_length + 11);
#endif
int4store(ptr, (unsigned int)rpl->start_position);
ptr+= 4;
int2store(ptr, rpl->flags);
ptr+= 2;
int4store(ptr, rpl->server_id);
ptr+= 4;
memcpy(ptr, rpl->filename, rpl->filename_length);
ptr+= rpl->filename_length;
if (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0))
return 1;
return 0;
}
MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event)
{
unsigned char *ev;
size_t len;
MARIADB_RPL_EVENT *rpl_event= 0;
if (!rpl || !rpl->mysql)
return 0;
while (1) {
unsigned long pkt_len= ma_net_safe_read(rpl->mysql);
if (pkt_len == packet_error)
{
rpl->buffer_size= 0;
return 0;
}
/* EOF packet:
see https://mariadb.com/kb/en/library/eof_packet/
Packet length must be less than 9 bytes, EOF header
is 0xFE.
*/
if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE)
{
rpl->buffer_size= 0;
return 0;
}
/* if ignore heartbeat flag was set, we ignore this
record and continue to fetch next record.
The first byte is always status byte (0x00)
For event header description see
https://mariadb.com/kb/en/library/2-binlog-event-header/ */
if (rpl->flags & MARIADB_RPL_IGNORE_HEARTBEAT)
{
if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT)
continue;
}
rpl->buffer_size= pkt_len;
rpl->buffer= rpl->mysql->net.read_pos;
if (event)
{
MA_MEM_ROOT memroot= event->memroot;
rpl_event= event;
ma_free_root(&memroot, MYF(MY_KEEP_PREALLOC));
memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
rpl_event->memroot= memroot;
} else {
if (!(rpl_event = (MARIADB_RPL_EVENT *)malloc(sizeof(MARIADB_RPL_EVENT))))
goto mem_error;
memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
ma_init_alloc_root(&rpl_event->memroot, 8192, 0);
}
rpl_event->checksum= uint4korr(rpl->buffer + rpl->buffer_size - 4);
rpl_event->ok= rpl->buffer[0];
rpl_event->timestamp= uint4korr(rpl->buffer + 1);
rpl_event->event_type= (unsigned char)*(rpl->buffer + 5);
rpl_event->server_id= uint4korr(rpl->buffer + 6);
rpl_event->event_length= uint4korr(rpl->buffer + 10);
rpl_event->next_event_pos= uint4korr(rpl->buffer + 14);
rpl_event->flags= uint2korr(rpl->buffer + 18);
ev= rpl->buffer + EVENT_HEADER_OFS;
switch(rpl_event->event_type) {
case HEARTBEAT_LOG_EVENT:
rpl_event->event.heartbeat.timestamp= uint4korr(ev);
ev+= 4;
rpl_event->event.heartbeat.next_position= uint4korr(ev);
ev+= 4;
rpl_event->event.heartbeat.type= (uint8_t)*ev;
ev+= 1;
rpl_event->event.heartbeat.flags= uint2korr(ev);
break;
case BINLOG_CHECKPOINT_EVENT:
len= uint4korr(ev);
ev+= 4;
if (rpl_alloc_string(rpl_event, &rpl_event->event.checkpoint.filename, ev, len))
goto mem_error;
break;
case FORMAT_DESCRIPTION_EVENT:
rpl_event->event.format_description.format = uint2korr(ev);
ev+= 2;
rpl_event->event.format_description.server_version = (char *)(ev);
ev+= 50;
rpl_event->event.format_description.timestamp= uint4korr(ev);
ev+= 2;
rpl->fd_header_len= rpl_event->event.format_description.header_len= (uint8_t)*ev;
break;
case QUERY_EVENT:
{
size_t db_len, status_len;
rpl_event->event.query.thread_id= uint4korr(ev);
ev+= 4;
rpl_event->event.query.seconds= uint4korr(ev);
ev+= 4;
db_len= *ev;
ev++;
rpl_event->event.query.errornr= uint2korr(ev);
ev+= 2;
status_len= uint2korr(ev);
ev+= 2;
if (rpl_alloc_string(rpl_event, &rpl_event->event.query.status, ev, status_len))
goto mem_error;
ev+= status_len;
if (rpl_alloc_string(rpl_event, &rpl_event->event.query.database, ev, db_len))
goto mem_error;
ev+= db_len + 1; /* zero terminated */
/* calculate statement size: buffer + buffer_size - current_ofs (ev) - crc_size */
len= (size_t)(rpl->buffer + rpl->buffer_size - ev - 4);
if (rpl_alloc_string(rpl_event, &rpl_event->event.query.statement, ev, len))
goto mem_error;
break;
}
case TABLE_MAP_EVENT:
rpl_event->event.table_map.table_id= uint6korr(ev);
ev+= 8;
len= *ev;
ev++;
if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.database, ev, len))
goto mem_error;
ev+= len + 1;
len= *ev;
ev++;
if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.table, ev, len))
goto mem_error;
ev+= len + 1;
rpl_event->event.table_map.column_count= mysql_net_field_length(&ev);
rpl_event->event.table_map.column_types= (char *)ev;
ev+= rpl_event->event.table_map.column_count;
len= mysql_net_field_length(&ev);
if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.metadata, ev, len))
goto mem_error;
break;
case RAND_EVENT:
rpl_event->event.rand.first_seed= uint8korr(ev);
ev+= 8;
rpl_event->event.rand.second_seed= uint8korr(ev);
break;
case INTVAR_EVENT:
rpl_event->event.intvar.type= *ev;
ev++;
rpl_event->event.intvar.value= uint8korr(ev);
break;
case USER_VAR_EVENT:
len= uint4korr(ev);
ev+= 4;
if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.name, ev, len))
goto mem_error;
ev+= len;
if (!(rpl_event->event.uservar.is_null= (uint8)*ev))
{
ev++;
rpl_event->event.uservar.type= *ev;
ev++;
rpl_event->event.uservar.charset_nr= uint4korr(ev);
ev+= 4;
len= uint4korr(ev);
ev+= 4;
if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.value, ev, len))
goto mem_error;
ev+= len;
if ((unsigned long)(ev - rpl->buffer) < rpl->buffer_size)
rpl_event->event.uservar.flags= *ev;
}
break;
case START_ENCRYPTION_EVENT:
rpl_event->event.encryption.scheme= *ev;
ev++;
rpl_event->event.encryption.key_version= uint4korr(ev);
ev+= 4;
rpl_event->event.encryption.nonce= (char *)ev;
break;
case ANNOTATE_ROWS_EVENT:
len= (uint32)(rpl->buffer + rpl->buffer_size - (unsigned char *)ev - 4);
if (rpl_alloc_string(rpl_event, &rpl_event->event.annotate_rows.statement, ev, len))
goto mem_error;
break;
case ROTATE_EVENT:
rpl_event->event.rotate.position= uint8korr(ev);
ev+= 8;
len= rpl->buffer + rpl->buffer_size - ev;
if (rpl_alloc_string(rpl_event, &rpl_event->event.rotate.filename, ev, len))
goto mem_error;
break;
case XID_EVENT:
rpl_event->event.xid.transaction_nr= uint8korr(ev);
break;
case STOP_EVENT:
/* nothing to do here */
break;
case GTID_EVENT:
rpl_event->event.gtid.sequence_nr= uint8korr(ev);
ev+= 8;
rpl_event->event.gtid.domain_id= uint4korr(ev);
ev+= 4;
rpl_event->event.gtid.flags= *ev;
ev++;
if (rpl_event->event.gtid.flags & FL_GROUP_COMMIT_ID)
rpl_event->event.gtid.commit_id= uint8korr(ev);
break;
case GTID_LIST_EVENT:
{
uint32 i;
rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev);
ev++;
if (!(rpl_event->event.gtid_list.gtid= (MARIADB_GTID *)ma_alloc_root(&rpl_event->memroot, sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt)))
goto mem_error;
for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++)
{
rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev);
ev+= 4;
rpl_event->event.gtid_list.gtid[i].server_id= uint4korr(ev);
ev+= 4;
rpl_event->event.gtid_list.gtid[i].sequence_nr= uint8korr(ev);
ev+= 8;
}
break;
}
case WRITE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V1:
rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT_V1;
if (rpl->fd_header_len == 6)
{
rpl_event->event.rows.table_id= uint4korr(ev);
ev+= 4;
} else {
rpl_event->event.rows.table_id= uint6korr(ev);
ev+= 6;
}
rpl_event->event.rows.flags= uint2korr(ev);
ev+= 2;
len= rpl_event->event.rows.column_count= mysql_net_field_length(&ev);
if (!len)
break;
if (!(rpl_event->event.rows.column_bitmap =
(char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8)))
goto mem_error;
memcpy(rpl_event->event.rows.column_bitmap, ev, (len + 7) / 8);
ev+= (len + 7) / 8;
if (rpl_event->event_type == UPDATE_ROWS_EVENT_V1)
{
if (!(rpl_event->event.rows.column_update_bitmap =
(char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8)))
goto mem_error;
memcpy(rpl_event->event.rows.column_update_bitmap, ev, (len + 7) / 8);
ev+= (len + 7) / 8;
}
if ((rpl_event->event.rows.row_data_size= rpl->buffer + rpl->buffer_size - ev))
{
if (!(rpl_event->event.rows.row_data =
(char *)ma_alloc_root(&rpl_event->memroot, rpl_event->event.rows.row_data_size)))
goto mem_error;
memcpy(rpl_event->event.rows.row_data, ev, rpl_event->event.rows.row_data_size);
}
break;
default:
return NULL;
break;
}
return rpl_event;
}
mem_error:
SET_CLIENT_ERROR(rpl->mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
return 0;
}
void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl)
{
if (!rpl)
return;
if (rpl->filename)
free((void *)rpl->filename);
free(rpl);
return;
}
int STDCALL mariadb_rpl_optionsv(MARIADB_RPL *rpl,
enum mariadb_rpl_option option,
...)
{
va_list ap;
int rc= 0;
if (!rpl)
return 1;
va_start(ap, option);
switch (option) {
case MARIADB_RPL_FILENAME:
{
char *arg1= va_arg(ap, char *);
rpl->filename_length= va_arg(ap, size_t);
free((void *)rpl->filename);
rpl->filename= NULL;
if (rpl->filename_length)
{
rpl->filename= (char *)malloc(rpl->filename_length);
memcpy((void *)rpl->filename, arg1, rpl->filename_length);
}
else if (arg1)
{
rpl->filename= strdup((const char *)arg1);
rpl->filename_length= strlen(rpl->filename);
}
break;
}
case MARIADB_RPL_SERVER_ID:
{
rpl->server_id= va_arg(ap, unsigned int);
break;
}
case MARIADB_RPL_FLAGS:
{
rpl->flags= va_arg(ap, unsigned int);
break;
}
case MARIADB_RPL_START:
{
rpl->start_position= va_arg(ap, unsigned long);
break;
}
default:
rc= -1;
goto end;
}
end:
return rc;
}
int STDCALL mariadb_rpl_get_optionsv(MARIADB_RPL *rpl,
enum mariadb_rpl_option option,
...)
{
va_list ap;
if (!rpl)
return 1;
va_start(ap, option);
switch (option) {
case MARIADB_RPL_FILENAME:
{
const char **name= (const char **)va_arg(ap, char **);
size_t *len= (size_t*)va_arg(ap, size_t *);
*name= rpl->filename;
*len= rpl->filename_length;
break;
}
case MARIADB_RPL_SERVER_ID:
{
unsigned int *id= va_arg(ap, unsigned int *);
*id= rpl->server_id;
break;
}
case MARIADB_RPL_FLAGS:
{
unsigned int *flags= va_arg(ap, unsigned int *);
*flags= rpl->flags;
break;
}
case MARIADB_RPL_START:
{
unsigned long *start= va_arg(ap, unsigned long *);
*start= rpl->start_position;
break;
}
default:
return 1;
break;
}
return 0;
}

View File

@@ -26,7 +26,7 @@ INCLUDE_DIRECTORIES(${CC_SOURCE_DIR}/include
${CC_SOURCE_DIR}/unittest/libmariadb)
ADD_DEFINITIONS(-DLIBMARIADB)
SET(API_TESTS "conc336" "bulk1" "performance" "basic-t" "fetch" "charset" "logs" "cursor" "errors" "view" "ps" "ps_bugs" "sp" "result" "connection" "misc" "ps_new" "thread" "features-10_2" "bulk1")
SET(API_TESTS "conc336" "bulk1" "performance" "basic-t" "fetch" "charset" "logs" "cursor" "errors" "view" "ps" "ps_bugs" "sp" "result" "connection" "misc" "ps_new" "thread" "features-10_2" "bulk1" "rpl_api")
IF(WITH_DYNCOL)
SET(API_TESTS ${API_TESTS} "dyncol")
ENDIF()

View File

@@ -0,0 +1,75 @@
/*
Copyright (c) 2018 MariaDB Corporation AB
The MySQL Connector/C is licensed under the terms of the GPLv2
<http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>, like most
MySQL Connectors. There are special exceptions to the terms and
conditions of the GPLv2 as it is applied to this software, see the
FLOSS License Exception
<http://www.mysql.com/about/legal/licensing/foss-exception.html>.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
Some basic tests of the client API.
*/
#include "my_test.h"
#include "mariadb_rpl.h"
static int test_rpl_01(MYSQL *mysql)
{
int i;
unsigned int server_id= 0;
MARIADB_RPL_EVENT *event= NULL;
MARIADB_RPL *rpl= mariadb_rpl_init(mysql);
mysql_query(mysql, "SET @mariadb_slave_capability=4");
mysql_query(mysql, "SET NAMES latin1");
mysql_query(mysql, "SET @slave_gtid_strict_mode=1");
mysql_query(mysql, "SET @slave_gtid_ignore_duplicates=1");
mysql_query(mysql, "SET NAMES utf8");
mysql_query(mysql, "SET @master_binlog_checksum= @@global.binlog_checksum");
rpl->server_id= 12;
rpl->start_position= 4;
rpl->flags= MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS;
if (mariadb_rpl_open(rpl))
return FAIL;
while((event= mariadb_rpl_fetch(rpl, event)))
{
printf("event: %d\n", event->event_type);
}
mariadb_free_rpl_event(event);
return OK;
}
struct my_tests_st my_tests[] = {
{"test_rpl_01", test_rpl_01, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{NULL, NULL, 0, 0, NULL, NULL}
};
int main(int argc, char **argv)
{
if (argc > 1)
get_options(argc, argv);
get_envvars();
run_tests(my_tests);
return(exit_status());
}