1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-01 03:47:19 +03:00

WL#3023 (Use locks in a statement-like manner):

Table maps are now written on aquiring locks to tables and released
  at the end of each logical statement.


mysql-test/extra/binlog_tests/ctype_cp932.test:
  Disabling cleanup code
mysql-test/r/binlog_row_blackhole.result:
  Result change
mysql-test/r/binlog_row_mix_innodb_myisam.result:
  Result change
mysql-test/r/binlog_stm_ctype_cp932.result:
  Result change
mysql-test/r/rpl_row_charset.result:
  Result change
mysql-test/r/rpl_row_create_table.result:
  Result change
mysql-test/t/rpl_row_create_table.test:
  Binlog position change
sql/handler.cc:
  Writing table map after external_lock()
sql/handler.h:
  Adding class for table operation hooks.
sql/log.cc:
  Adding binlog_write_table_map() to THD.
  Removing write_table_map() from MYSQL_LOG.
sql/log.h:
  Minor interface changes to move table map writing.
sql/log_event.cc:
  Removing pre-allocation of memory for buffers.
  Allowing ULONG_MAX as table id denoting an event to ignore (only used to transfer flags).
  Adding code to collect tables while seeing table maps and lock collected tables
  when seeing a binrow event.
  Debriding code as a result of the above changes.
sql/log_event.h:
  Minor interface changes.
sql/mysql_priv.h:
  Adding hooks argument to create_table_from_items().
sql/parse_file.cc:
  Minor fix to avoid crash in debug printout.
sql/rpl_rli.h:
  Adding list of tables to lock to RLI structure.
sql/slave.cc:
  Using list of tables to lock from RLI structure.
sql/sql_acl.cc:
  Removing redundant pending events flush.
sql/sql_base.cc:
  Moving pending event flush.
  Using flag to guard to clear statement transaction only if this is the original
  open tables state.
sql/sql_class.cc:
  Adding flag for open tables state.
  Removing redundant pending events flushes.
  Write a dummy event to indicate that the tables to lock should be emptied
  on the slave.
sql/sql_class.h:
  Adding open tables state flags.
  Adding binlog_write_table_map() function to THD.
  Changes to select_create() to support new locking scheme.
sql/sql_insert.cc:
  Adding rollback of statement transaction on error. It can now contain
  events after locking tables.
sql/sql_load.cc:
  Removing redundant pending event flush.
sql/sql_table.cc:
  Adding hooks argument to create_table_from_items().
  Calling prelock hook before starting to lock tables.
sql/sql_update.cc:
  Removing a compiler warning.
sql/table.h:
  Minor changes.
This commit is contained in:
unknown
2006-02-16 08:30:53 +01:00
parent a89c10fd9b
commit 41f7d13853
26 changed files with 560 additions and 472 deletions

View File

@ -434,4 +434,8 @@ insert into t1 values ('ab');
select * from t1;
insert into t1 values ('abc');
select * from t1;
--disable_query_log
--disable_warnings
drop table t1;
--enable_warnings
--enable_query_log

View File

@ -118,6 +118,12 @@ master-bin.000001 # Query 1 # use `test`; COMMIT
master-bin.000001 # Table_map 1 # test.t1
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # use `test`; COMMIT
master-bin.000001 # Table_map 1 # test.t1
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # use `test`; COMMIT
master-bin.000001 # Table_map 1 # test.t1
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # use `test`; COMMIT
master-bin.000001 # Query 1 # use `test`; create table t2 (a varchar(200)) engine=blackhole
master-bin.000001 # Table_map 1 # test.t2
master-bin.000001 # Write_rows 1 #
@ -125,6 +131,12 @@ master-bin.000001 # Query 1 # use `test`; COMMIT
master-bin.000001 # Query 1 # use `test`; alter table t1 add b int
master-bin.000001 # Query 1 # use `test`; alter table t1 drop b
master-bin.000001 # Query 1 # use `test`; create table t3 like t1
master-bin.000001 # Table_map 1 # test.t1
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # use `test`; COMMIT
master-bin.000001 # Table_map 1 # test.t1
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # use `test`; COMMIT
drop table t1,t2,t3;
reset master;
create table t1 (a int) engine=blackhole;

View File

@ -262,22 +262,26 @@ master-bin.000001 209 Write_rows 1 #
master-bin.000001 243 Table_map 1 # test.t1
master-bin.000001 282 Write_rows 1 #
master-bin.000001 316 Xid 1 # COMMIT /* xid= */
master-bin.000001 343 Query 1 # use `test`; delete from t1
master-bin.000001 420 Xid 1 # COMMIT /* xid= */
master-bin.000001 447 Query 1 # use `test`; delete from t2
master-bin.000001 524 Xid 1 # COMMIT /* xid= */
master-bin.000001 551 Query 1 # use `test`; alter table t2 type=MyISAM
master-bin.000001 640 Table_map 1 # test.t1
master-bin.000001 679 Write_rows 1 #
master-bin.000001 713 Xid 1 # COMMIT /* xid= */
master-bin.000001 740 Table_map 1 # test.t2
master-bin.000001 779 Write_rows 1 #
master-bin.000001 813 Query 1 # use `test`; drop table t1,t2
master-bin.000001 892 Query 1 # use `test`; create table t0 (n int)
master-bin.000001 978 Table_map 1 # test.t0
master-bin.000001 1017 Write_rows 1 #
master-bin.000001 1051 Table_map 1 # test.t0
master-bin.000001 1090 Write_rows 1 #
master-bin.000001 1124 Query 1 # use `test`; create table t2 (n int) engine=innodb
master-bin.000001 343 Table_map 1 # test.t1
master-bin.000001 382 Query 1 # use `test`; delete from t1
master-bin.000001 459 Xid 1 # COMMIT /* xid= */
master-bin.000001 486 Table_map 1 # test.t2
master-bin.000001 525 Query 1 # use `test`; delete from t2
master-bin.000001 602 Xid 1 # COMMIT /* xid= */
master-bin.000001 629 Query 1 # use `test`; alter table t2 type=MyISAM
master-bin.000001 718 Table_map 1 # test.t1
master-bin.000001 757 Write_rows 1 #
master-bin.000001 791 Xid 1 # COMMIT /* xid= */
master-bin.000001 818 Query 1 # use `test`; BEGIN
master-bin.000001 886 Table_map 1 # test.t1
master-bin.000001 925 Write_rows 1 #
master-bin.000001 954 Xid 1 # COMMIT /* xid= */
master-bin.000001 981 Query 1 # use `test`; drop table t1,t2
master-bin.000001 1060 Query 1 # use `test`; create table t0 (n int)
master-bin.000001 1146 Table_map 1 # test.t0
master-bin.000001 1185 Write_rows 1 #
master-bin.000001 1219 Table_map 1 # test.t0
master-bin.000001 1258 Write_rows 1 #
master-bin.000001 1292 Query 1 # use `test`; create table t2 (n int) engine=innodb
do release_lock("lock1");
drop table t0,t2;

View File

@ -11366,4 +11366,3 @@ col1
a
a
a
drop table t1;

View File

@ -112,10 +112,16 @@ drop database mysqltest3;
show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query 1 # drop database if exists mysqltest2
master-bin.000001 # Table_map 1 # mysql.proc
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # drop database if exists mysqltest3
master-bin.000001 # Table_map 1 # mysql.proc
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # create database mysqltest2 character set latin2
master-bin.000001 # Query 1 # create database mysqltest3
master-bin.000001 # Query 1 # drop database mysqltest3
master-bin.000001 # Table_map 1 # mysql.proc
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # create database mysqltest3
master-bin.000001 # Query 1 # use `mysqltest2`; create table t1 (a int auto_increment primary key, b varchar(100))
master-bin.000001 # Table_map 1 # mysqltest2.t1
@ -141,7 +147,11 @@ master-bin.000001 # Query 1 # use `mysqltest2`; truncate table t1
master-bin.000001 # Table_map 1 # mysqltest2.t1
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # drop database mysqltest2
master-bin.000001 # Table_map 1 # mysql.proc
master-bin.000001 # Write_rows 1 #
master-bin.000001 # Query 1 # drop database mysqltest3
master-bin.000001 # Table_map 1 # mysql.proc
master-bin.000001 # Write_rows 1 #
select "--- --global--" as "";
--- --global--

