1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-08-08 14:02:17 +03:00

CONC-470: Support for semi synchronous replication

Beside already supported asynchronous replication
the replication/binlog API now supports semi
synchronous replication:

If an event contains a semi synchronous indicator (0xEF)
behind status byte and acknowledgement flag is set,
mariadb_rpl_fetch() automatically sends an acknowledge
message to the connected primary server.
This commit is contained in:
Georg Richter
2021-10-09 08:26:15 +02:00
parent 52934a1c08
commit 004f9d4217
3 changed files with 159 additions and 8 deletions

View File

@@ -1,4 +1,4 @@
/* Copyright (C) 2018 MariaDB Corporation AB
/* Copyright (C) 2018-2021 MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
@@ -38,6 +38,9 @@ extern "C" {
#define LOG_EVENT_ARTIFICIAL_F 0x20
/* SEMI SYNCHRONOUS REPLICATION */
#define SEMI_SYNC_INDICATOR 0xEF
#define SEMI_SYNC_ACK_REQ 0x01
/* Options */
enum mariadb_rpl_option {
@@ -266,6 +269,9 @@ typedef struct st_mariadb_rpl_event
unsigned int event_length;
unsigned int next_event_pos;
unsigned short flags;
/* Added in C/C 3.3.0 */
uint8_t is_semi_sync;
uint8_t semi_sync_flags;
/****************/
union {
struct st_mariadb_rpl_rotate_event rotate;

View File

@@ -1,5 +1,5 @@
/************************************************************************************
Copyright (C) 2018 MariaDB Corpoeation AB
Copyright (C) 2018-2021 MariaDB Corpoeation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
@@ -93,7 +93,7 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
*/
ptr= buf=
#ifdef WIN32
(unsigned char *)_alloca(rpl->filename_length + 11);
(unsigned char *)_alloca(rpl->filename_length + 11);
#else
(unsigned char *)alloca(rpl->filename_length + 11);
#endif
@@ -171,6 +171,14 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
rpl_event->checksum= uint4korr(rpl->buffer + rpl->buffer_size - 4);
rpl_event->ok= rpl->buffer[0];
/* CONC-470: add support for semi snychronous replication */
if ((rpl_event->is_semi_sync= (rpl->buffer[1] == SEMI_SYNC_INDICATOR)))
{
rpl_event->semi_sync_flags= rpl->buffer[2];
rpl->buffer+= 2;
}
rpl_event->timestamp= uint4korr(rpl->buffer + 1);
rpl_event->event_type= (unsigned char)*(rpl->buffer + 5);
rpl_event->server_id= uint4korr(rpl->buffer + 6);
@@ -201,6 +209,11 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
ev+= 4;
if (rpl_alloc_string(rpl_event, &rpl_event->event.checkpoint.filename, ev, len))
goto mem_error;
free(rpl->filename);
if (!(rpl->filename= (char *)malloc(len)))
goto mem_error;
memcpy(rpl->filename, ev, len);
rpl->filename_length= len;
break;
case FORMAT_DESCRIPTION_EVENT:
rpl_event->event.format_description.format = uint2korr(ev);
@@ -391,12 +404,35 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
return NULL;
break;
}
/* check if we have to send acknoledgement to primary
when semi sync replication is used */
if (rpl_event->is_semi_sync &&
rpl_event->semi_sync_flags == SEMI_SYNC_ACK_REQ)
{
size_t buf_size= rpl->filename_length + 1 + 9;
uchar *buffer= alloca(buf_size);
buffer[0]= SEMI_SYNC_INDICATOR;
int8store(buffer + 1, (int64_t)rpl_event->next_event_pos);
memcpy(buffer + 9, rpl->filename, rpl->filename_length);
buffer[buf_size - 1]= 0;
if (ma_net_write(&rpl->mysql->net, buffer, buf_size) ||
(ma_net_flush(&rpl->mysql->net)))
goto net_error;
}
return rpl_event;
}
mem_error:
free(rpl_event);
SET_CLIENT_ERROR(rpl->mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
return 0;
net_error:
free(rpl_event);
SET_CLIENT_ERROR(rpl->mysql, CR_CONNECTION_ERROR, SQLSTATE_UNKNOWN, 0);
return 0;
}
void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl)

View File

