diff --git a/sql/log.cc b/sql/log.cc index 0444499c72b..aca71915ddf 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -5278,6 +5278,67 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, DBUG_RETURN(error); } + +/* Generate a new global transaction ID, and write it to the binlog */ +bool +MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, + bool is_transactional) +{ + rpl_gtid gtid; + uint64 seq_no; + + seq_no= thd->variables.gtid_seq_no; + /* + Reset the session variable gtid_seq_no, to reduce the risk of accidentally + producing a duplicate GTID. + */ + thd->variables.gtid_seq_no= 0; + if (seq_no != 0) + { + /* + If we see a higher sequence number, use that one as the basis of any + later generated sequence numbers. + + This way, in simple tree replication topologies with just one master + generating events at any point in time, sequence number will always be + monotonic irrespectively of server_id. Only if events are produced in + parallel on multiple master servers will sequence id be non-monotonic + and server id needed to distinguish. + + We will not rely on this in the server code, but it makes things + conceptually easier to understand for the DBA. + */ + mysql_mutex_lock(&LOCK_gtid_counter); + if (global_gtid_counter < seq_no) + global_gtid_counter= seq_no; + mysql_mutex_unlock(&LOCK_gtid_counter); + } + else + { + mysql_mutex_lock(&LOCK_gtid_counter); + seq_no= ++global_gtid_counter; + mysql_mutex_unlock(&LOCK_gtid_counter); + } + gtid.seq_no= seq_no; + gtid.domain_id= thd->variables.gtid_domain_id; + + Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone, + LOG_EVENT_SUPPRESS_USE_F, is_transactional); + gtid.server_id= gtid_event.server_id; + + /* Write the event to the binary log. */ + if (gtid_event.write(&mysql_bin_log.log_file)) + return true; + status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); + + /* Update the replication state (last GTID in each replication domain). */ + mysql_mutex_lock(&LOCK_rpl_gtid_state); + global_rpl_gtid_state.update(>id); + mysql_mutex_unlock(&LOCK_rpl_gtid_state); + return false; +} + + /** Write an event to the binary log. If with_annotate != NULL and *with_annotate = TRUE write also Annotate_rows before the event @@ -5347,6 +5408,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); prev_binlog_id= current_binlog_id; + write_gtid_event(thd, true, using_trans); } else { @@ -6219,19 +6281,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, break; } - /* - Log "BEGIN" at the beginning of every transaction. Here, a transaction is - either a BEGIN..COMMIT block or a single statement in autocommit mode. - - Create the necessary events here, where we have the correct THD (and - thread context). - - Due to group commit the actual writing to binlog may happen in a different - thread. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE, - TRUE, 0); - entry.begin_event= &qinfo; entry.end_event= end_ev; if (cache_mngr->stmt_cache.has_incident() || cache_mngr->trx_cache.has_incident()) @@ -6607,10 +6656,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) { binlog_cache_mngr *mngr= entry->cache_mngr; - if (entry->begin_event->write(&log_file)) + if (write_gtid_event(entry->thd, false, entry->using_trx_cache)) return ER_ERROR_ON_WRITE; - status_var_add(entry->thd->status_var.binlog_bytes_written, - entry->begin_event->data_written); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) diff --git a/sql/log.h b/sql/log.h index 80fe34b5ff2..417d0528801 100644 --- a/sql/log.h +++ b/sql/log.h @@ -420,11 +420,10 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG bool using_stmt_cache; bool using_trx_cache; /* - Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + Extra events (COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be written during group commit. The incident_event is only valid if trx_data->has_incident() is true. */ - Log_event *begin_event; Log_event *end_event; Log_event *incident_event; /* Set during group commit to record any per-thread error. */ @@ -771,6 +770,7 @@ public: inline IO_CACHE *get_index_file() { return &index_file;} inline uint32 get_open_count() { return open_count; } void set_status_variables(THD *thd); + bool write_gtid_event(THD *thd, bool standalone, bool is_transactional); }; class Log_event_handler diff --git a/sql/log_event.cc b/sql/log_event.cc index 10f0fe1e931..fb2ddb2b012 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -749,6 +749,8 @@ const char* Log_event::get_type_str(Log_event_type type) case INCIDENT_EVENT: return "Incident"; case ANNOTATE_ROWS_EVENT: return "Annotate_rows"; case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint"; + case GTID_EVENT: return "Gtid"; + case GTID_LIST_EVENT: return "Gtid_list"; default: return "Unknown"; /* impossible */ } } @@ -1560,6 +1562,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case BINLOG_CHECKPOINT_EVENT: ev = new Binlog_checkpoint_log_event(buf, event_len, description_event); break; + case GTID_EVENT: + ev = new Gtid_log_event(buf, event_len, description_event); + break; + case GTID_LIST_EVENT: + ev = new Gtid_list_log_event(buf, event_len, description_event); + break; #ifdef HAVE_REPLICATION case SLAVE_EVENT: /* can never happen (unused event) */ ev = new Slave_log_event(buf, event_len, description_event); @@ -3432,6 +3440,53 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset, return 0; } +/* + Replace an event (GTID event) with a BEGIN query event, to be compatible + with an old slave. +*/ +int +Query_log_event::begin_event(String *packet, ulong ev_offset, + uint8 checksum_alg) +{ + uchar *p= (uchar *)packet->ptr() + ev_offset; + uchar *q= p + LOG_EVENT_HEADER_LEN; + size_t data_len= packet->length() - ev_offset; + uint16 flags; + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + data_len-= BINLOG_CHECKSUM_LEN; + else + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_OFF); + + /* Currently we only need to replace GTID event. */ + DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN); + if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return 1; + + flags= uint2korr(p + FLAGS_OFFSET); + flags&= ~LOG_EVENT_THREAD_SPECIFIC_F; + flags|= LOG_EVENT_SUPPRESS_USE_F; + int2store(p + FLAGS_OFFSET, flags); + + p[EVENT_TYPE_OFFSET]= QUERY_EVENT; + int4store(q + Q_THREAD_ID_OFFSET, 0); + int4store(q + Q_EXEC_TIME_OFFSET, 0); + q[Q_DB_LEN_OFFSET]= 0; + int2store(q + Q_ERR_CODE_OFFSET, 0); + int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0); + q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */ + q+= Q_DATA_OFFSET + 1; + memcpy(q, "BEGIN", 5); + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + { + ha_checksum crc= my_checksum(0L, p, data_len); + int4store(p + data_len, crc); + } + return 0; +} + #ifdef MYSQL_CLIENT /** @@ -4454,6 +4509,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN; post_header_len[BINLOG_CHECKPOINT_EVENT-1]= BINLOG_CHECKPOINT_HEADER_LEN; + post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN; + post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN; // Sanity-check that all post header lengths are initialized. int i; @@ -5992,6 +6049,406 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file) #endif /* MYSQL_CLIENT */ +/************************************************************************** + Global transaction ID stuff +**************************************************************************/ + +/** + Current replication state (hash of last GTID executed, per replication + domain). +*/ +rpl_state global_rpl_gtid_state; + + +rpl_state::rpl_state() +{ + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(rpl_gtid, domain_id), sizeof(uint32), + NULL, my_free, HASH_UNIQUE); +} + + +rpl_state::~rpl_state() +{ + my_hash_free(&hash); +} + + +#ifdef MYSQL_SERVER +/* + Update replication state with a new GTID. + + If the replication domain id already exists, then the new GTID replaces the + old one for that domain id. Else a new entry is inserted. + + Returns 0 for ok, 1 for error. +*/ +int +rpl_state::update(const struct rpl_gtid *gtid) +{ + uchar *rec; + + rec= my_hash_search(&hash, (const uchar *)gtid, 0); + if (rec) + { + const rpl_gtid *old_gtid= (const rpl_gtid *)rec; + if (old_gtid->server_id == gtid->server_id && + old_gtid->seq_no > gtid->seq_no) + sql_print_warning("Out-of-order GTIDs detected for server_id=%u. " + "Please ensure that independent replication streams " + "use different replication domain_id to avoid " + "inconsistencies.", gtid->server_id); + else + memcpy(rec, gtid, sizeof(*gtid)); + return 0; + } + + if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME)))) + return 1; + memcpy(rec, gtid, sizeof(*gtid)); + return my_hash_insert(&hash, rec); +} +#endif /* MYSQL_SERVER */ + + +Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) + : Log_event(buf, description_event), seq_no(0) +{ + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1]; + if (event_len < header_size + post_header_len || + post_header_len < GTID_HEADER_LEN) + return; + + buf+= header_size; + seq_no= uint8korr(buf); + buf+= 8; + domain_id= uint4korr(buf); + buf+= 4; + flags2= *buf; +} + + +#ifdef MYSQL_SERVER + +Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, + uint32 domain_id_arg, bool standalone, + uint16 flags_arg, bool is_transactional) + : Log_event(thd_arg, flags_arg, is_transactional), + seq_no(seq_no_arg), domain_id(domain_id_arg), + flags2(standalone ? FL_STANDALONE : 0) +{ +} + +bool +Gtid_log_event::write(IO_CACHE *file) +{ + uchar buf[GTID_HEADER_LEN]; + int8store(buf, seq_no); + int4store(buf+8, domain_id); + buf[12]= flags2; + bzero(buf+13, GTID_HEADER_LEN-13); + return write_header(file, GTID_HEADER_LEN) || + wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) || + write_footer(file); +} + + +/* + Replace a GTID event with either a BEGIN event, dummy event, or nothing, as + appropriate to work with old slave that does not know global transaction id. + + The need_dummy_event argument is an IN/OUT argument. It is passed as TRUE + if slave has capability lower than MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES. + It is returned TRUE if we return a BEGIN (or dummy) event to be sent to the + slave, FALSE if event should be skipped completely. +*/ +int +Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, + ulong ev_offset, uint8 checksum_alg) +{ + uchar flags2; + if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return 1; + flags2= (*packet)[ev_offset + LOG_EVENT_HEADER_LEN + 12]; + if (flags2 & FL_STANDALONE) + { + if (need_dummy_event) + return Query_log_event::dummy_event(packet, ev_offset, checksum_alg); + else + return 0; + } + + *need_dummy_event= true; + return Query_log_event::begin_event(packet, ev_offset, checksum_alg); +} + + +#ifdef HAVE_REPLICATION +void +Gtid_log_event::pack_info(THD *thd, Protocol *protocol) +{ + char buf[6+5+10+1+10+1+20+1]; + char *p; + p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); + if (domain_id) + { + p= longlong10_to_str(domain_id, p, 10); + *p++= '-'; + } + p= longlong10_to_str(server_id, p, 10); + *p++= '-'; + p= longlong10_to_str(seq_no, p, 10); + + protocol->store(buf, p-buf, &my_charset_bin); +} + +static char gtid_begin_string[5] = {'B','E','G','I','N'}; + +int +Gtid_log_event::do_apply_event(Relay_log_info const *rli) +{ + const_cast(rli)->slave_close_thread_tables(thd); + + /* ToDo: record the new GTID. */ + + if (flags2 & FL_STANDALONE) + return 0; + + /* Execute this like a BEGIN query event. */ + thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string), + &my_charset_bin, next_query_id()); + Parser_state parser_state; + if (!parser_state.init(thd, thd->query(), thd->query_length())) + { + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + /* Finalize server status flags after executing a statement. */ + thd->update_server_status(); + log_slow_statement(thd); + general_log_write(thd, COM_QUERY, thd->query(), thd->query_length()); + } + + thd->reset_query(); + free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); + return 0; +} + + +int +Gtid_log_event::do_update_pos(Relay_log_info *rli) +{ + rli->inc_event_relay_log_pos(); + return 0; +} + + +Log_event::enum_skip_reason +Gtid_log_event::do_shall_skip(Relay_log_info *rli) +{ + /* + An event skipped due to @@skip_replication must not be counted towards the + number of events to be skipped due to @@sql_slave_skip_counter. + */ + if (flags & LOG_EVENT_SKIP_REPLICATION_F && + opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE) + return Log_event::EVENT_SKIP_IGNORE; + + if (rli->slave_skip_counter > 0) + { + if (!(flags2 & FL_STANDALONE)) + thd->variables.option_bits|= OPTION_BEGIN; + return Log_event::continue_group(rli); + } + return Log_event::do_shall_skip(rli); +} + + +#endif /* HAVE_REPLICATION */ + +#else /* !MYSQL_SERVER */ + +void +Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char buf[21]; + + print_header(&cache, print_event_info, FALSE); + longlong10_to_str(seq_no, buf, 10); + if (!print_event_info->short_form) + { + my_b_printf(&cache, "\tGTID "); + if (domain_id) + my_b_printf(&cache, "%u-", domain_id); + my_b_printf(&cache, "%u-%s", server_id, buf); + } + my_b_printf(&cache, "\n"); + + if (!print_event_info->domain_id_printed || + print_event_info->domain_id != domain_id) + { + my_b_printf(&cache, "/*!100001 SET @@session.gtid_domain_id=%u*/%s\n", + domain_id, print_event_info->delimiter); + print_event_info->domain_id= domain_id; + print_event_info->domain_id_printed= true; + } + + if (!print_event_info->server_id_printed || + print_event_info->server_id != server_id) + { + my_b_printf(&cache, "/*!100001 SET @@session.server_id=%u*/%s\n", + server_id, print_event_info->delimiter); + print_event_info->server_id= server_id; + print_event_info->server_id_printed= true; + } + + my_b_printf(&cache, "/*!100001 SET @@session.gtid_seq_no=%s*/%s\n", + buf, print_event_info->delimiter); + if (!(flags2 & FL_STANDALONE)) + my_b_printf(&cache, "BEGIN%s\n", print_event_info->delimiter); +} + +#endif /* MYSQL_SERVER */ + + +/* GTID list. */ + +Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) + : Log_event(buf, description_event), count(0), list(0) +{ + uint32 i; + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1]; + if (event_len < header_size + post_header_len || + post_header_len < GTID_LIST_HEADER_LEN) + return; + + buf+= header_size; + count= uint4korr(buf) & ((1<<28)-1); + buf+= 4; + if (count == 0 || + event_len - (header_size + post_header_len) < count*element_size || + (!(list= (rpl_gtid *)my_malloc(count*sizeof(*list), MYF(MY_WME))))) + return; + + for (i= 0; i < count; ++i) + { + list[i].domain_id= uint4korr(buf); + buf+= 4; + list[i].server_id= uint4korr(buf); + buf+= 4; + list[i].seq_no= uint8korr(buf); + buf+= 8; + } +} + + +#ifdef MYSQL_SERVER + +Gtid_list_log_event::Gtid_list_log_event(rpl_state *gtid_set) + : count(gtid_set->count()), list(0) +{ + DBUG_ASSERT(count != 0); + + /* Failure to allocate memory will be caught by is_valid() returning false. */ + if (count != 0 && count < (1<<28) && + (list = (rpl_gtid *)my_malloc(count * sizeof(*list), MYF(MY_WME)))) + { + uint32 i; + + for (i= 0; i < count; ++i) + list[i]= *(rpl_gtid *)my_hash_element(>id_set->hash, i); + } +} + +bool +Gtid_list_log_event::write(IO_CACHE *file) +{ + uint32 i; + uchar buf[element_size]; + + DBUG_ASSERT(count < 1<<28); + + if (write_header(file, get_data_size())) + return 1; + int4store(buf, count & ((1<<28)-1)); + if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN)) + return 1; + for (i= 0; i < count; ++i) + { + int4store(buf, list[i].domain_id); + int4store(buf+4, list[i].server_id); + int8store(buf+8, list[i].seq_no); + if (wrapper_my_b_safe_write(file, buf, element_size)) + return 1; + } + return write_footer(file); +} + + +#ifdef HAVE_REPLICATION +void +Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol) +{ + char buf_mem[1024]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + uint32 i; + + buf.length(0); + for (i= 0; i < count; ++i) + { + if (i) + buf.append(STRING_WITH_LEN(", ")); + else + buf.append(STRING_WITH_LEN("[")); + if (list[i].domain_id) + { + buf.append_ulonglong((ulonglong)list[i].domain_id); + buf.append(STRING_WITH_LEN("-")); + } + buf.append_ulonglong((ulonglong)list[i].server_id); + buf.append(STRING_WITH_LEN("-")); + buf.append_ulonglong(list[i].seq_no); + } + buf.append(STRING_WITH_LEN("]")); + + protocol->store(&buf); +} +#endif /* HAVE_REPLICATION */ + +#else /* !MYSQL_SERVER */ + +void +Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + if (!print_event_info->short_form) + { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char buf[21]; + uint32 i; + + print_header(&cache, print_event_info, FALSE); + for (i= 0; i < count; ++i) + { + if (list[i].domain_id) + my_b_printf(&cache, "%u-", list[i].domain_id); + longlong10_to_str(list[i].seq_no, buf, 10); + my_b_printf(&cache, "%u-%s", list[i].server_id, buf); + if (i < count-1) + my_b_printf(&cache, "\n# "); + else + my_b_printf(&cache, "\n"); + } + } +} + +#endif /* MYSQL_SERVER */ + + /************************************************************************** Intvar_log_event methods **************************************************************************/ @@ -11236,7 +11693,9 @@ st_print_event_info::st_print_event_info() auto_increment_increment(0),auto_increment_offset(0), charset_inited(0), lc_time_names_number(~0), charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER), - thread_id(0), thread_id_printed(false), skip_replication(0), + thread_id(0), thread_id_printed(false), server_id(0), + server_id_printed(false), domain_id(0), domain_id_printed(false), + skip_replication(0), base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) { /* diff --git a/sql/log_event.h b/sql/log_event.h index ff13cab9cd5..210b175db3a 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -260,6 +260,8 @@ struct sql_ex_info #define HEARTBEAT_HEADER_LEN 0 #define ANNOTATE_ROWS_HEADER_LEN 0 #define BINLOG_CHECKPOINT_HEADER_LEN 4 +#define GTID_HEADER_LEN 19 +#define GTID_LIST_HEADER_LEN 4 /* Max number of possible extra bytes in a replication event compared to a @@ -599,16 +601,13 @@ enum enum_binlog_checksum_alg { because they mis-compute the offsets into the master's binlog). */ #define MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES 2 -/* MariaDB > 5.5, which knows about binlog_checkpoint_log_event. */ +/* MariaDB >= 10.0, which knows about binlog_checkpoint_log_event. */ #define MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT 3 -/* - MariaDB server which understands MySQL 5.6 ignorable events. This server - can tolerate receiving any event with the LOG_EVENT_IGNORABLE_F flag set. -*/ -#define MARIA_SLAVE_CAPABILITY_IGNORABLE 4 +/* MariaDB >= 10.0.1, which knows about global transaction id events. */ +#define MARIA_SLAVE_CAPABILITY_GTID 4 /* Our capability. */ -#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT +#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_GTID /** @@ -694,6 +693,18 @@ enum Log_event_type that are prepared in storage engines but not yet committed. */ BINLOG_CHECKPOINT_EVENT= 161, + /* + Gtid event. For global transaction ID, used to start a new event group, + instead of the old BEGIN query event, and also to mark stand-alone + events. + */ + GTID_EVENT= 162, + /* + Gtid list event. Logged at the start of every binlog, to record the + current replication state. This consists of the last GTID seen for + each replication domain. + */ + GTID_LIST_EVENT= 163, /* Add new MariaDB events here - right above this comment! */ @@ -766,6 +777,11 @@ typedef struct st_print_event_info uint charset_database_number; uint thread_id; bool thread_id_printed; + uint32 server_id; + bool server_id_printed; + uint32 domain_id; + bool domain_id_printed; + /* Track when @@skip_replication changes so we need to output a SET statement for it. @@ -1874,6 +1890,7 @@ public: } Log_event_type get_type_code() { return QUERY_EVENT; } static int dummy_event(String *packet, ulong ev_offset, uint8 checksum_alg); + static int begin_event(String *packet, ulong ev_offset, uint8 checksum_alg); #ifdef MYSQL_SERVER bool write(IO_CACHE* file); virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; } @@ -2927,6 +2944,210 @@ public: #endif }; + +struct rpl_gtid +{ + uint32 domain_id; + uint32 server_id; + uint64 seq_no; +}; + + +struct rpl_state +{ + HASH hash; + + rpl_state(); + ~rpl_state(); + + ulong count() const { return hash.records; } + int update(const struct rpl_gtid *gtid); +}; + +extern rpl_state global_rpl_gtid_state; + +/** + @class Gtid_log_event + + This event is logged as part of every event group to give the global + transaction id (GTID) of that group. + + It replaces the BEGIN query event used in earlier versions to begin most + event groups, but is also used for events that used to be stand-alone. + + @section Gtid_log_event_binary_format Binary Format + + The binary format for Gtid_log_event has 6 extra reserved bytes to make the + length a total of 19 byte (+ 19 bytes of header in common with all events). + This is just the minimal size for a BEGIN query event, which makes it easy + to replace this event with such BEGIN event to remain compatible with old + slave servers. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Post-Header
NameFormatDescription
seq_no8 byte unsigned integerincreasing id within one server_id. Starts at 1, holes in the sequence + may occur
domain_id4 byte unsigned integerReplication domain id, identifying independent replication streams>
flags1 byte bitfieldBit 0 set indicates stand-alone event (no terminating COMMIT)
Reserved6 bytesReserved bytes, set to 0. Maybe be used for future expansion.
+ + The Body of Gtid_log_event is empty. The total event size is 19 bytes + + the normal 19 bytes common-header. +*/ + +class Gtid_log_event: public Log_event +{ +public: + uint64 seq_no; + uint32 domain_id; + uchar flags2; + + /* Flags2. */ + + /* FL_STANDALONE is set when there is no terminating COMMIT event. */ + static const uchar FL_STANDALONE= 1; + +#ifdef MYSQL_SERVER + Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, + uint16 flags, bool is_transactional); +#ifdef HAVE_REPLICATION + void pack_info(THD *thd, Protocol *protocol); + virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_update_pos(Relay_log_info *rli); + virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); +#endif +#else + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + Gtid_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); + ~Gtid_log_event() { } + Log_event_type get_type_code() { return GTID_EVENT; } + int get_data_size() { return GTID_HEADER_LEN; } + bool is_valid() const { return seq_no != 0; } +#ifdef MYSQL_SERVER + bool write(IO_CACHE *file); + static int make_compatible_event(String *packet, bool *need_dummy_event, + ulong ev_offset, uint8 checksum_alg); +#endif +}; + + +/** + @class Gtid_list_log_event + + This event is logged at the start of every binlog file to record the + current replication state: the last global transaction id (GTID) applied + on the server within each replication domain. + + It consists of a list of GTIDs, one for each replication domain ever seen + on the server. + + @section Gtid_list_log_event_binary_format Binary Format + + + + + + + + + + + + + + + +
Post-Header
NameFormatDescription
count4 byte unsigned integerThe lower 28 bits are the number of GTIDs. The upper 4 bits are + reserved for flags bits for future expansion
+ + + + + + + + + + + + + + + + + + + + + + + + + + + +
Body
NameFormatDescription
domain_id4 byte unsigned integerReplication domain id of one GTID
server_id4 byte unsigned integerServer id of one GTID
seq_no8 byte unsigned integersequence number of one GTID
+ + The three elements in the body repeat COUNT times to form the GTID list. +*/ + +class Gtid_list_log_event: public Log_event +{ +public: + uint32 count; + struct rpl_gtid *list; + + static const uint element_size= 4+4+8; + +#ifdef MYSQL_SERVER + Gtid_list_log_event(rpl_state *gtid_set); +#ifdef HAVE_REPLICATION + void pack_info(THD *thd, Protocol *protocol); +#endif +#else + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + Gtid_list_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); + ~Gtid_list_log_event() { my_free(list); } + Log_event_type get_type_code() { return GTID_LIST_EVENT; } + int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; } + bool is_valid() const { return list != NULL; } +#ifdef MYSQL_SERVER + bool write(IO_CACHE *file); +#endif +}; + + /* the classes below are for the new LOAD DATA INFILE logging */ /** diff --git a/sql/mysqld.cc b/sql/mysqld.cc index c662b3279ef..6532ecb8733 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -675,6 +675,8 @@ mysql_mutex_t mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, LOCK_global_table_stats, LOCK_global_index_stats; +mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; + /** The below lock protects access to two global server variables: max_prepared_stmt_count and prepared_stmt_count. These variables @@ -770,6 +772,8 @@ PSI_mutex_key key_LOCK_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; +PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; + PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered; static PSI_mutex_info all_server_mutexes[]= @@ -813,6 +817,8 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0}, + { &key_LOCK_gtid_counter, "LOCK_gtid_counter", PSI_FLAG_GLOBAL}, + { &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL}, { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0}, { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL}, { &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL}, @@ -1279,6 +1285,12 @@ struct st_VioSSLFd *ssl_acceptor_fd; */ uint connection_count= 0, extra_connection_count= 0; +/** + Running counter for generating new GTIDs locally. +*/ +uint64 global_gtid_counter= 0; + + /* Function declarations */ pthread_handler_t signal_hand(void *arg); @@ -1940,6 +1952,8 @@ static void clean_up_mutexes() mysql_mutex_destroy(&LOCK_global_user_client_stats); mysql_mutex_destroy(&LOCK_global_table_stats); mysql_mutex_destroy(&LOCK_global_index_stats); + mysql_mutex_destroy(&LOCK_gtid_counter); + mysql_mutex_destroy(&LOCK_rpl_gtid_state); #ifdef HAVE_OPENSSL mysql_mutex_destroy(&LOCK_des_key_file); #ifndef HAVE_YASSL @@ -4002,6 +4016,10 @@ static int init_thread_environment() &LOCK_global_table_stats, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_global_index_stats, &LOCK_global_index_stats, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_gtid_counter, + &LOCK_gtid_counter, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_rpl_gtid_state, + &LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered, diff --git a/sql/mysqld.h b/sql/mysqld.h index 6bde25f08fb..bf4957dba69 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -252,6 +252,8 @@ extern PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; +extern PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; + extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock; @@ -341,6 +343,7 @@ extern mysql_mutex_t LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_global_system_variables, LOCK_user_conn, LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count; +extern mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count; #ifdef HAVE_OPENSSL extern mysql_mutex_t LOCK_des_key_file; @@ -546,6 +549,7 @@ inline int set_current_thd(THD *thd) extern handlerton *maria_hton; extern uint extra_connection_count; +extern uint64 global_gtid_counter; extern my_bool opt_userstat_running, debug_assert_if_crashed_table; extern uint mysqld_extra_port; extern ulong opt_progress_report_time; diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 5348a94b35b..65366becc41 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -639,7 +639,7 @@ bool check_master_connection_name(LEX_STRING *name) file names without a prefix. */ -void create_logfile_name_with_suffix(char *res_file_name, uint length, +void create_logfile_name_with_suffix(char *res_file_name, size_t length, const char *info_file, bool append, LEX_STRING *suffix) { diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index d52b2992afd..adede7935c6 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -170,7 +170,7 @@ public: }; bool check_master_connection_name(LEX_STRING *name); -void create_logfile_name_with_suffix(char *res_file_name, uint length, +void create_logfile_name_with_suffix(char *res_file_name, size_t length, const char *info_file, bool append, LEX_STRING *suffix); diff --git a/sql/sql_class.h b/sql/sql_class.h index 87b9783c1ed..2e1a12cdedb 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -534,6 +534,12 @@ typedef struct system_variables thread the query is being run to replicate temp tables properly */ my_thread_id pseudo_thread_id; + /** + When replicating an event group with GTID, keep these values around so + slave binlog can receive the same GTID as the original. + */ + uint32 gtid_domain_id; + uint64 gtid_seq_no; /** Place holders to store Multi-source variables in sys_var.cc during update and show of variables. diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 6b8d0f5153f..5698b4679b6 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -616,10 +616,34 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, } /* - Do not send binlog checkpoint events to a slave that does not understand it. + Replace GTID events with old-style BEGIN events for slaves that do not + understand global transaction IDs. For stand-alone events, where there is + no terminating COMMIT query event, omit the GTID event or replace it with + a dummy event, as appropriate. */ - if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && - mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) + if (event_type == GTID_EVENT && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID) + { + bool need_dummy= + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES; + bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy, + ev_offset, + current_checksum_alg); + if (err) + return "Failed to replace GTID event with backwards-compatible event: " + "currupt event."; + if (!need_dummy) + return NULL; + } + + /* + Do not send binlog checkpoint or gtid list events to a slave that does not + understand it. + */ + if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) || + (unlikely(event_type == GTID_LIST_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)) { if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) { diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index fcd2df7b338..0ad7c787782 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1201,6 +1201,29 @@ static Sys_var_ulong Sys_pseudo_thread_id( BLOCK_SIZE(1), NO_MUTEX_GUARD, IN_BINLOG, ON_CHECK(check_has_super)); +static Sys_var_uint Sys_gtid_domain_id( + "gtid_domain_id", + "Used with global transaction ID to identify logically independent " + "replication streams. When events can propagate through multiple " + "parallel paths (for example multiple masters), each independent " + "source server must use a distinct domain_id. For simple tree-shaped " + "replication topologies, it can be left at its default, 0.", + SESSION_VAR(gtid_domain_id), + CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, UINT_MAX32), DEFAULT(0), + BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(check_has_super)); + +static Sys_var_ulonglong Sys_gtid_seq_no( + "gtid_seq_no", + "Internal server usage, for replication with global transaction id. " + "When set, next event group logged to the binary log will use this " + "sequence number, not generate a new one, thus allowing to preserve " + "master's GTID in slave's binlog.", + SESSION_ONLY(gtid_seq_no), + NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0), + BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(check_has_super)); + static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type) { SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables;