mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-03 14:33:32 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			519 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			519 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2008 MySQL AB
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or modify
 | 
						|
   it under the terms of the GNU General Public License as published by
 | 
						|
   the Free Software Foundation; version 2 of the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 | 
						|
 | 
						|
#include "mysql_priv.h"
 | 
						|
 | 
						|
#include "rpl_mi.h"
 | 
						|
#include "sql_repl.h"
 | 
						|
#include "log_event.h"
 | 
						|
#include "rpl_filter.h"
 | 
						|
#include <my_dir.h>
 | 
						|
#include "rpl_handler.h"
 | 
						|
 | 
						|
Trans_delegate *transaction_delegate;
 | 
						|
Binlog_storage_delegate *binlog_storage_delegate;
 | 
						|
#ifdef HAVE_REPLICATION
 | 
						|
Binlog_transmit_delegate *binlog_transmit_delegate;
 | 
						|
Binlog_relay_IO_delegate *binlog_relay_io_delegate;
 | 
						|
#endif /* HAVE_REPLICATION */
 | 
						|
 | 
						|
/*
 | 
						|
  structure to save transaction log filename and position
 | 
						|
*/
 | 
						|
typedef struct Trans_binlog_info {
 | 
						|
  my_off_t log_pos;
 | 
						|
  char log_file[FN_REFLEN];
 | 
						|
} Trans_binlog_info;
 | 
						|
 | 
						|
static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | 
						|
 | 
						|
