1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-20119: Implement the --do-domain-ids, --ignore-domain-ids, and --ignore-server-ids options for mysqlbinlog

New Feature:
============
Extend mariadb-binlog command-line tool to allow for filtering
events using GTID domain and server ids. The functionality mimics
that of a replica server’s DO_DOMAIN_IDS, IGNORE_DOMAIN_IDS, and
IGNORE_SERVER_IDS from CHANGE MASTER TO. For completeness, this
patch additionally adds the option --do-server-ids as an alias for
--server-id, which now accepts a list of server ids instead of a
single one.

Example usage:
  mariadb-binlog --do-domain-ids=2,3,4 --do-server-ids=1,3
  master-bin.000001

Functional Notes:
 1. --do-domain-ids cannot be combined with --ignore-domain-ids
 2. --do-server-ids cannot be combined with --ignore-server-ids
 3. A domain id filter can be combined with a server id filter
 4. When any new filter options are combined with the
--gtid-strict-mode option, events from excluded domains/servers are
not validated.
 5. Domain/server id filters can be combined with GTID ranges (i.e.
specifications of --start-position and --stop-position). However,
because the --stop-position option implicitly undertakes filtering
to only output events within its range of domains, when combined
with --do-domain-ids or --ignore-domain-ids, output will consist of
the intersection between the filters. Specifically, with
--do-domain-ids and --stop-position, only events with domain ids
present in both argument lists will be output. Conversely, with
--ignore-domain-ids and --stop-position, only events with domain ids
present in the --stop-position and absent from the
--ignore-domain-ids options will be output.

Reviewed By
============
Andrei Elkin <andrei.elkin@mariadb.com>
This commit is contained in:
Brandon Nesterenko
2022-02-03 08:31:05 -07:00
parent f326b43cb9
commit c132bce1a1
13 changed files with 3521 additions and 182 deletions

View File