@@ -28,10 +28,47 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "my_test.h"
#include "mariadb_rpl.h"
static int test_rpl_01(MYSQL *mysql)
static int test_rpl_async(MYSQL *my __attribute__((unused)))
{
MYSQL *mysql= mysql_init(NULL);
MYSQL_RES *result;
MYSQL_ROW row;
MARIADB_RPL_EVENT *event= NULL;
MARIADB_RPL *rpl= mariadb_rpl_init(mysql);
MARIADB_RPL *rpl;
int events= 0, rc;
SKIP_SKYSQL;
SKIP_MAXSCALE;
if (!is_mariadb)
return SKIP;
if (!my_test_connect(mysql, hostname, username,
password, schema, port, socketname, 0))
{
diag("Error: %s", mysql_error(mysql));
mysql_close(mysql);
return FAIL;
}
rc= mysql_query(mysql, "SELECT @@log_bin");
check_mysql_rc(rc, mysql);
result= mysql_store_result(mysql);
row= mysql_fetch_row(result);
if (!atoi(row[0]))
rc= SKIP;
mysql_free_result(result);
if (rc == SKIP)
{
diag("binary log disabled -> skip");
mysql_close(mysql);
return SKIP;
}
rpl = mariadb_rpl_init(mysql);
mysql_query(mysql, "SET @mariadb_slave_capability=4");
mysql_query(mysql, "SET NAMES latin1");
mysql_query(mysql, "SET @slave_gtid_strict_mode=1");
@@ -45,18 +82,90 @@ static int test_rpl_01(MYSQL *mysql)
if (mariadb_rpl_open(rpl))
return FAIL;
while((event= mariadb_rpl_fetch(rpl, event)))
/* We run rpl_api as very last test, too make sure
binary log contains > 10000 events.
*/
while((event= mariadb_rpl_fetch(rpl, event)) && events < 10000)
{
diag("event: %d\n", event->event_type);
events++;
}
mariadb_free_rpl_event(event);
mariadb_rpl_close(rpl);
mysql_close(mysql);
return OK;
}
static int test_rpl_semisync(MYSQL *my __attribute__((unused)))
{
MYSQL *mysql= mysql_init(NULL);
MYSQL_RES *result;
MYSQL_ROW row;
MARIADB_RPL_EVENT *event= NULL;
MARIADB_RPL *rpl;
int events= 0, rc;
SKIP_SKYSQL;
SKIP_MAXSCALE;
if (!is_mariadb)
return SKIP;
if (!my_test_connect(mysql, hostname, username,
password, schema, port, socketname, 0))
{
diag("Error: %s", mysql_error(mysql));
mysql_close(mysql);
return FAIL;
}
rc= mysql_query(mysql, "SELECT @@log_bin");
check_mysql_rc(rc, mysql);
result= mysql_store_result(mysql);
row= mysql_fetch_row(result);
if (!atoi(row[0]))
rc= SKIP;
mysql_free_result(result);
if (rc == SKIP)
{
diag("binary log disabled -> skip");
mysql_close(mysql);
return SKIP;
}
rpl = mariadb_rpl_init(mysql);
mysql_query(mysql, "SET @mariadb_slave_capability=4");
mysql_query(mysql, "SET NAMES latin1");
mysql_query(mysql, "SET @slave_gtid_strict_mode=1");
mysql_query(mysql, "SET @slave_gtid_ignore_duplicates=1");
mysql_query(mysql, "SET NAMES utf8");
mysql_query(mysql, "SET @master_binlog_checksum= @@global.binlog_checksum");
mysql_query(mysql, "SET @rpl_semi_sync_slave=1");
rpl->server_id= 12;
rpl->start_position= 4;
rpl->flags= MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS;
if (mariadb_rpl_open(rpl))
return FAIL;
/* We run rpl_api as very last test, too make sure
binary log contains > 10000 events.
*/
while((event= mariadb_rpl_fetch(rpl, event)) && events < 10000)
{
events++;
}
mariadb_free_rpl_event(event);
mariadb_rpl_close(rpl);
mysql_close(mysql);
return OK;
}
struct my_tests_st my_tests[] = {
{"test_rpl_01", test_rpl_01, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"test_rpl_async", test_rpl_async, TEST_CONNECTION_NEW, 0, NULL, NULL},
{"test_rpl_semisync", test_rpl_semisync, TEST_CONNECTION_NEW, 0, NULL, NULL},
{NULL, NULL, 0, 0, NULL, NULL}
};