mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-25 18:38:00 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			519 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			519 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2008 MySQL AB
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 | |
| 
 | |
| #include "mysql_priv.h"
 | |
| 
 | |
| #include "rpl_mi.h"
 | |
| #include "sql_repl.h"
 | |
| #include "log_event.h"
 | |
| #include "rpl_filter.h"
 | |
| #include <my_dir.h>
 | |
| #include "rpl_handler.h"
 | |
| 
 | |
| Trans_delegate *transaction_delegate;
 | |
| Binlog_storage_delegate *binlog_storage_delegate;
 | |
| #ifdef HAVE_REPLICATION
 | |
| Binlog_transmit_delegate *binlog_transmit_delegate;
 | |
| Binlog_relay_IO_delegate *binlog_relay_io_delegate;
 | |
| #endif /* HAVE_REPLICATION */
 | |
| 
 | |
| /*
 | |
|   structure to save transaction log filename and position
 | |
| */
 | |
| typedef struct Trans_binlog_info {
 | |
|   my_off_t log_pos;
 | |
|   char log_file[FN_REFLEN];
 | |
| } Trans_binlog_info;
 | |
| 
 | |
| static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | |
| 
 | |
| int get_user_var_int(const char *name,
 | |
|                      long long int *value, int *null_value)
 | |