View File

@ -127,7 +127,7 @@ NULL 5 10
NULL 6 12
CREATE TABLE t7 (UNIQUE(b)) SELECT a,b FROM tt3;
ERROR 23000: Duplicate entry '2' for key 1
SHOW BINLOG EVENTS FROM 1256;
SHOW BINLOG EVENTS FROM 1326;
Log_name Pos Event_type Server_id End_log_pos Info
CREATE TABLE t7 (a INT, b INT UNIQUE);
INSERT INTO t7 SELECT a,b FROM tt3;
@ -137,11 +137,11 @@ a b
1 2
2 4
3 6
SHOW BINLOG EVENTS FROM 1256;
SHOW BINLOG EVENTS FROM 1326;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 1256 Query 1 1356 use `test`; CREATE TABLE t7 (a INT, b INT UNIQUE)
master-bin.000001 1356 Table_map 1 1396 test.t7
master-bin.000001 1396 Write_rows 1 1452
master-bin.000001 1326 Query 1 1426 use `test`; CREATE TABLE t7 (a INT, b INT UNIQUE)
master-bin.000001 1426 Table_map 1 1466 test.t7
master-bin.000001 1466 Write_rows 1 1522
SELECT * FROM t7 ORDER BY a,b;
a b
1 2
@ -154,10 +154,10 @@ INSERT INTO t7 SELECT a,b FROM tt4;
ROLLBACK;
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
SHOW BINLOG EVENTS FROM 1452;
SHOW BINLOG EVENTS FROM 1522;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 1452 Table_map 1 1492 test.t7
master-bin.000001 1492 Write_rows 1 1548
master-bin.000001 1522 Table_map 1 1562 test.t7
master-bin.000001 1562 Write_rows 1 1618
SELECT * FROM t7 ORDER BY a,b;
a b
1 2
@ -191,10 +191,10 @@ Create Table CREATE TABLE `t9` (
`a` int(11) default NULL,
`b` int(11) default NULL
) ENGINE=MyISAM DEFAULT CHARSET=latin1
SHOW BINLOG EVENTS FROM 1548;
SHOW BINLOG EVENTS FROM 1618;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 1548 Query 1 1634 use `test`; CREATE TABLE t8 LIKE t4
master-bin.000001 1634 Query 1 1773 use `test`; CREATE TABLE `t9` (
master-bin.000001 1618 Query 1 1704 use `test`; CREATE TABLE t8 LIKE t4
master-bin.000001 1704 Query 1 1843 use `test`; CREATE TABLE `t9` (
`a` int(11) default NULL,
`b` int(11) default NULL
)

View File

@ -60,7 +60,7 @@ connection master;
--error 1062
CREATE TABLE t7 (UNIQUE(b)) SELECT a,b FROM tt3;
# Shouldn't be written to the binary log
SHOW BINLOG EVENTS FROM 1256;
SHOW BINLOG EVENTS FROM 1326;
# Test that INSERT-SELECT works the same way as for SBR.
CREATE TABLE t7 (a INT, b INT UNIQUE);
@ -68,7 +68,7 @@ CREATE TABLE t7 (a INT, b INT UNIQUE);
INSERT INTO t7 SELECT a,b FROM tt3;
SELECT * FROM t7 ORDER BY a,b;
# Should be written to the binary log
SHOW BINLOG EVENTS FROM 1256;
SHOW BINLOG EVENTS FROM 1326;
sync_slave_with_master;
SELECT * FROM t7 ORDER BY a,b;
@ -78,7 +78,7 @@ INSERT INTO tt4 VALUES (4,8), (5,10), (6,12);
BEGIN;
INSERT INTO t7 SELECT a,b FROM tt4;
ROLLBACK;
SHOW BINLOG EVENTS FROM 1452;
SHOW BINLOG EVENTS FROM 1522;
SELECT * FROM t7 ORDER BY a,b;
sync_slave_with_master;
SELECT * FROM t7 ORDER BY a,b;
@ -91,7 +91,7 @@ CREATE TEMPORARY TABLE tt6 LIKE tt4;
--echo **** On Master ****
--query_vertical SHOW CREATE TABLE t8
--query_vertical SHOW CREATE TABLE t9
SHOW BINLOG EVENTS FROM 1548;
SHOW BINLOG EVENTS FROM 1618;
sync_slave_with_master;
--echo **** On Slave ****
--query_vertical SHOW CREATE TABLE t8

View File

@ -3016,10 +3016,43 @@ template int binlog_log_row<Update_rows_log_event>(TABLE *, const byte *, const
int handler::ha_external_lock(THD *thd, int lock_type)
{
DBUG_ENTER("handler::ha_external_lock");
int error;
if (unlikely(error= external_lock(thd, lock_type)))
return error;
return 0;
DBUG_RETURN(error);
#ifdef HAVE_ROW_BASED_REPLICATION
if (table->file->is_injective())
DBUG_RETURN(0);
/*
There is a number of statements that are logged statement-based
but call external lock. For these, we do not need to generate a
table map.
TODO: The need for this switch is an indication that the model for
locking combined with row-based replication needs to be looked
over. Ideally, no such special handling should be needed.
*/
switch (thd->lex->sql_command)
{
case SQLCOM_TRUNCATE:
case SQLCOM_ALTER_TABLE:
DBUG_RETURN(0);
}
/*
If we are locking a table for writing, we generate a table map.
For all other kinds of locks, we don't do anything.
*/
if (lock_type == F_WRLCK && check_table_binlog_row_based(thd, table))
{
int const has_trans= table->file->has_transactions();
error= thd->binlog_write_table_map(table, has_trans);
if (unlikely(error))
DBUG_RETURN(error);
}
#endif
DBUG_RETURN(0);
}
int handler::ha_write_row(byte *buf)

View File

@ -871,7 +871,35 @@ typedef struct st_ha_create_information
bool store_on_disk; /* 1 if table stored on disk */
} HA_CREATE_INFO;
/*
Class for maintaining hooks used inside operations on tables such
as: create table functions, delete table functions, and alter table
functions.
Class is using the Template Method pattern to separate the public
usage interface from the private inheritance interface. This
imposes no overhead, since the public non-virtual function is small
enough to be inlined.
The hooks are usually used for functions that does several things,
e.g., create_table_from_items(), which both create a table and lock
it.
*/
class TABLEOP_HOOKS
{
public:
inline void prelock(TABLE **tables, uint count)
{
do_prelock(tables, count);
}
private:
/* Function primitive that is called prior to locking tables */
virtual void do_prelock(TABLE **tables, uint count)
{
/* Default is to do nothing */
}
};
typedef struct st_savepoint SAVEPOINT;
extern ulong savepoint_alloc_size;

View File

@ -125,6 +125,12 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, Log_event *end_ev)
if (end_ev)
{
/*
We can always end the statement when ending a transaction since
transactions are not allowed inside stored functions. If they
were, we would have to ensure that we're not ending a statement
inside a stored function.
*/
thd->binlog_flush_pending_rows_event(true);
error= mysql_bin_log.write(thd, trans_log, end_ev);
}
@ -144,7 +150,7 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, Log_event *end_ev)
generated instead of the one that was written to the thrown-away
transaction cache.
*/
++mysql_bin_log.m_table_map_version;
mysql_bin_log.update_table_map_version();
statistic_increment(binlog_cache_use, &LOCK_status);
if (trans_log->disk_writes != 0)
@ -1678,6 +1684,38 @@ int THD::binlog_setup_trx_data()
DBUG_RETURN(0);
}
int THD::binlog_write_table_map(TABLE *table, bool is_trans)
{
DBUG_ENTER("THD::binlog_write_table_map");
DBUG_PRINT("enter", ("table=%p (%s: #%u)",
table, table->s->table_name, table->s->table_map_id));
/* Pre-conditions */
DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event::flag_set const
flags= Table_map_log_event::TM_NO_FLAGS;
Table_map_log_event
the_event(this, table, table->s->table_map_id, is_trans, flags);
/*
This function is called from ha_external_lock() after the storage
engine has registered for the transaction.
*/
if (is_trans)
trans_register_ha(this, options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN),
&binlog_hton);
if (int error= mysql_bin_log.write(&the_event))
DBUG_RETURN(error);
++binlog_table_maps;
table->s->table_map_version= mysql_bin_log.table_map_version();
DBUG_RETURN(0);
}
Rows_log_event*
THD::binlog_get_pending_rows_event() const
{
@ -1695,8 +1733,12 @@ THD::binlog_get_pending_rows_event() const
void
THD::binlog_set_pending_rows_event(Rows_log_event* ev)
{
if (ha_data[binlog_hton.slot] == NULL)
binlog_setup_trx_data();
binlog_trx_data *const trx_data=
(binlog_trx_data*) ha_data[binlog_hton.slot];
DBUG_ASSERT(trx_data);
trx_data->pending= ev;
}
@ -1739,15 +1781,6 @@ int MYSQL_LOG::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event)
*/
pthread_mutex_lock(&LOCK_log);
/*
Write a table map if necessary
*/
if (pending->maybe_write_table_map(thd, file, this))
{
pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(2);
}
/*
Write pending event to log file or transaction cache
*/
@ -1789,18 +1822,8 @@ int MYSQL_LOG::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event)
pthread_mutex_unlock(&LOCK_log);
}
else if (event && event->get_cache_stmt()) /* && pending == 0 */
{
/*
If we are setting a non-null event for a table that is
transactional, we start a transaction here as well.
*/
trans_register_ha(thd,
thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN),
&binlog_hton);
}
trx_data->pending= event;
thd->binlog_set_pending_rows_event(event);
DBUG_RETURN(error);
}
@ -1832,21 +1855,13 @@ bool MYSQL_LOG::write(Log_event *event_info)
mutex, we do this before aquiring the LOCK_log mutex in this
function.
This is not optimal, but necessary in the current implementation
since there is code that writes rows to system tables without
using some way to flush the pending event (e.g., binlog_query()).
TODO: There shall be no writes to any system table after calling
binlog_query(), so these writes has to be moved to before the call
of binlog_query() for correct functioning.
This is necessesary not only for RBR, but the master might crash
after binlogging the query but before changing the system tables.
This means that the slave and the master are not in the same state
(after the master has restarted), so therefore we have to
eliminate this problem.
We only end the statement if we are in a top-level statement. If
we are inside a stored function, we do not end the statement since
this will close all tables on the slave.
*/
thd->binlog_flush_pending_rows_event(true);
bool const end_stmt=
thd->prelocked_mode && thd->lex->requires_prelocking();
thd->binlog_flush_pending_rows_event(end_stmt);
pthread_mutex_lock(&LOCK_log);
@ -1869,8 +1884,9 @@ bool MYSQL_LOG::write(Log_event *event_info)
(!binlog_filter->db_ok(local_db)))
{
VOID(pthread_mutex_unlock(&LOCK_log));
DBUG_PRINT("info",("db_ok('%s')==%d", local_db,
binlog_filter->db_ok(local_db)));
DBUG_PRINT("info",("OPTION_BIN_LOG is %s, db_ok('%s') == %d",
(thd->options & OPTION_BIN_LOG) ? "set" : "clear",
local_db, binlog_filter->db_ok(local_db)));
DBUG_RETURN(0);
}
#endif /* HAVE_REPLICATION */
@ -2544,44 +2560,6 @@ void MYSQL_LOG::signal_update()
DBUG_VOID_RETURN;
}
#ifndef MYSQL_CLIENT
bool MYSQL_LOG::write_table_map(THD *thd, IO_CACHE *file, TABLE* table,
bool is_transactional)
{
DBUG_ENTER("MYSQL_LOG::write_table_map()");
DBUG_PRINT("enter", ("table=%p (%s: %u)",
table, table->s->table_name, table->s->table_map_id));
/* Pre-conditions */
DBUG_ASSERT(binlog_row_based && is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
#ifndef DBUG_OFF
/*
We only need to execute under the LOCK_log mutex if we are writing
to the log file; otherwise, we are writing to a thread-specific
transaction cache and there is no need to serialize this event
with events in other threads.
*/
if (file == &log_file)
safe_mutex_assert_owner(&LOCK_log);
#endif
Table_map_log_event::flag_set const
flags= Table_map_log_event::TM_NO_FLAGS;
Table_map_log_event
the_event(thd, table, table->s->table_map_id, is_transactional, flags);
if (the_event.write(file))
DBUG_RETURN(1);
table->s->table_map_version= m_table_map_version;
DBUG_RETURN(0);
}
#endif /* !defined(MYSQL_CLIENT) */
#ifdef __NT__
void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
uint length, int buffLen)

