mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
Backporting WL#4398 WL#1720
Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673 Backporting BUG#45819 BUG#45973 BUG#39012
This commit is contained in:
493
sql/rpl_handler.cc
Normal file
493
sql/rpl_handler.cc
Normal file
@ -0,0 +1,493 @@
|
||||
/* Copyright (C) 2008 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include "mysql_priv.h"
|
||||
|
||||
#include "rpl_mi.h"
|
||||
#include "sql_repl.h"
|
||||
#include "log_event.h"
|
||||
#include "rpl_filter.h"
|
||||
#include <my_dir.h>
|
||||
#include "rpl_handler.h"
|
||||
|
||||
Trans_delegate *transaction_delegate;
|
||||
Binlog_storage_delegate *binlog_storage_delegate;
|
||||
#ifdef HAVE_REPLICATION
|
||||
Binlog_transmit_delegate *binlog_transmit_delegate;
|
||||
Binlog_relay_IO_delegate *binlog_relay_io_delegate;
|
||||
#endif /* HAVE_REPLICATION */
|
||||
|
||||
/*
|
||||
structure to save transaction log filename and position
|
||||
*/
|
||||
typedef struct Trans_binlog_info {
|
||||
my_off_t log_pos;
|
||||
char log_file[FN_REFLEN];
|
||||
} Trans_binlog_info;
|
||||
|
||||
static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
||||
|
||||
int get_user_var_int(const char *name,
|
||||
long long int *value, int *null_value)
|
||||
{
|
||||
my_bool null_val;
|
||||
user_var_entry *entry=
|
||||
(user_var_entry*) hash_search(¤t_thd->user_vars,
|
||||
(uchar*) name, strlen(name));
|
||||
if (!entry)
|
||||
return 1;
|
||||
*value= entry->val_int(&null_val);
|
||||
if (null_value)
|
||||
*null_value= null_val;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int get_user_var_real(const char *name,
|
||||
double *value, int *null_value)
|
||||
{
|
||||
my_bool null_val;
|
||||
user_var_entry *entry=
|
||||
(user_var_entry*) hash_search(¤t_thd->user_vars,
|
||||
(uchar*) name, strlen(name));
|
||||
if (!entry)
|
||||
return 1;
|
||||
*value= entry->val_real(&null_val);
|
||||
if (null_value)
|
||||
*null_value= null_val;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int get_user_var_str(const char *name, char *value,
|
||||
size_t len, unsigned int precision, int *null_value)
|
||||
{
|
||||
String str;
|
||||
my_bool null_val;
|
||||
user_var_entry *entry=
|
||||
(user_var_entry*) hash_search(¤t_thd->user_vars,
|
||||
(uchar*) name, strlen(name));
|
||||
if (!entry)
|
||||
return 1;
|
||||
entry->val_str(&null_val, &str, precision);
|
||||
strncpy(value, str.c_ptr(), len);
|
||||
if (null_value)
|
||||
*null_value= null_val;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int delegates_init()
|
||||
{
|
||||
static unsigned char trans_mem[sizeof(Trans_delegate)];
|
||||
static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
|
||||
#ifdef HAVE_REPLICATION
|
||||
static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
|
||||
static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
|
||||
#endif
|
||||
|
||||
if (!(transaction_delegate= new (trans_mem) Trans_delegate)
|
||||
|| (!transaction_delegate->is_inited())
|
||||
|| !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
|
||||
|| (!binlog_storage_delegate->is_inited())
|
||||
#ifdef HAVE_REPLICATION
|
||||
|| !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
|
||||
|| (!binlog_transmit_delegate->is_inited())
|
||||
|| !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
|
||||
|| (!binlog_relay_io_delegate->is_inited())
|
||||
#endif /* HAVE_REPLICATION */
|
||||
)
|
||||
return 1;
|
||||
|
||||
if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void delegates_destroy()
|
||||
{
|
||||
if (transaction_delegate)
|
||||
transaction_delegate->~Trans_delegate();
|
||||
if (binlog_storage_delegate)
|
||||
binlog_storage_delegate->~Binlog_storage_delegate();
|
||||
#ifdef HAVE_REPLICATION
|
||||
if (binlog_transmit_delegate)
|
||||
binlog_transmit_delegate->~Binlog_transmit_delegate();
|
||||
if (binlog_relay_io_delegate)
|
||||
binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
|
||||
#endif /* HAVE_REPLICATION */
|
||||
}
|
||||
|
||||
/*
|
||||
This macro is used by almost all the Delegate methods to iterate
|
||||
over all the observers running given callback function of the
|
||||
delegate .
|
||||
|
||||
Add observer plugins to the thd->lex list, after each statement, all
|
||||
plugins add to thd->lex will be automatically unlocked.
|
||||
*/
|
||||
#define FOREACH_OBSERVER(r, f, thd, args) \
|
||||
param.server_id= thd->server_id; \
|
||||
read_lock(); \
|
||||
Observer_info_iterator iter= observer_info_iter(); \
|
||||
Observer_info *info= iter++; \
|
||||
for (; info; info= iter++) \
|
||||
{ \
|
||||
plugin_ref plugin= \
|
||||
my_plugin_lock(thd, &info->plugin); \
|
||||
if (!plugin) \
|
||||
{ \
|
||||
r= 1; \
|
||||
break; \
|
||||
} \
|
||||
if (((Observer *)info->observer)->f \
|
||||
&& ((Observer *)info->observer)->f args) \
|
||||
{ \
|
||||
r= 1; \
|
||||
plugin_unlock(thd, plugin); \
|
||||
break; \
|
||||
} \
|
||||
plugin_unlock(thd, plugin); \
|
||||
} \
|
||||
unlock()
|
||||
|
||||
|
||||
int Trans_delegate::after_commit(THD *thd, bool all)
|
||||
{
|
||||
Trans_param param;
|
||||
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
|
||||
if (is_real_trans)
|
||||
param.flags |= TRANS_IS_REAL_TRANS;
|
||||
|
||||
Trans_binlog_info *log_info=
|
||||
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
||||
|
||||
param.log_file= log_info ? log_info->log_file : 0;
|
||||
param.log_pos= log_info ? log_info->log_pos : 0;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
|
||||
|
||||
/*
|
||||
This is the end of a real transaction or autocommit statement, we
|
||||
can free the memory allocated for binlog file and position.
|
||||
*/
|
||||
if (is_real_trans && log_info)
|
||||
{
|
||||
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
|
||||
my_free(log_info, MYF(0));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Trans_delegate::after_rollback(THD *thd, bool all)
|
||||
{
|
||||
Trans_param param;
|
||||
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
|
||||
if (is_real_trans)
|
||||
param.flags |= TRANS_IS_REAL_TRANS;
|
||||
|
||||
Trans_binlog_info *log_info=
|
||||
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
||||
|
||||
param.log_file= log_info ? log_info->log_file : 0;
|
||||
param.log_pos= log_info ? log_info->log_pos : 0;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
|
||||
|
||||
/*
|
||||
This is the end of a real transaction or autocommit statement, we
|
||||
can free the memory allocated for binlog file and position.
|
||||
*/
|
||||
if (is_real_trans && log_info)
|
||||
{
|
||||
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
|
||||
my_free(log_info, MYF(0));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_storage_delegate::after_flush(THD *thd,
|
||||
const char *log_file,
|
||||
my_off_t log_pos,
|
||||
bool synced)
|
||||
{
|
||||
Binlog_storage_param param;
|
||||
uint32 flags=0;
|
||||
if (synced)
|
||||
flags |= BINLOG_STORAGE_IS_SYNCED;
|
||||
|
||||
Trans_binlog_info *log_info=
|
||||
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
||||
|
||||
if (!log_info)
|
||||
{
|
||||
if(!(log_info=
|
||||
(Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
|
||||
return 1;
|
||||
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
|
||||
}
|
||||
|
||||
strcpy(log_info->log_file, log_file+dirname_length(log_file));
|
||||
log_info->log_pos = log_pos;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_flush, thd,
|
||||
(¶m, log_info->log_file, log_info->log_pos, flags));
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef HAVE_REPLICATION
|
||||
int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
|
||||
const char *log_file,
|
||||
my_off_t log_pos)
|
||||
{
|
||||
Binlog_transmit_param param;
|
||||
param.flags= flags;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
|
||||
{
|
||||
Binlog_transmit_param param;
|
||||
param.flags= flags;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
|
||||
String *packet)
|
||||
{
|
||||
/* NOTE2ME: Maximum extra header size for each observer, I hope 32
|
||||
bytes should be enough for each Observer to reserve their extra
|
||||
header. If later found this is not enough, we can increase this
|
||||
/HEZX
|
||||
*/
|
||||
#define RESERVE_HEADER_SIZE 32
|
||||
unsigned char header[RESERVE_HEADER_SIZE];
|
||||
ulong hlen;
|
||||
Binlog_transmit_param param;
|
||||
param.flags= flags;
|
||||
param.server_id= thd->server_id;
|
||||
|
||||
int ret= 0;
|
||||
read_lock();
|
||||
Observer_info_iterator iter= observer_info_iter();
|
||||
Observer_info *info= iter++;
|
||||
for (; info; info= iter++)
|
||||
{
|
||||
plugin_ref plugin=
|
||||
my_plugin_lock(thd, &info->plugin);
|
||||
if (!plugin)
|
||||
{
|
||||
ret= 1;
|
||||
break;
|
||||
}
|
||||
hlen= 0;
|
||||
if (((Observer *)info->observer)->reserve_header
|
||||
&& ((Observer *)info->observer)->reserve_header(¶m,
|
||||
header,
|
||||
RESERVE_HEADER_SIZE,
|
||||
&hlen))
|
||||
{
|
||||
ret= 1;
|
||||
plugin_unlock(thd, plugin);
|
||||
break;
|
||||
}
|
||||
plugin_unlock(thd, plugin);
|
||||
if (hlen == 0)
|
||||
continue;
|
||||
if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
|
||||
{
|
||||
ret= 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
unlock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
|
||||
String *packet,
|
||||
const char *log_file,
|
||||
my_off_t log_pos)
|
||||
{
|
||||
Binlog_transmit_param param;
|
||||
param.flags= flags;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, before_send_event, thd,
|
||||
(¶m, (uchar *)packet->c_ptr(),
|
||||
packet->length(),
|
||||
log_file+dirname_length(log_file), log_pos));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
|
||||
String *packet)
|
||||
{
|
||||
Binlog_transmit_param param;
|
||||
param.flags= flags;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_send_event, thd,
|
||||
(¶m, packet->c_ptr(), packet->length()));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
|
||||
|
||||
{
|
||||
Binlog_transmit_param param;
|
||||
param.flags= flags;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
|
||||
Master_info *mi)
|
||||
{
|
||||
param->mysql= mi->mysql;
|
||||
param->user= mi->user;
|
||||
param->host= mi->host;
|
||||
param->port= mi->port;
|
||||
param->master_log_name= mi->master_log_name;
|
||||
param->master_log_pos= mi->master_log_pos;
|
||||
}
|
||||
|
||||
int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
|
||||
{
|
||||
Binlog_relay_IO_param param;
|
||||
init_param(¶m, mi);
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
|
||||
{
|
||||
|
||||
Binlog_relay_IO_param param;
|
||||
init_param(¶m, mi);
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
|
||||
Master_info *mi,
|
||||
ushort flags)
|
||||
{
|
||||
Binlog_relay_IO_param param;
|
||||
init_param(¶m, mi);
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
|
||||
const char *packet, ulong len,
|
||||
const char **event_buf,
|
||||
ulong *event_len)
|
||||
{
|
||||
Binlog_relay_IO_param param;
|
||||
init_param(¶m, mi);
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_read_event, thd,
|
||||
(¶m, packet, len, event_buf, event_len));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
|
||||
const char *event_buf,
|
||||
ulong event_len,
|
||||
bool synced)
|
||||
{
|
||||
Binlog_relay_IO_param param;
|
||||
init_param(¶m, mi);
|
||||
|
||||
uint32 flags=0;
|
||||
if (synced)
|
||||
flags |= BINLOG_STORAGE_IS_SYNCED;
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_queue_event, thd,
|
||||
(¶m, event_buf, event_len, flags));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
|
||||
|
||||
{
|
||||
Binlog_relay_IO_param param;
|
||||
init_param(¶m, mi);
|
||||
|
||||
int ret= 0;
|
||||
FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
|
||||
return ret;
|
||||
}
|
||||
#endif /* HAVE_REPLICATION */
|
||||
|
||||
int register_trans_observer(Trans_observer *observer, void *p)
|
||||
{
|
||||
return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
|
||||
int unregister_trans_observer(Trans_observer *observer, void *p)
|
||||
{
|
||||
return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
|
||||
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
|
||||
{
|
||||
return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
|
||||
int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
|
||||
{
|
||||
return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
|
||||
#ifdef HAVE_REPLICATION
|
||||
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
|
||||
{
|
||||
return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
|
||||
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
|
||||
{
|
||||
return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
|
||||
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
|
||||
{
|
||||
return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
|
||||
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
|
||||
{
|
||||
return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
|
||||
}
|
||||
#endif /* HAVE_REPLICATION */
|
Reference in New Issue
Block a user