int get_user_var_int(const char *name,
 | 
						|
                     long long int *value, int *null_value)
 | 
						|
{
 | 
						|
  my_bool null_val;
 | 
						|
  user_var_entry *entry= 
 | 
						|
    (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 | 
						|
                                  (uchar*) name, strlen(name));
 | 
						|
  if (!entry)
 | 
						|
    return 1;
 | 
						|
  *value= entry->val_int(&null_val);
 | 
						|
  if (null_value)
 | 
						|
    *null_value= null_val;
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int get_user_var_real(const char *name,
 | 
						|
                      double *value, int *null_value)
 | 
						|
{
 | 
						|
  my_bool null_val;
 | 
						|
  user_var_entry *entry= 
 | 
						|
    (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 | 
						|
                                  (uchar*) name, strlen(name));
 | 
						|
  if (!entry)
 | 
						|
    return 1;
 | 
						|
  *value= entry->val_real(&null_val);
 | 
						|
  if (null_value)
 | 
						|
    *null_value= null_val;
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int get_user_var_str(const char *name, char *value,
 | 
						|
                     size_t len, unsigned int precision, int *null_value)
 | 
						|
{
 | 
						|
  String str;
 | 
						|
  my_bool null_val;
 | 
						|
  user_var_entry *entry= 
 | 
						|
    (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 | 
						|
                                  (uchar*) name, strlen(name));
 | 
						|
  if (!entry)
 | 
						|
    return 1;
 | 
						|
  entry->val_str(&null_val, &str, precision);
 | 
						|
  strncpy(value, str.c_ptr(), len);
 | 
						|
  if (null_value)
 | 
						|
    *null_value= null_val;
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int delegates_init()
 | 
						|
{
 | 
						|
  static unsigned long trans_mem[sizeof(Trans_delegate) / sizeof(unsigned long) + 1];
 | 
						|
  static unsigned long storage_mem[sizeof(Binlog_storage_delegate) / sizeof(unsigned long) + 1];
 | 
						|
#ifdef HAVE_REPLICATION
 | 
						|
  static unsigned long transmit_mem[sizeof(Binlog_transmit_delegate) / sizeof(unsigned long) + 1];
 | 
						|
  static unsigned long relay_io_mem[sizeof(Binlog_relay_IO_delegate)/ sizeof(unsigned long) + 1];
 | 
						|
#endif
 | 
						|
  
 | 
						|
  if (!(transaction_delegate= new (trans_mem) Trans_delegate)
 | 
						|
      || (!transaction_delegate->is_inited())
 | 
						|
      || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
 | 
						|
      || (!binlog_storage_delegate->is_inited())
 | 
						|
#ifdef HAVE_REPLICATION
 | 
						|
      || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
 | 
						|
      || (!binlog_transmit_delegate->is_inited())
 | 
						|
      || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
 | 
						|
      || (!binlog_relay_io_delegate->is_inited())
 | 
						|
#endif /* HAVE_REPLICATION */
 | 
						|
      )
 | 
						|
    return 1;
 | 
						|
 | 
						|
  if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
 | 
						|
    return 1;
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
void delegates_destroy()
 | 
						|
{
 | 
						|
  if (transaction_delegate)
 | 
						|
    transaction_delegate->~Trans_delegate();
 | 
						|
  if (binlog_storage_delegate)
 | 
						|
    binlog_storage_delegate->~Binlog_storage_delegate();
 | 
						|
#ifdef HAVE_REPLICATION
 | 
						|
  if (binlog_transmit_delegate)
 | 
						|
    binlog_transmit_delegate->~Binlog_transmit_delegate();
 | 
						|
  if (binlog_relay_io_delegate)
 | 
						|
    binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
 | 
						|
#endif /* HAVE_REPLICATION */
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  This macro is used by almost all the Delegate methods to iterate
 | 
						|
  over all the observers running given callback function of the
 | 
						|
  delegate .
 | 
						|
  
 | 
						|
  Add observer plugins to the thd->lex list, after each statement, all
 | 
						|
  plugins add to thd->lex will be automatically unlocked.
 | 
						|
 */
 | 
						|
#define FOREACH_OBSERVER(r, f, thd, args)                               \
 | 
						|
  param.server_id= thd->server_id;                                      \
 | 
						|
  /*
 | 
						|
     Use a struct to make sure that they are allocated adjacent, check
 | 
						|
     delete_dynamic().
 | 
						|
  */                                                                    \
 | 
						|
  struct {                                                              \
 | 
						|
    DYNAMIC_ARRAY plugins;                                              \
 | 
						|
    /* preallocate 8 slots */                                           \
 | 
						|
    plugin_ref plugins_buffer[8];                                       \
 | 
						|
  } s;                                                                  \
 | 
						|
  DYNAMIC_ARRAY *plugins= &s.plugins;                                   \
 | 
						|
  plugin_ref *plugins_buffer= s.plugins_buffer;                         \
 | 
						|
  my_init_dynamic_array2(plugins, sizeof(plugin_ref),                   \
 | 
						|
                         plugins_buffer, 8, 8);                         \
 | 
						|
  read_lock();                                                          \
 | 
						|
  Observer_info_iterator iter= observer_info_iter();                    \
 | 
						|
  Observer_info *info= iter++;                                          \
 | 
						|
  for (; info; info= iter++)                                            \
 | 
						|
  {                                                                     \
 | 
						|
    plugin_ref plugin=                                                  \
 | 
						|
      my_plugin_lock(0, &info->plugin);                                 \
 | 
						|
    if (!plugin)                                                        \
 | 
						|
    {                                                                   \
 | 
						|
      /* plugin is not intialized or deleted, this is not an error */   \
 | 
						|
      r= 0;                                                             \
 | 
						|
      break;                                                            \
 | 
						|
    }                                                                   \
 | 
						|
    insert_dynamic(plugins, (uchar *)&plugin);                          \
 | 
						|
    if (((Observer *)info->observer)->f                                 \
 | 
						|
        && ((Observer *)info->observer)->f args)                        \
 | 
						|
    {                                                                   \
 | 
						|
      r= 1;                                                             \
 | 
						|
      sql_print_error("Run function '" #f "' in plugin '%s' failed",    \
 | 
						|
                      info->plugin_int->name.str);                      \
 | 
						|
      break;                                                            \
 | 
						|
    }                                                                   \
 | 
						|
  }                                                                     \
 | 
						|
  unlock();                                                             \
 | 
						|
  /* 
 | 
						|
     Unlock plugins should be done after we released the Delegate lock
 | 
						|
     to avoid possible deadlock when this is the last user of the
 | 
						|
     plugin, and when we unlock the plugin, it will try to
 | 
						|
     deinitialize the plugin, which will try to lock the Delegate in
 | 
						|
     order to remove the observers.
 | 
						|
  */                                                                    \
 | 
						|
  plugin_unlock_list(0, (plugin_ref*)plugins->buffer,                   \
 | 
						|
                     plugins->elements);                                \
 | 
						|
  delete_dynamic(plugins)
 | 
						|
 | 
						|
 | 
						|
int Trans_delegate::after_commit(THD *thd, bool all)
 | 
						|
{
 | 
						|
  Trans_param param;
 | 
						|
  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
 | 
						|
 | 
						|
  param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
 | 
						|
 | 
						|
  Trans_binlog_info *log_info=
 | 
						|
    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | 
						|
 | 
						|
  param.log_file= log_info ? log_info->log_file : 0;
 | 
						|
  param.log_pos= log_info ? log_info->log_pos : 0;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
 | 
						|
 | 
						|
  /*
 | 
						|
    This is the end of a real transaction or autocommit statement, we
 | 
						|
    can free the memory allocated for binlog file and position.
 | 
						|
  */
 | 
						|
  if (is_real_trans && log_info)
 | 
						|
  {
 | 
						|
    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
 | 
						|
    my_free(log_info, MYF(0));
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Trans_delegate::after_rollback(THD *thd, bool all)
 | 
						|
{
 | 
						|
  Trans_param param;
 | 
						|
  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
 | 
						|
 | 
						|
  param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
 | 
						|
 | 
						|
  Trans_binlog_info *log_info=
 | 
						|
    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | 
						|
    
 | 
						|
  param.log_file= log_info ? log_info->log_file : 0;
 | 
						|
  param.log_pos= log_info ? log_info->log_pos : 0;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_rollback, thd, (¶m));
 | 
						|
 | 
						|
  /*
 | 
						|
    This is the end of a real transaction or autocommit statement, we
 | 
						|
    can free the memory allocated for binlog file and position.
 | 
						|
  */
 | 
						|
  if (is_real_trans && log_info)
 | 
						|
  {
 | 
						|
    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
 | 
						|
    my_free(log_info, MYF(0));
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_storage_delegate::after_flush(THD *thd,
 | 
						|
                                         const char *log_file,
 | 
						|
                                         my_off_t log_pos,
 | 
						|
                                         bool synced)
 | 
						|
{
 | 
						|
  Binlog_storage_param param;
 | 
						|
  uint32 flags=0;
 | 
						|
  if (synced)
 | 
						|
    flags |= BINLOG_STORAGE_IS_SYNCED;
 | 
						|
 | 
						|
  Trans_binlog_info *log_info=
 | 
						|
    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | 
						|
    
 | 
						|
  if (!log_info)
 | 
						|
  {
 | 
						|
    if(!(log_info=
 | 
						|
         (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
 | 
						|
      return 1;
 | 
						|
    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
 | 
						|
  }
 | 
						|
    
 | 
						|
  strcpy(log_info->log_file, log_file+dirname_length(log_file));
 | 
						|
  log_info->log_pos = log_pos;
 | 
						|
  
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_flush, thd,
 | 
						|
                   (¶m, log_info->log_file, log_info->log_pos, flags));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
#ifdef HAVE_REPLICATION
 | 
						|
int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
 | 
						|
                                             const char *log_file,
 | 
						|
                                             my_off_t log_pos)
 | 
						|
{
 | 
						|
  Binlog_transmit_param param;
 | 
						|
  param.flags= flags;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
 | 
						|
{
 | 
						|
  Binlog_transmit_param param;
 | 
						|
  param.flags= flags;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
 | 
						|
                                             String *packet)
 | 
						|
{
 | 
						|
  /* NOTE2ME: Maximum extra header size for each observer, I hope 32
 | 
						|
     bytes should be enough for each Observer to reserve their extra
 | 
						|
     header. If later found this is not enough, we can increase this
 | 
						|
     /HEZX
 | 
						|
  */
 | 
						|
#define RESERVE_HEADER_SIZE 32
 | 
						|
  unsigned char header[RESERVE_HEADER_SIZE];
 | 
						|
  ulong hlen;
 | 
						|
  Binlog_transmit_param param;
 | 
						|
  param.flags= flags;
 | 
						|
  param.server_id= thd->server_id;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  read_lock();
 | 
						|
  Observer_info_iterator iter= observer_info_iter();
 | 
						|
  Observer_info *info= iter++;
 | 
						|
  for (; info; info= iter++)
 | 
						|
  {
 | 
						|
    plugin_ref plugin=
 | 
						|
      my_plugin_lock(thd, &info->plugin);
 | 
						|
    if (!plugin)
 | 
						|
    {
 | 
						|
      ret= 1;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    hlen= 0;
 | 
						|
    if (((Observer *)info->observer)->reserve_header
 | 
						|
        && ((Observer *)info->observer)->reserve_header(¶m,
 | 
						|
                                                        header,
 | 
						|
                                                        RESERVE_HEADER_SIZE,
 | 
						|
                                                        &hlen))
 | 
						|
    {
 | 
						|
      ret= 1;
 | 
						|
      plugin_unlock(thd, plugin);
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    plugin_unlock(thd, plugin);
 | 
						|
    if (hlen == 0)
 | 
						|
      continue;
 | 
						|
    if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
 | 
						|
    {
 | 
						|
      ret= 1;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  unlock();
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
 | 
						|
                                                String *packet,
 | 
						|
                                                const char *log_file,
 | 
						|
                                                my_off_t log_pos)
 | 
						|
{
 | 
						|
  Binlog_transmit_param param;
 | 
						|
  param.flags= flags;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, before_send_event, thd,
 | 
						|
                   (¶m, (uchar *)packet->c_ptr(),
 | 
						|
                    packet->length(),
 | 
						|
                    log_file+dirname_length(log_file), log_pos));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
 | 
						|
                                               String *packet)
 | 
						|
{
 | 
						|
  Binlog_transmit_param param;
 | 
						|
  param.flags= flags;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_send_event, thd,
 | 
						|
                   (¶m, packet->c_ptr(), packet->length()));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
 | 
						|
 | 
						|
{
 | 
						|
  Binlog_transmit_param param;
 | 
						|
  param.flags= flags;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
 | 
						|
                                          Master_info *mi)
 | 
						|
{
 | 
						|
  param->mysql= mi->mysql;
 | 
						|
  param->user= mi->user;
 | 
						|
  param->host= mi->host;
 | 
						|
  param->port= mi->port;
 | 
						|
  param->master_log_name= mi->master_log_name;
 | 
						|
  param->master_log_pos= mi->master_log_pos;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
 | 
						|
{
 | 
						|
  Binlog_relay_IO_param param;
 | 
						|
  init_param(¶m, mi);
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
 | 
						|
{
 | 
						|
 | 
						|
  Binlog_relay_IO_param param;
 | 
						|
  init_param(¶m, mi);
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
 | 
						|
                                                      Master_info *mi,
 | 
						|
                                                      ushort flags)
 | 
						|
{
 | 
						|
  Binlog_relay_IO_param param;
 | 
						|
  init_param(¶m, mi);
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
 | 
						|
                                               const char *packet, ulong len,
 | 
						|
                                               const char **event_buf,
 | 
						|
                                               ulong *event_len)
 | 
						|
{
 | 
						|
  Binlog_relay_IO_param param;
 | 
						|
  init_param(¶m, mi);
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_read_event, thd,
 | 
						|
                   (¶m, packet, len, event_buf, event_len));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
 | 
						|
                                                const char *event_buf,
 | 
						|
                                                ulong event_len,
 | 
						|
                                                bool synced)
 | 
						|
{
 | 
						|
  Binlog_relay_IO_param param;
 | 
						|
  init_param(¶m, mi);
 | 
						|
 | 
						|
  uint32 flags=0;
 | 
						|
  if (synced)
 | 
						|
    flags |= BINLOG_STORAGE_IS_SYNCED;
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_queue_event, thd,
 | 
						|
                   (¶m, event_buf, event_len, flags));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
 | 
						|
 | 
						|
{
 | 
						|
  Binlog_relay_IO_param param;
 | 
						|
  init_param(¶m, mi);
 | 
						|
 | 
						|
  int ret= 0;
 | 
						|
  FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
#endif /* HAVE_REPLICATION */
 | 
						|
 | 
						|
int register_trans_observer(Trans_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
 | 
						|
int unregister_trans_observer(Trans_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
 | 
						|
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
 | 
						|
int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
 | 
						|
#ifdef HAVE_REPLICATION
 | 
						|
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
 | 
						|
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
 | 
						|
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
 | 
						|
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
 | 
						|
{
 | 
						|
  return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
 | 
						|
}
 | 
						|
#endif /* HAVE_REPLICATION */
 |