View File

@ -192,9 +192,11 @@ class MYSQL_LOG: public TC_LOG
bool no_auto_events;
friend class Log_event;
public:
ulonglong m_table_map_version;
int write_to_file(IO_CACHE *cache);
public:
/*
These describe the log's format. This is used only for relay logs.
_for_exec is used by the SQL thread, _for_queue by the I/O thread. It's
@ -221,9 +223,12 @@ public:
#if !defined(MYSQL_CLIENT)
bool is_table_mapped(TABLE *table) const
{
return table->s->table_map_version == m_table_map_version;
return table->s->table_map_version == table_map_version();
}
ulonglong table_map_version() const { return m_table_map_version; }
void update_table_map_version() { ++m_table_map_version; }
int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event);
#endif /* !defined(MYSQL_CLIENT) */
@ -283,8 +288,6 @@ public:
bool write(Log_event* event_info); // binary log write
bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event);
bool write_table_map(THD *thd, IO_CACHE *cache, TABLE *table, bool is_trans);
void start_union_events(THD *thd);
void stop_union_events(THD *thd);
bool is_query_in_union(THD *thd, query_id_t query_id_param);

View File

@ -1681,6 +1681,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli, const char *query
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
clear_all_errors(thd, rli);
rli->clear_tables_to_lock();
/*
Note: We do not need to execute reset_one_shot_variables() if this
@ -5054,17 +5055,19 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
MY_BITMAP const *cols, bool is_transactional)
: Log_event(thd_arg, 0, is_transactional),
m_row_count(0),
m_table(tbl_arg),
m_table_id(tid),
m_width(tbl_arg->s->fields),
m_rows_buf((byte*)my_malloc(opt_binlog_rows_event_max_size *
sizeof(*m_rows_buf), MYF(MY_WME))),
m_rows_cur(m_rows_buf),
m_rows_end(m_rows_buf + opt_binlog_rows_event_max_size),
m_width(tbl_arg ? tbl_arg->s->fields : 1),
m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
m_flags(0)
{
DBUG_ASSERT(m_table && m_table->s);
DBUG_ASSERT(m_table_id != ULONG_MAX);
/*
We allow a special form of dummy event when the table, and cols
are null and the table id is ULONG_MAX.
*/
DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ULONG_MAX ||
!tbl_arg && !cols && tid == ULONG_MAX);
if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
set_flags(NO_FOREIGN_KEY_CHECKS_F);
@ -5075,7 +5078,11 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
(m_width + 7) & ~7UL,
false)))
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
{
/* Cols can be zero if this is a dummy binrows event */
if (likely(cols != NULL))
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
}
else
m_cols.bitmap= 0; // to not free it
}
@ -5086,6 +5093,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
const Format_description_log_event
*description_event)
: Log_event(buf, description_event),
m_row_count(0),
m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
{
DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
@ -5111,8 +5119,6 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
post_start+= RW_FLAGS_OFFSET;
}
DBUG_ASSERT(m_table_id != ULONG_MAX);
m_flags= uint2korr(post_start);
byte const *const var_start= (const byte *)buf + common_header_len +
@ -5168,7 +5174,7 @@ int Rows_log_event::do_add_row_data(byte *const row_data,
DBUG_DUMP("row_data", (const char*)row_data, min(length, 32));
DBUG_ASSERT(m_rows_buf <= m_rows_cur);
DBUG_ASSERT(m_rows_buf < m_rows_end);
DBUG_ASSERT(!m_rows_buf || m_rows_end && m_rows_buf < m_rows_end);
DBUG_ASSERT(m_rows_cur <= m_rows_end);
/* The cast will always work since m_rows_cur <= m_rows_end */
@ -5176,12 +5182,12 @@ int Rows_log_event::do_add_row_data(byte *const row_data,
{
my_size_t const block_size= 1024;
my_ptrdiff_t const old_alloc= m_rows_end - m_rows_buf;
my_ptrdiff_t const new_alloc=
old_alloc + block_size * (length / block_size + block_size - 1);
my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
my_ptrdiff_t const new_alloc=
block_size * ((cur_size + length) / block_size + block_size - 1);
byte* const new_buf=
(byte*)my_realloc((gptr)m_rows_buf, new_alloc, MYF(MY_WME));
byte* const new_buf= (byte*)my_realloc((gptr)m_rows_buf, new_alloc,
MYF(MY_ALLOW_ZERO_PTR|MY_WME));
if (unlikely(!new_buf))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
@ -5202,6 +5208,7 @@ int Rows_log_event::do_add_row_data(byte *const row_data,
DBUG_ASSERT(m_rows_cur + length < m_rows_end);
memcpy(m_rows_cur, row_data, length);
m_rows_cur+= length;
m_row_count++;
DBUG_RETURN(0);
}
#endif
@ -5245,10 +5252,28 @@ static char const *unpack_row(TABLE *table,
int Rows_log_event::exec_event(st_relay_log_info *rli)
{
DBUG_ENTER("Rows_log_event::exec_event(st_relay_log_info*)");
DBUG_ASSERT(m_table_id != ULONG_MAX);
int error= 0;
char const *row_start= (char const *)m_rows_buf;
TABLE* table= rli->m_table_map.get_table(m_table_id);
/*
If m_table_id == ULONG_MAX, then we have a dummy event that does
not contain any data. In that case, we just remove all tables in
the tables_to_lock list, step the relay log position, and return
with success.
*/
if (m_table_id == ULONG_MAX)
{
/*
This one is supposed to be set: just an extra check so that
nothing strange has happened.
*/
DBUG_ASSERT(get_flags(STMT_END_F));
rli->clear_tables_to_lock();
thd->clear_error();
rli->inc_event_relay_log_pos();
DBUG_RETURN(0);
}
/*
'thd' has been set by exec_relay_log_event(), just before calling
@ -5257,85 +5282,51 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
DBUG_ASSERT(rli->sql_thd == thd);
/*
lock_tables() reads the contents of thd->lex, so they must be
initialized, so we should call lex_start(); to be even safer, we call
mysql_init_query() which does a more complete set of inits.
If there is no locks taken, this is the first binrow event seen
after the table map events. We should then lock all the tables
used in the transaction and proceed with execution of the actual
event.
*/
mysql_init_query(thd, NULL, 0);
if (table)
if (!thd->lock)
{
bool need_reopen= 1; /* To execute the first lap of the loop below */
/*
table == NULL means that this table should not be
replicated (this was set up by Table_map_log_event::exec_event() which
tested replicate-* rules).
lock_tables() reads the contents of thd->lex, so they must be
initialized, so we should call lex_start(); to be even safer, we
call mysql_init_query() which does a more complete set of inits.
*/
TABLE_LIST table_list;
bool need_reopen;
uint count= 1;
bzero(&table_list, sizeof(table_list));
table_list.lock_type= TL_WRITE;
table_list.next_global= table_list.next_local= 0;
table_list.table= table;
mysql_init_query(thd, NULL, 0);
for ( ; ; )
while ((error= lock_tables(thd, rli->tables_to_lock,
rli->tables_to_lock_count, &need_reopen)))
{
table_list.db= const_cast<char*>(table->s->db.str);
table_list.alias= table_list.table_name=
const_cast<char*>(table->s->table_name.str);
if ((error= lock_tables(thd, &table_list, count, &need_reopen)) == 0)
break;
if (!need_reopen)
{
slave_print_msg(ERROR_LEVEL, rli, error,
"Error in %s event: error during table %s.%s lock",
get_type_str(), table->s->db.str,
table->s->table_name.str);
"Error in %s event: when locking tables",
get_type_str());
DBUG_RETURN(error);
}
/*
we need to store a local copy of the table names since the table object
will become invalid after close_tables_for_reopen
*/
char *db= my_strdup(table->s->db.str, MYF(MY_WME));
char *table_name= my_strdup(table->s->table_name.str, MYF(MY_WME));
if (db == 0 || table_name == 0)
{
/*
Since the lock_tables() failed, the table is not locked, so
we don't need to unlock them.
*/
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
}
/*
We also needs to flush the pending RBR event, since it keeps a
So we need to reopen the tables.
We need to flush the pending RBR event, since it keeps a
pointer to an open table.
ALTERNATIVE SOLUTION: Extract a pointer to the pending RBR
event and reset the table pointer after the tables has been
reopened.
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
the pending RBR event and reset the table pointer after the
tables has been reopened.
NOTE: For this new scheme there should be no pending event:
need to add code to assert that is the case.
*/
thd->binlog_flush_pending_rows_event(false);
close_tables_for_reopen(thd, rli->tables_to_lock);
close_tables_for_reopen(thd, &table_list);
/* open the table again, same as in Table_map_event::exec_event */
table_list.db= const_cast<char*>(db);
table_list.alias= table_list.table_name= const_cast<char*>(table_name);
table_list.updating= 1;
TABLE_LIST *tables= &table_list;
if ((error= open_tables(thd, &tables, &count, 0)) == 0)
{
/* reset some variables for the table list*/
table_list.updating= 0;
/* retrieve the new table reference and update the table map */
table= table_list.table;
error= rli->m_table_map.set_table(m_table_id, table);
}
else /* error in open_tables */
if ((error= open_tables(thd, &rli->tables_to_lock,
&rli->tables_to_lock_count, 0)))
{
if (thd->query_error || thd->is_fatal_error)
{
@ -5345,19 +5336,41 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
*/
uint actual_error= thd->net.last_errno;
slave_print_msg(ERROR_LEVEL, rli, actual_error,
"Error '%s' on reopening table `%s`.`%s`",
"Error '%s' on reopening tables",
(actual_error ? thd->net.last_error :
"unexpected success or fatal error"),
db, table_name);
"unexpected success or fatal error"));
thd->query_error= 1;
}
}
my_free((char*) db, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*) table_name, MYF(MY_ALLOW_ZERO_PTR));
if (error)
DBUG_RETURN(error);
}
}
/*
When the open and locking succeeded, we add all the tables to
the table map and remove them from tables to lock.
*/
TABLE_LIST *ptr= rli->tables_to_lock;
while (ptr)
{
rli->m_table_map.set_table(ptr->table_id, ptr->table);
rli->touching_table(ptr->db, ptr->table_name, ptr->table_id);
char *to_free= reinterpret_cast<char*>(ptr);
ptr= ptr->next_global;
my_free(to_free, MYF(MY_WME));
}
rli->tables_to_lock= 0;
rli->tables_to_lock_count= 0;
}
TABLE* table= rli->m_table_map.get_table(m_table_id);
if (table)
{
/*
table == NULL means that this table should not be replicated
(this was set up by Table_map_log_event::exec_event() which
tested replicate-* rules).
*/
/*
It's not needed to set_time() but
@ -5510,52 +5523,25 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
DBUG_RETURN(error);
}
if (table)
if (table && (table->s->primary_key == MAX_KEY) && !cache_stmt)
{
/*
As "table" is not NULL, we did a successful lock_tables(), without any
prior LOCK TABLES and are not in prelocked mode, so this assertion should
be true.
------------ Temporary fix until WL#2975 is implemented ---------
This event is not the last one (no STMT_END_F). If we stop now
(in case of terminate_slave_thread()), how will we restart? We
have to restart from Table_map_log_event, but as this table is
not transactional, the rows already inserted will still be
present, and idempotency is not guaranteed (no PK) so we risk
that repeating leads to double insert. So we desperately try to
continue, hope we'll eventually leave this buggy situation (by
executing the final Rows_log_event). If we are in a hopeless
wait (reached end of last relay log and nothing gets appended
there), we timeout after one minute, and notify DBA about the
problem. When WL#2975 is implemented, just remove the member
st_relay_log_info::unsafe_to_stop_at and all its occurences.
*/
DBUG_ASSERT(thd->lock);
/*
If we are here, there are more events to come which may use our mappings
and our table. So don't clear mappings or close tables, just unlock
tables.
Why don't we lock the table once for all in
Table_map_log_event::exec_event() ? Because we could have in binlog:
BEGIN;
Table_map t1 -> 1
Write_rows to id 1
Table_map t2 -> 2
Write_rows to id 2
Xid_log_event
So we cannot lock t1 when executing the first Table_map, because at that
moment we don't know we'll also have to lock t2, and all tables must be
locked at once in MySQL.
*/
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
if ((table->s->primary_key == MAX_KEY) &&
!cache_stmt)
{
/*
------------ Temporary fix until WL#2975 is implemented ---------
This event is not the last one (no STMT_END_F). If we stop now (in
case of terminate_slave_thread()), how will we restart? We have to
restart from Table_map_log_event, but as this table is not
transactional, the rows already inserted will still be present, and
idempotency is not guaranteed (no PK) so we risk that repeating leads
to double insert. So we desperately try to continue, hope we'll
eventually leave this buggy situation (by executing the final
Rows_log_event). If we are in a hopeless wait (reached end of last
relay log and nothing gets appended there), we timeout after one
minute, and notify DBA about the problem.
When WL#2975 is implemented, just remove the member
st_relay_log_info::unsafe_to_stop_at and all its occurences.
*/
rli->unsafe_to_stop_at= time(0);
}
rli->unsafe_to_stop_at= time(0);
}
DBUG_ASSERT(error == 0);
@ -5569,7 +5555,6 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
#ifndef MYSQL_CLIENT
bool Rows_log_event::write_data_header(IO_CACHE *file)
{
DBUG_ASSERT(m_table_id != ULONG_MAX);
byte buf[ROWS_HEADER_LEN]; // No need to init the buffer
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
{
@ -5601,16 +5586,17 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
}
#endif
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) && defined(DBUG_RBR)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void Rows_log_event::pack_info(Protocol *protocol)
{
char buf[256];
char const *const flagstr= get_flags(STMT_END_F) ? "STMT_END_F" : "";
char const *const dbnam= m_table->s->db.str;
char const *const tblnam= m_table->s->table_name.str;
my_size_t bytes= snprintf(buf, sizeof(buf),
"%s.%s - %s", dbnam, tblnam, flagstr);
protocol->store(buf, bytes, &my_charset_bin);
#ifdef DBUG_RBR
char buf[256];
char const *const flagstr=
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
my_size_t bytes= snprintf(buf, sizeof(buf),
"table_id: %lu%s", m_table_id, flagstr);
protocol->store(buf, bytes, &my_charset_bin);
#endif
}
#endif
@ -5752,59 +5738,6 @@ Table_map_log_event::~Table_map_log_event()
my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
}
/*
Find a table based on database name and table name.
DESCRIPTION
Currently, only the first table of the 'table_list' is located. If the
table is found in the list of open tables for the thread, the 'table'
field of 'table_list' is filled in.
PARAMETERS
thd Thread structure
table_list List of tables to locate in the thd->open_tables list.
count Pointer to a variable that will be set to the number of
tables found. If the pointer is NULL, nothing will be stored.
RETURN VALUE
The number of tables found.
TO DO
Replace the list of table searches with a hash based on the combined
database and table name. The handler_tables_hash is inappropriate since
it hashes on the table alias. At the same time, the function can be
extended to handle a full list of table names, in the same spirit as
open_tables() and lock_tables().
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static uint find_tables(THD *thd, TABLE_LIST *table_list, uint *count)
{
uint result= 0;
/* we verify that the caller knows our limitation */
DBUG_ASSERT(table_list->next_global == 0);
for (TABLE *table= thd->open_tables; table ; table= table->next)
{
if (strcmp(table->s->db.str, table_list->db) == 0
&& strcmp(table->s->table_name.str, table_list->table_name) == 0)
{
/* Copy the table pointer into the table list. */
table_list->table= table;
result= 1;
break;
}
}
if (count)
*count= result;
return result;
}
#endif
/*
Return value is an error code, one of:
@ -5828,20 +5761,37 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
thd->query_id= next_query_id();
pthread_mutex_unlock(&LOCK_thread_count);
TABLE_LIST table_list;
TABLE_LIST *table_list;
char *db_mem, *tname_mem;
void *const memory=
my_multi_malloc(MYF(MY_WME),
&table_list, sizeof(TABLE_LIST),
&db_mem, NAME_LEN + 1,
&tname_mem, NAME_LEN + 1,
NULL);
/*
If memory is allocated, it the pointer to it should be stored in
table_list. If this is not true, the memory will not be correctly
free:ed later.
*/
DBUG_ASSERT(memory == NULL || memory == table_list);
uint32 dummy_len;
bzero(&table_list, sizeof(table_list));
table_list.db= const_cast<char *>
(rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
table_list.alias= table_list.table_name= const_cast<char*>(m_tblnam);
table_list.lock_type= TL_WRITE;
table_list.next_global= table_list.next_local= 0;
table_list.updating= 1;
bzero(table_list, sizeof(*table_list));
table_list->db = db_mem;
table_list->alias= table_list->table_name = tname_mem;
table_list->lock_type= TL_WRITE;
table_list->next_global= table_list->next_local= 0;
table_list->table_id= m_table_id;
table_list->updating= 1;
strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
strmov(table_list->table_name, m_tblnam);
int error= 0;
if (rpl_filter->db_ok(table_list.db) &&
(!rpl_filter->is_on() || rpl_filter->tables_ok("", &table_list)))
if (rpl_filter->db_ok(table_list->db) &&
(!rpl_filter->is_on() || rpl_filter->tables_ok("", table_list)))
{
/*
Check if the slave is set to use SBR. If so, the slave should
@ -5867,36 +5817,32 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
be a no-op.
*/
uint count;
if (find_tables(thd, &table_list, &count) == 0)
/*
open_tables() reads the contents of thd->lex, so they must be
initialized, so we should call lex_start(); to be even safer, we
call mysql_init_query() which does a more complete set of inits.
*/
mysql_init_query(thd, NULL, 0);
if ((error= open_tables(thd, &table_list, &count, 0)))
{
/*
open_tables() reads the contents of thd->lex, so they must be
initialized, so we should call lex_start(); to be even safer, we call
mysql_init_query() which does a more complete set of inits.
*/
mysql_init_query(thd, NULL, 0);
TABLE_LIST *tables= &table_list;
if ((error= open_tables(thd, &tables, &count, 0)))
if (thd->query_error || thd->is_fatal_error)
{
if (thd->query_error || thd->is_fatal_error)
{
/*
Error reporting borrowed from Query_log_event with many excessive
simplifications (we don't honour --slave-skip-errors)
*/
uint actual_error= thd->net.last_errno;
slave_print_msg(ERROR_LEVEL, rli, actual_error,
"Error '%s' on opening table `%s`.`%s`",
(actual_error ? thd->net.last_error :
"unexpected success or fatal error"),
table_list.db, table_list.table_name);
thd->query_error= 1;
}
DBUG_RETURN(error);
/*
Error reporting borrowed from Query_log_event with many excessive
simplifications (we don't honour --slave-skip-errors)
*/
uint actual_error= thd->net.last_errno;
slave_print_msg(ERROR_LEVEL, rli, actual_error,
"Error '%s' on opening table `%s`.`%s`",
(actual_error ? thd->net.last_error :
"unexpected success or fatal error"),
table_list->db, table_list->table_name);
thd->query_error= 1;
}
DBUG_RETURN(error);
}
m_table= table_list.table;
m_table= table_list->table;
/*
This will fail later otherwise, the 'in_use' field should be
@ -5974,19 +5920,16 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
}
/*
We record in the slave's information that the number m_table_id is
mapped to the m_table object
We record in the slave's information that the table should be
locked by linking the table into the list of tables to lock, and
tell the RLI that we are touching a table.
*/
if (!error)
error= rli->m_table_map.set_table(m_table_id, m_table);
/*
Tell the RLI that we are touching a table.
TODO: Maybe we can combine this with the previous operation?
*/
if (!error)
rli->touching_table(m_dbnam, m_tblnam, m_table_id);
{
table_list->next_global= table_list->next_local= rli->tables_to_lock;
rli->tables_to_lock= table_list;
rli->tables_to_lock_count++;
}
}
/*
@ -6053,12 +5996,23 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
field.
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void Table_map_log_event::pack_info(Protocol *protocol)
{
#ifdef DBUG_RBR
char buf[256];
my_size_t bytes= snprintf(buf, sizeof(buf),
"table_id: %lu (%s.%s)",
m_table_id, m_dbnam, m_tblnam);
protocol->store(buf, bytes, &my_charset_bin);
#else
char buf[256];
my_size_t bytes= snprintf(buf, sizeof(buf), "%s.%s", m_dbnam, m_tblnam);
protocol->store(buf, bytes, &my_charset_bin);
#endif
}
#endif
#endif

View File

@ -657,11 +657,16 @@ public:
{
return (void*) my_malloc((uint)size, MYF(MY_WME|MY_FAE));
}
static void operator delete(void *ptr, size_t size)
{
my_free((gptr) ptr, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
}
/* Placement version of the above operators */
static void *operator new(size_t, void* ptr) { return ptr; }
static void operator delete(void*, void*) { }
#ifndef MYSQL_CLIENT
bool write_header(IO_CACHE* file, ulong data_length);
virtual bool write(IO_CACHE* file)
@ -1795,10 +1800,8 @@ public:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int exec_event(struct st_relay_log_info *rli);
#ifdef DBUG_RBR
virtual void pack_info(Protocol *protocol);
#endif
#endif
#ifdef MYSQL_CLIENT
/* not for direct call, each derived has its own ::print() */
@ -1850,6 +1853,7 @@ public:
Error code, or zero if write succeeded.
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_ROW_BASED_REPLICATION)
#if 0
int maybe_write_table_map(THD *thd, IO_CACHE *file, MYSQL_LOG *log) const
{
/*
@ -1864,6 +1868,9 @@ public:
return result;
}
#endif
#endif
uint m_row_count; /* The number of rows added to the event */
protected:
/*

View File

@ -742,7 +742,8 @@ TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
List<create_field> *extra_fields,
List<Key> *keys,
List<Item> *items,
MYSQL_LOCK **lock);
MYSQL_LOCK **lock,
TABLEOP_HOOKS *hooks);
bool mysql_alter_table(THD *thd, char *new_db, char *new_name,
HA_CREATE_INFO *create_info,
TABLE_LIST *table_list,

View File

@ -224,7 +224,8 @@ sql_create_definition_file(const LEX_STRING *dir, const LEX_STRING *file_name,
File_option *param;
DBUG_ENTER("sql_create_definition_file");
DBUG_PRINT("enter", ("Dir: %s, file: %s, base 0x%lx",
dir->str, file_name->str, (ulong) base));
dir ? dir->str : "(null)",
file_name->str, (ulong) base));
if (dir)
{

View File

@ -276,7 +276,9 @@ typedef struct st_relay_log_info
group_relay_log_pos);
}
table_mapping m_table_map;
TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
uint tables_to_lock_count; /* RBR: Count of tables to lock */
table_mapping m_table_map; /* RBR: Mapping table-id to table */
/*
Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
@ -306,6 +308,18 @@ typedef struct st_relay_log_info
void transaction_end(THD*);
void cleanup_context(THD *, bool);
void clear_tables_to_lock() {
TABLE_LIST *ptr= tables_to_lock;
while (ptr)
{
char *to_free= reinterpret_cast<char*>(ptr);
ptr= ptr->next_global;
my_free(to_free, MYF(MY_WME));
}
tables_to_lock= 0;
tables_to_lock_count= 0;
}
time_t unsafe_to_stop_at;
} RELAY_LOG_INFO;

View File

@ -1444,6 +1444,8 @@ static int init_relay_log_info(RELAY_LOG_INFO* rli,
rli->abort_pos_wait=0;
rli->log_space_limit= relay_log_space_limit;
rli->log_space_total= 0;
rli->tables_to_lock= 0;
rli->tables_to_lock_count= 0;
/*
The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
@ -2301,6 +2303,7 @@ st_relay_log_info::st_relay_log_info()
abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), m_reload_flags(RELOAD_NONE_F),
tables_to_lock(0), tables_to_lock_count(0),
unsafe_to_stop_at(0)
{
group_relay_log_name[0]= event_relay_log_name[0]=
@ -4950,6 +4953,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error)
}
m_table_map.clear_tables();
close_thread_tables(thd);
clear_tables_to_lock();
unsafe_to_stop_at= 0;
}
#endif

View File

@ -3144,16 +3144,6 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc,
}
grant_option=TRUE;
thd->mem_root= old_root;
/*
This flush is here only becuase there is code that writes rows to
system tables after executing a binlog_query().
TODO: Ensure that no writes are executed after a binlog_query() by
moving the writes to before calling binlog_query(). Then remove
this line (and add an assert inside send_ok() that checks that
everything is in a consistent state).
*/
thd->binlog_flush_pending_rows_event(true);
rw_unlock(&LOCK_grant);
if (!result && !no_error)
send_ok(thd);

View File

@ -1031,21 +1031,18 @@ void close_thread_tables(THD *thd, bool lock_in_use, bool skip_derived)
/* Fallthrough */
}
/*
For RBR: before calling close_thread_tables(), storage engines
should autocommit. Hence if there is a a pending event, it belongs
to a non-transactional engine, which writes directly to the table,
and should therefore be flushed before unlocking and closing the
tables. The test above for locked tables will not be triggered
since RBR locks and unlocks tables on a per-event basis.
TODO (WL#3023): Change the semantics so that RBR does not lock and
unlock tables on a per-event basis.
*/
thd->binlog_flush_pending_rows_event(true);
if (thd->lock)
{
/*
For RBR we flush the pending event just before we unlock all the
tables. This means that we are at the end of a topmost
statement, so we ensure that the STMT_END_F flag is set on the
pending event. For statements that are *inside* stored
functions, the pending event will not be flushed: that will be
handled either before writing a query log event (inside
binlog_query()) or when preparing a pending event.
*/
thd->binlog_flush_pending_rows_event(true);
mysql_unlock_tables(thd, thd->lock);
thd->lock=0;
}
@ -1057,7 +1054,8 @@ void close_thread_tables(THD *thd, bool lock_in_use, bool skip_derived)
saves some work in 2pc too)
see also sql_parse.cc - dispatch_command()
*/
bzero(&thd->transaction.stmt, sizeof(thd->transaction.stmt));
if (!(thd->state_flags & Open_tables_state::BACKUPS_AVAIL))
bzero(&thd->transaction.stmt, sizeof(thd->transaction.stmt));
if (!thd->active_transaction())
thd->transaction.xid_state.xid.null();

