mirror of
https://github.com/MariaDB/server.git
synced 2025-07-30 16:24:05 +03:00
Merge romeo.(none):/home/bkroot/mysql-5.1-new-rpl
into romeo.(none):/home/bk/b20265-mysql-5.1-new-rpl
This commit is contained in:
@ -359,15 +359,6 @@ show binlog events from 102;
|
|||||||
Log_name Pos Event_type Server_id End_log_pos Info
|
Log_name Pos Event_type Server_id End_log_pos Info
|
||||||
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
||||||
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
||||||
master-bin.000001 # Query 1 # use `test`; BEGIN
|
|
||||||
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
|
|
||||||
`a` int(11) NOT NULL DEFAULT '0',
|
|
||||||
`b` int(11) DEFAULT NULL,
|
|
||||||
PRIMARY KEY (`a`)
|
|
||||||
) ENGINE=InnoDB
|
|
||||||
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
|
|
||||||
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
|
||||||
master-bin.000001 # Xid 1 # COMMIT /* xid= */
|
|
||||||
master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2
|
master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2
|
||||||
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
||||||
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
||||||
@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2
|
|||||||
master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb
|
master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb
|
||||||
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
||||||
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
||||||
master-bin.000001 # Query 1 # use `test`; BEGIN
|
|
||||||
master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` (
|
|
||||||
`a` int(11) NOT NULL DEFAULT '0',
|
|
||||||
`b` int(11) DEFAULT NULL,
|
|
||||||
PRIMARY KEY (`a`)
|
|
||||||
) ENGINE=InnoDB
|
|
||||||
master-bin.000001 # Table_map 1 # table_id: # (test.t2)
|
|
||||||
master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F
|
|
||||||
master-bin.000001 # Xid 1 # COMMIT /* xid= */
|
|
||||||
master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2
|
master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2
|
||||||
master-bin.000001 # Xid 1 # COMMIT /* xid= */
|
master-bin.000001 # Xid 1 # COMMIT /* xid= */
|
||||||
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
master-bin.000001 # Table_map 1 # table_id: # (test.t1)
|
||||||
|
538
sql/log.cc
538
sql/log.cc
@ -32,11 +32,22 @@
|
|||||||
|
|
||||||
#include <mysql/plugin.h>
|
#include <mysql/plugin.h>
|
||||||
|
|
||||||
|
/*
|
||||||
|
Define placement versions of operator new and operator delete since
|
||||||
|
we cannot be sure that the <new> include exists.
|
||||||
|
*/
|
||||||
|
inline void *operator new(size_t, void *ptr) { return ptr; }
|
||||||
|
inline void *operator new[](size_t, void *ptr) { return ptr; }
|
||||||
|
inline void operator delete(void*, void*) { /* Do nothing */ }
|
||||||
|
inline void operator delete[](void*, void*) { /* Do nothing */ }
|
||||||
|
|
||||||
/* max size of the log message */
|
/* max size of the log message */
|
||||||
#define MAX_LOG_BUFFER_SIZE 1024
|
#define MAX_LOG_BUFFER_SIZE 1024
|
||||||
#define MAX_USER_HOST_SIZE 512
|
#define MAX_USER_HOST_SIZE 512
|
||||||
#define MAX_TIME_SIZE 32
|
#define MAX_TIME_SIZE 32
|
||||||
|
|
||||||
|
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
|
||||||
|
|
||||||
LOGGER logger;
|
LOGGER logger;
|
||||||
|
|
||||||
MYSQL_BIN_LOG mysql_bin_log;
|
MYSQL_BIN_LOG mysql_bin_log;
|
||||||
@ -70,23 +81,96 @@ char *make_default_log_name(char *buff,const char* log_ext)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
This is a POD. Please keep it that way!
|
Helper class to store binary log transaction data.
|
||||||
|
|
||||||
Don't add constructors, destructors, or virtual functions.
|
|
||||||
*/
|
*/
|
||||||
struct binlog_trx_data {
|
class binlog_trx_data {
|
||||||
|
public:
|
||||||
|
enum {
|
||||||
|
UNDEF_POS = ~ (my_off_t) 0
|
||||||
|
};
|
||||||
|
|
||||||
|
binlog_trx_data()
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
: m_pending(0), before_stmt_pos(UNDEF_POS)
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
trans_log.end_of_file= max_binlog_cache_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
~binlog_trx_data()
|
||||||
|
{
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
DBUG_ASSERT(pending() == NULL);
|
||||||
|
#endif
|
||||||
|
close_cached_file(&trans_log);
|
||||||
|
}
|
||||||
|
|
||||||
|
my_off_t position() const {
|
||||||
|
return my_b_tell(&trans_log);
|
||||||
|
}
|
||||||
|
|
||||||
bool empty() const
|
bool empty() const
|
||||||
{
|
{
|
||||||
#ifdef HAVE_ROW_BASED_REPLICATION
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
return pending == NULL && my_b_tell(&trans_log) == 0;
|
return pending() == NULL && my_b_tell(&trans_log) == 0;
|
||||||
#else
|
#else
|
||||||
return my_b_tell(&trans_log) == 0;
|
return my_b_tell(&trans_log) == 0;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
binlog_trx_data() {}
|
|
||||||
IO_CACHE trans_log; // The transaction cache
|
/*
|
||||||
|
Truncate the transaction cache to a certain position. This
|
||||||
|
includes deleting the pending event.
|
||||||
|
*/
|
||||||
|
void truncate(my_off_t pos)
|
||||||
|
{
|
||||||
#ifdef HAVE_ROW_BASED_REPLICATION
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
Rows_log_event *pending; // The pending binrows event
|
delete pending();
|
||||||
|
set_pending(0);
|
||||||
|
#endif
|
||||||
|
reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Reset the entire contents of the transaction cache, emptying it
|
||||||
|
completely.
|
||||||
|
*/
|
||||||
|
void reset() {
|
||||||
|
if (!empty())
|
||||||
|
truncate(0);
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
before_stmt_pos= UNDEF_POS;
|
||||||
|
#endif
|
||||||
|
trans_log.end_of_file= max_binlog_cache_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
Rows_log_event *pending() const
|
||||||
|
{
|
||||||
|
return m_pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
void set_pending(Rows_log_event *const pending)
|
||||||
|
{
|
||||||
|
m_pending= pending;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
IO_CACHE trans_log; // The transaction cache
|
||||||
|
|
||||||
|
private:
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
/*
|
||||||
|
Pending binrows event. This event is the event where the rows are
|
||||||
|
currently written.
|
||||||
|
*/
|
||||||
|
Rows_log_event *m_pending;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/*
|
||||||
|
Binlog position before the start of the current statement.
|
||||||
|
*/
|
||||||
|
my_off_t before_stmt_pos;
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1149,6 +1233,69 @@ void Log_to_csv_event_handler::
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Save position of binary log transaction cache.
|
||||||
|
|
||||||
|
SYNPOSIS
|
||||||
|
binlog_trans_log_savepos()
|
||||||
|
|
||||||
|
thd The thread to take the binlog data from
|
||||||
|
pos Pointer to variable where the position will be stored
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
|
||||||
|
Save the current position in the binary log transaction cache into
|
||||||
|
the variable pointed to by 'pos'
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void
|
||||||
|
binlog_trans_log_savepos(THD *thd, my_off_t *pos)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("binlog_trans_log_savepos");
|
||||||
|
DBUG_ASSERT(pos != NULL);
|
||||||
|
if (thd->ha_data[binlog_hton.slot] == NULL)
|
||||||
|
thd->binlog_setup_trx_data();
|
||||||
|
binlog_trx_data *const trx_data=
|
||||||
|
(binlog_trx_data*) thd->ha_data[binlog_hton.slot];
|
||||||
|
DBUG_ASSERT(mysql_bin_log.is_open());
|
||||||
|
*pos= trx_data->position();
|
||||||
|
DBUG_PRINT("return", ("*pos=%u", *pos));
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Truncate the binary log transaction cache.
|
||||||
|
|
||||||
|
SYNPOSIS
|
||||||
|
binlog_trans_log_truncate()
|
||||||
|
|
||||||
|
thd The thread to take the binlog data from
|
||||||
|
pos Position to truncate to
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
|
||||||
|
Truncate the binary log to the given position. Will not change
|
||||||
|
anything else.
|
||||||
|
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
binlog_trans_log_truncate(THD *thd, my_off_t pos)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("binlog_trans_log_truncate");
|
||||||
|
DBUG_PRINT("enter", ("pos=%u", pos));
|
||||||
|
|
||||||
|
DBUG_ASSERT(thd->ha_data[binlog_hton.slot] != NULL);
|
||||||
|
/* Only true if binlog_trans_log_savepos() wasn't called before */
|
||||||
|
DBUG_ASSERT(pos != ~(my_off_t) 0);
|
||||||
|
|
||||||
|
binlog_trx_data *const trx_data=
|
||||||
|
(binlog_trx_data*) thd->ha_data[binlog_hton.slot];
|
||||||
|
trx_data->truncate(pos);
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
this function is mostly a placeholder.
|
this function is mostly a placeholder.
|
||||||
conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open)
|
conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open)
|
||||||
@ -1175,27 +1322,62 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
|
|||||||
{
|
{
|
||||||
binlog_trx_data *const trx_data=
|
binlog_trx_data *const trx_data=
|
||||||
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
||||||
IO_CACHE *trans_log= &trx_data->trans_log;
|
|
||||||
DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty());
|
DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty());
|
||||||
close_cached_file(trans_log);
|
thd->ha_data[binlog_hton.slot]= 0;
|
||||||
thd->ha_data[binlog_hton->slot]= 0;
|
trx_data->~binlog_trx_data();
|
||||||
my_free((gptr)trx_data, MYF(0));
|
my_free((gptr)trx_data, MYF(0));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
End a transaction.
|
||||||
|
|
||||||
|
SYNOPSIS
|
||||||
|
binlog_end_trans()
|
||||||
|
|
||||||
|
thd The thread whose transaction should be ended
|
||||||
|
trx_data Pointer to the transaction data to use
|
||||||
|
end_ev The end event to use, or NULL
|
||||||
|
all True if the entire transaction should be ended, false if
|
||||||
|
only the statement transaction should be ended.
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
|
||||||
|
End the currently open transaction. The transaction can be either
|
||||||
|
a real transaction (if 'all' is true) or a statement transaction
|
||||||
|
(if 'all' is false).
|
||||||
|
|
||||||
|
If 'end_ev' is NULL, the transaction is a rollback of only
|
||||||
|
transactional tables, so the transaction cache will be truncated
|
||||||
|
to either just before the last opened statement transaction (if
|
||||||
|
'all' is false), or reset completely (if 'all' is true).
|
||||||
|
*/
|
||||||
static int
|
static int
|
||||||
binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
|
binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
|
||||||
Log_event *end_ev)
|
Log_event *end_ev, bool all)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("binlog_end_trans");
|
DBUG_ENTER("binlog_end_trans");
|
||||||
int error=0;
|
int error=0;
|
||||||
IO_CACHE *trans_log= &trx_data->trans_log;
|
IO_CACHE *trans_log= &trx_data->trans_log;
|
||||||
|
DBUG_PRINT("enter", ("transaction: %s, end_ev=%p",
|
||||||
|
all ? "all" : "stmt", end_ev));
|
||||||
|
DBUG_PRINT("info", ("thd->options={ %s%s}",
|
||||||
|
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
|
||||||
|
FLAGSTR(thd->options, OPTION_BEGIN)));
|
||||||
|
|
||||||
|
/*
|
||||||
/* NULL denotes ROLLBACK with nothing to replicate */
|
NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of
|
||||||
|
only transactional tables. If the transaction contain changes to
|
||||||
|
any non-transactiona tables, we need write the transaction and log
|
||||||
|
a ROLLBACK last.
|
||||||
|
*/
|
||||||
if (end_ev != NULL)
|
if (end_ev != NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
Doing a commit or a rollback including non-transactional tables,
|
||||||
|
i.e., ending a transaction where we might write the transaction
|
||||||
|
cache to the binary log.
|
||||||
|
|
||||||
We can always end the statement when ending a transaction since
|
We can always end the statement when ending a transaction since
|
||||||
transactions are not allowed inside stored functions. If they
|
transactions are not allowed inside stored functions. If they
|
||||||
were, we would have to ensure that we're not ending a statement
|
were, we would have to ensure that we're not ending a statement
|
||||||
@ -1204,38 +1386,55 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
|
|||||||
#ifdef HAVE_ROW_BASED_REPLICATION
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
thd->binlog_flush_pending_rows_event(TRUE);
|
thd->binlog_flush_pending_rows_event(TRUE);
|
||||||
#endif
|
#endif
|
||||||
error= mysql_bin_log.write(thd, trans_log, end_ev);
|
/*
|
||||||
|
We write the transaction cache to the binary log if either we're
|
||||||
|
committing the entire transaction, or if we are doing an
|
||||||
|
autocommit outside a transaction.
|
||||||
|
*/
|
||||||
|
if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
|
||||||
|
{
|
||||||
|
error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev);
|
||||||
|
trx_data->reset();
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
/*
|
||||||
|
We need to step the table map version after writing the
|
||||||
|
transaction cache to disk.
|
||||||
|
*/
|
||||||
|
mysql_bin_log.update_table_map_version();
|
||||||
|
#endif
|
||||||
|
statistic_increment(binlog_cache_use, &LOCK_status);
|
||||||
|
if (trans_log->disk_writes != 0)
|
||||||
|
{
|
||||||
|
statistic_increment(binlog_cache_disk_use, &LOCK_status);
|
||||||
|
trans_log->disk_writes= 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#ifdef HAVE_ROW_BASED_REPLICATION
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
#ifdef HAVE_ROW_BASED_REPLICATION
|
/*
|
||||||
thd->binlog_delete_pending_rows_event();
|
If rolling back an entire transaction or a single statement not
|
||||||
#endif
|
inside a transaction, we reset the transaction cache.
|
||||||
|
|
||||||
|
If rolling back a statement in a transaction, we truncate the
|
||||||
|
transaction cache to remove the statement.
|
||||||
|
|
||||||
|
*/
|
||||||
|
if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
|
||||||
|
trx_data->reset();
|
||||||
|
else
|
||||||
|
trx_data->truncate(trx_data->before_stmt_pos); // ...statement
|
||||||
|
|
||||||
|
/*
|
||||||
|
We need to step the table map version on a rollback to ensure
|
||||||
|
that a new table map event is generated instead of the one that
|
||||||
|
was written to the thrown-away transaction cache.
|
||||||
|
*/
|
||||||
|
mysql_bin_log.update_table_map_version();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
We need to step the table map version both after writing the
|
|
||||||
entire transaction to the log file and after rolling back the
|
|
||||||
transaction.
|
|
||||||
|
|
||||||
We need to step the table map version after writing the
|
|
||||||
transaction cache to disk. In addition, we need to step the table
|
|
||||||
map version on a rollback to ensure that a new table map event is
|
|
||||||
generated instead of the one that was written to the thrown-away
|
|
||||||
transaction cache.
|
|
||||||
*/
|
|
||||||
mysql_bin_log.update_table_map_version();
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
statistic_increment(binlog_cache_use, &LOCK_status);
|
|
||||||
if (trans_log->disk_writes != 0)
|
|
||||||
{
|
|
||||||
statistic_increment(binlog_cache_disk_use, &LOCK_status);
|
|
||||||
trans_log->disk_writes= 0;
|
|
||||||
}
|
|
||||||
reinit_io_cache(trans_log, WRITE_CACHE, (my_off_t) 0, 0, 1); // cannot fail
|
|
||||||
trans_log->end_of_file= max_binlog_cache_size;
|
|
||||||
DBUG_RETURN(error);
|
DBUG_RETURN(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1252,26 +1451,31 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
|
|||||||
|
|
||||||
static int binlog_commit(handlerton *hton, THD *thd, bool all)
|
static int binlog_commit(handlerton *hton, THD *thd, bool all)
|
||||||
{
|
{
|
||||||
|
int error= 0;
|
||||||
DBUG_ENTER("binlog_commit");
|
DBUG_ENTER("binlog_commit");
|
||||||
binlog_trx_data *const trx_data=
|
binlog_trx_data *const trx_data=
|
||||||
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
||||||
IO_CACHE *trans_log= &trx_data->trans_log;
|
IO_CACHE *trans_log= &trx_data->trans_log;
|
||||||
DBUG_ASSERT(mysql_bin_log.is_open() &&
|
DBUG_ASSERT(mysql_bin_log.is_open());
|
||||||
(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))));
|
|
||||||
|
|
||||||
if (trx_data->empty())
|
if (all && trx_data->empty())
|
||||||
{
|
{
|
||||||
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log()
|
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log()
|
||||||
|
trx_data->reset();
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
if (all)
|
if (all)
|
||||||
{
|
{
|
||||||
Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE);
|
Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE);
|
||||||
qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE)
|
qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE)
|
||||||
DBUG_RETURN(binlog_end_trans(thd, trx_data, &qev));
|
int error= binlog_end_trans(thd, trx_data, &qev, all);
|
||||||
|
DBUG_RETURN(error);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
DBUG_RETURN(binlog_end_trans(thd, trx_data, &invisible_commit));
|
{
|
||||||
|
int error= binlog_end_trans(thd, trx_data, &invisible_commit, all);
|
||||||
|
DBUG_RETURN(error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
||||||
@ -1281,13 +1485,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
|||||||
binlog_trx_data *const trx_data=
|
binlog_trx_data *const trx_data=
|
||||||
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
||||||
IO_CACHE *trans_log= &trx_data->trans_log;
|
IO_CACHE *trans_log= &trx_data->trans_log;
|
||||||
/*
|
DBUG_ASSERT(mysql_bin_log.is_open());
|
||||||
First assert is guaranteed - see trans_register_ha() call below.
|
|
||||||
The second must be true. If it is not, we're registering
|
if (trx_data->empty()) {
|
||||||
unnecessary, doing extra work. The cause should be found and eliminated
|
trx_data->reset();
|
||||||
*/
|
DBUG_RETURN(0);
|
||||||
DBUG_ASSERT(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
|
}
|
||||||
DBUG_ASSERT(mysql_bin_log.is_open() && !trx_data->empty());
|
|
||||||
/*
|
/*
|
||||||
Update the binary log with a BEGIN/ROLLBACK block if we have
|
Update the binary log with a BEGIN/ROLLBACK block if we have
|
||||||
cached some queries and we updated some non-transactional
|
cached some queries and we updated some non-transactional
|
||||||
@ -1299,10 +1503,10 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
|||||||
{
|
{
|
||||||
Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE);
|
Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE);
|
||||||
qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE)
|
qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE)
|
||||||
error= binlog_end_trans(thd, trx_data, &qev);
|
error= binlog_end_trans(thd, trx_data, &qev, all);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
error= binlog_end_trans(thd, trx_data, 0);
|
error= binlog_end_trans(thd, trx_data, 0, all);
|
||||||
DBUG_RETURN(error);
|
DBUG_RETURN(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1330,11 +1534,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
|||||||
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
|
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("binlog_savepoint_set");
|
DBUG_ENTER("binlog_savepoint_set");
|
||||||
binlog_trx_data *const trx_data=
|
|
||||||
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
|
||||||
DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(&trx_data->trans_log));
|
|
||||||
|
|
||||||
*(my_off_t *)sv= my_b_tell(&trx_data->trans_log);
|
binlog_trans_log_savepos(thd, (my_off_t*) sv);
|
||||||
/* Write it to the binary log */
|
/* Write it to the binary log */
|
||||||
|
|
||||||
int const error=
|
int const error=
|
||||||
@ -1349,7 +1550,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
|
|||||||
binlog_trx_data *const trx_data=
|
binlog_trx_data *const trx_data=
|
||||||
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
||||||
IO_CACHE *trans_log= &trx_data->trans_log;
|
IO_CACHE *trans_log= &trx_data->trans_log;
|
||||||
DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log));
|
DBUG_ASSERT(mysql_bin_log.is_open());
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
|
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
|
||||||
@ -1364,7 +1565,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
|
|||||||
thd->query, thd->query_length, TRUE, FALSE);
|
thd->query, thd->query_length, TRUE, FALSE);
|
||||||
DBUG_RETURN(error);
|
DBUG_RETURN(error);
|
||||||
}
|
}
|
||||||
reinit_io_cache(trans_log, WRITE_CACHE, *(my_off_t *)sv, 0, 0);
|
binlog_trans_log_truncate(thd, *(my_off_t*)sv);
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2494,7 +2695,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
|
|||||||
thread. If the transaction involved MyISAM tables, it should go
|
thread. If the transaction involved MyISAM tables, it should go
|
||||||
into binlog even on rollback.
|
into binlog even on rollback.
|
||||||
*/
|
*/
|
||||||
(void) pthread_mutex_lock(&LOCK_thread_count);
|
VOID(pthread_mutex_lock(&LOCK_thread_count));
|
||||||
|
|
||||||
/* Save variables so that we can reopen the log */
|
/* Save variables so that we can reopen the log */
|
||||||
save_name=name;
|
save_name=name;
|
||||||
@ -2526,7 +2727,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
|
|||||||
my_free((gptr) save_name, MYF(0));
|
my_free((gptr) save_name, MYF(0));
|
||||||
|
|
||||||
err:
|
err:
|
||||||
(void) pthread_mutex_unlock(&LOCK_thread_count);
|
VOID(pthread_mutex_unlock(&LOCK_thread_count));
|
||||||
pthread_mutex_unlock(&LOCK_index);
|
pthread_mutex_unlock(&LOCK_index);
|
||||||
pthread_mutex_unlock(&LOCK_log);
|
pthread_mutex_unlock(&LOCK_log);
|
||||||
DBUG_RETURN(error);
|
DBUG_RETURN(error);
|
||||||
@ -3092,18 +3293,76 @@ int THD::binlog_setup_trx_data()
|
|||||||
ha_data[binlog_hton->slot]= 0;
|
ha_data[binlog_hton->slot]= 0;
|
||||||
DBUG_RETURN(1); // Didn't manage to set it up
|
DBUG_RETURN(1); // Didn't manage to set it up
|
||||||
}
|
}
|
||||||
trx_data->trans_log.end_of_file= max_binlog_cache_size;
|
|
||||||
|
trx_data= new (ha_data[binlog_hton.slot]) binlog_trx_data;
|
||||||
|
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
/*
|
||||||
|
Function to start a statement and optionally a transaction for the
|
||||||
|
binary log.
|
||||||
|
|
||||||
|
SYNOPSIS
|
||||||
|
binlog_start_trans_and_stmt()
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
|
||||||
|
This function does three things:
|
||||||
|
- Start a transaction if not in autocommit mode or if a BEGIN
|
||||||
|
statement has been seen.
|
||||||
|
|
||||||
|
- Start a statement transaction to allow us to truncate the binary
|
||||||
|
log.
|
||||||
|
|
||||||
|
- Save the currrent binlog position so that we can roll back the
|
||||||
|
statement by truncating the transaction log.
|
||||||
|
|
||||||
|
We only update the saved position if the old one was undefined,
|
||||||
|
the reason is that there are some cases (e.g., for CREATE-SELECT)
|
||||||
|
where the position is saved twice (e.g., both in
|
||||||
|
select_create::prepare() and THD::binlog_write_table_map()) , but
|
||||||
|
we should use the first. This means that calls to this function
|
||||||
|
can be used to start the statement before the first table map
|
||||||
|
event, to include some extra events.
|
||||||
|
*/
|
||||||
|
|
||||||
|
void
|
||||||
|
THD::binlog_start_trans_and_stmt()
|
||||||
|
{
|
||||||
|
DBUG_ENTER("binlog_start_trans_and_stmt");
|
||||||
|
binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot];
|
||||||
|
DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data));
|
||||||
|
if (trx_data)
|
||||||
|
DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u",
|
||||||
|
trx_data->before_stmt_pos));
|
||||||
|
if (trx_data == NULL ||
|
||||||
|
trx_data->before_stmt_pos == binlog_trx_data::UNDEF_POS)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
The call to binlog_trans_log_savepos() might create the trx_data
|
||||||
|
structure, if it didn't exist before, so we save the position
|
||||||
|
into an auto variable and then write it into the transaction
|
||||||
|
data for the binary log (i.e., trx_data).
|
||||||
|
*/
|
||||||
|
my_off_t pos= 0;
|
||||||
|
binlog_trans_log_savepos(this, &pos);
|
||||||
|
trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot];
|
||||||
|
|
||||||
|
trx_data->before_stmt_pos= pos;
|
||||||
|
|
||||||
|
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
|
||||||
|
trans_register_ha(this, TRUE, &binlog_hton);
|
||||||
|
trans_register_ha(this, FALSE, &binlog_hton);
|
||||||
|
}
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Write a table map to the binary log.
|
Write a table map to the binary log.
|
||||||
|
|
||||||
This function is called from ha_external_lock() after the storage
|
|
||||||
engine has registered for the transaction.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifdef HAVE_ROW_BASED_REPLICATION
|
|
||||||
int THD::binlog_write_table_map(TABLE *table, bool is_trans)
|
int THD::binlog_write_table_map(TABLE *table, bool is_trans)
|
||||||
{
|
{
|
||||||
int error;
|
int error;
|
||||||
@ -3122,10 +3381,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans)
|
|||||||
Table_map_log_event
|
Table_map_log_event
|
||||||
the_event(this, table, table->s->table_map_id, is_trans, flags);
|
the_event(this, table, table->s->table_map_id, is_trans, flags);
|
||||||
|
|
||||||
if (is_trans)
|
if (is_trans && binlog_table_maps == 0)
|
||||||
trans_register_ha(this,
|
binlog_start_trans_and_stmt();
|
||||||
(options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0,
|
|
||||||
binlog_hton);
|
|
||||||
|
|
||||||
if ((error= mysql_bin_log.write(&the_event)))
|
if ((error= mysql_bin_log.write(&the_event)))
|
||||||
DBUG_RETURN(error);
|
DBUG_RETURN(error);
|
||||||
@ -3146,7 +3403,7 @@ THD::binlog_get_pending_rows_event() const
|
|||||||
(since the trx_data is set up there). In that case, we just return
|
(since the trx_data is set up there). In that case, we just return
|
||||||
NULL.
|
NULL.
|
||||||
*/
|
*/
|
||||||
return trx_data ? trx_data->pending : NULL;
|
return trx_data ? trx_data->pending() : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -3159,7 +3416,7 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev)
|
|||||||
(binlog_trx_data*) ha_data[binlog_hton->slot];
|
(binlog_trx_data*) ha_data[binlog_hton->slot];
|
||||||
|
|
||||||
DBUG_ASSERT(trx_data);
|
DBUG_ASSERT(trx_data);
|
||||||
trx_data->pending= ev;
|
trx_data->set_pending(ev);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -3168,8 +3425,9 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev)
|
|||||||
(either cached binlog if transaction, or disk binlog). Sets a new pending
|
(either cached binlog if transaction, or disk binlog). Sets a new pending
|
||||||
event.
|
event.
|
||||||
*/
|
*/
|
||||||
int MYSQL_BIN_LOG::
|
int
|
||||||
flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event)
|
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
|
||||||
|
Rows_log_event* event)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
|
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
|
||||||
DBUG_ASSERT(mysql_bin_log.is_open());
|
DBUG_ASSERT(mysql_bin_log.is_open());
|
||||||
@ -3182,9 +3440,9 @@ int MYSQL_BIN_LOG::
|
|||||||
|
|
||||||
DBUG_ASSERT(trx_data);
|
DBUG_ASSERT(trx_data);
|
||||||
|
|
||||||
DBUG_PRINT("info", ("trx_data->pending=%p", trx_data->pending));
|
DBUG_PRINT("info", ("trx_data->pending()=%p", trx_data->pending()));
|
||||||
|
|
||||||
if (Rows_log_event* pending= trx_data->pending)
|
if (Rows_log_event* pending= trx_data->pending())
|
||||||
{
|
{
|
||||||
IO_CACHE *file= &log_file;
|
IO_CACHE *file= &log_file;
|
||||||
|
|
||||||
@ -3334,15 +3592,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
|
|||||||
binlog_trx_data *const trx_data=
|
binlog_trx_data *const trx_data=
|
||||||
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
||||||
IO_CACHE *trans_log= &trx_data->trans_log;
|
IO_CACHE *trans_log= &trx_data->trans_log;
|
||||||
bool trans_log_in_use= my_b_tell(trans_log) != 0;
|
my_off_t trans_log_pos= my_b_tell(trans_log);
|
||||||
if (event_info->get_cache_stmt() && !trans_log_in_use)
|
if (event_info->get_cache_stmt() || trans_log_pos != 0)
|
||||||
trans_register_ha(thd,
|
|
||||||
(thd->options &
|
|
||||||
(OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0,
|
|
||||||
binlog_hton);
|
|
||||||
if (event_info->get_cache_stmt() || trans_log_in_use)
|
|
||||||
{
|
{
|
||||||
DBUG_PRINT("info", ("Using trans_log"));
|
DBUG_PRINT("info", ("Using trans_log: cache=%d, trans_log_pos=%u",
|
||||||
|
event_info->get_cache_stmt(),
|
||||||
|
trans_log_pos));
|
||||||
|
if (trans_log_pos == 0)
|
||||||
|
thd->binlog_start_trans_and_stmt();
|
||||||
file= trans_log;
|
file= trans_log;
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
@ -3546,61 +3803,69 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event)
|
|||||||
uint length;
|
uint length;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Log "BEGIN" at the beginning of the transaction.
|
We only bother to write to the binary log if there is anything
|
||||||
which may contain more than 1 SQL statement.
|
to write.
|
||||||
*/
|
*/
|
||||||
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
|
if (my_b_tell(cache) > 0)
|
||||||
{
|
{
|
||||||
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE);
|
|
||||||
/*
|
/*
|
||||||
Imagine this is rollback due to net timeout, after all statements of
|
Log "BEGIN" at the beginning of the transaction.
|
||||||
the transaction succeeded. Then we want a zero-error code in BEGIN.
|
which may contain more than 1 SQL statement.
|
||||||
In other words, if there was a really serious error code it's already
|
|
||||||
in the statement's events, there is no need to put it also in this
|
|
||||||
internally generated event, and as this event is generated late it
|
|
||||||
would lead to false alarms.
|
|
||||||
This is safer than thd->clear_error() against kills at shutdown.
|
|
||||||
*/
|
*/
|
||||||
qinfo.error_code= 0;
|
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
|
||||||
/*
|
{
|
||||||
Now this Query_log_event has artificial log_pos 0. It must be adjusted
|
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE);
|
||||||
to reflect the real position in the log. Not doing it would confuse the
|
/*
|
||||||
slave: it would prevent this one from knowing where he is in the
|
Imagine this is rollback due to net timeout, after all statements of
|
||||||
master's binlog, which would result in wrong positions being shown to
|
the transaction succeeded. Then we want a zero-error code in BEGIN.
|
||||||
the user, MASTER_POS_WAIT undue waiting etc.
|
In other words, if there was a really serious error code it's already
|
||||||
*/
|
in the statement's events, there is no need to put it also in this
|
||||||
if (qinfo.write(&log_file))
|
internally generated event, and as this event is generated late it
|
||||||
goto err;
|
would lead to false alarms.
|
||||||
}
|
This is safer than thd->clear_error() against kills at shutdown.
|
||||||
/* Read from the file used to cache the queries .*/
|
*/
|
||||||
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
|
qinfo.error_code= 0;
|
||||||
goto err;
|
/*
|
||||||
length=my_b_bytes_in_cache(cache);
|
Now this Query_log_event has artificial log_pos 0. It must be adjusted
|
||||||
DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;);
|
to reflect the real position in the log. Not doing it would confuse the
|
||||||
do
|
slave: it would prevent this one from knowing where he is in the
|
||||||
{
|
master's binlog, which would result in wrong positions being shown to
|
||||||
/* Write data to the binary log file */
|
the user, MASTER_POS_WAIT undue waiting etc.
|
||||||
if (my_b_write(&log_file, cache->read_pos, length))
|
*/
|
||||||
goto err;
|
if (qinfo.write(&log_file))
|
||||||
cache->read_pos=cache->read_end; // Mark buffer used up
|
goto err;
|
||||||
DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;);
|
}
|
||||||
} while ((length=my_b_fill(cache)));
|
/* Read from the file used to cache the queries .*/
|
||||||
|
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
|
||||||
|
goto err;
|
||||||
|
length=my_b_bytes_in_cache(cache);
|
||||||
|
DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;);
|
||||||
|
do
|
||||||
|
{
|
||||||
|
/* Write data to the binary log file */
|
||||||
|
if (my_b_write(&log_file, cache->read_pos, length))
|
||||||
|
goto err;
|
||||||
|
cache->read_pos=cache->read_end; // Mark buffer used up
|
||||||
|
DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;);
|
||||||
|
} while ((length=my_b_fill(cache)));
|
||||||
|
|
||||||
if (commit_event->write(&log_file))
|
if (commit_event && commit_event->write(&log_file))
|
||||||
goto err;
|
goto err;
|
||||||
#ifndef DBUG_OFF
|
#ifndef DBUG_OFF
|
||||||
DBUG_skip_commit:
|
DBUG_skip_commit:
|
||||||
#endif
|
#endif
|
||||||
if (flush_and_sync())
|
if (flush_and_sync())
|
||||||
goto err;
|
goto err;
|
||||||
DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
|
DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
|
||||||
if (cache->error) // Error on read
|
if (cache->error) // Error on read
|
||||||
{
|
{
|
||||||
sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
|
sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
|
||||||
write_error=1; // Don't give more errors
|
write_error=1; // Don't give more errors
|
||||||
goto err;
|
goto err;
|
||||||
|
}
|
||||||
|
signal_update();
|
||||||
}
|
}
|
||||||
signal_update();
|
|
||||||
/*
|
/*
|
||||||
if commit_event is Xid_log_event, increase the number of
|
if commit_event is Xid_log_event, increase the number of
|
||||||
prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
|
prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
|
||||||
@ -3609,7 +3874,7 @@ DBUG_skip_commit:
|
|||||||
If the commit_event is not Xid_log_event (then it's a Query_log_event)
|
If the commit_event is not Xid_log_event (then it's a Query_log_event)
|
||||||
rotate binlog, if necessary.
|
rotate binlog, if necessary.
|
||||||
*/
|
*/
|
||||||
if (commit_event->get_type_code() == XID_EVENT)
|
if (commit_event && commit_event->get_type_code() == XID_EVENT)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&LOCK_prep_xids);
|
pthread_mutex_lock(&LOCK_prep_xids);
|
||||||
prepared_xids++;
|
prepared_xids++;
|
||||||
@ -4619,12 +4884,17 @@ int TC_LOG_BINLOG::log(THD *thd, my_xid xid)
|
|||||||
Xid_log_event xle(thd, xid);
|
Xid_log_event xle(thd, xid);
|
||||||
binlog_trx_data *trx_data=
|
binlog_trx_data *trx_data=
|
||||||
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
|
||||||
DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle)); // invert return value
|
/*
|
||||||
|
We always commit the entire transaction when writing an XID. Also
|
||||||
|
note that the return value is inverted.
|
||||||
|
*/
|
||||||
|
DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE));
|
||||||
}
|
}
|
||||||
|
|
||||||
void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
|
void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&LOCK_prep_xids);
|
pthread_mutex_lock(&LOCK_prep_xids);
|
||||||
|
DBUG_ASSERT(prepared_xids > 0);
|
||||||
if (--prepared_xids == 0)
|
if (--prepared_xids == 0)
|
||||||
pthread_cond_signal(&COND_prep_xids);
|
pthread_cond_signal(&COND_prep_xids);
|
||||||
pthread_mutex_unlock(&LOCK_prep_xids);
|
pthread_mutex_unlock(&LOCK_prep_xids);
|
||||||
|
@ -930,6 +930,7 @@ public:
|
|||||||
/*
|
/*
|
||||||
Public interface to write RBR events to the binlog
|
Public interface to write RBR events to the binlog
|
||||||
*/
|
*/
|
||||||
|
void binlog_start_trans_and_stmt();
|
||||||
int binlog_write_table_map(TABLE *table, bool is_transactional);
|
int binlog_write_table_map(TABLE *table, bool is_transactional);
|
||||||
int binlog_write_row(TABLE* table, bool is_transactional,
|
int binlog_write_row(TABLE* table, bool is_transactional,
|
||||||
MY_BITMAP const* cols, my_size_t colcnt,
|
MY_BITMAP const* cols, my_size_t colcnt,
|
||||||
|
@ -2014,6 +2014,10 @@ err:
|
|||||||
rolled back. We only need to roll back a potential statement
|
rolled back. We only need to roll back a potential statement
|
||||||
transaction, since real transactions are rolled back in
|
transaction, since real transactions are rolled back in
|
||||||
close_thread_tables().
|
close_thread_tables().
|
||||||
|
|
||||||
|
TODO: This is not true any more, table maps are generated on the
|
||||||
|
first call to ha_*_row() instead. Remove code that are used to
|
||||||
|
cover for the case outlined above.
|
||||||
*/
|
*/
|
||||||
ha_rollback_stmt(thd);
|
ha_rollback_stmt(thd);
|
||||||
|
|
||||||
@ -2357,6 +2361,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
|
|||||||
DBUG_ENTER("select_insert::prepare");
|
DBUG_ENTER("select_insert::prepare");
|
||||||
|
|
||||||
unit= u;
|
unit= u;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Since table in which we are going to insert is added to the first
|
Since table in which we are going to insert is added to the first
|
||||||
select, LEX::current_select should point to the first select while
|
select, LEX::current_select should point to the first select while
|
||||||
@ -2586,56 +2591,54 @@ void select_insert::send_error(uint errcode,const char *err)
|
|||||||
if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
|
if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
|
||||||
my_message(errcode, err, MYF(0));
|
my_message(errcode, err, MYF(0));
|
||||||
|
|
||||||
if (!table)
|
/*
|
||||||
|
If the creation of the table failed (due to a syntax error, for
|
||||||
|
example), no table will have been opened and therefore 'table'
|
||||||
|
will be NULL. In that case, we still need to execute the rollback
|
||||||
|
and the end of the function to truncate the binary log, but we can
|
||||||
|
skip all the intermediate steps.
|
||||||
|
*/
|
||||||
|
if (table)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
This can only happen when using CREATE ... SELECT and the table was not
|
If we are not in prelocked mode, we end the bulk insert started
|
||||||
created becasue of an syntax error
|
before.
|
||||||
*/
|
*/
|
||||||
DBUG_VOID_RETURN;
|
if (!thd->prelocked_mode)
|
||||||
}
|
table->file->ha_end_bulk_insert();
|
||||||
if (!thd->prelocked_mode)
|
|
||||||
table->file->ha_end_bulk_insert();
|
|
||||||
/*
|
|
||||||
If at least one row has been inserted/modified and will stay in the table
|
|
||||||
(the table doesn't have transactions) we must write to the binlog (and
|
|
||||||
the error code will make the slave stop).
|
|
||||||
|
|
||||||
For many errors (example: we got a duplicate key error while
|
/*
|
||||||
inserting into a MyISAM table), no row will be added to the table,
|
If at least one row has been inserted/modified and will stay in
|
||||||
so passing the error to the slave will not help since there will
|
the table (the table doesn't have transactions) we must write to
|
||||||
be an error code mismatch (the inserts will succeed on the slave
|
the binlog (and the error code will make the slave stop).
|
||||||
with no error).
|
|
||||||
|
|
||||||
If we are using row-based replication we have two cases where this
|
For many errors (example: we got a duplicate key error while
|
||||||
code is executed: replication of CREATE-SELECT and replication of
|
inserting into a MyISAM table), no row will be added to the table,
|
||||||
INSERT-SELECT.
|
so passing the error to the slave will not help since there will
|
||||||
|
be an error code mismatch (the inserts will succeed on the slave
|
||||||
|
with no error).
|
||||||
|
|
||||||
When replicating a CREATE-SELECT statement, we shall not write the
|
If table creation failed, the number of rows modified will also be
|
||||||
events to the binary log and should thus not set
|
zero, so no check for that is made.
|
||||||
OPTION_STATUS_NO_TRANS_UPDATE.
|
*/
|
||||||
|
if (info.copied || info.deleted || info.updated)
|
||||||
When replicating INSERT-SELECT, we shall not write the events to
|
|
||||||
the binary log for transactional table, but shall write all events
|
|
||||||
if there is one or more writes to non-transactional tables. In
|
|
||||||
this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
|
|
||||||
write to a non-transactional table, otherwise it is cleared.
|
|
||||||
*/
|
|
||||||
if (info.copied || info.deleted || info.updated)
|
|
||||||
{
|
|
||||||
if (!table->file->has_transactions())
|
|
||||||
{
|
{
|
||||||
if (mysql_bin_log.is_open())
|
DBUG_ASSERT(table != NULL);
|
||||||
|
if (!table->file->has_transactions())
|
||||||
{
|
{
|
||||||
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
|
if (mysql_bin_log.is_open())
|
||||||
table->file->has_transactions(), FALSE);
|
{
|
||||||
|
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
|
||||||
|
table->file->has_transactions(), FALSE);
|
||||||
|
}
|
||||||
|
if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
|
||||||
|
!can_rollback_data())
|
||||||
|
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
|
||||||
|
query_cache_invalidate3(thd, table, 1);
|
||||||
}
|
}
|
||||||
if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
|
|
||||||
!can_rollback_data())
|
|
||||||
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
|
|
||||||
query_cache_invalidate3(thd, table, 1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ha_rollback_stmt(thd);
|
ha_rollback_stmt(thd);
|
||||||
table->file->ha_release_auto_increment();
|
table->file->ha_release_auto_increment();
|
||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
@ -2645,8 +2648,11 @@ void select_insert::send_error(uint errcode,const char *err)
|
|||||||
bool select_insert::send_eof()
|
bool select_insert::send_eof()
|
||||||
{
|
{
|
||||||
int error,error2;
|
int error,error2;
|
||||||
|
bool const trans_table= table->file->has_transactions();
|
||||||
ulonglong id;
|
ulonglong id;
|
||||||
DBUG_ENTER("select_insert::send_eof");
|
DBUG_ENTER("select_insert::send_eof");
|
||||||
|
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
|
||||||
|
trans_table, table->file->table_type()));
|
||||||
|
|
||||||
error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
|
error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
|
||||||
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
|
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
|
||||||
@ -2666,9 +2672,8 @@ bool select_insert::send_eof()
|
|||||||
are not logged in RBR)
|
are not logged in RBR)
|
||||||
- We are using statement based replication
|
- We are using statement based replication
|
||||||
*/
|
*/
|
||||||
if (!table->file->has_transactions() &&
|
if (!trans_table &&
|
||||||
(!table->s->tmp_table ||
|
(!table->s->tmp_table || !thd->current_stmt_binlog_row_based))
|
||||||
!thd->current_stmt_binlog_row_based))
|
|
||||||
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
|
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2684,11 +2689,22 @@ bool select_insert::send_eof()
|
|||||||
thd->clear_error();
|
thd->clear_error();
|
||||||
thd->binlog_query(THD::ROW_QUERY_TYPE,
|
thd->binlog_query(THD::ROW_QUERY_TYPE,
|
||||||
thd->query, thd->query_length,
|
thd->query, thd->query_length,
|
||||||
table->file->has_transactions(), FALSE);
|
trans_table, FALSE);
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
We will call ha_autocommit_or_rollback() also for
|
||||||
|
non-transactional tables under row-based replication: there might
|
||||||
|
be events in the binary logs transaction, and we need to write
|
||||||
|
them to the binary log.
|
||||||
|
*/
|
||||||
|
if (trans_table || thd->current_stmt_binlog_row_based)
|
||||||
|
{
|
||||||
|
int const error2= ha_autocommit_or_rollback(thd, error);
|
||||||
|
if (error2 && !error)
|
||||||
|
error=error2;
|
||||||
}
|
}
|
||||||
if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
|
|
||||||
error=error2;
|
|
||||||
table->file->ha_release_auto_increment();
|
table->file->ha_release_auto_increment();
|
||||||
|
|
||||||
if (error)
|
if (error)
|
||||||
{
|
{
|
||||||
table->file->print_error(error,MYF(0));
|
table->file->print_error(error,MYF(0));
|
||||||
@ -2885,14 +2901,19 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
|
|||||||
class MY_HOOKS : public TABLEOP_HOOKS {
|
class MY_HOOKS : public TABLEOP_HOOKS {
|
||||||
public:
|
public:
|
||||||
MY_HOOKS(select_create *x) : ptr(x) { }
|
MY_HOOKS(select_create *x) : ptr(x) { }
|
||||||
virtual void do_prelock(TABLE **tables, uint count)
|
|
||||||
{
|
|
||||||
if (ptr->get_thd()->current_stmt_binlog_row_based &&
|
|
||||||
!(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE))
|
|
||||||
ptr->binlog_show_create_table(tables, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
virtual void do_prelock(TABLE **tables, uint count)
|
||||||
|
{
|
||||||
|
TABLE const *const table = *tables;
|
||||||
|
if (ptr->get_thd()->current_stmt_binlog_row_based &&
|
||||||
|
table->s->tmp_table == NO_TMP_TABLE &&
|
||||||
|
!ptr->get_create_info()->table_existed)
|
||||||
|
{
|
||||||
|
ptr->binlog_show_create_table(tables, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
select_create *ptr;
|
select_create *ptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -2901,6 +2922,20 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
unit= u;
|
unit= u;
|
||||||
|
|
||||||
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
|
/*
|
||||||
|
Start a statement transaction before the create if we are creating
|
||||||
|
a non-temporary table and are using row-based replication for the
|
||||||
|
statement.
|
||||||
|
*/
|
||||||
|
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
|
||||||
|
thd->current_stmt_binlog_row_based)
|
||||||
|
{
|
||||||
|
thd->binlog_start_trans_and_stmt();
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (!(table= create_table_from_items(thd, create_info, create_table,
|
if (!(table= create_table_from_items(thd, create_info, create_table,
|
||||||
extra_fields, keys, &values,
|
extra_fields, keys, &values,
|
||||||
&thd->extra_lock, hook_ptr)))
|
&thd->extra_lock, hook_ptr)))
|
||||||
@ -3048,8 +3083,17 @@ void select_create::abort()
|
|||||||
table->s->version= 0;
|
table->s->version= 0;
|
||||||
hash_delete(&open_cache,(byte*) table);
|
hash_delete(&open_cache,(byte*) table);
|
||||||
if (!create_info->table_existed)
|
if (!create_info->table_existed)
|
||||||
|
{
|
||||||
quick_rm_table(table_type, create_table->db,
|
quick_rm_table(table_type, create_table->db,
|
||||||
create_table->table_name, 0);
|
create_table->table_name, 0);
|
||||||
|
/*
|
||||||
|
We roll back the statement, including truncating the
|
||||||
|
transaction cache of the binary log, if the statement
|
||||||
|
failed.
|
||||||
|
*/
|
||||||
|
if (thd->current_stmt_binlog_row_based)
|
||||||
|
ha_rollback_stmt(thd);
|
||||||
|
}
|
||||||
/* Tell threads waiting for refresh that something has happened */
|
/* Tell threads waiting for refresh that something has happened */
|
||||||
if (version != refresh_version)
|
if (version != refresh_version)
|
||||||
broadcast_refresh();
|
broadcast_refresh();
|
||||||
|
Reference in New Issue
Block a user