mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
Galera GTID support
Support for galera GTID consistency thru cluster. All nodes in cluster should have same GTID for replicated events which are originating from cluster. Cluster originating commands need to contain sequential WSREP GTID seqno Ignore manual setting of gtid_seq_no=X. In master-slave scenario where master is non galera node replicated GTID is replicated and is preserved in all nodes. To have this - domain_id, server_id and seqnos should be same on all nodes. Node which bootstraps the cluster, to achieve this, sends domain_id and server_id to other nodes and this combination is used to write GTID for events that are replicated inside cluster. Cluster nodes that are executing non replicated events are going to have different GTID than replicated ones, difference will be visible in domain part of gtid. With wsrep_gtid_domain_id you can set domain_id for WSREP cluster. Functions WSREP_LAST_WRITTEN_GTID, WSREP_LAST_SEEN_GTID and WSREP_SYNC_WAIT_UPTO_GTID now works with "native" GTID format. Fixed galera tests to reflect this chances. Add variable to manually update WSREP GTID seqno in cluster Add variable to manipulate and change WSREP GTID seqno. Next command originating from cluster and on same thread will have set seqno and cluster should change their internal counter to it's value. Behavior is same as using @@gtid_seq_no for non WSREP transaction.
This commit is contained in:
129
sql/log.cc
129
sql/log.cc
@@ -5048,55 +5048,6 @@ MYSQL_BIN_LOG::is_xidlist_idle_nolock()
|
||||
return true;
|
||||
}
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
inline bool
|
||||
is_gtid_cached_internal(IO_CACHE *file)
|
||||
{
|
||||
uchar data[EVENT_TYPE_OFFSET+1];
|
||||
bool result= false;
|
||||
my_off_t write_pos= my_b_tell(file);
|
||||
if (reinit_io_cache(file, READ_CACHE, 0, 0, 0))
|
||||
return false;
|
||||
/*
|
||||
In the cache we have gtid event if , below condition is true,
|
||||
*/
|
||||
my_b_read(file, data, sizeof(data));
|
||||
uint event_type= (uchar)data[EVENT_TYPE_OFFSET];
|
||||
if (event_type == GTID_LOG_EVENT)
|
||||
result= true;
|
||||
/*
|
||||
Cleanup , Why because we have not read the full buffer
|
||||
and this will cause next to next reinit_io_cache(called in write_cache)
|
||||
to make cache empty.
|
||||
*/
|
||||
file->read_pos= file->read_end;
|
||||
if (reinit_io_cache(file, WRITE_CACHE, write_pos, 0, 0))
|
||||
return false;
|
||||
return result;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
inline bool
|
||||
MYSQL_BIN_LOG::is_gtid_cached(THD *thd)
|
||||
{
|
||||
binlog_cache_mngr *mngr= (binlog_cache_mngr *) thd_get_ha_data(
|
||||
thd, binlog_hton);
|
||||
if (!mngr)
|
||||
return false;
|
||||
binlog_cache_data *cache_trans= mngr->get_binlog_cache_data(
|
||||
use_trans_cache(thd, true));
|
||||
binlog_cache_data *cache_stmt= mngr->get_binlog_cache_data(
|
||||
use_trans_cache(thd, false));
|
||||
if (cache_trans && !cache_trans->empty() &&
|
||||
is_gtid_cached_internal(&cache_trans->cache_log))
|
||||
return true;
|
||||
if (cache_stmt && !cache_stmt->empty() &&
|
||||
is_gtid_cached_internal(&cache_stmt->cache_log))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
/**
|
||||
Create a new log file name.
|
||||
|
||||
@@ -5719,31 +5670,47 @@ THD::binlog_start_trans_and_stmt()
|
||||
{
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
/* Write Gtid
|
||||
Get domain id only when gtid mode is set
|
||||
If this event is replicate through a master then ,
|
||||
we will forward the same gtid another nodes
|
||||
We have to do this only one time in mysql transaction.
|
||||
Since this function is called multiple times , We will check for
|
||||
ha_info->is_started()
|
||||
*/
|
||||
/* If this event replicates through a master-slave then we need to
|
||||
inject manually GTID so it is preserved in the cluster. We are writing
|
||||
directly to WSREP buffer and not in IO cache because in case of IO cache
|
||||
GTID event will be duplicated in binlog.
|
||||
We have to do this only one time in mysql transaction.
|
||||
Since this function is called multiple times , We will check for
|
||||
ha_info->is_started().
|
||||
*/
|
||||
Ha_trx_info *ha_info;
|
||||
ha_info= this->ha_data[binlog_hton->slot].ha_info + (mstmt_mode ? 1 : 0);
|
||||
|
||||
if (!ha_info->is_started() && wsrep_gtid_mode
|
||||
&& this->variables.gtid_seq_no)
|
||||
if (!ha_info->is_started() &&
|
||||
(this->variables.gtid_seq_no || this->variables.wsrep_gtid_seq_no) &&
|
||||
wsrep_on(this) &&
|
||||
(this->wsrep_cs().mode() == wsrep::client_state::m_local))
|
||||
{
|
||||
binlog_cache_mngr *const cache_mngr=
|
||||
(binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
|
||||
binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(1);
|
||||
IO_CACHE *file= &cache_data->cache_log;
|
||||
Log_event_writer writer(file, cache_data);
|
||||
Gtid_log_event gtid_event(this, this->variables.gtid_seq_no,
|
||||
this->variables.gtid_domain_id,
|
||||
true, LOG_EVENT_SUPPRESS_USE_F,
|
||||
true, 0);
|
||||
gtid_event.server_id= this->variables.server_id;
|
||||
writer.write(>id_event);
|
||||
uchar *buf= 0;
|
||||
size_t len= 0;
|
||||
IO_CACHE tmp_io_cache;
|
||||
Log_event_writer writer(&tmp_io_cache, 0);
|
||||
if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
|
||||
128, MYF(MY_WME)))
|
||||
{
|
||||
uint64 seqno= this->variables.gtid_seq_no;
|
||||
uint32 domain_id= this->variables.gtid_domain_id;
|
||||
uint32 server_id= this->variables.server_id;
|
||||
if (!this->variables.gtid_seq_no && this->variables.wsrep_gtid_seq_no)
|
||||
{
|
||||
seqno= this->variables.wsrep_gtid_seq_no;
|
||||
domain_id= wsrep_gtid_server.domain_id;
|
||||
server_id= wsrep_gtid_server.server_id;
|
||||
}
|
||||
Gtid_log_event gtid_event(this, seqno, domain_id, true,
|
||||
LOG_EVENT_SUPPRESS_USE_F, true, 0);
|
||||
gtid_event.server_id= server_id;
|
||||
writer.write(>id_event);
|
||||
wsrep_write_cache_buf(&tmp_io_cache, &buf, &len);
|
||||
if (len > 0) this->wsrep_cs().append_data(wsrep::const_buffer(buf, len));
|
||||
if (buf) my_free(buf);
|
||||
close_cached_file(&tmp_io_cache);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (mstmt_mode)
|
||||
@@ -6024,20 +5991,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
|
||||
DBUG_ENTER("write_gtid_event");
|
||||
DBUG_PRINT("enter", ("standalone: %d", standalone));
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
if (WSREP(thd) &&
|
||||
(wsrep_thd_trx_seqno(thd) > 0) &&
|
||||
wsrep_gtid_mode && !thd->variables.gtid_seq_no)
|
||||
{
|
||||
domain_id= wsrep_gtid_domain_id;
|
||||
} else {
|
||||
#endif /* WITH_WSREP */
|
||||
domain_id= thd->variables.gtid_domain_id;
|
||||
#ifdef WITH_WSREP
|
||||
}
|
||||
#endif /* WITH_WSREP */
|
||||
local_server_id= thd->variables.server_id;
|
||||
seq_no= thd->variables.gtid_seq_no;
|
||||
domain_id= thd->variables.gtid_domain_id;
|
||||
local_server_id= thd->variables.server_id;
|
||||
|
||||
DBUG_ASSERT(local_server_id != 0);
|
||||
|
||||
@@ -6084,8 +6040,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
|
||||
DBUG_ASSERT(this == &mysql_bin_log);
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
if (wsrep_gtid_mode && is_gtid_cached(thd))
|
||||
DBUG_RETURN(false);
|
||||
if (wsrep_gtid_mode)
|
||||
{
|
||||
thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id;
|
||||
thd->variables.server_id= global_system_variables.server_id;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (write_event(>id_event))
|
||||
|
Reference in New Issue
Block a user