1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-04-18 21:24:07 +03:00
Georg Richter 03195a2f30 Fix for CENTOS7:
Since CentOS7 builder still uses gcc4.8, we need to move c99
declarations out of the loop.
2023-08-16 20:11:45 +02:00

2100 lines
58 KiB
C

/************************************************************************************
Copyright (C) 2018,2022 MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not see <http://www.gnu.org/licenses>
or write to the Free Software Foundation, Inc.,
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
*************************************************************************************/
#include <ma_global.h>
#include <ma_sys.h>
#include <ma_common.h>
#include <mysql.h>
#include <errmsg.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <zlib.h>
#include <ma_decimal.h>
#include <mariadb_rpl.h>
#ifdef WIN32
#include <malloc.h>
#undef alloca
#define alloca _malloca
#endif
#define RPL_EVENT_HEADER_SIZE 19
#define RPL_ERR_POS(r) (r)->filename_length, (r)->filename, (r)->start_position
#define RPL_CHECK_NULL_POS(position, end)\
{\
uchar *tmp= (position);\
while (*tmp && tmp < (end))\
tmp++;\
if (tmp > (end))\
goto malformed_packet;\
}
#define RPL_CHECK_POS(position, end, bytes)\
if ((end)-(position) < (ssize_t)(bytes))\
goto malformed_packet;
#define RPL_CHECK_FIELD_LENGTH(position, end)\
{\
RPL_CHECK_POS((position), (end), 1);\
RPL_CHECK_POS((position), (end), net_field_size((position)));\
}
#define RPL_CHECK_POST_HEADER_LEN(position, end, type)\
if (rpl->post_header_len[(type) - 1])\
{\
RPL_CHECK_POS((position), (end), rpl->post_header_len[(type)-1])\
}
static inline uint64_t uintNkorr(uint8_t len, u_char *p)
{
switch (len) {
case 1:
return *p;
case 2:
return uint2korr(p);
case 3:
return uint3korr(p);
case 4:
return uint4korr(p);
case 8:
return uint8korr(p);
default:
return 0;
}
}
static inline int net_field_size(uchar *p)
{
if (*p <= 251)
return 1;
if (*p == 252)
return 3;
if (*p == 253)
return 4;
return 9;
}
static inline int rpl_bit_size(uint32_t x)
{
int bits= 1;
while (x >>= 1)
bits++;
return bits;
}
static inline int rpl_byte_size(uint32_t x)
{
int bits= rpl_bit_size(x);
return (bits + 7) / 8;
}
void rpl_set_error(MARIADB_RPL *rpl,
unsigned int error_nr,
const char *format,
...)
{
va_list ap;
const char *errmsg;
return;
if (!format)
{
if (error_nr >= CR_MIN_ERROR && error_nr <= CR_MYSQL_LAST_ERROR)
errmsg= ER(error_nr);
else if (error_nr >= CER_MIN_ERROR && error_nr <= CR_MARIADB_LAST_ERROR)
errmsg= CER(error_nr);
else
errmsg= ER(CR_UNKNOWN_ERROR);
}
rpl->error_no= error_nr;
va_start(ap, format);
vsnprintf(rpl->error_msg, MYSQL_ERRMSG_SIZE - 1,
format ? format : errmsg, ap);
va_end(ap);
/* For backward compatibility we also need to set a connection
error, if we read from primary instead of file */
if (rpl->mysql)
{
my_set_error(rpl->mysql, error_nr, SQLSTATE_UNKNOWN, rpl->error_msg);
}
}
const char * STDCALL mariadb_rpl_error(MARIADB_RPL *rpl)
{
return rpl->error_msg;
}
uint32_t STDCALL mariadb_rpl_errno(MARIADB_RPL *rpl)
{
return rpl->error_no;
}
uint8_t rpl_parse_opt_metadata(MARIADB_RPL_EVENT *event, const uchar *buffer, size_t length)
{
const uchar *pos= buffer, *end= buffer + length;
struct st_mariadb_rpl_table_map_event *tm_event= (struct st_mariadb_rpl_table_map_event *)&event->event;
if (event->event_type != TABLE_MAP_EVENT)
return 1;
while (pos < end)
{
uint8_t meta_type= *pos++;
uint32_t len;
RPL_CHECK_FIELD_LENGTH((uchar *)pos, end);
len= net_field_length((uchar **)&pos);
RPL_CHECK_POS(pos, end,len);
switch(meta_type)
{
case SIGNEDNESS:
tm_event->signed_indicator= (uchar *)pos;
pos+= len;
break;
case DEFAULT_CHARSET:
tm_event->default_charset= *pos;
pos+= len;
break;
case COLUMN_CHARSET:
tm_event->column_charsets.data= pos;
tm_event->column_charsets.length= len;
pos+= len;
break;
case COLUMN_NAME:
tm_event->column_names.data= pos;
tm_event->column_names.length= len;
pos+= len;
break;
case SIMPLE_PRIMARY_KEY:
tm_event->simple_primary_keys.data= pos;
tm_event->simple_primary_keys.length= len;
pos+= len;
break;
case PRIMARY_KEY_WITH_PREFIX:
tm_event->prefixed_primary_keys.data= pos;
tm_event->prefixed_primary_keys.length= len;
pos+= len;
break;
case GEOMETRY_TYPE:
{
tm_event->geometry_types.data= pos;
tm_event->geometry_types.length= len;
pos+= len;
break;
}
/* Default character set used by all columns */
case ENUM_AND_SET_DEFAULT_CHARSET:
tm_event->enum_set_default_charset= *pos;
pos+= len;
break;
case ENUM_AND_SET_COLUMN_CHARSET:
tm_event->enum_set_column_charsets.data= pos;
tm_event->enum_set_column_charsets.length= len;
pos+= len;
break;
case SET_STR_VALUE:
tm_event->set_values.data= pos;
tm_event->set_values.length= len;
pos+= len;
break;
case ENUM_STR_VALUE:
tm_event->enum_values.data= pos;
tm_event->enum_values.length= len;
pos+= len;
break;
default:
rpl_set_error(event->rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(event->rpl), "Unknown/unsupported event type");
pos+= len;
break;
}
}
return 0;
malformed_packet:
return 1;
}
static void *ma_calloc_root(void *memroot, size_t len)
{
void *p;
if ((p= ma_alloc_root(memroot, len)))
memset(p, 0, len);
return p;
}
static void rpl_set_string_and_len(MARIADB_STRING *s,
unsigned char *buffer,
size_t len)
{
if (!buffer || !len)
{
s->length= 0;
return;
}
s->str= (char *)buffer;
s->length= len;
}
static uint8_t rpl_alloc_set_string_and_len(MARIADB_RPL_EVENT *event,
MARIADB_STRING *s,
void *buffer,
size_t len)
{
if (!buffer || !len)
{
s->length= 0;
return 0;
}
if (!(s->str = (char *)ma_alloc_root(&event->memroot, len)))
return 1;
memcpy(s->str, buffer, len);
s->length= len;
return 0;
}
static uint8_t rpl_metadata_size(enum enum_field_types field_type)
{
switch (field_type) {
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_DATETIME2:
case MYSQL_TYPE_TIMESTAMP2:
case MYSQL_TYPE_TIME2:
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
return 1;
case MYSQL_TYPE_STRING:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
case MYSQL_TYPE_NEWDECIMAL:
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_BIT:
return 2;
default:
return 0;
}
}
static uint8_t ma_rpl_get_second_part(MYSQL_TIME *tm, uchar *ptr, uchar *metadata)
{
switch(metadata[0])
{
case 0:
tm->second_part= 0;
return 0;
case 1:
case 2:
tm->second_part= (uint32_t)ptr[0] * 10000;
return 1;
case 3:
case 4:
tm->second_part= myisam_sint2korr(ptr) * 100;
return 2;
case 5:
case 6:
tm->second_part= myisam_sint3korr(ptr);
return 3;
default:
return 0;
}
}
MARIADB_RPL_ROW * STDCALL
mariadb_rpl_extract_rows(MARIADB_RPL *rpl,
MARIADB_RPL_EVENT *tm_event,
MARIADB_RPL_EVENT *row_event)
{
uchar *start, *pos, *end;
MARIADB_RPL_ROW *f_row= NULL, *p_row= NULL, *c_row= NULL;
uint32_t column_count;
if (!rpl || !tm_event || !row_event)
return NULL;
if (tm_event->event_type != TABLE_MAP_EVENT || !(IS_ROW_EVENT(row_event)))
{
rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl), "Event with wrong event type passed.");
return NULL;
}
if (row_event->event.rows.table_id != tm_event->event.table_map.table_id)
{
rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl), "table_id in table_map event differs.");
return NULL;
}
if (!row_event->event.rows.row_data_size ||
!row_event->event.rows.row_data)
{
rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Row event has no data.");
return NULL;
}
column_count= tm_event->event.table_map.column_count;
start= pos = row_event->event.rows.row_data;
end= start + row_event->event.rows.row_data_size;
while (pos < end)
{
uchar *n_bitmap;
uint32_t i;
uchar *metadata= (uchar *)tm_event->event.table_map.metadata.str;
if (!(c_row = (MARIADB_RPL_ROW *)ma_calloc_root(&row_event->memroot, sizeof(MARIADB_RPL_ROW))) ||
!(c_row->columns= (MARIADB_RPL_VALUE *)ma_calloc_root(&row_event->memroot,
sizeof(MARIADB_RPL_VALUE) * column_count)))
{
rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0);
return NULL;
}
if (!f_row)
f_row= c_row;
if (p_row)
p_row->next= c_row;
c_row->column_count= column_count;
n_bitmap= pos;
pos+= (column_count + 7) / 8;
for (i= 0; i < column_count; i++)
{
MARIADB_RPL_VALUE *column= &c_row->columns[i];
column->field_type= (uchar)tm_event->event.table_map.column_types.str[i];
/* enum, set and string types are stored as string - first metadata
byte contains real_type, second byte contains the length */
if (column->field_type == MYSQL_TYPE_STRING)
{
if (metadata[0] == MYSQL_TYPE_ENUM || metadata[0] == MYSQL_TYPE_SET)
column->field_type = metadata[0];
}
if ((n_bitmap[i / 8] >> (i % 8)) & 1)
{
column->is_null= 1;
metadata+= rpl_metadata_size(column->field_type);
continue;
}
if (column->field_type == MYSQL_TYPE_BLOB)
{
switch(metadata[0])
{
case 1:
column->field_type= MYSQL_TYPE_TINY_BLOB;
break;
case 3:
column->field_type= MYSQL_TYPE_MEDIUM_BLOB;
break;
case 4:
column->field_type= MYSQL_TYPE_LONG_BLOB;
break;
default:
break;
}
}
switch (column->field_type) {
case MYSQL_TYPE_TINY:
column->val.ll= sint1korr(pos);
column->val.ull= uint1korr(pos);
pos++;
break;
case MYSQL_TYPE_YEAR:
column->val.ull= uint1korr(pos++) + 1900;
break;
case MYSQL_TYPE_SHORT:
column->val.ll= sint2korr(pos);
column->val.ull= uint2korr(pos);
pos+= 2;
break;
case MYSQL_TYPE_INT24:
column->val.ll= sint3korr(pos);
column->val.ull= uint3korr(pos);
pos+= 3;
break;
case MYSQL_TYPE_LONG:
column->val.ll= sint4korr(pos);
column->val.ull= uint4korr(pos);
pos+= 4;
break;
case MYSQL_TYPE_LONGLONG:
column->val.ll= sint8korr(pos);
column->val.ull= uint8korr(pos);
pos+= 8;
break;
case MYSQL_TYPE_NEWDECIMAL:
{
uint8_t precision= *metadata++;
uint8_t scale= *metadata++;
uint32_t bin_size;
decimal dec;
char str[200];
char buf[100];
int s_len= sizeof(str) - 1;
dec.buf= (void *)buf;
dec.len= sizeof(buf) / sizeof(decimal_digit);
bin_size= decimal_bin_size(precision, scale);
bin2decimal((char *)pos, &dec, precision, scale);
decimal2string(&dec, str, &s_len);
pos+= bin_size;
if (rpl_alloc_set_string_and_len(row_event, &column->val.str, str, s_len))
goto mem_error;
break;
}
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
{
uint8_t flen= *metadata++;
if (flen == 4)
{
float4get(column->val.f, pos);
}
if (flen == 8)
{
float8get(column->val.d, pos);
}
pos+= flen;
break;
}
case MYSQL_TYPE_BIT:
{
uint8_t num_bits= (metadata[0] & 0xFF) + metadata[1] * 8;
uint8_t b_len= (num_bits + 7) / 8;
metadata+= 2;
if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, b_len))
goto mem_error;
pos+= b_len;
break;
}
case MYSQL_TYPE_TIMESTAMP:
{
column->val.ull= uint4korr(pos);
pos+= 4;
break;
}
case MYSQL_TYPE_TIMESTAMP2:
{
char tmp[20];
uint32_t p1= uint4korr(pos);
uint8_t f_len= *metadata++;
uint32_t p2;
pos+= 4;
p2= (uint32_t)uintNkorr(f_len, pos);
pos+= f_len;
sprintf(tmp, "%d.%d", p1, p2);
if (rpl_alloc_set_string_and_len(row_event, &column->val.str, tmp, strlen(tmp)))
goto mem_error;
break;
}
case MYSQL_TYPE_DATE:
{
MYSQL_TIME *tm= &column->val.tm;
uint32_t d_val= uint3korr(pos);
pos+= 3;
tm->year= (int)(d_val / (16 * 32));
tm->month= (int)(d_val / 32 % 16);
tm->day= d_val % 32;
tm->time_type= MYSQL_TIMESTAMP_DATE;
break;
}
case MYSQL_TYPE_TIME2:
{
MYSQL_TIME *tm= &column->val.tm;
int64_t t_val= myisam_uint3korr(pos) - 0x800000LL;
if ((tm->neg = t_val < 0))
t_val= -t_val;
pos+= 3;
tm->hour= (t_val >> 12) % (1 << 10);
tm->minute= (t_val >> 6) % (1 << 6);
tm->second= t_val % (1 << 6);
pos+= ma_rpl_get_second_part(tm, pos, metadata);
metadata++;
tm->time_type= MYSQL_TIMESTAMP_TIME;
column->field_type= MYSQL_TYPE_TIME;
break;
}
case MYSQL_TYPE_DATETIME2:
{
MYSQL_TIME *tm= &column->val.tm;
uint64_t dt_val= mi_uint5korr(pos) - 0x8000000000LL,
date_part, time_part;
pos+= 5;
date_part= dt_val >> 17;
time_part= dt_val % (1 << 17);
tm->day= (unsigned int)date_part % (1 << 5);
tm->month= (unsigned int)(date_part >> 5) % 13;
tm->year= (unsigned int)(date_part >> 5) / 13;
tm->second= time_part % (1 << 6);
tm->minute= (time_part >> 6) % (1 << 6);
tm->hour= (uint32_t)(time_part >> 12);
tm->time_type= MYSQL_TIMESTAMP_DATETIME;
column->field_type= MYSQL_TYPE_DATETIME;
pos+= ma_rpl_get_second_part(tm, pos, metadata);
metadata++;
break;
}
case MYSQL_TYPE_STRING:
{
uint8_t s_len= metadata[2];
metadata+= 2;
if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, s_len))
goto mem_error;
pos+= s_len;
break;
}
case MYSQL_TYPE_ENUM:
{
uint8_t e_len= metadata[2];
metadata+= 2;
column->val.ull= uintNkorr(e_len, pos);
pos+= e_len;
break;
}
case MYSQL_TYPE_SET:
{
uint8_t e_len= metadata[2];
metadata+= 2;
column->val.ull= uintNkorr(e_len, pos);
pos+= e_len;
break;
}
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_GEOMETRY:
{
uint8_t h_len= *metadata++;
uint64_t b_len= uintNkorr(h_len, pos);
pos+= h_len;
if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, (size_t)b_len))
goto mem_error;
pos+= b_len;
break;
}
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
{
uint32_t s_len= uint2korr(metadata);
uint8_t byte_len= rpl_byte_size(s_len);
metadata+= 2;
s_len= (uint32_t)uintNkorr(byte_len, pos);
pos+= byte_len;
if (rpl_alloc_set_string_and_len(row_event, &column->val.str, pos, s_len))
goto mem_error;
pos+= s_len;
break;
}
case MYSQL_TYPE_TIME:
{
MYSQL_TIME *tm= &column->val.tm;
uint64_t t= uint8korr(pos);
pos+= 8;
tm->hour= (unsigned int)(t/100)/100;
tm->minute= (unsigned int)(t/100) % 100;
tm->second= (unsigned int)t % 100;
tm->time_type= MYSQL_TIMESTAMP_TIME;
break;
}
case MYSQL_TYPE_DATETIME:
{
MYSQL_TIME *tm= &column->val.tm;
uint64_t t= uint8korr(pos);
uint32_t d_val= (uint32_t)t / 1000000,
t_val= (uint32_t)t % 1000000;
pos+= 8;
tm->year= (unsigned int)(d_val / 100) / 100;
tm->month= (unsigned int)(d_val / 100) % 100;
tm->day= (unsigned int)d_val % 100;
tm->hour= (t_val/100)/100;
tm->minute= (t_val/100) % 100;
tm->second= t_val % 100;
tm->time_type= MYSQL_TIMESTAMP_DATETIME;
break;
}
default:
break;
}
}
p_row= c_row;
}
return f_row;
mem_error:
rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0);
return NULL;
}
MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version)
{
MARIADB_RPL *rpl;
if (version < MARIADB_RPL_REQUIRED_VERSION ||
version > MARIADB_RPL_VERSION)
{
if (mysql)
my_set_error(mysql, CR_VERSION_MISMATCH, SQLSTATE_UNKNOWN, 0, version,
MARIADB_RPL_VERSION, MARIADB_RPL_REQUIRED_VERSION);
return 0;
}
if (!(rpl= (MARIADB_RPL *)calloc(1, sizeof(MARIADB_RPL))))
{
SET_CLIENT_ERROR(mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
return 0;
}
rpl->version= version;
if ((rpl->mysql= mysql))
{
MYSQL_RES *result;
if (!mysql_query(mysql, "select @@binlog_checksum"))
{
if ((result= mysql_store_result(mysql)))
{
MYSQL_ROW row= mysql_fetch_row(result);
if (!strcmp(row[0], "CRC32"))
{
rpl->artificial_checksum= 1;
}
mysql_free_result(result);
}
}
}
/* recommended way to set replica host and port is via rpl_optionsv(), however if
hostname and port was set via mysql_optionsv, we set it here to rpl */
if (rpl->mysql && rpl->mysql->options.extension && rpl->mysql->options.extension->rpl_host)
{
mariadb_rpl_optionsv(rpl, MARIADB_RPL_HOST, rpl->mysql->options.extension->rpl_host);
mariadb_rpl_optionsv(rpl, MARIADB_RPL_PORT, rpl->mysql->options.extension->rpl_port);
}
return rpl;
}
void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event)
{
if (event)
{
ma_free_root(&event->memroot, MYF(0));
free(event);
}
}
int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
{
unsigned char *ptr, *buf;
if (!rpl)
return 1;
/* COM_BINLOG_DUMP:
Ofs Len Data
0 1 COM_BINLOG_DUMP
1 4 position
5 2 flags
7 4 server id
11 * filename
* = filename length
*/
rpl_clear_error(rpl);
/* if replica was specified, we will register replica via
COM_REGISTER_SLAVE */
if (rpl->host)
{
/* Protocol:
Ofs Len Data
0 1 COM_REGISTER_SLAVE
1 4 server id
5 1 replica host name length
6 <n> replica host name
1 user name length
<n> user name
1 password length
<n> password
2 replica port
4 replication rank (unused)
4 source server id (unused)
*/
unsigned char *p, buffer[1024];
size_t len= MIN(strlen(rpl->host), 255);
p= buffer;
int4store(p, rpl->server_id);
p+= 4;
*p++= (unsigned char)len;
memcpy(p, rpl->host, len);
p+= len;
/* Don't send user, password, rank and server_id */
*p++= 0;
*p++= 0;
int2store(p, rpl->port);
p+= 2;
int4store(p, 0);
p+= 4;
int4store(p, 0);
p+= 4;
if (ma_simple_command(rpl->mysql, COM_REGISTER_SLAVE, (const char *)buffer, p - buffer, 0, 0))
{
rpl_set_error(rpl, mysql_errno(rpl->mysql), 0, NULL, 0);
return 1;
}
}
if (rpl->mysql)
{
uint32_t replica_id= rpl->server_id;
ptr= buf= (unsigned char *)alloca(rpl->filename_length + 11);
if (!ptr)
{
rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0);
}
if (rpl->is_semi_sync)
{
if (mysql_query(rpl->mysql, "SET @rpl_semi_sync_slave=1"))
{
rpl_set_error(rpl, mysql_errno(rpl->mysql), 0, mysql_error(rpl->mysql));
return 1;
}
}
else {
MYSQL_RES* result;
MYSQL_ROW row;
if (mysql_query(rpl->mysql, "SELECT @rpl_semi_sync_slave=1"))
{
rpl_set_error(rpl, mysql_errno(rpl->mysql), 0, mysql_error(rpl->mysql));
return 1;
}
if ((result = mysql_store_result(rpl->mysql)))
{
if ((row = mysql_fetch_row(result)))
rpl->is_semi_sync = (row[0] != NULL && row[0][0] == '1');
mysql_free_result(result);
}
}
int4store(ptr, (unsigned int)rpl->start_position);
ptr+= 4;
int2store(ptr, rpl->flags);
ptr+= 2;
if ((rpl->flags & MARIADB_RPL_BINLOG_DUMP_NON_BLOCK) && !replica_id)
replica_id= 1;
int4store(ptr, replica_id);
ptr+= 4;
memcpy(ptr, rpl->filename, rpl->filename_length);
ptr+= rpl->filename_length;
return (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0));
} else
{
char *buf[RPL_BINLOG_MAGIC_SIZE];
MYSQL mysql;
/* Semi sync doesn't work when processing files */
rpl->is_semi_sync = 0;
if (rpl->fp)
ma_close(rpl->fp);
if (!(rpl->fp= ma_open((const char *)rpl->filename, "r", &mysql)))
{
rpl_set_error(rpl, CR_FILE_NOT_FOUND, 0, rpl->filename, errno);
return errno;
}
if (ma_read(buf, 1, RPL_BINLOG_MAGIC_SIZE, rpl->fp) != 4)
{
rpl_set_error(rpl, CR_FILE_READ, 0, rpl->filename, errno);
return errno;
}
/* check if it is a valid binlog file */
if (memcmp(buf, RPL_BINLOG_MAGIC, RPL_BINLOG_MAGIC_SIZE) != 0)
{
rpl_set_error(rpl, CR_BINLOG_INVALID_FILE, 0, rpl->filename, errno);
return errno;
}
return 0;
}
}
static int ma_set_rpl_filename(MARIADB_RPL *rpl, const unsigned char *filename, size_t len)
{
if (!rpl)
return 1;
free(rpl->filename);
if (!(rpl->filename= (char *)malloc(len)))
return 1;
memcpy(rpl->filename, filename, len);
rpl->filename_length= (uint32_t)len;
return 0;
}
/*
* Returns compression info:
* Ofs Len
* 0 1 header:
* ofs & 0x07 >> 4: algorithm, always 0=zlib
* ofs & 0x07: header size
* 1 header size uncompressed length in MyISAM format.
*/
static uint32_t get_compression_info(const unsigned char *buf,
uint8_t *algorithm,
uint8_t *header_size)
{
uint8_t alg, header;
uint32 len= 0;
if (!algorithm)
algorithm= &alg;
if (!header_size)
header_size= &header;
*header_size= 0;
*algorithm= 0;
if (!buf)
return len;
if ((buf[0] & 0xe0) != 0x80)
return len;
*header_size= buf[0] & 0x07;
*algorithm = (buf[0] & 0x07) >> 4;
buf++;
/* Attention: we can't use uint*korr, here, we need myisam macros since
length is stored in high byte first order
*/
switch(*header_size) {
case 1:
len= *buf;
break;
case 2:
len= myisam_uint2korr(buf);
break;
case 3:
len= myisam_uint3korr(buf);
break;
case 4:
len= myisam_uint4korr(buf);
break;
default:
len= 0;
break;
}
*header_size += 1;
return len;
}
static uint8_t mariadb_rpl_send_semisync_ack(MARIADB_RPL* rpl, MARIADB_RPL_EVENT* event)
{
size_t buf_size = 0;
uchar* buf;
if (!rpl)
return 1;
if (!event)
{
rpl_set_error(rpl, CR_BINLOG_SEMI_SYNC_ERROR, 0, "Invalid event");
return 1;
}
if (!rpl->is_semi_sync)
{
rpl_set_error(rpl, CR_BINLOG_SEMI_SYNC_ERROR, 0, "semi synchronous replication is not enabled");
return 1;
}
if (!event->is_semi_sync || (event->semi_sync_flags != SEMI_SYNC_ACK_REQ))
{
rpl_set_error(rpl, CR_BINLOG_SEMI_SYNC_ERROR, 0, "This event doesn't require to send semi synchronous acknoledgement");
return 1;
}
buf_size = rpl->filename_length + 9;
buf = alloca(buf_size);
buf[0] = SEMI_SYNC_INDICATOR;
int8store(buf + 1, (uint64_t)event->next_event_pos);
memcpy(buf + 9, rpl->filename, rpl->filename_length);
ma_net_clear(&rpl->mysql->net);
if (ma_net_write(&rpl->mysql->net, buf, buf_size) ||
(ma_net_flush(&rpl->mysql->net)))
{
rpl_set_error(rpl, CR_CONNECTION_ERROR, 0);
return 1;
}
return 0;
}
MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event)
{
unsigned char *ev= 0;
unsigned char *checksum_start= 0;
unsigned char *ev_start= 0;
unsigned char *ev_end= 0;
size_t len= 0;
MARIADB_RPL_EVENT *rpl_event= 0;
if (!rpl || (!rpl->mysql && !rpl->fp))
return 0;
if (event)
{
MA_MEM_ROOT memroot= event->memroot;
rpl_event= event;
ma_free_root(&memroot, MYF(MY_KEEP_PREALLOC));
memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
rpl_event->memroot= memroot;
} else {
if (!(rpl_event = (MARIADB_RPL_EVENT *)malloc(sizeof(MARIADB_RPL_EVENT))))
goto mem_error;
memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
ma_init_alloc_root(&rpl_event->memroot, 8192, 0);
}
rpl_event->rpl= rpl;
while (1) {
unsigned long pkt_len;
if (rpl->mysql)
{
pkt_len= ma_net_safe_read(rpl->mysql);
if (pkt_len == packet_error)
{
mariadb_free_rpl_event(rpl_event);
return 0;
}
/* EOF packet:
see https://mariadb.com/kb/en/library/eof_packet/
Packet length must be less than 9 bytes, EOF header
is 0xFE.
*/
if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE)
{
mariadb_free_rpl_event(rpl_event);
return 0;
}
/* if ignore heartbeat flag was set, we ignore this
record and continue to fetch next record.
The first byte is always status byte (0x00)
For event header description see
https://mariadb.com/kb/en/library/2-binlog-event-header/ */
if (rpl->flags & MARIADB_RPL_IGNORE_HEARTBEAT)
{
if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT)
continue;
}
if (!(rpl_event->raw_data= ma_alloc_root(&rpl_event->memroot, pkt_len)))
goto mem_error;
rpl_event->raw_data_size= pkt_len;
memcpy(rpl_event->raw_data, rpl->mysql->net.read_pos, pkt_len);
ev= rpl_event->raw_data;
} else if (rpl->fp) {
char buf[EVENT_HEADER_OFS]; /* header */
size_t rc;
uint32_t len= 0;
char *p= buf;
if (ma_feof(rpl->fp))
{
return NULL;
}
memset(buf, 0, EVENT_HEADER_OFS);
if ((rc= ma_read(buf, 1, EVENT_HEADER_OFS - 1, rpl->fp)) != EVENT_HEADER_OFS - 1)
{
rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Can't read event header");
mariadb_free_rpl_event(rpl_event);
return NULL;
}
len= uint4korr(p + 9);
if (!(rpl_event->raw_data= ma_alloc_root(&rpl_event->memroot, len)))
{
rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0);
mariadb_free_rpl_event(rpl_event);
return NULL;
}
rpl_event->raw_data_size= len;
memcpy(rpl_event->raw_data, buf, EVENT_HEADER_OFS - 1);
len-= (EVENT_HEADER_OFS - 1);
rc= ma_read(rpl_event->raw_data + EVENT_HEADER_OFS - 1, 1, len, rpl->fp);
if (rc != len)
{
rpl_set_error(rpl, CR_BINLOG_ERROR, 0, "Error while reading post header");
mariadb_free_rpl_event(rpl_event);
return NULL;
}
ev= rpl_event->raw_data;
/* We don't decrypt yet */
if (rpl->encrypted) {
return rpl_event;
}
}
ev_end= rpl_event->raw_data + rpl_event->raw_data_size;
if (rpl->mysql)
{
RPL_CHECK_POS(ev, ev_end, 1);
rpl_event->ok= *ev++;
/* CONC-470: add support for semi snychronous replication */
if (rpl->is_semi_sync && (rpl_event->is_semi_sync= (*ev == SEMI_SYNC_INDICATOR)))
{
RPL_CHECK_POS(ev, ev_end, 1);
ev++;
rpl_event->semi_sync_flags= *ev++;
}
}
rpl_event->raw_data_ofs= ev - rpl_event->raw_data;
/* check sum verification:
check sum will be calculated from begin of binlog header
*/
checksum_start= ev;
/******************************************************************
Binlog event header:
All binary log events have the same header:
- uint32_t timestamp: creation time
- uint8_t event_type: type code of the event
- uint32_t server_id: server which created the event
- uint32_t event_len: length of the event. If checksum is
enabled, the length also include 4 bytes
of checksum
------------- if START_ENCRYPTION_EVENT was sent, ---------------
encrypted part starts here:
- uint32_t next_pos: Position of next binary log event
- uint16_t flags: flags
The size of binlog event header must match the header size returned
by FORMAT_DESCIPTION_EVENT. In version 4 it is always 19.
********************************************************************/
RPL_CHECK_POS(ev, ev_end, RPL_EVENT_HEADER_SIZE);
rpl_event->timestamp= uint4korr(ev);
ev+= 4;
rpl_event->event_type= (unsigned char)*ev++;
rpl_event->server_id= uint4korr(ev);
ev+= 4;
rpl_event->event_length= uint4korr(ev);
ev+= 4;
rpl_event->next_event_pos= uint4korr(ev);
ev+= 4;
rpl_event->flags= uint2korr(ev);
ev+=2;
rpl_event->checksum= 0;
/* start of post_header */
ev_start= ev;
DBUG_ASSERT(rpl_event->event_type < ENUM_END_EVENT);
switch(rpl_event->event_type) {
case UNKNOWN_EVENT:
case SLAVE_EVENT:
return rpl_event;
break;
case HEARTBEAT_LOG_EVENT:
/* no post header size */
RPL_CHECK_POS(ev, ev_end, 11);
rpl_event->event.heartbeat.timestamp= uint4korr(ev);
ev+= 4;
rpl_event->event.heartbeat.next_position= uint4korr(ev);
ev+= 4;
rpl_event->event.heartbeat.type= (uint8_t)*ev;
ev+= 1;
rpl_event->event.heartbeat.flags= uint2korr(ev);
ev+= 2;
break;
case BEGIN_LOAD_QUERY_EVENT:
/* check post header size */
RPL_CHECK_POST_HEADER_LEN(ev, ev_end, BEGIN_LOAD_QUERY_EVENT);
rpl_event->event.begin_load_query.file_id= uint4korr(ev);
ev+= 4;
/* Payload: query_data (zero terminated) */
RPL_CHECK_NULL_POS(ev, ev_end);
rpl_event->event.begin_load_query.data= ev;
RPL_CHECK_NULL_POS(ev, ev_end);
ev+= strlen((char *)ev);
/* terminating zero */
RPL_CHECK_POS(ev, ev_end, 1);
ev++;
break;
case START_ENCRYPTION_EVENT:
/* Post header */
RPL_CHECK_POS(ev, ev_end, 17);
rpl_event->event.start_encryption.scheme= *ev++;
rpl_event->event.start_encryption.key_version= uint4korr(ev);
ev+= 4;
memcpy(rpl_event->event.start_encryption.nonce, ev, 12);
memcpy(rpl->nonce, ev, 12);
ev+= 12;
rpl->encrypted= 1;
break;
case EXECUTE_LOAD_QUERY_EVENT:
{
uint16_t status_len;
uint8_t schema_len;
/* Post header */
RPL_CHECK_POS(ev, ev_end, rpl->post_header_len[EXECUTE_LOAD_QUERY_EVENT - 1]);
rpl_event->event.execute_load_query.thread_id= uint4korr(ev);
ev+= 4;
rpl_event->event.execute_load_query.execution_time= uint4korr(ev);
ev+= 4;
schema_len= *ev++;
rpl_event->event.execute_load_query.error_code= uint2korr(ev);
ev+= 2;
status_len= uint2korr(ev);
ev+= 2;
rpl_event->event.execute_load_query.file_id= uint4korr(ev);
ev+= 4;
rpl_event->event.execute_load_query.ofs1= uint4korr(ev);
ev+= 4;
rpl_event->event.execute_load_query.ofs2= uint4korr(ev);
ev+= 4;
rpl_event->event.execute_load_query.duplicate_flag= *ev++;
/* Payload:
- status variables
- query schema
- statement */
RPL_CHECK_POS(ev, ev_end, status_len);
rpl_set_string_and_len(&rpl_event->event.execute_load_query.status_vars, ev, status_len);
ev+= status_len;
RPL_CHECK_POS(ev, ev_end, schema_len);
rpl_set_string_and_len(&rpl_event->event.execute_load_query.schema, ev, schema_len);
/* terminating zero */
RPL_CHECK_POS(ev, ev_end, 1);
ev+= (schema_len + 1);
len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1);
RPL_CHECK_POS(ev, ev_end, len);
rpl_set_string_and_len(&rpl_event->event.execute_load_query.statement, ev, len);
ev+= len;
break;
}
case BINLOG_CHECKPOINT_EVENT:
/* Post header */
RPL_CHECK_POS(ev, ev_end, rpl->post_header_len[BINLOG_CHECKPOINT_EVENT - 1]);
len= uint4korr(ev);
ev+= 4;
/* payload: filename */
RPL_CHECK_POS(ev, ev_end, len);
rpl_set_string_and_len(&rpl_event->event.checkpoint.filename, ev, len);
if (ma_set_rpl_filename(rpl, ev, len))
goto mem_error;
ev+= len;
break;
case FORMAT_DESCRIPTION_EVENT:
/*
FORMAT_DESCRIPTION_EVENT:
Header:
uint<2> binary log version
(we support only version 4)
str<50> server version, right padded with \0
uint<4> timestamp <redundant>
uint<1> header length
byte<n> post header lengths. Length can be calculated by
ev_end - end - 1 - 4
uint<1> check sum algorithm byte
uint<4> CRC32 checksum
*/
/* We don't speak bing log protocol version < 4, in case it's an older
protocol version an error will be returned. */
RPL_CHECK_POS(ev, ev_end, 57);
if ((rpl_event->event.format_description.format = uint2korr(ev)) < 4)
{
mariadb_free_rpl_event(rpl_event);
rpl_set_error(rpl, CR_ERR_UNSUPPORTED_BINLOG_FORMAT, SQLSTATE_UNKNOWN, 0,
RPL_ERR_POS(rpl), uint2korr(ev));
return 0;
}
ev+= 2;
rpl_event->event.format_description.server_version = (char *)(ev);
ev+= 50;
rpl_event->event.format_description.timestamp= uint4korr(ev);
ev+= 4;
rpl->fd_header_len= rpl_event->event.format_description.header_len= *ev;
ev+= 1;
/*Post header lengths: 1 byte for each event, non used events/gaps in enum should
have a zero value */
len= ev_end - ev - 5;
rpl_set_string_and_len(&rpl_event->event.format_description.post_header_lengths, ev, len);
memset(rpl->post_header_len, 0, ENUM_END_EVENT);
memcpy(rpl->post_header_len, rpl_event->event.format_description.post_header_lengths.str,
MIN(len, ENUM_END_EVENT));
ev+= len;
RPL_CHECK_POS(ev, ev_end, 5);
if ((rpl->use_checksum= *ev++))
{
rpl_event->checksum= uint4korr(ev);
ev+= 4;
}
break;
case QUERY_COMPRESSED_EVENT:
case QUERY_EVENT:
{
size_t db_len, status_len;
/***********
post_header
***********/
RPL_CHECK_POS(ev, ev_end, rpl->post_header_len[rpl_event->event_type - 1]);
rpl_event->event.query.thread_id= uint4korr(ev);
ev+= 4;
rpl_event->event.query.seconds= uint4korr(ev);
ev+= 4;
db_len= *ev;
ev++;
rpl_event->event.query.errornr= uint2korr(ev);
ev+= 2;
status_len= uint2korr(ev);
ev+= 2;
/*******
payload
******/
RPL_CHECK_POS(ev, ev_end, status_len + db_len + 1);
rpl_set_string_and_len(&rpl_event->event.query.status, ev, status_len);
ev+= status_len;
rpl_set_string_and_len(&rpl_event->event.query.database, ev, db_len);
ev+= db_len + 1; /* zero terminated */
len= rpl_event->event_length - (ev - ev_start) - (rpl->use_checksum ? 4 : 0) - (EVENT_HEADER_OFS - 1);
RPL_CHECK_POS(ev, ev_end, len);
if (rpl_event->event_type == QUERY_EVENT || !rpl->uncompress) {
rpl_set_string_and_len(&rpl_event->event.query.statement, ev, len);
}
else if (rpl_event->event_type == QUERY_COMPRESSED_EVENT)
{
uint8_t header_size= 0,
algorithm= 0;
uint32_t uncompressed_len= get_compression_info(ev, &algorithm, &header_size);
len-= header_size;
if (!(rpl_event->event.query.statement.str = ma_calloc_root(&rpl_event->memroot, uncompressed_len)))
goto mem_error;
if ((uncompress((Bytef*)rpl_event->event.query.statement.str, (uLongf *)&uncompressed_len,
(Bytef*)ev + header_size, (uLongf)*&len) != Z_OK))
{
mariadb_free_rpl_event(rpl_event);
rpl_set_error(rpl, CR_ERR_BINLOG_UNCOMPRESS, SQLSTATE_UNKNOWN, RPL_ERR_POS(rpl));
return 0;
}
rpl_event->event.query.statement.length= uncompressed_len;
}
break;
}
case TABLE_MAP_EVENT:
{
/*
TABLE_MAP_EVENT:
Header:
uint<6> table_id
uint<2> unused
Payload:
uint<1> schema_name length
str<NULL> schema_name (zero terminated)
uint<1> table_name length
str<NULL> table_name (zero terminated)
int<lenc> column_count
byte<n> column_types[column_count], 1 byte for
each column
int<lenc> meta_data_size
byte<n> netadata{metadata_size]
byte<n> bit fields, indicating which column can be null
n= (column_count + 7) / 8;
if (remaining_bytes)
byte<n> optional metadata
*/
RPL_CHECK_POST_HEADER_LEN(ev, ev_end, TABLE_MAP_EVENT);
/* Post header */
rpl_event->event.table_map.table_id= uint6korr(ev);
ev+= 8; /* 2 byte in header ignored */
/* Payload */
RPL_CHECK_POS(ev, ev_end, 1);
len= *ev++;
RPL_CHECK_POS(ev, ev_end, len + 1);
rpl_set_string_and_len(&rpl_event->event.table_map.database, ev, len);
ev+= len + 1; /* Zero terminated */
RPL_CHECK_POS(ev, ev_end, 1);
len= *ev++;
RPL_CHECK_POS(ev, ev_end, len + 1);
rpl_set_string_and_len(&rpl_event->event.table_map.table, ev, len);
ev+= len + 1; /* Zero terminated */
RPL_CHECK_FIELD_LENGTH(ev, ev_end);
len= rpl_event->event.table_map.column_count= mysql_net_field_length(&ev);
RPL_CHECK_POS(ev, ev_end, len);
rpl_set_string_and_len(&rpl_event->event.table_map.column_types, ev, len);
ev+= len;
RPL_CHECK_FIELD_LENGTH(ev, ev_end);
len= mysql_net_field_length(&ev);
RPL_CHECK_POS(ev, ev_end, len);
rpl_set_string_and_len(&rpl_event->event.table_map.metadata, ev, len);
ev+= len;
len= (rpl_event->event.table_map.column_count + 7) / 8;
RPL_CHECK_POS(ev, ev_end, len);
rpl_event->event.table_map.null_indicator= ev;
ev+= len;
len= ev_end - ev - (rpl->use_checksum ? 4 : 0);
if (len > 0) /* optional metadata */
{
rpl_parse_opt_metadata(rpl_event, ev, len);
ev+= len;
}
break;
case RAND_EVENT:
RPL_CHECK_POS(ev, ev_end, 16);
rpl_event->event.rand.first_seed= uint8korr(ev);
ev+= 8;
rpl_event->event.rand.second_seed= uint8korr(ev);
ev+= 8;
break;
}
case INTVAR_EVENT:
RPL_CHECK_POS(ev, ev_end, 9);
rpl_event->event.intvar.type= *ev;
ev++;
rpl_event->event.intvar.value= uint8korr(ev);
ev+= 8;
break;
case USER_VAR_EVENT:
RPL_CHECK_POS(ev, ev_end, 4);
len= uint4korr(ev);
ev+= 4;
RPL_CHECK_POS(ev, ev_end, len);
rpl_set_string_and_len(&rpl_event->event.uservar.name, ev, len);
ev+= len;
RPL_CHECK_POS(ev, ev_end, 1);
if (!(rpl_event->event.uservar.is_null= (uint8)*ev))
{
ev++;
RPL_CHECK_POS(ev, ev_end, 9);
rpl_event->event.uservar.type= *ev;
ev++;
rpl_event->event.uservar.charset_nr= uint4korr(ev);
ev+= 4;
len= uint4korr(ev);
ev+= 4;
RPL_CHECK_POS(ev, ev_end, len);
if (rpl_event->event.uservar.type == DECIMAL_RESULT)
{
char str[200];
int s_len= sizeof(str) - 1;
int precision= (int)ev[0],
scale= (int)ev[1];
decimal d;
decimal_digit buf[10];
d.len= 10;
d.buf= buf;
bin2decimal((char *)(ev+2), &d, precision, scale);
decimal2string(&d, str, &s_len);
if (!(rpl_event->event.uservar.value.str =
(char *)ma_calloc_root(&rpl_event->memroot, s_len)))
goto mem_error;
memcpy(rpl_event->event.uservar.value.str, str, s_len);
rpl_event->event.uservar.value.length= s_len;
} else if (rpl_event->event.uservar.type == INT_RESULT)
{
uint64_t val64;
if (!(rpl_event->event.uservar.value.str =
(char *)ma_calloc_root(&rpl_event->memroot, sizeof(longlong))))
goto mem_error;
val64= uint8korr(ev);
memcpy(rpl_event->event.uservar.value.str, &val64, sizeof(uint64_t));
rpl_event->event.uservar.value.length= sizeof(uint64_t);
} else if (rpl_event->event.uservar.type == REAL_RESULT)
{
double d;
float8get(d, ev);
ev+= 8;
if (!(rpl_event->event.uservar.value.str =
(char *)ma_calloc_root(&rpl_event->memroot, 24)))
goto mem_error;
memset(rpl_event->event.uservar.value.str, 0, 24);
sprintf(rpl_event->event.uservar.value.str, "%.14g", d);
rpl_event->event.uservar.value.length= strlen(rpl_event->event.uservar.value.str);
}
else
rpl_set_string_and_len(&rpl_event->event.uservar.value, ev, len);
ev+= len;
if ((unsigned long)(ev - rpl_event->raw_data) < rpl_event->raw_data_size)
rpl_event->event.uservar.flags= *ev;
ev++;
}
break;
case ANNOTATE_ROWS_EVENT:
/* Payload */
len= ev_end - ev - (rpl->use_checksum ? 4 : 0);
if (len > 0)
rpl_set_string_and_len(&rpl_event->event.annotate_rows.statement, ev, len);
break;
case ROTATE_EVENT:
RPL_CHECK_POST_HEADER_LEN(ev, ev_end, ROTATE_EVENT);
rpl_event->event.rotate.position= uint8korr(ev);
ev+= 8;
/* Payload */
len= ev_end - ev - 4;
if (!len)
goto malformed_packet;
if (rpl_event->timestamp == 0 &&
rpl_event->flags & LOG_EVENT_ARTIFICIAL_F)
{
if (rpl->artificial_checksum)
{
unsigned long crc= crc32(0L, Z_NULL, 0);
rpl_event->checksum= (uint32_t) crc32(crc, checksum_start, (uint32_t)(ev_end - checksum_start));
}
}
rpl_set_string_and_len(&rpl_event->event.rotate.filename, ev, len);
if (ma_set_rpl_filename(rpl, ev, len))
goto mem_error;
ev+= len;
break;
case XID_EVENT:
/*
XID_EVENT was generated if a transaction which modified tables was
committed.
Header:
- uint64_t transaction number
*/
RPL_CHECK_POS(ev, ev_end, 8);
rpl_event->event.xid.transaction_nr= uint8korr(ev);
break;
case XA_PREPARE_LOG_EVENT:
/*
MySQL only!
Header:
uint8_t one phase commit
uint32_t format_id
uint32_t length of gtrid
uint32_t length of bqual
Payload:
char<n> xid, where n is sum of gtrid and bqual lengths
*/
RPL_CHECK_POS(ev, ev_end, 13);
rpl_event->event.xa_prepare_log.one_phase= *ev;
ev++;
rpl_event->event.xa_prepare_log.format_id= uint4korr(ev);
ev+= 4;
len= rpl_event->event.xa_prepare_log.gtrid_len= uint4korr(ev);
ev+= 4;
len+= rpl_event->event.xa_prepare_log.bqual_len= uint4korr(ev);
ev+= 4;
RPL_CHECK_POS(ev, ev_end, len);
rpl_set_string_and_len(&rpl_event->event.xa_prepare_log.xid, ev, len);
break;
case STOP_EVENT:
/*
STOP_EVENT - server shutdown or crash. It's always the last written
event after shutdown or after resuming from crash.
After starting the server a new binary log file will be created, additionally
a ROTATE_EVENT will be appended to the old log file.
No data to process.
*/
break;
case PREVIOUS_GTIDS_LOG_EVENT:
{
/*
PREVIOUS_GTID_LOG_EVENT (MySQL only):
8-bytes, always zero ?!
*/
ssize_t len= ev_end - ev - rpl->use_checksum * 4;
if (len)
{
rpl_event->event.previous_gtid.content.data= ev;
rpl_event->event.previous_gtid.content.length= len;
ev+= len;
}
break;
}
case ANONYMOUS_GTID_LOG_EVENT:
case GTID_LOG_EVENT:
/*
ANONYMOUS_GTID_LOG_EVENT
uint32_t thread_id
Header:
uint8_t flag: commit flag
byte<16> source_id: numerical representation of server's UUID
uint64_t sequence_nr: sequence number
*/
RPL_CHECK_POS(ev, ev_end, 25);
rpl_event->event.gtid_log.commit_flag= *ev;
ev++;
memcpy(rpl_event->event.gtid_log.source_id, ev, 16);
ev+= 16;
rpl_event->event.gtid_log.sequence_nr= uint8korr(ev);
ev+= 8;
break;
case GTID_EVENT:
/*
GTID_EVENT (MariaDB Only):
A New transaction (BEGIN) was started, or a single transaction
(ddl) statement was executed. In case a single transaction was
executed, the FL_GROUP_COMMIT id flag is not set.
Header:
uint64_t sequence_nr
uint64_t domain_id
uint8_t flags
if (flags & FL_GROUP_COMMIT_D)
uint64_t commit_id
else
char[6] unused
*/
RPL_CHECK_POST_HEADER_LEN(ev, ev_end, GTID_EVENT);
rpl_event->event.gtid.sequence_nr= uint8korr(ev);
ev+= 8;
rpl_event->event.gtid.domain_id= uint4korr(ev);
ev+= 4;
rpl_event->event.gtid.flags= *ev;
ev++;
if (rpl_event->event.gtid.flags & FL_GROUP_COMMIT_ID)
{
RPL_CHECK_POS(ev, ev_end, 8);
rpl_event->event.gtid.commit_id= uint8korr(ev);
ev+= 8;
}
else if (rpl_event->event.gtid.flags & (FL_PREPARED_XA | FL_COMPLETED_XA))
{
uint16_t len;
RPL_CHECK_POS(ev, ev_end, 6);
rpl_event->event.gtid.format_id= uint4korr(ev);
ev+= 4;
rpl_event->event.gtid.gtrid_len= *ev;
ev++;
rpl_event->event.gtid.bqual_len= *ev;
ev++;
len= rpl_event->event.gtid.gtrid_len + rpl_event->event.gtid.bqual_len;
RPL_CHECK_POS(ev, ev_end, len);
rpl_set_string_and_len(&rpl_event->event.gtid.xid, ev, len);
ev+= len;
}
else
ev+= 6;
break;
case GTID_LIST_EVENT:
/*
GTID_LIST_EVENT (MariaDB only)
Logged in every binlog to record the current replication state.
Consists of the last GTID seen for each replication domain.
The Global Transaction ID, GTID for short, consists of three components:
replication domain id, server id and sequence nr
Header:
uint32_t gtid_cnt - number of global transaction id's
Payload:
for i=0; i < gtid_cnt; i++
uint32_t domain_id
uint32_t server_id
uint64_t sequence_nr
*/
RPL_CHECK_POST_HEADER_LEN(ev, ev_end, GTID_LIST_EVENT);
rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev);
ev+=4;
RPL_CHECK_POS(ev, ev_end, rpl_event->event.gtid_list.gtid_cnt * 16);
/* Payload */
if (rpl_event->event.gtid_list.gtid_cnt)
{
uint32_t i;
if (!(rpl_event->event.gtid_list.gtid=
(MARIADB_GTID *)ma_calloc_root(&rpl_event->memroot,
sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt)))
goto mem_error;
for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++)
{
rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev);
ev+= 4;
rpl_event->event.gtid_list.gtid[i].server_id= uint4korr(ev);
ev+= 4;
rpl_event->event.gtid_list.gtid[i].sequence_nr= uint8korr(ev);
ev+= 8;
}
}
break;
case WRITE_ROWS_COMPRESSED_EVENT_V1:
case UPDATE_ROWS_COMPRESSED_EVENT_V1:
case DELETE_ROWS_COMPRESSED_EVENT_V1:
case WRITE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
{
/*
WRITE/UPDATE/DELETE_ROWS_EVENT_V1 (MariaDB only)
WRITE/UPDATE/DELETE_ROWS_EVENT_COMPRESSED_V1 (MariaDB only)
WRITE/UPDATE/DELETE_ROWS_EVENT (MySQL only)
ROWS events are written for row based replicatoin if data is
inserted, deleted or updated.
Header
uint<6> table_id
uint<2> flags
if MySQL (version 2)
uint<<2> extra_data_length
char[extra_data_length] extra_data
uint<lenenc> number of columns
uint8_t<n> Bitmap of columns used.
n= (number of columns + 7) / 8
if UPDATE_ROWS_v1 (MariaDB)
uint8_t<n> columns updated
n= (number of columns + 7) / 8
uint7_t<n> null bitmap
n= (number of columns + 7) / 8
str<len> Column data. If event is not compressed,
length must be calculated.
if UPDATE_ROWS_v1 (MariaDB)
byte<n> Null bitmap update
n= (number of columns + 7) / 8
str<len> Update column data
*/
uint32_t bitmap_len= 0;
RPL_CHECK_POST_HEADER_LEN(ev, ev_end, rpl_event->event_type);
if (rpl_event->event_type >= WRITE_ROWS_COMPRESSED_EVENT) {
return rpl_event;
rpl_event->event.rows.compressed= 1;
rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_COMPRESSED_EVENT;
} else if (rpl_event->event_type >= WRITE_ROWS_COMPRESSED_EVENT_V1) {
rpl_event->event.rows.compressed= 1;
rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_COMPRESSED_EVENT_V1;
} else if (rpl_event->event_type >= WRITE_ROWS_EVENT)
rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT;
else
rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT_V1;
rpl_event->event.rows.table_id= uint6korr(ev);
ev+= 6;
rpl_event->event.rows.flags= uint2korr(ev);
ev+= 2;
/* payload */
/* ROWS_EVENT V2 has the extra-data field.
See also: https://dev.mysql.com/doc/internals/en/rows-event.html
*/
if (IS_ROW_VERSION2(rpl_event->event_type))
{
RPL_CHECK_POS(ev, ev_end, 2);
rpl_event->event.rows.extra_data_size= uint2korr(ev);
ev+= 2;
RPL_CHECK_POS(ev, ev_end, rpl_event->event.rows.extra_data_size);
if (rpl_event->event.rows.extra_data_size - 2 > 0)
{
rpl_alloc_set_string_and_len(rpl_event, rpl_event->event.rows.extra_data, ev, rpl_event->event.rows.extra_data_size - 2);
ev+= rpl_event->event.rows.extra_data_size;
}
}
/* END_ROWS_EVENT_V2 */
/* number of columns */
RPL_CHECK_FIELD_LENGTH(ev, ev_end);
rpl_event->event.rows.column_count= mysql_net_field_length(&ev);
bitmap_len= (rpl_event->event.rows.column_count + 7) / 8;
DBUG_ASSERT(rpl_event->event.rows.column_count > 0);
/* columns updated bitmap */
RPL_CHECK_POS(ev, ev_end, bitmap_len);
rpl_event->event.rows.column_bitmap= ev;
ev+= bitmap_len;
if (rpl_event->event_type == UPDATE_ROWS_EVENT_V1 ||
rpl_event->event_type == UPDATE_ROWS_COMPRESSED_EVENT_V1)
{
RPL_CHECK_POS(ev, ev_end, bitmap_len);
rpl_event->event.rows.column_update_bitmap= ev;
ev+= bitmap_len;
}
len= ev_end - ev - (rpl->use_checksum ? 4 : 0);
if (rpl_event->event.rows.compressed)
{
uint8_t algorithm= 0, header_size= 0;
uint32_t uncompressed_len= get_compression_info(ev, &algorithm, &header_size);
if (!(rpl_event->event.rows.row_data = ma_calloc_root(&rpl_event->memroot, uncompressed_len)))
goto mem_error;
if ((uncompress((Bytef*)rpl_event->event.rows.row_data, (uLong *)&uncompressed_len,
(Bytef*)ev + header_size, (uLongf )len) != Z_OK))
{
rpl_set_error(rpl, CR_ERR_BINLOG_UNCOMPRESS, SQLSTATE_UNKNOWN, 0, RPL_ERR_POS(rpl));
mariadb_free_rpl_event(rpl_event);
return 0;
}
rpl_event->event.rows.row_data_size= uncompressed_len;
RPL_CHECK_POS(ev, ev_end, header_size + len);
ev+= header_size + len;
} else {
rpl_event->event.rows.row_data_size= ev_end - ev - (rpl->use_checksum ? 4 : 0);
if (!(rpl_event->event.rows.row_data =
(char *)ma_calloc_root(&rpl_event->memroot, rpl_event->event.rows.row_data_size)))
goto mem_error;
memcpy(rpl_event->event.rows.row_data, ev, rpl_event->event.rows.row_data_size);
}
break;
}
default:
/* We need to report an error if this event can't be ignored */
if (!(rpl_event->flags & LOG_EVENT_IGNORABLE_F))
{
mariadb_free_rpl_event(rpl_event);
rpl_set_error(rpl, CR_UNKNOWN_BINLOG_EVENT, 0, RPL_ERR_POS(rpl),
rpl_event->event_type);
return 0;
}
return rpl_event;
break;
}
/* check if we have to send acknowledgement to primary
when semi sync replication is used */
if (rpl_event->is_semi_sync &&
rpl_event->semi_sync_flags == SEMI_SYNC_ACK_REQ)
{
if (mariadb_rpl_send_semisync_ack(rpl, rpl_event))
{
/* ACK failed and rpl->error was set */
return rpl_event;
}
}
if (rpl->use_checksum && !rpl_event->checksum)
{
rpl_event->checksum= uint4korr(ev_end - 4);
if (rpl_event->checksum && rpl->verify_checksum)
{
unsigned long crc= crc32(0L, Z_NULL, 0);
crc= crc32(crc, checksum_start, (uint32_t)(ev_end - checksum_start - 4));
if (rpl_event->checksum != (uint32_t)crc)
{
rpl_set_error(rpl, CR_ERR_CHECKSUM_VERIFICATION_ERROR, SQLSTATE_UNKNOWN, 0,
RPL_ERR_POS(rpl),
rpl_event->checksum, (uint32_t)crc);
mariadb_free_rpl_event(rpl_event);
return 0;
}
}
}
return rpl_event;
}
mem_error:
mariadb_free_rpl_event(rpl_event);
rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0);
return 0;
malformed_packet:
rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl),
"Malformed packet");
mariadb_free_rpl_event(rpl_event);
return 0;
}
void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl)
{
if (!rpl)
return;
free((void *)rpl->filename);
if (rpl->fp)
{
ma_close(rpl->fp);
}
free(rpl->host);
free(rpl);
return;
}
int STDCALL mariadb_rpl_optionsv(MARIADB_RPL *rpl,
enum mariadb_rpl_option option,
...)
{
va_list ap;
int rc= 0;
if (!rpl)
return 1;
va_start(ap, option);
switch (option) {
case MARIADB_RPL_FILENAME:
{
char *arg1= va_arg(ap, char *);
rpl->filename_length= (uint32_t)va_arg(ap, size_t);
free((void *)rpl->filename);
rpl->filename= NULL;
if (rpl->filename_length)
{
rpl->filename= (char *)malloc(rpl->filename_length);
memcpy((void *)rpl->filename, arg1, rpl->filename_length);
}
else if (arg1)
{
rpl->filename= strdup((const char *)arg1);
rpl->filename_length= (uint32_t)strlen(rpl->filename);
}
break;
}
case MARIADB_RPL_SERVER_ID:
{
rpl->server_id= va_arg(ap, unsigned int);
break;
}
case MARIADB_RPL_FLAGS:
{
rpl->flags= va_arg(ap, unsigned int);
break;
}
case MARIADB_RPL_START:
{
rpl->start_position= va_arg(ap, unsigned long);
break;
}
case MARIADB_RPL_VERIFY_CHECKSUM:
{
rpl->verify_checksum= va_arg(ap, uint32_t);
break;
}
case MARIADB_RPL_UNCOMPRESS:
{
rpl->uncompress= (uint8_t)va_arg(ap, uint32_t);
break;
}
case MARIADB_RPL_PORT:
{
rpl->port= va_arg(ap, uint32_t);
break;
}
case MARIADB_RPL_HOST:
{
rpl->host= strdup(va_arg(ap, char *));
break;
}
case MARIADB_RPL_EXTRACT_VALUES:
{
rpl->extract_values= (uint8_t)va_arg(ap, uint32_t);
break;
}
case MARIADB_RPL_SEMI_SYNC:
{
rpl->is_semi_sync = (uint8_t)va_arg(ap, uint32_t);
break;
}
default:
rc= -1;
goto end;
}
end:
va_end(ap);
return rc;
}
int STDCALL mariadb_rpl_get_optionsv(MARIADB_RPL *rpl,
enum mariadb_rpl_option option,
...)
{
va_list ap;
if (!rpl)
return 1;
va_start(ap, option);
switch (option) {
case MARIADB_RPL_FILENAME:
{
const char **name= (const char **)va_arg(ap, char **);
size_t *len= (size_t*)va_arg(ap, size_t *);
*name= rpl->filename;
*len= rpl->filename_length;
break;
}
case MARIADB_RPL_SERVER_ID:
{
unsigned int *id= va_arg(ap, unsigned int *);
*id= rpl->server_id;
break;
}
case MARIADB_RPL_FLAGS:
{
unsigned int *flags= va_arg(ap, unsigned int *);
*flags= rpl->flags;
break;
}
case MARIADB_RPL_START:
{
unsigned long *start= va_arg(ap, unsigned long *);
*start= rpl->start_position;
break;
}
case MARIADB_RPL_SEMI_SYNC:
{
unsigned int* semi_sync = va_arg(ap, unsigned int*);
*semi_sync = rpl->is_semi_sync;
break;
}
default:
va_end(ap);
return 1;
break;
}
va_end(ap);
return 0;
}