1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-01 03:47:19 +03:00

WL#1012: All changes as one single changeset.

This includes both code and test cases.
This commit is contained in:
lars@mysql.com
2005-12-22 06:39:02 +01:00
parent 0f8f444b8b
commit ad126d90e0
447 changed files with 34742 additions and 3216 deletions

View File

@ -27,6 +27,8 @@
#endif
#include "mysql_priv.h"
#include <my_bitmap.h>
#include "log_event.h"
#include <m_ctype.h>
#include <sys/stat.h>
#include <thr_alarm.h>
@ -174,7 +176,7 @@ Open_tables_state::Open_tables_state(ulong version_arg)
THD::THD()
:Statement(CONVENTIONAL_EXECUTION, 0, ALLOC_ROOT_MIN_BLOCK_SIZE, 0),
Open_tables_state(refresh_version),
Open_tables_state(refresh_version), rli_fake(0),
lock_id(&main_lock_id),
user_time(0), in_sub_stmt(0), global_read_lock(0), is_fatal_error(0),
rand_used(0), time_zone_used(0),
@ -227,6 +229,9 @@ THD::THD()
ull=0;
system_thread= cleanup_done= abort_on_warning= no_warnings_for_error= 0;
peer_port= 0; // For SHOW PROCESSLIST
#ifdef HAVE_ROW_BASED_REPLICATION
transaction.m_pending_rows_event= 0;
#endif
#ifdef __WIN__
real_id = 0;
#endif
@ -440,6 +445,11 @@ THD::~THD()
#ifndef DBUG_OFF
dbug_sentry= THD_SENTRY_GONE;
#endif
#ifndef EMBEDDED_LIBRARY
if (rli_fake)
delete rli_fake;
#endif
DBUG_VOID_RETURN;
}
@ -1959,7 +1969,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
backup->client_capabilities= client_capabilities;
backup->savepoints= transaction.savepoints;
if (!lex->requires_prelocking() || is_update_query(lex->sql_command))
if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
!binlog_row_based)
options&= ~OPTION_BIN_LOG;
/* Disable result sets */
client_capabilities &= ~CLIENT_MULTI_RESULTS;
@ -2101,3 +2112,439 @@ void xid_cache_delete(XID_STATE *xid_state)
pthread_mutex_unlock(&LOCK_xid_cache);
}
/*
Implementation of interface to write rows to the binary log through the
thread. The thread is responsible for writing the rows it has
inserted/updated/deleted.
*/
#ifndef MYSQL_CLIENT
#ifdef HAVE_ROW_BASED_REPLICATION
/*
Template member function for ensuring that there is an rows log
event of the apropriate type before proceeding.
PRE CONDITION:
- Events of type 'RowEventT' have the type code 'type_code'.
POST CONDITION:
If a non-NULL pointer is returned, the pending event for thread 'thd' will
be an event of type 'RowEventT' (which have the type code 'type_code')
will either empty or have enough space to hold 'needed' bytes. In
addition, the columns bitmap will be correct for the row, meaning that
the pending event will be flushed if the columns in the event differ from
the columns suppled to the function.
RETURNS
If no error, a non-NULL pending event (either one which already existed or
the newly created one).
If error, NULL.
*/
template <class RowsEventT> Rows_log_event*
THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
MY_BITMAP const* cols,
my_size_t colcnt,
my_size_t needed,
bool is_transactional)
{
/* Pre-conditions */
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
/* Fetch the type code for the RowsEventT template parameter */
int const type_code= RowsEventT::TYPE_CODE;
/*
There is no good place to set up the transactional data, so we
have to do it here.
*/
if (binlog_setup_trx_data())
return NULL;
Rows_log_event* pending= binlog_get_pending_rows_event();
if (unlikely(pending && !pending->is_valid()))
return NULL;
/*
Check if the current event is non-NULL and a write-rows
event. Also check if the table provided is mapped: if it is not,
then we have switched to writing to a new table.
If there is no pending event, we need to create one. If there is a pending
event, but it's not about the same table id, or not of the same type
(between Write, Update and Delete), or not the same affected columns, or
going to be too big, flush this event to disk and create a new pending
event.
*/
if (!pending ||
pending->server_id != serv_id ||
pending->get_table_id() != table->s->table_map_id ||
pending->get_type_code() != type_code ||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
pending->get_width() != colcnt ||
!bitmap_cmp(pending->get_cols(), cols))
{
/* Create a new RowsEventT... */
Rows_log_event* const
ev= new RowsEventT(this, table, table->s->table_map_id, cols,
is_transactional);
if (unlikely(!ev))
return NULL;
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
/*
flush the pending event and replace it with the newly created
event...
*/
if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
{
delete ev;
return NULL;
}
return ev; /* This is the new pending event */
}
return pending; /* This is the current pending event */
}
/*
Instansiate the versions we need, we have -fno-implicit-template as
compiling option.
*/
template Rows_log_event*
THD::binlog_prepare_pending_rows_event<Write_rows_log_event>
(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool);
template Rows_log_event*
THD::binlog_prepare_pending_rows_event<Delete_rows_log_event>
(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool);
template Rows_log_event*
THD::binlog_prepare_pending_rows_event<Update_rows_log_event>
(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool);
static char const*
field_type_name(enum_field_types type)
{
switch (type)
{
case MYSQL_TYPE_DECIMAL:
return "MYSQL_TYPE_DECIMAL";
case MYSQL_TYPE_TINY:
return "MYSQL_TYPE_TINY";
case MYSQL_TYPE_SHORT:
return "MYSQL_TYPE_SHORT";
case MYSQL_TYPE_LONG:
return "MYSQL_TYPE_LONG";
case MYSQL_TYPE_FLOAT:
return "MYSQL_TYPE_FLOAT";
case MYSQL_TYPE_DOUBLE:
return "MYSQL_TYPE_DOUBLE";
case MYSQL_TYPE_NULL:
return "MYSQL_TYPE_NULL";
case MYSQL_TYPE_TIMESTAMP:
return "MYSQL_TYPE_TIMESTAMP";
case MYSQL_TYPE_LONGLONG:
return "MYSQL_TYPE_LONGLONG";
case MYSQL_TYPE_INT24:
return "MYSQL_TYPE_INT24";
case MYSQL_TYPE_DATE:
return "MYSQL_TYPE_DATE";
case MYSQL_TYPE_TIME:
return "MYSQL_TYPE_TIME";
case MYSQL_TYPE_DATETIME:
return "MYSQL_TYPE_DATETIME";
case MYSQL_TYPE_YEAR:
return "MYSQL_TYPE_YEAR";
case MYSQL_TYPE_NEWDATE:
return "MYSQL_TYPE_NEWDATE";
case MYSQL_TYPE_VARCHAR:
return "MYSQL_TYPE_VARCHAR";
case MYSQL_TYPE_BIT:
return "MYSQL_TYPE_BIT";
case MYSQL_TYPE_NEWDECIMAL:
return "MYSQL_TYPE_NEWDECIMAL";
case MYSQL_TYPE_ENUM:
return "MYSQL_TYPE_ENUM";
case MYSQL_TYPE_SET:
return "MYSQL_TYPE_SET";
case MYSQL_TYPE_TINY_BLOB:
return "MYSQL_TYPE_TINY_BLOB";
case MYSQL_TYPE_MEDIUM_BLOB:
return "MYSQL_TYPE_MEDIUM_BLOB";
case MYSQL_TYPE_LONG_BLOB:
return "MYSQL_TYPE_LONG_BLOB";
case MYSQL_TYPE_BLOB:
return "MYSQL_TYPE_BLOB";
case MYSQL_TYPE_VAR_STRING:
return "MYSQL_TYPE_VAR_STRING";
case MYSQL_TYPE_STRING:
return "MYSQL_TYPE_STRING";
case MYSQL_TYPE_GEOMETRY:
return "MYSQL_TYPE_GEOMETRY";
}
return "Unknown";
}
my_size_t THD::max_row_length_blob(TABLE *table, const byte *data) const
{
my_size_t length= 0;
TABLE_SHARE *table_s= table->s;
uint* const beg= table_s->blob_field;
uint* const end= beg + table_s->blob_fields;
for (uint *ptr= beg ; ptr != end ; ++ptr)
{
Field_blob* const blob= (Field_blob*) table->field[*ptr];
length+= blob->get_length(data + blob->offset()) + 2;
}
return length;
}
my_size_t THD::pack_row(TABLE *table, MY_BITMAP const* cols, byte *row_data,
const byte *record) const
{
Field **p_field= table->field, *field= *p_field;
int n_null_bytes= table->s->null_bytes;
my_ptrdiff_t const offset= record - (byte*) table->record[0];
memcpy(row_data, record, n_null_bytes);
byte *ptr= row_data+n_null_bytes;
for (int i= 0 ; field ; i++, p_field++, field= *p_field)
{
if (bitmap_is_set(cols,i))
ptr= field->pack(ptr, field->ptr + offset);
}
/*
my_ptrdiff_t is signed, size_t is unsigned. Assert that the
conversion will work correctly.
*/
DBUG_ASSERT(ptr - row_data >= 0);
return (static_cast<size_t>(ptr - row_data));
}
int THD::binlog_write_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, my_size_t colcnt,
byte const *record)
{
DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
/*
Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter.
*/
bool error= 0;
byte *row_data= table->write_row_record;
my_size_t const max_len= max_row_length(table, record);
/*
* Allocate room for a row (if needed)
*/
if (!row_data)
{
if (!table->s->blob_fields)
{
/* multiply max_len by 2 so it can be used for update_row as well */
table->write_row_record= alloc_root(&table->mem_root, 2*max_len);
if (!table->write_row_record)
return HA_ERR_OUT_OF_MEM;
row_data= table->write_row_record;
}
else if (unlikely(!(row_data= my_malloc(max_len, MYF(MY_WME)))))
return HA_ERR_OUT_OF_MEM;
}
my_size_t const len= pack_row(table, cols, row_data, record);
Rows_log_event* const
ev= binlog_prepare_pending_rows_event<Write_rows_log_event>
(table, server_id, cols, colcnt, len, is_trans);
/* add_row_data copies row_data to internal buffer */
error= likely(ev != 0) ? ev->add_row_data(row_data,len) : HA_ERR_OUT_OF_MEM ;
if (table->write_row_record == 0)
my_free(row_data, MYF(MY_WME));
return error;
}
int THD::binlog_update_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, my_size_t colcnt,
const byte *before_record,
const byte *after_record)
{
DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
bool error= 0;
my_size_t const before_maxlen = max_row_length(table, before_record);
my_size_t const after_maxlen = max_row_length(table, after_record);
byte *row_data= table->write_row_record;
byte *before_row, *after_row;
if (row_data != 0)
{
before_row= row_data;
after_row= before_row + before_maxlen;
}
else
{
if (unlikely(!(row_data= my_multi_malloc(MYF(MY_WME),
&before_row, before_maxlen,
&after_row, after_maxlen,
NULL))))
return HA_ERR_OUT_OF_MEM;
}
my_size_t const before_size= pack_row(table, cols, before_row,
before_record);
my_size_t const after_size= pack_row(table, cols, after_row,
after_record);
Rows_log_event* const
ev= binlog_prepare_pending_rows_event<Update_rows_log_event>
(table, server_id, cols, colcnt, before_size + after_size, is_trans);
error= (unlikely(!ev)) || ev->add_row_data(before_row, before_size) ||
ev->add_row_data(after_row, after_size);
if (!table->write_row_record)
{
/* add_row_data copies row_data to internal buffer */
my_free(row_data, MYF(MY_WME));
}
return error;
}
int THD::binlog_delete_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, my_size_t colcnt,
byte const *record)
{
DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
/*
Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter.
*/
bool error= 0;
my_size_t const max_len= max_row_length(table, record);
byte *row_data= table->write_row_record;
if (!row_data && unlikely(!(row_data= my_malloc(max_len, MYF(MY_WME)))))
return HA_ERR_OUT_OF_MEM;
my_size_t const len= pack_row(table, cols, row_data, record);
Rows_log_event* const
ev= binlog_prepare_pending_rows_event<Delete_rows_log_event>
(table, server_id, cols, colcnt, len, is_trans);
error= (unlikely(!ev)) || ev->add_row_data(row_data, len);
/* add_row_data copies row_data */
if (table->write_row_record == 0)
my_free(row_data, MYF(MY_WME));
return error;
}
int THD::binlog_flush_pending_rows_event(bool stmt_end)
{
DBUG_ENTER("THD::binlog_flush_pending_rows_event");
if (!binlog_row_based || !mysql_bin_log.is_open())
DBUG_RETURN(0);
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
if (Rows_log_event *pending= binlog_get_pending_rows_event())
{
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
}
/*
We only bother to set the pending event if it is non-NULL. This
is essential for correctness, since there is not necessarily a
trx_data created for the thread if the pending event is NULL.
*/
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
}
DBUG_RETURN(error);
}
void THD::binlog_delete_pending_rows_event()
{
if (Rows_log_event *pending= binlog_get_pending_rows_event())
{
delete pending;
binlog_set_pending_rows_event(0);
}
}
#endif /* HAVE_ROW_BASED_REPLICATION */
/*
Member function that will log query, either row-based or
statement-based depending on the value of the 'binlog_row_based'
variable and the value of the 'qtype' flag.
This function should be called after the all calls to ha_*_row()
functions have been issued, but before tables are unlocked and
closed.
RETURN VALUE
Error code, or 0 if no error.
*/
int THD::binlog_query(THD::enum_binlog_query_type qtype,
char const *query, ulong query_len,
bool is_trans, bool suppress_use)
{
DBUG_ENTER("THD::binlog_query");
DBUG_ASSERT(query && mysql_bin_log.is_open());
int error= binlog_flush_pending_rows_event(true);
switch (qtype)
{
case THD::MYSQL_QUERY_TYPE:
/*
Using this query type is a conveniece hack, since we have been
moving back and forth between using RBR for replication of
system tables and not using it.
Make sure to change in check_table_binlog_row_based() according
to how you treat this.
*/
case THD::ROW_QUERY_TYPE:
if (binlog_row_based)
DBUG_RETURN(binlog_flush_pending_rows_event(true));
/* Otherwise, we fall through */
case THD::STMT_QUERY_TYPE:
/*
Most callers of binlog_query() ignore the error code, assuming
that the statement will always be written to the binlog. In
case of error above, we therefore just continue and write the
statement to the binary log.
*/
{
Query_log_event qinfo(this, query, query_len, is_trans, suppress_use);
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
break;
case THD::QUERY_TYPE_COUNT:
default:
DBUG_ASSERT(0 <= qtype && qtype < QUERY_TYPE_COUNT);
}
DBUG_RETURN(0);
}
#endif /* !defined(MYSQL_CLIENT) */