@@ -47,6 +47,7 @@
#include "sql_common.h"
#include "my_dir.h"
#include <welcome_copyright_notice.h> // ORACLE_WELCOME_COPYRIGHT_NOTICE
#include "rpl_gtid.h"
#include "sql_string.h" // needed for Rpl_filter
#include "sql_list.h" // needed for Rpl_filter
#include "rpl_filter.h"
@@ -82,7 +83,7 @@ extern "C" {
char server_version[SERVER_VERSION_LENGTH];
}
ulong server_id = 0;
static char *server_id_str;
// needed by net_serv.c
ulong bytes_sent = 0L, bytes_received = 0L;
@@ -144,6 +145,8 @@ static char *charset= 0;
static uint verbose= 0;
static char *ignore_domain_ids_str, *do_domain_ids_str;
static char *ignore_server_ids_str, *do_server_ids_str;
static char *start_pos_str, *stop_pos_str;
static ulonglong start_position= BIN_LOG_HEADER_SIZE,
stop_position= (longlong)(~(my_off_t)0) ;
@@ -151,7 +154,10 @@ static ulonglong start_position= BIN_LOG_HEADER_SIZE,
#define stop_position_mot ((my_off_t)stop_position)
static Binlog_gtid_state_validator *gtid_state_validator= NULL;
static Domain_gtid_event_filter *domain_gtid_filter= NULL;
static Gtid_event_filter *gtid_event_filter= NULL;
static Domain_gtid_event_filter *position_gtid_filter= NULL;
static Domain_gtid_event_filter *domain_id_gtid_filter= NULL;
static Server_gtid_event_filter *server_id_gtid_filter= NULL;
static char *start_datetime_str, *stop_datetime_str;
static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX;
@@ -987,9 +993,16 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
return result;
}
static inline my_bool is_gtid_filtering_enabled()
/*
Check if the server id should be excluded from the output.
*/
static inline my_bool is_server_id_excluded(uint32 server_id)
{
return domain_gtid_filter != NULL;
static rpl_gtid server_tester_gtid;
server_tester_gtid.server_id= server_id;
return server_id_gtid_filter == NULL
? FALSE // No server id filter exists
: server_id_gtid_filter->exclude(&server_tester_gtid);
}
/**
@@ -1061,14 +1074,15 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
glev->count))
goto err;
if (domain_gtid_filter && !domain_gtid_filter->get_num_start_gtids())
if (position_gtid_filter &&
!position_gtid_filter->get_num_start_gtids())
{
/*
We need to validate the GTID list from --stop-position because we
couldn't prove it intrinsically (i.e. using stop > start)
*/
rpl_gtid *stop_gtids= domain_gtid_filter->get_stop_gtids();
size_t n_stop_gtids= domain_gtid_filter->get_num_stop_gtids();
rpl_gtid *stop_gtids= position_gtid_filter->get_stop_gtids();
size_t n_stop_gtids= position_gtid_filter->get_num_stop_gtids();
if (gtid_state_validator->verify_stop_state(stderr, stop_gtids,
n_stop_gtids))
{
@@ -1101,15 +1115,15 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
If the binlog output should be filtered using GTIDs, test the new event
group to see if its events should be ignored.
*/
if (domain_gtid_filter)
if (gtid_event_filter)
{
if (domain_gtid_filter->has_finished())
if (gtid_event_filter->has_finished())
{
retval= OK_STOP;
goto end;
}
if (!domain_gtid_filter->exclude(&ev_gtid))
if (!gtid_event_filter->exclude(&ev_gtid))
print_event_info->activate_current_event_group();
else
print_event_info->deactivate_current_event_group();
@@ -1124,7 +1138,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
If we don't care about ensuring GTID validity, just delete the auditor
object to disable it for future checks.
*/
if (gtid_state_validator)
if (gtid_state_validator && print_event_info->is_event_group_active())
{
if (!(opt_gtid_strict_mode || verbose >= 3))
{
@@ -1180,8 +1194,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
the format_description event so that we can parse subsequent
events.
*/
if (ev_type != ROTATE_EVENT &&
server_id && (server_id != ev->server_id))
if (ev_type != ROTATE_EVENT && is_server_id_excluded(ev->server_id))
goto end;
}
if ((ev->when >= stop_datetime)
@@ -1769,9 +1782,42 @@ static struct my_option my_options[] =
"Print row event positions",
&print_row_event_positions, &print_row_event_positions, 0, GET_BOOL,
NO_ARG, 1, 0, 0, 0, 0, 0},
{"server-id", 0,
"Extract only binlog entries created by the server having the given id.",
&server_id, &server_id, 0, GET_ULONG,
{"ignore-domain-ids", OPT_IGNORE_DOMAIN_IDS,
"A list of positive integers, separated by commas, that form a blacklist "
"of domain ids. Any log event with a GTID that originates from a domain id "
"specified in this list is hidden. Cannot be used with "
"--do-domain-ids. When used with --(ignore|do)-server-ids, the result is the "
"intersection between the two datasets.",
&ignore_domain_ids_str, &ignore_domain_ids_str, 0, GET_STR_ALLOC,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"do-domain-ids", OPT_DO_DOMAIN_IDS,
"A list of positive integers, separated by commas, that form a whitelist "
"of domain ids. Any log event with a GTID that originates from a domain id "
"specified in this list is displayed. Cannot be used with "
"--ignore-domain-ids. When used with --(ignore|do)-server-ids, the result "
"is the intersection between the two datasets.",
&do_domain_ids_str, &do_domain_ids_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0,
0, 0, 0, 0, 0},
{"ignore-server-ids", OPT_IGNORE_SERVER_IDS,
"A list of positive integers, separated by commas, that form a blacklist "
"of server ids. Any log event originating from a server id "
"specified in this list is hidden. Cannot be used with "
"--do-server-ids. When used with --(ignore|do)-domain-ids, the result is "
"the intersection between the two datasets.",
&ignore_server_ids_str, &ignore_server_ids_str, 0, GET_STR_ALLOC,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"do-server-ids", OPT_DO_SERVER_IDS,
"A list of positive integers, separated by commas, that form a whitelist "
"of server ids. Any log event originating from a server id "
"specified in this list is displayed. Cannot be used with "
"--ignore-server-ids. When used with --(ignore|do)-domain-ids, the result "
"is the intersection between the two datasets. Alias for --server-id.",
&do_server_ids_str, &do_server_ids_str, 0, GET_STR_ALLOC,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"server-id", OPT_SERVER_ID,
"Extract only binlog entries created by the server having the given id. "
"Alias for --do-server-ids.",
&server_id_str, &server_id_str, 0, GET_STR_ALLOC,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"set-charset", OPT_SET_CHARSET,
"Add 'SET NAMES character_set' to the output.", &charset,
@@ -1976,9 +2022,31 @@ static void cleanup()
my_free(stop_datetime_str);
my_free(start_pos_str);
my_free(stop_pos_str);
my_free(ignore_domain_ids_str);
my_free(do_domain_ids_str);
my_free(ignore_server_ids_str);
my_free(do_server_ids_str);
my_free(server_id_str);
free_root(&glob_root, MYF(0));
delete domain_gtid_filter;
if (gtid_event_filter)
{
delete gtid_event_filter;
}
else
{
/*
If there was an error during input parsing, gtid_event_filter will not
be set, so we need to ensure the comprising filters are cleaned up
properly.
*/
if (domain_id_gtid_filter)
delete domain_id_gtid_filter;
if (position_gtid_filter)
delete position_gtid_filter;
if (server_id_gtid_filter)
delete server_id_gtid_filter;
}
if (gtid_state_validator)
delete gtid_state_validator;
@@ -1994,6 +2062,89 @@ static void cleanup()
DBUG_VOID_RETURN;
}
/*
Parse a list of positive numbers separated by commas.
Returns a list of numbers on success, NULL on parsing/resource error
*/
static uint32 *parse_u32_list(const char *str, size_t str_len, uint32 *n_vals)
{
const char *str_begin= const_cast<char *>(str);
const char *str_end= str_begin + str_len;
const char *p = str_begin;
uint32 len= 0, alloc_len= (uint32) ceil(str_len/2.0);
uint32 *list= NULL;
int err;
for (;;)
{
uint32 val;
/*
Set it to the end of the string overall, but when parsing, it will be
moved to the end of the element
*/
char *el_end= (char*) str_begin + str_len;
if (len >= (((uint32)1 << 28)-1))
{
my_free(list);
list= NULL;
goto end;
}
val= (uint32)my_strtoll10(p, &el_end, &err);
if (err)
{
my_free(list);
list= NULL;
goto end;
}
p = el_end;
if ((!list || len >= alloc_len) &&
!(list=
(uint32 *)my_realloc(PSI_INSTRUMENT_ME, list,
(alloc_len= alloc_len*2) * sizeof(uint32),
MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
return NULL;
list[len++]= val;
if (el_end == str_end)
break;
if (*p != ',')
{
my_free(list);
return NULL;
}
++p;
}
*n_vals= len;
end:
return list;
}
/*
If multiple different types of Gtid_event_filters are used, the result
should be the intersection between the filter types.
*/
static void extend_main_gtid_event_filter(Gtid_event_filter *new_filter)
{
if (gtid_event_filter == NULL)
{
gtid_event_filter= new_filter;
}
else
{
if (gtid_event_filter->get_filter_type() !=
Gtid_event_filter::INTERSECTING_GTID_FILTER_TYPE)
gtid_event_filter=
new Intersecting_gtid_event_filter(gtid_event_filter, new_filter);
else
((Intersecting_gtid_event_filter *) gtid_event_filter)
->add_filter(new_filter);
}
}
static void die()
{
@@ -2047,6 +2198,110 @@ static my_time_t convert_str_to_timestamp(const char* str)
my_system_gmt_sec(&l_time, &dummy_my_timezone, &dummy_in_dst_time_gap);
}
/**
Parses a start or stop position argument and populates either
start_position/stop_position (if a log offset) or position_gtid_filter
(if a gtid position)
@param[in] option_name : Name of the command line option provided (used for
error message)
@param[in] option_val : The user-provided value of the option_name
@param[out] fallback : Pointer to a global variable to set if using log
offsets
@param[in] add_gtid : Function pointer to a class method to add a GTID to a
Gtid_event_filter
@param[in] add_zero_seqno : If using GTID positions, this boolean specifies
if GTIDs with a sequence number of 0 should be added to the filter
*/
int parse_position_argument(
const char *option_name, char *option_val, ulonglong *fallback,
int (Domain_gtid_event_filter::*add_gtid)(rpl_gtid *),
my_bool add_zero_seqno)
{
uint32 n_gtids= 0;
rpl_gtid *gtid_list=
gtid_parse_string_to_list(option_val, strlen(option_val), &n_gtids);
if (gtid_list == NULL)
{
int err= 0;
char *end_ptr= NULL;
/*
No GTIDs specified in position specification. Treat the value
as a singular index.
*/
*fallback= my_strtoll10(option_val, &end_ptr, &err);
if (err || *end_ptr)
{
// Can't parse the position from the user
sql_print_error("%s argument value is invalid. Should be either a "
"positive integer or GTID.",
option_name);
return 1;
}
}
else if (n_gtids > 0)
{
uint32 gtid_idx;
if (position_gtid_filter == NULL)
position_gtid_filter= new Domain_gtid_event_filter();
for (gtid_idx = 0; gtid_idx < n_gtids; gtid_idx++)
{
rpl_gtid *gtid= &gtid_list[gtid_idx];
if ((gtid->seq_no || add_zero_seqno) &&
(position_gtid_filter->*add_gtid)(gtid))
{
my_free(gtid_list);
return 1;
}
}
my_free(gtid_list);
}
else
{
DBUG_ASSERT(0);
}
return 0;
}
/**
Parses a do/ignore domain/server ids option and populates the corresponding
gtid filter
@param[in] option_name : Name of the command line option provided (used for
error message)
@param[in] option_value : The user-provided list of domain or server ids
@param[in] filter : The filter to update with the provided domain/server id
@param[in] mode : Specifies whether the list should be a blacklist or
whitelist
*/
template <typename T>
int parse_gtid_filter_option(
const char *option_name, char *option_val, T **filter,
Gtid_event_filter::id_restriction_mode mode)
{
uint32 n_ids= 0;
uint32 *id_list= parse_u32_list(option_val, strlen(option_val), &n_ids);
if (id_list == NULL)
{
DBUG_ASSERT(n_ids == 0);
sql_print_error(
"Input for %s is invalid. Should be a list of positive integers",
option_name);
return 1;
}
if (!(*filter))
(*filter)= new T();
int err= (*filter)->set_id_restrictions(id_list, n_ids, mode);
my_free(id_list);
return err;
}
extern "C" my_bool
get_one_option(const struct my_option *opt, const char *argument, const char *filename)
@@ -2234,105 +2489,72 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi
case OPT_STOP_POSITION:
{
/* Stop position was already specified, so reset it and use the new list */
if (domain_gtid_filter && domain_gtid_filter->get_num_stop_gtids() > 0)
domain_gtid_filter->clear_stop_gtids();
if (position_gtid_filter &&
position_gtid_filter->get_num_stop_gtids() > 0)
position_gtid_filter->clear_stop_gtids();
uint32 n_stop_gtid_ranges= 0;
rpl_gtid *stop_gtids= gtid_parse_string_to_list(
stop_pos_str, strlen(stop_pos_str), &n_stop_gtid_ranges);
if (stop_gtids == NULL)
{
int err= 0;
char *end_ptr= NULL;
/*
No GTIDs specified in OPT_STOP_POSITION specification. Treat the value
as a singular index.
*/
stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err);
if (err || *end_ptr)
{
// Can't parse the position from the user
sql_print_error("Stop position argument value is invalid. Should be "
"either a positive integer or GTID.");
return 1;
}
}
else if (n_stop_gtid_ranges > 0)
{
uint32 gtid_idx;
if (domain_gtid_filter == NULL)
domain_gtid_filter= new Domain_gtid_event_filter();
for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++)
{
rpl_gtid *stop_gtid= &stop_gtids[gtid_idx];
if (domain_gtid_filter->add_stop_gtid(stop_gtid))
{
my_free(stop_gtids);
return 1;
}
}
my_free(stop_gtids);
}
else
{
DBUG_ASSERT(0);
}
if (parse_position_argument(
"--stop-position", stop_pos_str, &stop_position,
&Domain_gtid_event_filter::add_stop_gtid, TRUE))
return 1;
break;
}
case 'j':
{
/* Start position was already specified, so reset it and use the new list */
if (domain_gtid_filter && domain_gtid_filter->get_num_start_gtids() > 0)
domain_gtid_filter->clear_start_gtids();
if (position_gtid_filter &&
position_gtid_filter->get_num_start_gtids() > 0)
position_gtid_filter->clear_start_gtids();
uint32 n_start_gtid_ranges= 0;
rpl_gtid *start_gtids= gtid_parse_string_to_list(
start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges);
if (start_gtids == NULL)
{
int err= 0;
char *end_ptr= NULL;
/*
No GTIDs specified in OPT_START_POSITION specification. Treat the value
as a singular index.
*/
start_position= my_strtoll10(start_pos_str, &end_ptr, &err);
if (err || *end_ptr)
{
// Can't parse the position from the user
sql_print_error("Start position argument value is invalid. Should be "
"either a positive integer or GTID.");
return 1;
}
}
else if (n_start_gtid_ranges > 0)
{
uint32 gtid_idx;
if (domain_gtid_filter == NULL)
domain_gtid_filter= new Domain_gtid_event_filter();
for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++)
{
rpl_gtid *start_gtid= &start_gtids[gtid_idx];
if (start_gtid->seq_no &&
domain_gtid_filter->add_start_gtid(start_gtid))
{
my_free(start_gtids);
return 1;
}
}
my_free(start_gtids);
}
else
{
DBUG_ASSERT(0);
}
if (parse_position_argument(
"--start-position", start_pos_str, &start_position,
&Domain_gtid_event_filter::add_start_gtid, FALSE))
return 1;
break;
}
case OPT_IGNORE_DOMAIN_IDS:
{
if (parse_gtid_filter_option<Domain_gtid_event_filter>(
"--ignore-domain-ids", ignore_domain_ids_str,
&domain_id_gtid_filter,
Gtid_event_filter::id_restriction_mode::BLACKLIST_MODE))
return 1;
break;
}
case OPT_DO_DOMAIN_IDS:
{
if (parse_gtid_filter_option<Domain_gtid_event_filter>(
"--do-domain-ids", do_domain_ids_str,
&domain_id_gtid_filter,
Gtid_event_filter::id_restriction_mode::WHITELIST_MODE))
return 1;
break;
}
case OPT_IGNORE_SERVER_IDS:
{
if (parse_gtid_filter_option<Server_gtid_event_filter>(
"--ignore-server-ids", ignore_server_ids_str,
&server_id_gtid_filter,
Gtid_event_filter::id_restriction_mode::BLACKLIST_MODE))
return 1;
break;
}
case OPT_DO_SERVER_IDS:
{
if (parse_gtid_filter_option<Server_gtid_event_filter>(
"--do-server-ids", do_server_ids_str,
&server_id_gtid_filter,
Gtid_event_filter::id_restriction_mode::WHITELIST_MODE))
return 1;
break;
}
case OPT_SERVER_ID:
{
if (parse_gtid_filter_option<Server_gtid_event_filter>(
"--server-id", server_id_str,
&server_id_gtid_filter,
Gtid_event_filter::id_restriction_mode::WHITELIST_MODE))
return 1;
break;
}
case '?':
@@ -2346,7 +2568,6 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi
return 0;
}
static int parse_args(int *argc, char*** argv)
{
int ho_error;
@@ -2376,9 +2597,10 @@ static int parse_args(int *argc, char*** argv)
*/
gtid_state_validator= new Binlog_gtid_state_validator();
if (domain_gtid_filter)
if (position_gtid_filter)
{
if (opt_gtid_strict_mode && domain_gtid_filter->validate_window_filters())
if (opt_gtid_strict_mode &&
position_gtid_filter->validate_window_filters())
{
/*
In strict mode, if any --start/stop-position GTID ranges are invalid,
@@ -2387,17 +2609,24 @@ static int parse_args(int *argc, char*** argv)
*/
die();
}
extend_main_gtid_event_filter(position_gtid_filter);
/*
GTIDs before a start position shouldn't be validated, so we initialize
the stream auditor to only monitor GTIDs after these positions.
*/
size_t n_start_gtids= domain_gtid_filter->get_num_start_gtids();
rpl_gtid *start_gtids= domain_gtid_filter->get_start_gtids();
size_t n_start_gtids= position_gtid_filter->get_num_start_gtids();
rpl_gtid *start_gtids= position_gtid_filter->get_start_gtids();
gtid_state_validator->initialize_start_gtids(start_gtids, n_start_gtids);
my_free(start_gtids);
}
if(domain_id_gtid_filter)
extend_main_gtid_event_filter(domain_id_gtid_filter);
if(server_id_gtid_filter)
extend_main_gtid_event_filter(server_id_gtid_filter);
return 0;
}
@@ -2479,8 +2708,9 @@ static Exit_status dump_log_entries(const char* logname)
if (!print_event_info.init_ok())
return ERROR_STOP;
if (domain_gtid_filter)
if (position_gtid_filter || domain_id_gtid_filter)
print_event_info.enable_event_group_filtering();
/*
Set safe delimiter, to dump things
like CREATE PROCEDURE safely
@@ -2582,7 +2812,8 @@ static Exit_status check_master_version()
goto err;
}
if (domain_gtid_filter && domain_gtid_filter->get_num_start_gtids() > 0)
if (position_gtid_filter &&
position_gtid_filter->get_num_start_gtids() > 0)
{
char str_buf[256];
String query_str(str_buf, sizeof(str_buf), system_charset_info);
@@ -2590,8 +2821,8 @@ static Exit_status check_master_version()
query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"),
system_charset_info);
size_t n_start_gtids= domain_gtid_filter->get_num_start_gtids();
rpl_gtid *start_gtids= domain_gtid_filter->get_start_gtids();
size_t n_start_gtids= position_gtid_filter->get_num_start_gtids();
rpl_gtid *start_gtids= position_gtid_filter->get_start_gtids();
for (size_t gtid_idx = 0; gtid_idx < n_start_gtids; gtid_idx++)
{
@@ -3539,7 +3770,7 @@ int main(int argc, char** argv)
"/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n");
fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n");
if (is_gtid_filtering_enabled())
if (gtid_event_filter)
{
fprintf(result_file,
"/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n"