View File

@ -162,7 +162,7 @@ bool foreign_key_prefix(Key *a, Key *b)
****************************************************************************/
Open_tables_state::Open_tables_state(ulong version_arg)
:version(version_arg)
:version(version_arg), state_flags(0U)
{
reset_open_tables_state();
}
@ -178,7 +178,8 @@ THD::THD()
:Statement(CONVENTIONAL_EXECUTION, 0, ALLOC_ROOT_MIN_BLOCK_SIZE, 0),
Open_tables_state(refresh_version), rli_fake(0),
lock_id(&main_lock_id),
user_time(0), in_sub_stmt(0), global_read_lock(0), is_fatal_error(0),
user_time(0), in_sub_stmt(0), binlog_table_maps(0),
global_read_lock(0), is_fatal_error(0),
rand_used(0), time_zone_used(0),
last_insert_id_used(0), insert_id_used(0), clear_next_insert_id(0),
in_lock_tables(0), bootstrap(0), derived_tables_processing(FALSE),
@ -1910,6 +1911,7 @@ void THD::reset_n_backup_open_tables_state(Open_tables_state *backup)
DBUG_ENTER("reset_n_backup_open_tables_state");
backup->set_open_tables_state(this);
reset_open_tables_state();
state_flags|= Open_tables_state::BACKUPS_AVAIL;
DBUG_VOID_RETURN;
}
@ -1976,25 +1978,6 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
backup->client_capabilities= client_capabilities;
backup->savepoints= transaction.savepoints;
#ifdef HAVE_ROW_BASED_REPLICATION
/*
For row-based replication and before executing a function/trigger,
the pending rows event has to be flushed. The function/trigger
might execute statement that require the pending event to be
flushed. A simple example:
CREATE FUNCTION foo() RETURNS INT
BEGIN
SAVEPOINT x;
RETURN 0;
END
INSERT INTO t1 VALUES (1), (foo()), (2);
*/
if (binlog_row_based)
binlog_flush_pending_rows_event(false);
#endif /* HAVE_ROW_BASED_REPLICATION */
if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
!binlog_row_based)
options&= ~OPTION_BIN_LOG;
@ -2176,6 +2159,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
bool is_transactional,
RowsEventT *hint __attribute__((unused)))
{
DBUG_ENTER("binlog_prepare_pending_rows_event");
/* Pre-conditions */
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
@ -2187,12 +2171,12 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
have to do it here.
*/
if (binlog_setup_trx_data())
return NULL;
DBUG_RETURN(NULL);
Rows_log_event* pending= binlog_get_pending_rows_event();
if (unlikely(pending && !pending->is_valid()))
return NULL;
DBUG_RETURN(NULL);
/*
Check if the current event is non-NULL and a write-rows
@ -2217,7 +2201,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
ev= new RowsEventT(this, table, table->s->table_map_id, cols,
is_transactional);
if (unlikely(!ev))
return NULL;
DBUG_RETURN(NULL);
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
/*
flush the pending event and replace it with the newly created
@ -2226,16 +2210,16 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
{
delete ev;
return NULL;
DBUG_RETURN(NULL);
}
return ev; /* This is the new pending event */
DBUG_RETURN(ev); /* This is the new pending event */
}
return pending; /* This is the current pending event */
DBUG_RETURN(pending); /* This is the current pending event */
}
/*
Instansiate the versions we need, we have -fno-implicit-template as
Instantiate the versions we need, we have -fno-implicit-template as
compiling option.
*/
template Rows_log_event*
@ -2499,15 +2483,34 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
binlog_table_maps= 0;
}
/*
We only bother to set the pending event if it is non-NULL. This
is essential for correctness, since there is not necessarily a
trx_data created for the thread if the pending event is NULL.
*/
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
}
else if (stmt_end && binlog_table_maps > 0)
{ /* there is no pending event at this point */
/*
If pending is null and we are going to end the statement, we
have to write an extra, empty, binrow event so that the slave
knows to discard the tables it has received. Otherwise, the
table maps written this far will be included in the table maps
for the following statement.
See if we can replace this with a dummy, maybe constant, event.
*/
#if 0
static unsigned char memory[sizeof(Write_rows_log_event)];
void *const ptr= &memory;
#endif
Rows_log_event *ev=
new Write_rows_log_event(this, 0, ULONG_MAX, 0, FALSE);
ev->set_flags(Rows_log_event::STMT_END_F);
binlog_set_pending_rows_event(ev);
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
binlog_table_maps= 0;
}
DBUG_RETURN(error);
}
@ -2533,6 +2536,17 @@ void THD::binlog_delete_pending_rows_event()
functions have been issued, but before tables are unlocked and
closed.
OBSERVE
There shall be no writes to any system table after calling
binlog_query(), so these writes has to be moved to before the call
of binlog_query() for correct functioning.
This is necessesary not only for RBR, but the master might crash
after binlogging the query but before changing the system tables.
This means that the slave and the master are not in the same state
(after the master has restarted), so therefore we have to
eliminate this problem.
RETURN VALUE
Error code, or 0 if no error.
*/
@ -2542,7 +2556,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype,
{
DBUG_ENTER("THD::binlog_query");
DBUG_ASSERT(query && mysql_bin_log.is_open());
int error= binlog_flush_pending_rows_event(true);
switch (qtype)
{
case THD::MYSQL_QUERY_TYPE:
@ -2556,19 +2570,41 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype,
*/
case THD::ROW_QUERY_TYPE:
if (binlog_row_based)
DBUG_RETURN(binlog_flush_pending_rows_event(true));
{
/*
If thd->lock is set, then we are not inside a stored function.
In that case, mysql_unlock_tables() will be called after this
binlog_query(), so we have to flush the pending rows event
with the STMT_END_F set to unlock all tables at the slave side
as well.
We will not flush the pending event, if thd->lock is NULL.
This means that we are inside a stored function or trigger, so
the flushing will be done inside the top-most
close_thread_tables().
*/
if (this->lock)
DBUG_RETURN(binlog_flush_pending_rows_event(TRUE));
DBUG_RETURN(0);
}
/* Otherwise, we fall through */
case THD::STMT_QUERY_TYPE:
/*
Most callers of binlog_query() ignore the error code, assuming
that the statement will always be written to the binlog. In
case of error above, we therefore just continue and write the
statement to the binary log.
The MYSQL_LOG::write() function will set the STMT_END_F flag and
flush the pending rows event if necessary.
*/
{
Query_log_event qinfo(this, query, query_len, is_trans, suppress_use);
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
DBUG_RETURN(mysql_bin_log.write(&qinfo));
/*
Binlog table maps will be irrelevant after a Query_log_event
(they are just removed on the slave side) so after the query
log event is written to the binary log, we pretend that no
table maps were written.
*/
int error= mysql_bin_log.write(&qinfo);
binlog_table_maps= 0;
DBUG_RETURN(error);
}
break;

View File

@ -735,11 +735,20 @@ public:
ulong version;
uint current_tablenr;
enum enum_flags {
BACKUPS_AVAIL = (1U << 0) /* There are backups available */
};
/*
Flags with information about the open tables state.
*/
uint state_flags;
/*
This constructor serves for creation of Open_tables_state instances
which are used as backup storage.
*/
Open_tables_state() {};
Open_tables_state() : state_flags(0U) { }
Open_tables_state(ulong version_arg);
@ -753,6 +762,7 @@ public:
open_tables= temporary_tables= handler_tables= derived_tables= 0;
lock= locked_tables= 0;
prelocked_mode= NON_PRELOCKED;
state_flags= 0U;
}
};
@ -900,8 +910,9 @@ public:
#ifndef MYSQL_CLIENT
/*
Public interface to write rows to the binlog
Public interface to write RBR events to the binlog
*/
int binlog_write_table_map(TABLE *table, bool is_transactional);
int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, my_size_t colcnt,
const byte *buf);
@ -945,6 +956,11 @@ public:
int binlog_flush_pending_rows_event(bool stmt_end);
void binlog_delete_pending_rows_event();
private:
uint binlog_table_maps; // Number of table maps currently in the binlog
public:
#endif
#endif /* HAVE_ROW_BASED_REPLICATION */
#ifndef MYSQL_CLIENT
@ -1543,7 +1559,6 @@ class select_create: public select_insert {
HA_CREATE_INFO *create_info;
MYSQL_LOCK *lock;
Field **field;
bool create_table_written;
public:
select_create (TABLE_LIST *table,
HA_CREATE_INFO *create_info_par,
@ -1552,11 +1567,11 @@ public:
List<Item> &select_fields,enum_duplicates duplic, bool ignore)
:select_insert (NULL, NULL, &select_fields, 0, 0, duplic, ignore), create_table(table),
extra_fields(&fields_par),keys(&keys_par), create_info(create_info_par),
lock(0), create_table_written(FALSE)
lock(0)
{}
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
void binlog_show_create_table();
void binlog_show_create_table(TABLE **tables, uint count);
void store_values(List<Item> &values);
void send_error(uint errcode,const char *err);
bool send_eof();

View File

@ -1711,7 +1711,7 @@ pthread_handler_t handle_delayed_insert(void *arg)
{
thd->fatal_error();
strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
goto end;
goto err;
}
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
sigset_t set;
@ -1724,13 +1724,13 @@ pthread_handler_t handle_delayed_insert(void *arg)
if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
{
thd->fatal_error(); // Abort waiting inserts
goto end;
goto err;
}
if (!(di->table->file->table_flags() & HA_CAN_INSERT_DELAYED))
{
thd->fatal_error();
my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
goto end;
goto err;
}
di->table->copy_blobs=1;
@ -1859,6 +1859,16 @@ pthread_handler_t handle_delayed_insert(void *arg)
pthread_cond_broadcast(&di->cond_client); // If waiting clients
}
err:
/*
mysql_lock_tables() can potentially start a transaction and write
a table map. In the event of an error, that transaction has to be
rolled back. We only need to roll back a potential statement
transaction, since real transactions are rolled back in
close_thread_tables().
*/
ha_rollback_stmt(thd);
end:
/*
di should be unlinked from the thread handler list and have no active
@ -2493,9 +2503,25 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
{
DBUG_ENTER("select_create::prepare");
class MY_HOOKS : public TABLEOP_HOOKS {
public:
MY_HOOKS(select_create *x) : ptr(x) { }
virtual void do_prelock(TABLE **tables, uint count)
{
if (binlog_row_based)
ptr->binlog_show_create_table(tables, count);
}
private:
select_create *ptr;
};
MY_HOOKS hooks(this);
unit= u;
table= create_table_from_items(thd, create_info, create_table,
extra_fields, keys, &values, &lock);
extra_fields, keys, &values, &lock,
&hooks);
if (!table)
DBUG_RETURN(-1); // abort() deletes table
@ -2533,7 +2559,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
void
select_create::binlog_show_create_table()
select_create::binlog_show_create_table(TABLE **tables, uint count)
{
/*
Note 1: In RBR mode, we generate a CREATE TABLE statement for the
@ -2556,18 +2582,19 @@ select_create::binlog_show_create_table()
on rollback, we clear the OPTION_STATUS_NO_TRANS_UPDATE bit of
thd->options.
*/
DBUG_ASSERT(binlog_row_based && !create_table_written);
DBUG_ASSERT(binlog_row_based);
DBUG_ASSERT(tables && *tables && count > 0);
thd->options&= ~OPTION_STATUS_NO_TRANS_UPDATE;
char buf[2048];
String query(buf, sizeof(buf), system_charset_info);
query.length(0); // Have to zero it since constructor doesn't
TABLE_LIST tables;
memset(&tables, 0, sizeof(tables));
tables.table = table;
TABLE_LIST table_list;
memset(&table_list, 0, sizeof(table_list));
table_list.table = *tables;
int result= store_create_info(thd, &tables, &query, create_info);
int result= store_create_info(thd, &table_list, &query, create_info);
DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
thd->binlog_query(THD::STMT_QUERY_TYPE,
query.ptr(), query.length(),
@ -2578,16 +2605,6 @@ select_create::binlog_show_create_table()
void select_create::store_values(List<Item> &values)
{
/*
Before writing the first row, we write the CREATE TABLE statement
to the binlog.
*/
if (binlog_row_based && !create_table_written)
{
binlog_show_create_table();
create_table_written= TRUE;
}
fill_record_n_invoke_before_triggers(thd, field, values, 1,
table->triggers, TRG_EVENT_INSERT);
}
@ -2607,16 +2624,6 @@ void select_create::send_error(uint errcode,const char *err)
bool select_create::send_eof()
{
/*
If no rows where written to the binary log, we write the CREATE
TABLE statement to the binlog.
*/
if (binlog_row_based && !create_table_written)
{
binlog_show_create_table();
create_table_written= TRUE;
}
bool tmp=select_insert::send_eof();
if (tmp)
abort();

View File

@ -414,19 +414,6 @@ bool mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
#ifndef EMBEDDED_LIBRARY
if (mysql_bin_log.is_open())
{
#ifdef HAVE_ROW_BASED_REPLICATION
/*
We need to do the job that is normally done inside
binlog_query() here, which is to ensure that the pending event
is written before tables are unlocked and before any other
events are written. We also need to update the table map
version for the binary log to mark that table maps are invalid
after this point.
*/
if (binlog_row_based)
thd->binlog_flush_pending_rows_event(true);
else
#endif
{
/*
Make sure last block (the one which caused the error) gets

View File

@ -2069,7 +2069,8 @@ TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
List<create_field> *extra_fields,
List<Key> *keys,
List<Item> *items,
MYSQL_LOCK **lock)
MYSQL_LOCK **lock,
TABLEOP_HOOKS *hooks)
{
TABLE tmp_table; // Used during 'create_field()'
TABLE_SHARE share;
@ -2148,6 +2149,7 @@ TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
save us from that ?
*/
table->reginfo.lock_type=TL_WRITE;
hooks->prelock(&table, 1); // Call prelock hooks
if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
{

View File

@ -1415,7 +1415,7 @@ bool multi_update::send_data(List<Item> &not_used_values)
memcpy((char*) tmp_table->field[0]->ptr,
(char*) table->file->ref, table->file->ref_length);
/* Write row, ignoring duplicated updates to a row */
if (error= tmp_table->file->ha_write_row(tmp_table->record[0]))
if ((error= tmp_table->file->ha_write_row(tmp_table->record[0])))
{
if (error != HA_ERR_FOUND_DUPP_KEY &&
error != HA_ERR_FOUND_DUPP_UNIQUE &&

View File

@ -533,7 +533,8 @@ typedef struct st_table_list
struct st_table_list *next_name_resolution_table;
/* Index names in a "... JOIN ... USE/IGNORE INDEX ..." clause. */
List<String> *use_index, *ignore_index;
TABLE *table; /* opened table */
TABLE *table; /* opened table */
uint table_id; /* table id (from binlog) for opened table */
/*
select_result for derived table to pass it from table creation to table
filling procedure