| {
 | |
|   my_bool null_val;
 | |
|   user_var_entry *entry= 
 | |
|     (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 | |
|                                   (uchar*) name, strlen(name));
 | |
|   if (!entry)
 | |
|     return 1;
 | |
|   *value= entry->val_int(&null_val);
 | |
|   if (null_value)
 | |
|     *null_value= null_val;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int get_user_var_real(const char *name,
 | |
|                       double *value, int *null_value)
 | |
| {
 | |
|   my_bool null_val;
 | |
|   user_var_entry *entry= 
 | |
|     (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 | |
|                                   (uchar*) name, strlen(name));
 | |
|   if (!entry)
 | |
|     return 1;
 | |
|   *value= entry->val_real(&null_val);
 | |
|   if (null_value)
 | |
|     *null_value= null_val;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int get_user_var_str(const char *name, char *value,
 | |
|                      size_t len, unsigned int precision, int *null_value)
 | |
| {
 | |
|   String str;
 | |
|   my_bool null_val;
 | |
|   user_var_entry *entry= 
 | |
|     (user_var_entry*) my_hash_search(¤t_thd->user_vars,
 | |
|                                   (uchar*) name, strlen(name));
 | |
|   if (!entry)
 | |
|     return 1;
 | |
|   entry->val_str(&null_val, &str, precision);
 | |
|   strncpy(value, str.c_ptr(), len);
 | |
|   if (null_value)
 | |
|     *null_value= null_val;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int delegates_init()
 | |
| {
 | |
|   static unsigned long trans_mem[sizeof(Trans_delegate) / sizeof(unsigned long) + 1];
 | |
|   static unsigned long storage_mem[sizeof(Binlog_storage_delegate) / sizeof(unsigned long) + 1];
 | |
| #ifdef HAVE_REPLICATION
 | |
|   static unsigned long transmit_mem[sizeof(Binlog_transmit_delegate) / sizeof(unsigned long) + 1];
 | |
|   static unsigned long relay_io_mem[sizeof(Binlog_relay_IO_delegate)/ sizeof(unsigned long) + 1];
 | |
| #endif
 | |
|   
 | |
|   if (!(transaction_delegate= new (trans_mem) Trans_delegate)
 | |
|       || (!transaction_delegate->is_inited())
 | |
|       || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
 | |
|       || (!binlog_storage_delegate->is_inited())
 | |
| #ifdef HAVE_REPLICATION
 | |
|       || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
 | |
|       || (!binlog_transmit_delegate->is_inited())
 | |
|       || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
 | |
|       || (!binlog_relay_io_delegate->is_inited())
 | |
| #endif /* HAVE_REPLICATION */
 | |
|       )
 | |
|     return 1;
 | |
| 
 | |
|   if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
 | |
|     return 1;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| void delegates_destroy()
 | |
| {
 | |
|   if (transaction_delegate)
 | |
|     transaction_delegate->~Trans_delegate();
 | |
|   if (binlog_storage_delegate)
 | |
|     binlog_storage_delegate->~Binlog_storage_delegate();
 | |
| #ifdef HAVE_REPLICATION
 | |
|   if (binlog_transmit_delegate)
 | |
|     binlog_transmit_delegate->~Binlog_transmit_delegate();
 | |
|   if (binlog_relay_io_delegate)
 | |
|     binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
 | |
| #endif /* HAVE_REPLICATION */
 | |
| }
 | |
| 
 | |
| /*
 | |
|   This macro is used by almost all the Delegate methods to iterate
 | |
|   over all the observers running given callback function of the
 | |
|   delegate .
 | |
|   
 | |
|   Add observer plugins to the thd->lex list, after each statement, all
 | |
|   plugins add to thd->lex will be automatically unlocked.
 | |
|  */
 | |
| #define FOREACH_OBSERVER(r, f, thd, args)                               \
 | |
|   param.server_id= thd->server_id;                                      \
 | |
|   /*
 | |
|      Use a struct to make sure that they are allocated adjacent, check
 | |
|      delete_dynamic().
 | |
|   */                                                                    \
 | |
|   struct {                                                              \
 | |
|     DYNAMIC_ARRAY plugins;                                              \
 | |
|     /* preallocate 8 slots */                                           \
 | |
|     plugin_ref plugins_buffer[8];                                       \
 | |
|   } s;                                                                  \
 | |
|   DYNAMIC_ARRAY *plugins= &s.plugins;                                   \
 | |
|   plugin_ref *plugins_buffer= s.plugins_buffer;                         \
 | |
|   my_init_dynamic_array2(plugins, sizeof(plugin_ref),                   \
 | |
|                          plugins_buffer, 8, 8);                         \
 | |
|   read_lock();                                                          \
 | |
|   Observer_info_iterator iter= observer_info_iter();                    \
 | |
|   Observer_info *info= iter++;                                          \
 | |
|   for (; info; info= iter++)                                            \
 | |
|   {                                                                     \
 | |
|     plugin_ref plugin=                                                  \
 | |
|       my_plugin_lock(0, &info->plugin);                                 \
 | |
|     if (!plugin)                                                        \
 | |
|     {                                                                   \
 | |
|       /* plugin is not intialized or deleted, this is not an error */   \
 | |
|       r= 0;                                                             \
 | |
|       break;                                                            \
 | |
|     }                                                                   \
 | |
|     insert_dynamic(plugins, (uchar *)&plugin);                          \
 | |
|     if (((Observer *)info->observer)->f                                 \
 | |
|         && ((Observer *)info->observer)->f args)                        \
 | |
|     {                                                                   \
 | |
|       r= 1;                                                             \
 | |
|       sql_print_error("Run function '" #f "' in plugin '%s' failed",    \
 | |
|                       info->plugin_int->name.str);                      \
 | |
|       break;                                                            \
 | |
|     }                                                                   \
 | |
|   }                                                                     \
 | |
|   unlock();                                                             \
 | |
|   /* 
 | |
|      Unlock plugins should be done after we released the Delegate lock
 | |
|      to avoid possible deadlock when this is the last user of the
 | |
|      plugin, and when we unlock the plugin, it will try to
 | |
|      deinitialize the plugin, which will try to lock the Delegate in
 | |
|      order to remove the observers.
 | |
|   */                                                                    \
 | |
|   plugin_unlock_list(0, (plugin_ref*)plugins->buffer,                   \
 | |
|                      plugins->elements);                                \
 | |
|   delete_dynamic(plugins)
 | |
| 
 | |
| 
 | |
| int Trans_delegate::after_commit(THD *thd, bool all)
 | |
| {
 | |
|   Trans_param param;
 | |
|   bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
 | |
|   if (is_real_trans)
 | |
|     param.flags |= TRANS_IS_REAL_TRANS;
 | |
| 
 | |
|   Trans_binlog_info *log_info=
 | |
|     my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | |
| 
 | |
|   param.log_file= log_info ? log_info->log_file : 0;
 | |
|   param.log_pos= log_info ? log_info->log_pos : 0;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
 | |
| 
 | |
|   /*
 | |
|     This is the end of a real transaction or autocommit statement, we
 | |
|     can free the memory allocated for binlog file and position.
 | |
|   */
 | |
|   if (is_real_trans && log_info)
 | |
|   {
 | |
|     my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
 | |
|     my_free(log_info, MYF(0));
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Trans_delegate::after_rollback(THD *thd, bool all)
 | |
| {
 | |
|   Trans_param param;
 | |
|   bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
 | |
|   if (is_real_trans)
 | |
|     param.flags |= TRANS_IS_REAL_TRANS;
 | |
| 
 | |
|   Trans_binlog_info *log_info=
 | |
|     my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | |
|     
 | |
|   param.log_file= log_info ? log_info->log_file : 0;
 | |
|   param.log_pos= log_info ? log_info->log_pos : 0;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
 | |
| 
 | |
|   /*
 | |
|     This is the end of a real transaction or autocommit statement, we
 | |
|     can free the memory allocated for binlog file and position.
 | |
|   */
 | |
|   if (is_real_trans && log_info)
 | |
|   {
 | |
|     my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
 | |
|     my_free(log_info, MYF(0));
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_storage_delegate::after_flush(THD *thd,
 | |
|                                          const char *log_file,
 | |
|                                          my_off_t log_pos,
 | |
|                                          bool synced)
 | |
| {
 | |
|   Binlog_storage_param param;
 | |
|   uint32 flags=0;
 | |
|   if (synced)
 | |
|     flags |= BINLOG_STORAGE_IS_SYNCED;
 | |
| 
 | |
|   Trans_binlog_info *log_info=
 | |
|     my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
 | |
|     
 | |
|   if (!log_info)
 | |
|   {
 | |
|     if(!(log_info=
 | |
|          (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
 | |
|       return 1;
 | |
|     my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
 | |
|   }
 | |
|     
 | |
|   strcpy(log_info->log_file, log_file+dirname_length(log_file));
 | |
|   log_info->log_pos = log_pos;
 | |
|   
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_flush, thd,
 | |
|                    (¶m, log_info->log_file, log_info->log_pos, flags));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| #ifdef HAVE_REPLICATION
 | |
| int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
 | |
|                                              const char *log_file,
 | |
|                                              my_off_t log_pos)
 | |
| {
 | |
|   Binlog_transmit_param param;
 | |
|   param.flags= flags;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
 | |
| {
 | |
|   Binlog_transmit_param param;
 | |
|   param.flags= flags;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
 | |
|                                              String *packet)
 | |
| {
 | |
|   /* NOTE2ME: Maximum extra header size for each observer, I hope 32
 | |
|      bytes should be enough for each Observer to reserve their extra
 | |
|      header. If later found this is not enough, we can increase this
 | |
|      /HEZX
 | |
|   */
 | |
| #define RESERVE_HEADER_SIZE 32
 | |
|   unsigned char header[RESERVE_HEADER_SIZE];
 | |
|   ulong hlen;
 | |
|   Binlog_transmit_param param;
 | |
|   param.flags= flags;
 | |
|   param.server_id= thd->server_id;
 | |
| 
 | |
|   int ret= 0;
 | |
|   read_lock();
 | |
|   Observer_info_iterator iter= observer_info_iter();
 | |
|   Observer_info *info= iter++;
 | |
|   for (; info; info= iter++)
 | |
|   {
 | |
|     plugin_ref plugin=
 | |
|       my_plugin_lock(thd, &info->plugin);
 | |
|     if (!plugin)
 | |
|     {
 | |
|       ret= 1;
 | |
|       break;
 | |
|     }
 | |
|     hlen= 0;
 | |
|     if (((Observer *)info->observer)->reserve_header
 | |
|         && ((Observer *)info->observer)->reserve_header(¶m,
 | |
|                                                         header,
 | |
|                                                         RESERVE_HEADER_SIZE,
 | |
|                                                         &hlen))
 | |
|     {
 | |
|       ret= 1;
 | |
|       plugin_unlock(thd, plugin);
 | |
|       break;
 | |
|     }
 | |
|     plugin_unlock(thd, plugin);
 | |
|     if (hlen == 0)
 | |
|       continue;
 | |
|     if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
 | |
|     {
 | |
|       ret= 1;
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
|   unlock();
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
 | |
|                                                 String *packet,
 | |
|                                                 const char *log_file,
 | |
|                                                 my_off_t log_pos)
 | |
| {
 | |
|   Binlog_transmit_param param;
 | |
|   param.flags= flags;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, before_send_event, thd,
 | |
|                    (¶m, (uchar *)packet->c_ptr(),
 | |
|                     packet->length(),
 | |
|                     log_file+dirname_length(log_file), log_pos));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
 | |
|                                                String *packet)
 | |
| {
 | |
|   Binlog_transmit_param param;
 | |
|   param.flags= flags;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_send_event, thd,
 | |
|                    (¶m, packet->c_ptr(), packet->length()));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
 | |
| 
 | |
| {
 | |
|   Binlog_transmit_param param;
 | |
|   param.flags= flags;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
 | |
|                                           Master_info *mi)
 | |
| {
 | |
|   param->mysql= mi->mysql;
 | |
|   param->user= mi->user;
 | |
|   param->host= mi->host;
 | |
|   param->port= mi->port;
 | |
|   param->master_log_name= mi->master_log_name;
 | |
|   param->master_log_pos= mi->master_log_pos;
 | |
| }
 | |
| 
 | |
| int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
 | |
| {
 | |
|   Binlog_relay_IO_param param;
 | |
|   init_param(¶m, mi);
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, thread_start, thd, (¶m));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
 | |
| {
 | |
| 
 | |
|   Binlog_relay_IO_param param;
 | |
|   init_param(¶m, mi);
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, thread_stop, thd, (¶m));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
 | |
|                                                       Master_info *mi,
 | |
|                                                       ushort flags)
 | |
| {
 | |
|   Binlog_relay_IO_param param;
 | |
|   init_param(¶m, mi);
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
 | |
|                                                const char *packet, ulong len,
 | |
|                                                const char **event_buf,
 | |
|                                                ulong *event_len)
 | |
| {
 | |
|   Binlog_relay_IO_param param;
 | |
|   init_param(¶m, mi);
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_read_event, thd,
 | |
|                    (¶m, packet, len, event_buf, event_len));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
 | |
|                                                 const char *event_buf,
 | |
|                                                 ulong event_len,
 | |
|                                                 bool synced)
 | |
| {
 | |
|   Binlog_relay_IO_param param;
 | |
|   init_param(¶m, mi);
 | |
| 
 | |
|   uint32 flags=0;
 | |
|   if (synced)
 | |
|     flags |= BINLOG_STORAGE_IS_SYNCED;
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_queue_event, thd,
 | |
|                    (¶m, event_buf, event_len, flags));
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
 | |
| 
 | |
| {
 | |
|   Binlog_relay_IO_param param;
 | |
|   init_param(¶m, mi);
 | |
| 
 | |
|   int ret= 0;
 | |
|   FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m));
 | |
|   return ret;
 | |
| }
 | |
| #endif /* HAVE_REPLICATION */
 | |
| 
 | |
| int register_trans_observer(Trans_observer *observer, void *p)
 | |
| {
 | |
|   return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| 
 | |
| int unregister_trans_observer(Trans_observer *observer, void *p)
 | |
| {
 | |
|   return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| 
 | |
| int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
 | |
| {
 | |
|   return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| 
 | |
| int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
 | |
| {
 | |
|   return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| 
 | |
| #ifdef HAVE_REPLICATION
 | |
| int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
 | |
| {
 | |
|   return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| 
 | |
| int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
 | |
| {
 | |
|   return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| 
 | |
| int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
 | |
| {
 | |
|   return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| 
 | |
| int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
 | |
| {
 | |
|   return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
 | |
| }
 | |
| #endif /* HAVE_REPLICATION */
 |