mirror of
https://github.com/MariaDB/server.git
synced 2025-08-09 22:24:09 +03:00
Clean up and speed up interfaces for binary row logging
MDEV-21605 Clean up and speed up interfaces for binary row logging MDEV-21617 Bug fix for previous version of this code The intention is to have as few 'if' as possible in ha_write() and related functions. This is done by pre-calculating once per statement the row_logging state for all tables. Benefits are simpler and faster code both when binary logging is disabled and when it's enabled. Changes: - Added handler->row_logging to make it easy to check it table should be row logged. This also made it easier to disabling row logging for system, internal and temporary tables. - The tables row_logging capabilities are checked once per "statements that updates tables" in THD::binlog_prepare_for_row_logging() which is called when needed from THD::decide_logging_format(). - Removed most usage of tmp_disable_binlog(), reenable_binlog() and temporary saving and setting of thd->variables.option_bits. - Moved checks that can't change during a statement from check_table_binlog_row_based() to check_table_binlog_row_based_internal() - Removed flag row_already_logged (used by sequence engine) - Moved binlog_log_row() to a handler:: - Moved write_locked_table_maps() to THD::binlog_write_table_maps() as most other related binlog functions are in THD. - Removed binlog_write_table_map() and binlog_log_row_internal() as they are now obsolete as 'has_transactions()' is pre-calculated in prepare_for_row_logging(). - Remove 'is_transactional' argument from binlog_write_table_map() as this can now be read from handler. - Changed order of 'if's in handler::external_lock() and wsrep_mysqld.h to first evaluate fast and likely cases before more complex ones. - Added error checking in ha_write_row() and related functions if binlog_log_row() failed. - Don't clear check_table_binlog_row_based_result in clear_cached_table_binlog_row_based_flag() as it's not needed. - THD::clear_binlog_table_maps() has been replaced with THD::reset_binlog_for_next_statement() - Added 'MYSQL_OPEN_IGNORE_LOGGING_FORMAT' flag to open_and_lock_tables() to avoid calculating of binary log format for internal opens. This flag is also used to avoid reading statistics tables for internal tables. - Added OPTION_BINLOG_LOG_OFF as a simple way to turn of binlog temporary for create (instead of using THD::sql_log_bin_off. - Removed flag THD::sql_log_bin_off (not needed anymore) - Speed up THD::decide_logging_format() by remembering if blackhole engine is used and avoid a loop over all tables if it's not used (the common case). - THD::decide_logging_format() is not called anymore if no tables are used for the statement. This will speed up pure stored procedure code with about 5%+ according to some simple tests. - We now get annotated events on slave if a CREATE ... SELECT statement is transformed on the slave from statement to row logging. - In the original code, the master could come into a state where row logging is enforced for all future events if statement could be used. This is now partly fixed. Other changes: - Ensure that all tables used by a statement has query_id set. - Had to restore the row_logging flag for not used tables in THD::binlog_write_table_maps (not normal scenario) - Removed injector::transaction::use_table(server_id_type sid, table tbl) as it's not used. - Cleaned up set_slave_thread_options() - Some more DBUG_ENTER/DBUG_RETURN, code comments and minor indentation changes. - Ensure we only call THD::decide_logging_format_low() once in mysql_insert() (inefficiency). - Don't annotate INSERT DELAYED - Removed zeroing pos_in_table_list in THD::open_temporary_table() as it's already 0
This commit is contained in:
22
mysql-test/suite/binlog/t/foreign_key.test
Normal file
22
mysql-test/suite/binlog/t/foreign_key.test
Normal file
@@ -0,0 +1,22 @@
|
||||
--source include/have_innodb.inc
|
||||
--source include/have_binlog_format_row.inc
|
||||
|
||||
reset master;
|
||||
|
||||
CREATE TABLE t1 (
|
||||
id INT,
|
||||
k INT,
|
||||
c CHAR(8),
|
||||
KEY (k),
|
||||
PRIMARY KEY (id),
|
||||
FOREIGN KEY (id) REFERENCES t1 (k)
|
||||
) ENGINE=InnoDB;
|
||||
LOCK TABLES t1 WRITE;
|
||||
SET SESSION FOREIGN_KEY_CHECKS= OFF;
|
||||
SET AUTOCOMMIT=OFF;
|
||||
INSERT INTO t1 VALUES (1,1,'foo');
|
||||
DROP TABLE t1;
|
||||
SET SESSION FOREIGN_KEY_CHECKS= ON;
|
||||
SET AUTOCOMMIT=ON;
|
||||
|
||||
source include/show_binlog_events.inc;
|
@@ -85,9 +85,12 @@ master-bin.000001 # Gtid # # GTID #-#-#
|
||||
master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */
|
||||
master-bin.000001 # Gtid # # GTID #-#-#
|
||||
master-bin.000001 # Query # # use `test`; create table t1 (a int)
|
||||
master-bin.000001 # Gtid # # BEGIN GTID #-#-#
|
||||
master-bin.000001 # Gtid # # GTID #-#-#
|
||||
master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `test`.`t1`/* Generated to handle failed CREATE OR REPLACE */
|
||||
master-bin.000001 # Query # # ROLLBACK
|
||||
master-bin.000001 # Gtid # # GTID #-#-#
|
||||
master-bin.000001 # Query # # use `test`; create temporary table t9 (a int)
|
||||
master-bin.000001 # Gtid # # GTID #-#-#
|
||||
master-bin.000001 # Query # # DROP TEMPORARY TABLE IF EXISTS `test`.`t9`/* Generated to handle failed CREATE OR REPLACE */
|
||||
connection server_2;
|
||||
show tables;
|
||||
Tables_in_test
|
||||
@@ -154,7 +157,7 @@ slave-bin.000001 # Query # # use `test`; create table t4 (server_2_to_be_delete
|
||||
slave-bin.000001 # Gtid # # GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; create table t1 (new_table int)
|
||||
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` (
|
||||
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
|
||||
`a` int(11) DEFAULT NULL
|
||||
)
|
||||
slave-bin.000001 # Annotate_rows # # create table t2 select * from t9
|
||||
@@ -223,26 +226,12 @@ Log_name Pos Event_type Server_id End_log_pos Info
|
||||
slave-bin.000001 # Gtid # # GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; create table t1 (a int)
|
||||
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
|
||||
slave-bin.000001 # Annotate_rows # # insert into t1 values (0),(1),(2)
|
||||
slave-bin.000001 # Table_map # # table_id: # (test.t1)
|
||||
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
slave-bin.000001 # Query # # use `test`; insert into t1 values (0),(1),(2)
|
||||
slave-bin.000001 # Query # # COMMIT
|
||||
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` (
|
||||
`a` int(11) DEFAULT NULL
|
||||
) ENGINE=MyISAM
|
||||
slave-bin.000001 # Annotate_rows # # create table t2 engine=myisam select * from t1
|
||||
slave-bin.000001 # Table_map # # table_id: # (test.t2)
|
||||
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
slave-bin.000001 # Query # # COMMIT
|
||||
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
|
||||
`a` int(11) DEFAULT NULL
|
||||
) ENGINE=InnoDB
|
||||
slave-bin.000001 # Annotate_rows # # create or replace table t2 engine=innodb select * from t1
|
||||
slave-bin.000001 # Table_map # # table_id: # (test.t2)
|
||||
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
slave-bin.000001 # Xid # # COMMIT /* XID */
|
||||
slave-bin.000001 # Gtid # # GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; create table t2 engine=myisam select * from t1
|
||||
slave-bin.000001 # Gtid # # GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; create or replace table t2 engine=innodb select * from t1
|
||||
connection server_1;
|
||||
drop table t1;
|
||||
#
|
||||
|
@@ -160,6 +160,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
|
||||
`a` int(11) DEFAULT NULL
|
||||
)
|
||||
slave-bin.000001 # Annotate_rows # # create table t2 select * from t9
|
||||
slave-bin.000001 # Table_map # # table_id: # (test.t2)
|
||||
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
slave-bin.000001 # Query # # COMMIT
|
||||
@@ -171,6 +172,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
|
||||
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t5` (
|
||||
`a` int(11) DEFAULT NULL
|
||||
)
|
||||
slave-bin.000001 # Annotate_rows # # create table t5 select * from t9
|
||||
slave-bin.000001 # Table_map # # table_id: # (test.t5)
|
||||
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
slave-bin.000001 # Query # # COMMIT
|
||||
|
@@ -56,6 +56,7 @@ create or replace table t1 (a int primary key) select a from t2;
|
||||
|
||||
# Same with temporary table
|
||||
create temporary table t9 (a int);
|
||||
|
||||
--error ER_DUP_ENTRY
|
||||
create or replace temporary table t9 (a int primary key) select a from t2;
|
||||
|
||||
|
@@ -1241,15 +1241,9 @@ Events::load_events_from_db(THD *thd)
|
||||
TRUE);
|
||||
|
||||
/* All the dmls to mysql.events tables are stmt bin-logged. */
|
||||
bool save_binlog_row_based;
|
||||
if ((save_binlog_row_based= thd->is_current_stmt_binlog_format_row()))
|
||||
thd->set_current_stmt_binlog_format_stmt();
|
||||
|
||||
table->file->row_logging= 0;
|
||||
(void) table->file->ha_update_row(table->record[1], table->record[0]);
|
||||
|
||||
if (save_binlog_row_based)
|
||||
thd->set_current_stmt_binlog_format_row();
|
||||
|
||||
delete et;
|
||||
continue;
|
||||
}
|
||||
|
@@ -2131,12 +2131,10 @@ int ha_partition::copy_partitions(ulonglong * const copied,
|
||||
}
|
||||
else
|
||||
{
|
||||
THD *thd= ha_thd();
|
||||
/* Copy record to new handler */
|
||||
(*copied)++;
|
||||
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
|
||||
DBUG_ASSERT(!m_new_file[new_part]->row_logging);
|
||||
result= m_new_file[new_part]->ha_write_row(m_rec0);
|
||||
reenable_binlog(thd);
|
||||
if (result)
|
||||
goto error;
|
||||
}
|
||||
@@ -4374,11 +4372,10 @@ int ha_partition::write_row(const uchar * buf)
|
||||
|
||||
start_part_bulk_insert(thd, part_id);
|
||||
|
||||
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
|
||||
DBUG_ASSERT(!m_file[part_id]->row_logging);
|
||||
error= m_file[part_id]->ha_write_row(buf);
|
||||
if (have_auto_increment && !table->s->next_number_keypart)
|
||||
set_auto_increment_if_higher(table->next_number_field);
|
||||
reenable_binlog(thd);
|
||||
|
||||
exit:
|
||||
table->auto_increment_field_not_null= saved_auto_inc_field_not_null;
|
||||
@@ -4457,12 +4454,11 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data)
|
||||
|
||||
m_last_part= new_part_id;
|
||||
start_part_bulk_insert(thd, new_part_id);
|
||||
DBUG_ASSERT(!m_file[new_part_id]->row_logging);
|
||||
if (new_part_id == old_part_id)
|
||||
{
|
||||
DBUG_PRINT("info", ("Update in partition %u", (uint) new_part_id));
|
||||
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
|
||||
error= m_file[new_part_id]->ha_update_row(old_data, new_data);
|
||||
reenable_binlog(thd);
|
||||
goto exit;
|
||||
}
|
||||
else
|
||||
@@ -4481,16 +4477,12 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data)
|
||||
table->next_number_field= NULL;
|
||||
DBUG_PRINT("info", ("Update from partition %u to partition %u",
|
||||
(uint) old_part_id, (uint) new_part_id));
|
||||
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
|
||||
error= m_file[new_part_id]->ha_write_row((uchar*) new_data);
|
||||
reenable_binlog(thd);
|
||||
table->next_number_field= saved_next_number_field;
|
||||
if (unlikely(error))
|
||||
goto exit;
|
||||
|
||||
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
|
||||
error= m_file[old_part_id]->ha_delete_row(old_data);
|
||||
reenable_binlog(thd);
|
||||
if (unlikely(error))
|
||||
goto exit;
|
||||
}
|
||||
@@ -4592,9 +4584,8 @@ int ha_partition::delete_row(const uchar *buf)
|
||||
if (!bitmap_is_set(&(m_part_info->lock_partitions), m_last_part))
|
||||
DBUG_RETURN(HA_ERR_NOT_IN_LOCK_PARTITIONS);
|
||||
|
||||
tmp_disable_binlog(thd);
|
||||
DBUG_ASSERT(!m_file[m_last_part]->row_logging);
|
||||
error= m_file[m_last_part]->ha_delete_row(buf);
|
||||
reenable_binlog(thd);
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
|
@@ -202,7 +202,11 @@ int ha_sequence::write_row(const uchar *buf)
|
||||
DBUG_ENTER("ha_sequence::write_row");
|
||||
DBUG_ASSERT(table->record[0] == buf);
|
||||
|
||||
row_already_logged= 0;
|
||||
/*
|
||||
Log to binary log even if this function has been called before
|
||||
(The function ends by setting row_logging to 0)
|
||||
*/
|
||||
row_logging= row_logging_init;
|
||||
if (unlikely(sequence->initialized == SEQUENCE::SEQ_IN_PREPARE))
|
||||
{
|
||||
/* This calls is from ha_open() as part of create table */
|
||||
@@ -218,6 +222,7 @@ int ha_sequence::write_row(const uchar *buf)
|
||||
sequence->copy(&tmp_seq);
|
||||
if (likely(!(error= file->write_row(buf))))
|
||||
sequence->initialized= SEQUENCE::SEQ_READY_TO_USE;
|
||||
row_logging= 0;
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
if (unlikely(sequence->initialized != SEQUENCE::SEQ_READY_TO_USE))
|
||||
@@ -262,10 +267,12 @@ int ha_sequence::write_row(const uchar *buf)
|
||||
sequence->copy(&tmp_seq);
|
||||
rows_changed++;
|
||||
/* We have to do the logging while we hold the sequence mutex */
|
||||
error= binlog_log_row(table, 0, buf, log_func);
|
||||
row_already_logged= 1;
|
||||
if (row_logging)
|
||||
error= binlog_log_row(table, 0, buf, log_func);
|
||||
}
|
||||
|
||||
/* Row is already logged, don't log it again in ha_write_row() */
|
||||
row_logging= 0;
|
||||
sequence->all_values_used= 0;
|
||||
if (!sequence_locked)
|
||||
sequence->write_unlock(table);
|
||||
|
315
sql/handler.cc
315
sql/handler.cc
@@ -3620,8 +3620,9 @@ int handler::update_auto_increment()
|
||||
variables->auto_increment_increment);
|
||||
auto_inc_intervals_count++;
|
||||
/* Row-based replication does not need to store intervals in binlog */
|
||||
if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open())
|
||||
&& !thd->is_current_stmt_binlog_format_row())
|
||||
if (((WSREP_NNULL(thd) && wsrep_emulate_bin_log) ||
|
||||
mysql_bin_log.is_open()) &&
|
||||
!thd->is_current_stmt_binlog_format_row())
|
||||
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
|
||||
append(auto_inc_interval_for_cur_row.minimum(),
|
||||
auto_inc_interval_for_cur_row.values(),
|
||||
@@ -6366,32 +6367,35 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat)
|
||||
1 Row needs to be logged
|
||||
*/
|
||||
|
||||
bool handler::check_table_binlog_row_based(bool binlog_row)
|
||||
bool handler::check_table_binlog_row_based()
|
||||
{
|
||||
if (table->versioned(VERS_TRX_ID))
|
||||
return false;
|
||||
if (unlikely((table->in_use->variables.sql_log_bin_off)))
|
||||
return 0; /* Called by partitioning engine */
|
||||
#ifdef WITH_WSREP
|
||||
if (!table->in_use->variables.sql_log_bin &&
|
||||
wsrep_thd_is_applying(table->in_use))
|
||||
return 0; /* wsrep patch sets sql_log_bin to silence binlogging
|
||||
from high priority threads */
|
||||
#endif /* WITH_WSREP */
|
||||
if (unlikely((!check_table_binlog_row_based_done)))
|
||||
{
|
||||
check_table_binlog_row_based_done= 1;
|
||||
check_table_binlog_row_based_result=
|
||||
check_table_binlog_row_based_internal(binlog_row);
|
||||
check_table_binlog_row_based_internal();
|
||||
}
|
||||
return check_table_binlog_row_based_result;
|
||||
}
|
||||
|
||||
bool handler::check_table_binlog_row_based_internal(bool binlog_row)
|
||||
bool handler::check_table_binlog_row_based_internal()
|
||||
{
|
||||
THD *thd= table->in_use;
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
if (!thd->variables.sql_log_bin &&
|
||||
wsrep_thd_is_applying(table->in_use))
|
||||
{
|
||||
/*
|
||||
wsrep patch sets sql_log_bin to silence binlogging from high
|
||||
priority threads
|
||||
*/
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
return (table->s->can_do_row_logging &&
|
||||
!table->versioned(VERS_TRX_ID) &&
|
||||
!(thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) &&
|
||||
thd->is_current_stmt_binlog_format_row() &&
|
||||
/*
|
||||
Wsrep partially enables binary logging if it have not been
|
||||
@@ -6407,9 +6411,9 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
|
||||
|
||||
Otherwise, return 'true' if binary logging is on.
|
||||
*/
|
||||
IF_WSREP(((WSREP_EMULATE_BINLOG(thd) &&
|
||||
IF_WSREP(((WSREP_EMULATE_BINLOG_NNULL(thd) &&
|
||||
wsrep_thd_is_local(thd)) ||
|
||||
((WSREP(thd) ||
|
||||
((WSREP_NNULL(thd) ||
|
||||
(thd->variables.option_bits & OPTION_BIN_LOG)) &&
|
||||
mysql_bin_log.is_open())),
|
||||
(thd->variables.option_bits & OPTION_BIN_LOG) &&
|
||||
@@ -6417,151 +6421,22 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
|
||||
}
|
||||
|
||||
|
||||
/** @brief
|
||||
Write table maps for all (manually or automatically) locked tables
|
||||
to the binary log. Also, if binlog_annotate_row_events is ON,
|
||||
write Annotate_rows event before the first table map.
|
||||
|
||||
SYNOPSIS
|
||||
write_locked_table_maps()
|
||||
thd Pointer to THD structure
|
||||
|
||||
DESCRIPTION
|
||||
This function will generate and write table maps for all tables
|
||||
that are locked by the thread 'thd'.
|
||||
|
||||
RETURN VALUE
|
||||
0 All OK
|
||||
1 Failed to write all table maps
|
||||
|
||||
SEE ALSO
|
||||
THD::lock
|
||||
*/
|
||||
|
||||
static int write_locked_table_maps(THD *thd)
|
||||
int handler::binlog_log_row(TABLE *table,
|
||||
const uchar *before_record,
|
||||
const uchar *after_record,
|
||||
Log_func *log_func)
|
||||
{
|
||||
DBUG_ENTER("write_locked_table_maps");
|
||||
DBUG_PRINT("enter", ("thd:%p thd->lock:%p "
|
||||
"thd->extra_lock: %p",
|
||||
thd, thd->lock, thd->extra_lock));
|
||||
bool error;
|
||||
THD *thd= table->in_use;
|
||||
DBUG_ENTER("binlog_log_row");
|
||||
|
||||
DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps()));
|
||||
if (!thd->binlog_table_maps &&
|
||||
thd->binlog_write_table_maps())
|
||||
DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);
|
||||
|
||||
MYSQL_LOCK *locks[2];
|
||||
locks[0]= thd->extra_lock;
|
||||
locks[1]= thd->lock;
|
||||
my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd),
|
||||
true) &&
|
||||
thd->variables.binlog_annotate_row_events &&
|
||||
thd->query() && thd->query_length();
|
||||
|
||||
for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
|
||||
{
|
||||
MYSQL_LOCK const *const lock= locks[i];
|
||||
if (lock == NULL)
|
||||
continue;
|
||||
|
||||
TABLE **const end_ptr= lock->table + lock->table_count;
|
||||
for (TABLE **table_ptr= lock->table ;
|
||||
table_ptr != end_ptr ;
|
||||
++table_ptr)
|
||||
{
|
||||
TABLE *const table= *table_ptr;
|
||||
if (table->current_lock == F_WRLCK &&
|
||||
table->file->check_table_binlog_row_based(0))
|
||||
{
|
||||
if (binlog_write_table_map(thd, table, with_annotate))
|
||||
DBUG_RETURN(1);
|
||||
with_annotate= 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate)
|
||||
{
|
||||
DBUG_ENTER("binlog_write_table_map");
|
||||
DBUG_PRINT("info", ("table %s", table->s->table_name.str));
|
||||
/*
|
||||
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
|
||||
(e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
|
||||
compatible behavior with the STMT based replication even when
|
||||
the table is not transactional. In other words, if the operation
|
||||
fails while executing the insert phase nothing is written to the
|
||||
binlog.
|
||||
|
||||
Note that at this point, we check the type of a set of tables to
|
||||
create the table map events. In the function binlog_log_row(),
|
||||
which calls the current function, we check the type of the table
|
||||
of the current row.
|
||||
*/
|
||||
bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
|
||||
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
|
||||
table->file->has_transactions());
|
||||
int const error= thd->binlog_write_table_map(table, has_trans,
|
||||
&with_annotate);
|
||||
/*
|
||||
If an error occurs, it is the responsibility of the caller to
|
||||
roll back the transaction.
|
||||
*/
|
||||
if (unlikely(error))
|
||||
DBUG_RETURN(1);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
static int binlog_log_row_internal(TABLE* table,
|
||||
const uchar *before_record,
|
||||
const uchar *after_record,
|
||||
Log_func *log_func)
|
||||
{
|
||||
bool error= 0;
|
||||
THD *const thd= table->in_use;
|
||||
|
||||
/*
|
||||
If there are no table maps written to the binary log, this is
|
||||
the first row handled in this statement. In that case, we need
|
||||
to write table maps for all locked tables to the binary log.
|
||||
*/
|
||||
if (likely(!(error= ((thd->get_binlog_table_maps() == 0 &&
|
||||
write_locked_table_maps(thd))))))
|
||||
{
|
||||
/*
|
||||
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
|
||||
(i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
|
||||
compatible behavior with the STMT based replication even when
|
||||
the table is not transactional. In other words, if the operation
|
||||
fails while executing the insert phase nothing is written to the
|
||||
binlog. We need the same also for ALTER TABLE in the case we convert
|
||||
a shared table to a not shared table as in this case we will log all
|
||||
rows.
|
||||
*/
|
||||
bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
|
||||
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
|
||||
table->file->has_transactions());
|
||||
error= (*log_func)(thd, table, has_trans, before_record, after_record);
|
||||
}
|
||||
return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
|
||||
}
|
||||
|
||||
int binlog_log_row(TABLE* table, const uchar *before_record,
|
||||
const uchar *after_record, Log_func *log_func)
|
||||
{
|
||||
#ifdef WITH_WSREP
|
||||
THD *const thd= table->in_use;
|
||||
|
||||
/* only InnoDB tables will be replicated through binlog emulation */
|
||||
if ((WSREP_EMULATE_BINLOG(thd) &&
|
||||
!(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
|
||||
thd->wsrep_ignore_table == true)
|
||||
return 0;
|
||||
#endif
|
||||
|
||||
if (!table->file->check_table_binlog_row_based(1))
|
||||
return 0;
|
||||
return binlog_log_row_internal(table, before_record, after_record, log_func);
|
||||
error= (*log_func)(thd, table, row_logging_has_trans,
|
||||
before_record, after_record);
|
||||
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
|
||||
}
|
||||
|
||||
|
||||
@@ -6647,6 +6522,7 @@ int handler::ha_external_lock(THD *thd, int lock_type)
|
||||
int handler::ha_reset()
|
||||
{
|
||||
DBUG_ENTER("ha_reset");
|
||||
|
||||
/* Check that we have called all proper deallocation functions */
|
||||
DBUG_ASSERT((uchar*) table->def_read_set.bitmap +
|
||||
table->s->column_bitmap_size ==
|
||||
@@ -6662,6 +6538,10 @@ int handler::ha_reset()
|
||||
pushed_cond= NULL;
|
||||
tracker= NULL;
|
||||
mark_trx_read_write_done= 0;
|
||||
/*
|
||||
Disable row logging.
|
||||
*/
|
||||
row_logging= row_logging_init= 0;
|
||||
clear_cached_table_binlog_row_based_flag();
|
||||
/* Reset information about pushed engine conditions */
|
||||
cancel_pushed_idx_cond();
|
||||
@@ -6678,8 +6558,8 @@ static int wsrep_after_row(THD *thd)
|
||||
/* enforce wsrep_max_ws_rows */
|
||||
thd->wsrep_affected_rows++;
|
||||
if (wsrep_max_ws_rows &&
|
||||
wsrep_thd_is_local(thd) &&
|
||||
thd->wsrep_affected_rows > wsrep_max_ws_rows)
|
||||
thd->wsrep_affected_rows > wsrep_max_ws_rows &&
|
||||
wsrep_thd_is_local(thd))
|
||||
{
|
||||
trans_rollback_stmt(thd) || trans_rollback(thd);
|
||||
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
|
||||
@@ -6864,6 +6744,74 @@ static int check_duplicate_long_entries_update(TABLE *table, uchar *new_rec)
|
||||
|
||||
|
||||
/**
|
||||
Check if galera disables binary logging for this table
|
||||
|
||||
@return 0 Binary logging disabled
|
||||
@return 1 Binary logging can be enabled
|
||||
*/
|
||||
|
||||
|
||||
static inline bool wsrep_check_if_binlog_row(TABLE *table)
|
||||
{
|
||||
#ifdef WITH_WSREP
|
||||
THD *const thd= table->in_use;
|
||||
|
||||
/* only InnoDB tables will be replicated through binlog emulation */
|
||||
if ((WSREP_EMULATE_BINLOG(thd) &&
|
||||
!(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
|
||||
thd->wsrep_ignore_table == true)
|
||||
return 0;
|
||||
#endif
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Prepare handler for row logging
|
||||
|
||||
@return 0 if handler will not participate in row logging
|
||||
@return 1 handler will participate in row logging
|
||||
|
||||
This function is always safe to call on an opened table.
|
||||
*/
|
||||
|
||||
bool handler::prepare_for_row_logging()
|
||||
{
|
||||
DBUG_ENTER("handler::prepare_for_row_logging");
|
||||
|
||||
/* Check if we should have row logging */
|
||||
if (wsrep_check_if_binlog_row(table) &&
|
||||
check_table_binlog_row_based())
|
||||
{
|
||||
/*
|
||||
Row logging enabled. Intialize all variables and write
|
||||
annotated and table maps
|
||||
*/
|
||||
row_logging= row_logging_init= 1;
|
||||
|
||||
/*
|
||||
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
|
||||
(e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
|
||||
compatible behavior with the STMT based replication even when
|
||||
the table is not transactional. In other words, if the operation
|
||||
fails while executing the insert phase nothing is written to the
|
||||
binlog.
|
||||
*/
|
||||
row_logging_has_trans=
|
||||
((sql_command_flags[table->in_use->lex->sql_command] &
|
||||
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
|
||||
table->file->has_transactions());
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Check row_logging has not been properly cleared from previous command */
|
||||
DBUG_ASSERT(row_logging == 0);
|
||||
}
|
||||
DBUG_RETURN(row_logging);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Do all initialization needed for insert
|
||||
|
||||
@param force_update_handler Set to TRUE if we should always create an
|
||||
@@ -6891,7 +6839,6 @@ int handler::prepare_for_insert(bool force_update_handler)
|
||||
int handler::ha_write_row(const uchar *buf)
|
||||
{
|
||||
int error;
|
||||
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
|
||||
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
|
||||
m_lock_type == F_WRLCK);
|
||||
DBUG_ENTER("handler::ha_write_row");
|
||||
@@ -6911,13 +6858,17 @@ int handler::ha_write_row(const uchar *buf)
|
||||
{ error= write_row(buf); })
|
||||
|
||||
MYSQL_INSERT_ROW_DONE(error);
|
||||
if (likely(!error) && !row_already_logged)
|
||||
if (likely(!error))
|
||||
{
|
||||
rows_changed++;
|
||||
error= binlog_log_row(table, 0, buf, log_func);
|
||||
if (row_logging)
|
||||
{
|
||||
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
|
||||
error= binlog_log_row(table, 0, buf, log_func);
|
||||
}
|
||||
#ifdef WITH_WSREP
|
||||
if (table_share->tmp_table == NO_TMP_TABLE &&
|
||||
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
|
||||
if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
|
||||
!error && (error= wsrep_after_row(ha_thd())))
|
||||
{
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
@@ -6932,10 +6883,8 @@ int handler::ha_write_row(const uchar *buf)
|
||||
int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
|
||||
{
|
||||
int error;
|
||||
Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
|
||||
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
|
||||
m_lock_type == F_WRLCK);
|
||||
|
||||
/*
|
||||
Some storage engines require that the new record is in record[0]
|
||||
(and the old record is in record[1]).
|
||||
@@ -6950,20 +6899,22 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
|
||||
(error= check_duplicate_long_entries_update(table, (uchar*) new_data)))
|
||||
return error;
|
||||
|
||||
TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error,
|
||||
TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, 0,
|
||||
{ error= update_row(old_data, new_data);})
|
||||
|
||||
MYSQL_UPDATE_ROW_DONE(error);
|
||||
if (likely(!error) && !row_already_logged)
|
||||
if (likely(!error))
|
||||
{
|
||||
rows_changed++;
|
||||
error= binlog_log_row(table, old_data, new_data, log_func);
|
||||
#ifdef WITH_WSREP
|
||||
if (table_share->tmp_table == NO_TMP_TABLE &&
|
||||
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
|
||||
if (row_logging)
|
||||
{
|
||||
return error;
|
||||
Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
|
||||
error= binlog_log_row(table, old_data, new_data, log_func);
|
||||
}
|
||||
#ifdef WITH_WSREP
|
||||
if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
|
||||
!error && (error= wsrep_after_row(ha_thd())))
|
||||
return error;
|
||||
#endif /* WITH_WSREP */
|
||||
}
|
||||
return error;
|
||||
@@ -7000,7 +6951,6 @@ int handler::update_first_row(const uchar *new_data)
|
||||
int handler::ha_delete_row(const uchar *buf)
|
||||
{
|
||||
int error;
|
||||
Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
|
||||
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
|
||||
m_lock_type == F_WRLCK);
|
||||
/*
|
||||
@@ -7019,10 +6969,14 @@ int handler::ha_delete_row(const uchar *buf)
|
||||
if (likely(!error))
|
||||
{
|
||||
rows_changed++;
|
||||
error= binlog_log_row(table, buf, 0, log_func);
|
||||
if (row_logging)
|
||||
{
|
||||
Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
|
||||
error= binlog_log_row(table, buf, 0, log_func);
|
||||
}
|
||||
#ifdef WITH_WSREP
|
||||
if (table_share->tmp_table == NO_TMP_TABLE &&
|
||||
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
|
||||
if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
|
||||
!error && (error= wsrep_after_row(ha_thd())))
|
||||
{
|
||||
return error;
|
||||
}
|
||||
@@ -7049,11 +7003,10 @@ int handler::ha_delete_row(const uchar *buf)
|
||||
int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
|
||||
{
|
||||
int error;
|
||||
|
||||
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
|
||||
mark_trx_read_write();
|
||||
|
||||
error = direct_update_rows(update_rows, found_rows);
|
||||
error= direct_update_rows(update_rows, found_rows);
|
||||
MYSQL_UPDATE_ROW_DONE(error);
|
||||
return error;
|
||||
}
|
||||
|
@@ -610,6 +610,7 @@ given at all. */
|
||||
#define HA_CREATE_USED_SEQUENCE (1UL << 25)
|
||||
|
||||
typedef ulonglong alter_table_operations;
|
||||
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
|
||||
|
||||
/*
|
||||
These flags are set by the parser and describes the type of
|
||||
@@ -3050,8 +3051,6 @@ public:
|
||||
bool mark_trx_read_write_done; /* mark_trx_read_write was called */
|
||||
bool check_table_binlog_row_based_done; /* check_table_binlog.. was called */
|
||||
bool check_table_binlog_row_based_result; /* cached check_table_binlog... */
|
||||
/* Set to 1 if handler logged last insert/update/delete operation */
|
||||
bool row_already_logged;
|
||||
/*
|
||||
TRUE <=> the engine guarantees that returned records are within the range
|
||||
being scanned.
|
||||
@@ -3192,10 +3191,16 @@ public:
|
||||
void end_psi_batch_mode();
|
||||
|
||||
bool set_top_table_fields;
|
||||
|
||||
struct TABLE *top_table;
|
||||
Field **top_table_field;
|
||||
uint top_table_fields;
|
||||
|
||||
/* If we have row logging enabled for this table */
|
||||
bool row_logging, row_logging_init;
|
||||
/* If the row logging should be done in transaction cache */
|
||||
bool row_logging_has_trans;
|
||||
|
||||
private:
|
||||
/**
|
||||
The lock type set by when calling::ha_external_lock(). This is
|
||||
@@ -3213,7 +3218,6 @@ private:
|
||||
/** Stores next_insert_id for handling duplicate key errors. */
|
||||
ulonglong m_prev_insert_id;
|
||||
|
||||
|
||||
public:
|
||||
handler(handlerton *ht_arg, TABLE_SHARE *share_arg)
|
||||
:table_share(share_arg), table(0),
|
||||
@@ -3223,7 +3227,6 @@ public:
|
||||
mark_trx_read_write_done(0),
|
||||
check_table_binlog_row_based_done(0),
|
||||
check_table_binlog_row_based_result(0),
|
||||
row_already_logged(0),
|
||||
in_range_check_pushed_down(FALSE), errkey(-1),
|
||||
key_used_on_scan(MAX_KEY),
|
||||
active_index(MAX_KEY), keyread(MAX_KEY),
|
||||
@@ -3242,6 +3245,7 @@ public:
|
||||
m_psi_locker(NULL),
|
||||
set_top_table_fields(FALSE), top_table(0),
|
||||
top_table_field(0), top_table_fields(0),
|
||||
row_logging(0), row_logging_init(0),
|
||||
m_lock_type(F_UNLCK), ha_share(NULL), m_prev_insert_id(0)
|
||||
{
|
||||
DBUG_PRINT("info",
|
||||
@@ -4598,13 +4602,17 @@ protected:
|
||||
virtual int delete_table(const char *name);
|
||||
|
||||
public:
|
||||
bool check_table_binlog_row_based(bool binlog_row);
|
||||
bool check_table_binlog_row_based();
|
||||
bool prepare_for_row_logging();
|
||||
int prepare_for_insert(bool force_update_handler= 0);
|
||||
int binlog_log_row(TABLE *table,
|
||||
const uchar *before_record,
|
||||
const uchar *after_record,
|
||||
Log_func *log_func);
|
||||
|
||||
inline void clear_cached_table_binlog_row_based_flag()
|
||||
{
|
||||
check_table_binlog_row_based_done= 0;
|
||||
check_table_binlog_row_based_result= 0;
|
||||
}
|
||||
private:
|
||||
/* Cache result to avoid extra calls */
|
||||
@@ -4619,7 +4627,7 @@ private:
|
||||
|
||||
private:
|
||||
void mark_trx_read_write_internal();
|
||||
bool check_table_binlog_row_based_internal(bool binlog_row);
|
||||
bool check_table_binlog_row_based_internal();
|
||||
|
||||
protected:
|
||||
/*
|
||||
@@ -5202,7 +5210,6 @@ int binlog_log_row(TABLE* table,
|
||||
if (unlikely(this_tracker)) \
|
||||
tracker->stop_tracking(table->in_use); \
|
||||
}
|
||||
int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate);
|
||||
void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag);
|
||||
void print_keydup_error(TABLE *table, KEY *key, myf errflag);
|
||||
|
||||
|
179
sql/log.cc
179
sql/log.cc
@@ -514,6 +514,12 @@ void Log_event_writer::add_status(enum_logged_status status)
|
||||
cache_data->add_status(status);
|
||||
}
|
||||
|
||||
void Log_event_writer::set_incident()
|
||||
{
|
||||
cache_data->set_incident();
|
||||
}
|
||||
|
||||
|
||||
class binlog_cache_mngr {
|
||||
public:
|
||||
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size,
|
||||
@@ -714,7 +720,6 @@ bool Log_to_csv_event_handler::
|
||||
uint field_index;
|
||||
Silence_log_table_errors error_handler;
|
||||
Open_tables_backup open_tables_backup;
|
||||
ulonglong save_thd_options;
|
||||
bool save_time_zone_used;
|
||||
DBUG_ENTER("log_general");
|
||||
|
||||
@@ -724,9 +729,6 @@ bool Log_to_csv_event_handler::
|
||||
*/
|
||||
save_time_zone_used= thd->time_zone_used;
|
||||
|
||||
save_thd_options= thd->variables.option_bits;
|
||||
thd->variables.option_bits&= ~OPTION_BIN_LOG;
|
||||
|
||||
table_list.init_one_table(&MYSQL_SCHEMA_NAME, &GENERAL_LOG_NAME, 0,
|
||||
TL_WRITE_CONCURRENT_INSERT);
|
||||
|
||||
@@ -806,7 +808,6 @@ bool Log_to_csv_event_handler::
|
||||
table->field[field_index]->set_default();
|
||||
}
|
||||
|
||||
/* log table entries are not replicated */
|
||||
if (table->file->ha_write_row(table->record[0]))
|
||||
goto err;
|
||||
|
||||
@@ -827,7 +828,6 @@ err:
|
||||
if (need_close)
|
||||
close_log_table(thd, &open_tables_backup);
|
||||
|
||||
thd->variables.option_bits= save_thd_options;
|
||||
thd->time_zone_used= save_time_zone_used;
|
||||
DBUG_RETURN(result);
|
||||
}
|
||||
@@ -880,7 +880,6 @@ bool Log_to_csv_event_handler::
|
||||
ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
|
||||
ulong query_time_micro= (ulong) (query_utime % 1000000);
|
||||
ulong lock_time_micro= (ulong) (lock_utime % 1000000);
|
||||
|
||||
DBUG_ENTER("Log_to_csv_event_handler::log_slow");
|
||||
|
||||
thd->push_internal_handler(& error_handler);
|
||||
@@ -997,7 +996,6 @@ bool Log_to_csv_event_handler::
|
||||
0, TRUE))
|
||||
goto err;
|
||||
|
||||
/* log table entries are not replicated */
|
||||
if (table->file->ha_write_row(table->record[0]))
|
||||
goto err;
|
||||
|
||||
@@ -1982,7 +1980,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
|
||||
if (cache_mngr->trx_cache.has_incident())
|
||||
error= mysql_bin_log.write_incident(thd);
|
||||
|
||||
thd->clear_binlog_table_maps();
|
||||
thd->reset_binlog_for_next_statement();
|
||||
|
||||
cache_mngr->reset(false, true);
|
||||
}
|
||||
@@ -2253,6 +2251,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
||||
we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
|
||||
*/
|
||||
cache_mngr->reset(false, true);
|
||||
thd->reset_binlog_for_next_statement();
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd))
|
||||
@@ -2297,6 +2296,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
||||
*/
|
||||
if (!all)
|
||||
cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
|
||||
thd->reset_binlog_for_next_statement();
|
||||
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
@@ -2478,14 +2478,14 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
|
||||
|
||||
/*
|
||||
When a SAVEPOINT is executed inside a stored function/trigger we force the
|
||||
pending event to be flushed with a STMT_END_F flag and clear the table maps
|
||||
pending event to be flushed with a STMT_END_F flag and reset binlog
|
||||
as well to ensure that following DMLs will have a clean state to start
|
||||
with. ROLLBACK inside a stored routine has to finalize possibly existing
|
||||
current row-based pending event with cleaning up table maps. That ensures
|
||||
that following DMLs will have a clean state to start with.
|
||||
*/
|
||||
if (thd->in_sub_stmt)
|
||||
thd->clear_binlog_table_maps();
|
||||
thd->reset_binlog_for_next_statement();
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
@@ -5773,7 +5773,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
|
||||
We only update the saved position if the old one was undefined,
|
||||
the reason is that there are some cases (e.g., for CREATE-SELECT)
|
||||
where the position is saved twice (e.g., both in
|
||||
select_create::prepare() and THD::binlog_write_table_map()) , but
|
||||
select_create::prepare() and binlog_write_table_map()) , but
|
||||
we should use the first. This means that calls to this function
|
||||
can be used to start the statement before the first table map
|
||||
event, to include some extra events.
|
||||
@@ -5900,45 +5900,149 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
|
||||
DBUG_RETURN(err);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Prepare all tables that are updated for row logging
|
||||
|
||||
Annotate events and table maps are written by binlog_write_table_maps()
|
||||
*/
|
||||
|
||||
void THD::binlog_prepare_for_row_logging()
|
||||
{
|
||||
DBUG_ENTER("THD::binlog_prepare_for_row_logging");
|
||||
for (TABLE *table= open_tables ; table; table= table->next)
|
||||
{
|
||||
if (table->query_id == query_id && table->current_lock == F_WRLCK)
|
||||
table->file->prepare_for_row_logging();
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
/**
|
||||
Write annnotated row event (the query) if needed
|
||||
*/
|
||||
|
||||
bool THD::binlog_write_annotated_row(Log_event_writer *writer)
|
||||
{
|
||||
int error;
|
||||
DBUG_ENTER("THD::binlog_write_annotated_row");
|
||||
|
||||
if (!(IF_WSREP(!wsrep_fragments_certified_for_stmt(this), true) &&
|
||||
variables.binlog_annotate_row_events &&
|
||||
query_length()))
|
||||
DBUG_RETURN(0);
|
||||
|
||||
Annotate_rows_log_event anno(this, 0, false);
|
||||
if (unlikely((error= writer->write(&anno))))
|
||||
{
|
||||
if (my_errno == EFBIG)
|
||||
writer->set_incident();
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Write table map events for all tables that are using row logging.
|
||||
This includes all tables used by this statement, including tables
|
||||
used in triggers.
|
||||
|
||||
Also write annotate events and start transactions.
|
||||
This is using the "tables_with_row_logging" list prepared by
|
||||
THD::binlog_prepare_for_row_logging
|
||||
*/
|
||||
|
||||
bool THD::binlog_write_table_maps()
|
||||
{
|
||||
bool with_annotate;
|
||||
MYSQL_LOCK *locks[2], **locks_end= locks;
|
||||
DBUG_ENTER("THD::binlog_write_table_maps");
|
||||
|
||||
DBUG_ASSERT(!binlog_table_maps);
|
||||
DBUG_ASSERT(is_current_stmt_binlog_format_row());
|
||||
|
||||
/* Initialize cache_mngr once per statement */
|
||||
binlog_start_trans_and_stmt();
|
||||
with_annotate= 1; // Write annotate with first map
|
||||
|
||||
if ((*locks_end= extra_lock))
|
||||
locks_end++;
|
||||
if ((*locks_end= lock))
|
||||
locks_end++;
|
||||
|
||||
for (MYSQL_LOCK **cur_lock= locks ; cur_lock < locks_end ; cur_lock++)
|
||||
{
|
||||
TABLE **const end_ptr= (*cur_lock)->table + (*cur_lock)->table_count;
|
||||
for (TABLE **table_ptr= (*cur_lock)->table;
|
||||
table_ptr != end_ptr ;
|
||||
++table_ptr)
|
||||
{
|
||||
TABLE *table= *table_ptr;
|
||||
bool restore= 0;
|
||||
/*
|
||||
We have to also write table maps for tables that have not yet been
|
||||
used, like for tables in after triggers
|
||||
*/
|
||||
if (!table->file->row_logging &&
|
||||
table->query_id != query_id && table->current_lock == F_WRLCK)
|
||||
{
|
||||
if (table->file->prepare_for_row_logging())
|
||||
restore= 1;
|
||||
}
|
||||
if (table->file->row_logging)
|
||||
{
|
||||
if (binlog_write_table_map(table, with_annotate))
|
||||
DBUG_RETURN(1);
|
||||
with_annotate= 0;
|
||||
}
|
||||
if (restore)
|
||||
{
|
||||
/*
|
||||
Restore original setting so that it doesn't cause problem for the
|
||||
next statement
|
||||
*/
|
||||
table->file->row_logging= table->file->row_logging_init= 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
binlog_table_maps= 1; // Table maps written
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
This function writes a table map to the binary log.
|
||||
Note that in order to keep the signature uniform with related methods,
|
||||
we use a redundant parameter to indicate whether a transactional table
|
||||
was changed or not.
|
||||
|
||||
If with_annotate != NULL and
|
||||
*with_annotate = TRUE write also Annotate_rows before the table map.
|
||||
|
||||
@param table a pointer to the table.
|
||||
@param is_transactional @c true indicates a transactional table,
|
||||
otherwise @c false a non-transactional.
|
||||
@param with_annotate If true call binlog_write_annotated_row()
|
||||
|
||||
@return
|
||||
nonzero if an error pops up when writing the table map event.
|
||||
*/
|
||||
int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
|
||||
bool *with_annotate)
|
||||
|
||||
bool THD::binlog_write_table_map(TABLE *table, bool with_annotate)
|
||||
{
|
||||
int error;
|
||||
bool is_transactional= table->file->row_logging_has_trans;
|
||||
DBUG_ENTER("THD::binlog_write_table_map");
|
||||
DBUG_PRINT("enter", ("table: %p (%s: #%lu)",
|
||||
table, table->s->table_name.str,
|
||||
table->s->table_map_id));
|
||||
|
||||
/* Pre-conditions */
|
||||
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
|
||||
|
||||
/* Ensure that all events in a GTID group are in the same cache */
|
||||
if (variables.option_bits & OPTION_GTID_BEGIN)
|
||||
is_transactional= 1;
|
||||
|
||||
/* Pre-conditions */
|
||||
DBUG_ASSERT(is_current_stmt_binlog_format_row());
|
||||
DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open());
|
||||
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
|
||||
|
||||
Table_map_log_event
|
||||
the_event(this, table, table->s->table_map_id, is_transactional);
|
||||
|
||||
if (binlog_table_maps == 0)
|
||||
binlog_start_trans_and_stmt();
|
||||
|
||||
binlog_cache_mngr *const cache_mngr=
|
||||
(binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
|
||||
binlog_cache_data *cache_data= (cache_mngr->
|
||||
@@ -5946,25 +6050,17 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
|
||||
IO_CACHE *file= &cache_data->cache_log;
|
||||
Log_event_writer writer(file, cache_data);
|
||||
|
||||
if (with_annotate && *with_annotate)
|
||||
{
|
||||
Annotate_rows_log_event anno(table->in_use, is_transactional, false);
|
||||
/* Annotate event should be written not more than once */
|
||||
*with_annotate= 0;
|
||||
if (unlikely((error= writer.write(&anno))))
|
||||
{
|
||||
if (my_errno == EFBIG)
|
||||
cache_data->set_incident();
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
}
|
||||
if (with_annotate)
|
||||
if (binlog_write_annotated_row(&writer))
|
||||
DBUG_RETURN(1);
|
||||
|
||||
if (unlikely((error= writer.write(&the_event))))
|
||||
DBUG_RETURN(error);
|
||||
|
||||
binlog_table_maps++;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
This function retrieves a pending row event from a cache which is
|
||||
specified through the parameter @c is_transactional. Respectively, when it
|
||||
@@ -10848,7 +10944,7 @@ void wsrep_thd_binlog_trx_reset(THD * thd)
|
||||
cache_mngr->stmt_cache.reset();
|
||||
}
|
||||
}
|
||||
thd->clear_binlog_table_maps();
|
||||
thd->reset_binlog_for_next_statement();
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
@@ -10875,7 +10971,6 @@ bool wsrep_stmt_rollback_is_safe(THD* thd)
|
||||
binlog_cache_mngr *cache_mngr=
|
||||
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
|
||||
|
||||
|
||||
if (binlog_hton && cache_mngr)
|
||||
{
|
||||
binlog_cache_data * trx_cache = &cache_mngr->trx_cache;
|
||||
|
@@ -1144,6 +1144,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
|
||||
|
||||
void make_default_log_name(char **out, const char* log_ext, bool once);
|
||||
void binlog_reset_cache(THD *thd);
|
||||
bool write_annotated_row(THD *thd);
|
||||
|
||||
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
|
||||
extern handlerton *binlog_hton;
|
||||
|
@@ -942,6 +942,7 @@ public:
|
||||
int write_footer();
|
||||
my_off_t pos() { return my_b_safe_tell(file); }
|
||||
void add_status(enum_logged_status status);
|
||||
void set_incident();
|
||||
|
||||
Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg,
|
||||
Binlog_crypt_data *cr= 0)
|
||||
|
@@ -799,7 +799,10 @@ my_bool Log_event::need_checksum()
|
||||
int Log_event_writer::write_internal(const uchar *pos, size_t len)
|
||||
{
|
||||
if (my_b_safe_write(file, pos, len))
|
||||
{
|
||||
DBUG_PRINT("error", ("write to log failed: %d", my_errno));
|
||||
return 1;
|
||||
}
|
||||
bytes_written+= len;
|
||||
return 0;
|
||||
}
|
||||
@@ -6153,13 +6156,10 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
|
||||
(tbl->s->db.str[tbl->s->db.length] == 0));
|
||||
DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
|
||||
|
||||
#ifdef MYSQL_SERVER
|
||||
binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields *
|
||||
sizeof(Binlog_type_info));
|
||||
for (uint i= 0; i < m_table->s->fields; i++)
|
||||
binlog_type_info_array[i]= m_table->field[i]->binlog_type_info();
|
||||
#endif
|
||||
|
||||
|
||||
m_data_size= TABLE_MAP_HEADER_LEN;
|
||||
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
|
||||
@@ -7079,14 +7079,14 @@ bool Rows_log_event::process_triggers(trg_event_type event,
|
||||
m_table->triggers->mark_fields_used(event);
|
||||
if (slave_run_triggers_for_rbr == SLAVE_RUN_TRIGGERS_FOR_RBR_YES)
|
||||
{
|
||||
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
|
||||
result= m_table->triggers->process_triggers(thd, event,
|
||||
time_type, old_row_is_record1);
|
||||
reenable_binlog(thd);
|
||||
time_type,
|
||||
old_row_is_record1);
|
||||
}
|
||||
else
|
||||
result= m_table->triggers->process_triggers(thd, event,
|
||||
time_type, old_row_is_record1);
|
||||
time_type,
|
||||
old_row_is_record1);
|
||||
|
||||
DBUG_RETURN(result);
|
||||
}
|
||||
|
@@ -423,10 +423,12 @@ rpl_slave_state::truncate_state_table(THD *thd)
|
||||
TABLE_LIST tlist;
|
||||
int err= 0;
|
||||
|
||||
tmp_disable_binlog(thd);
|
||||
tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE);
|
||||
if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
|
||||
tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name,
|
||||
NULL, TL_WRITE);
|
||||
if (!(err= open_and_lock_tables(thd, &tlist, FALSE,
|
||||
MYSQL_OPEN_IGNORE_LOGGING_FORMAT)))
|
||||
{
|
||||
DBUG_ASSERT(!tlist.table->file->row_logging);
|
||||
tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql",
|
||||
rpl_gtid_slave_state_table_name.str);
|
||||
err= tlist.table->file->ha_truncate();
|
||||
@@ -445,8 +447,6 @@ rpl_slave_state::truncate_state_table(THD *thd)
|
||||
}
|
||||
thd->mdl_context.release_transactional_locks();
|
||||
}
|
||||
|
||||
reenable_binlog(thd);
|
||||
return err;
|
||||
}
|
||||
|
||||
@@ -676,6 +676,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
|
||||
table_opened= true;
|
||||
table= tlist.table;
|
||||
hton= table->s->db_type();
|
||||
table->file->row_logging= 0; // No binary logging
|
||||
|
||||
if ((err= gtid_check_rpl_slave_state_table(table)))
|
||||
goto end;
|
||||
|
@@ -100,6 +100,7 @@ int injector::transaction::commit()
|
||||
}
|
||||
|
||||
|
||||
#ifdef TO_BE_DELETED
|
||||
int injector::transaction::use_table(server_id_type sid, table tbl)
|
||||
{
|
||||
DBUG_ENTER("injector::transaction::use_table");
|
||||
@@ -111,12 +112,12 @@ int injector::transaction::use_table(server_id_type sid, table tbl)
|
||||
|
||||
server_id_type save_id= m_thd->variables.server_id;
|
||||
m_thd->set_server_id(sid);
|
||||
error= m_thd->binlog_write_table_map(tbl.get_table(),
|
||||
tbl.is_transactional());
|
||||
DBUG_ASSERT(tbl.is_transactional() == tbl.get_table()->file->row_logging_has_trans);
|
||||
error= m_thd->binlog_write_table_map(tbl.get_table(), 0);
|
||||
m_thd->set_server_id(save_id);
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
injector::transaction::binlog_pos injector::transaction::start_pos() const
|
||||
|
@@ -177,8 +177,9 @@ public:
|
||||
>0 Failure
|
||||
|
||||
*/
|
||||
#ifdef TO_BE_DELETED
|
||||
int use_table(server_id_type sid, table tbl);
|
||||
|
||||
#endif
|
||||
/*
|
||||
Commit a transaction.
|
||||
|
||||
|
23
sql/slave.cc
23
sql/slave.cc
@@ -3582,20 +3582,19 @@ void set_slave_thread_options(THD* thd)
|
||||
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
|
||||
only for client threads.
|
||||
*/
|
||||
ulonglong options= thd->variables.option_bits | OPTION_BIG_SELECTS;
|
||||
if (opt_log_slave_updates)
|
||||
options|= OPTION_BIN_LOG;
|
||||
else
|
||||
ulonglong options= (thd->variables.option_bits |
|
||||
OPTION_BIG_SELECTS | OPTION_BIN_LOG);
|
||||
if (!opt_log_slave_updates)
|
||||
options&= ~OPTION_BIN_LOG;
|
||||
thd->variables.option_bits= options;
|
||||
thd->variables.completion_type= 0;
|
||||
|
||||
/* For easier test in LOGGER::log_command */
|
||||
if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE)
|
||||
thd->variables.option_bits|= OPTION_LOG_OFF;
|
||||
options|= OPTION_LOG_OFF;
|
||||
thd->variables.option_bits= options;
|
||||
|
||||
thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements &
|
||||
LOG_SLOW_DISABLE_SLAVE);
|
||||
thd->variables.completion_type= 0;
|
||||
thd->variables.sql_log_slow=
|
||||
!MY_TEST(thd->variables.log_slow_disabled_statements &
|
||||
LOG_SLOW_DISABLE_SLAVE);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
@@ -8196,7 +8195,7 @@ void Rows_event_tracker::reset()
|
||||
|
||||
|
||||
/*
|
||||
Update log event tracking data.
|
||||
Update log event tracking data.
|
||||
|
||||
The first- and last- seen event binlog position get memorized, as
|
||||
well as the end-of-statement status of the last one.
|
||||
@@ -8206,6 +8205,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos,
|
||||
const char* buf,
|
||||
const Format_description_log_event *fdle)
|
||||
{
|
||||
DBUG_ENTER("Rows_event_tracker::update");
|
||||
if (!first_seen)
|
||||
{
|
||||
first_seen= pos;
|
||||
@@ -8214,6 +8214,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos,
|
||||
last_seen= pos;
|
||||
DBUG_ASSERT(stmt_end_seen == 0); // We can only have one
|
||||
stmt_end_seen= get_row_event_stmt_end(buf, fdle);
|
||||
DBUG_VOID_RETURN;
|
||||
};
|
||||
|
||||
|
||||
|
@@ -1942,7 +1942,9 @@ class Grant_tables
|
||||
if (res)
|
||||
DBUG_RETURN(res);
|
||||
|
||||
if (lock_tables(thd, first, counter, MYSQL_LOCK_IGNORE_TIMEOUT))
|
||||
if (lock_tables(thd, first, counter,
|
||||
MYSQL_LOCK_IGNORE_TIMEOUT |
|
||||
MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
p_user_table->set_table(tables[USER_TABLE].table);
|
||||
@@ -4397,7 +4399,8 @@ static USER_AUTH auth_no_password;
|
||||
|
||||
static int replace_user_table(THD *thd, const User_table &user_table,
|
||||
LEX_USER * const combo, privilege_t rights,
|
||||
const bool revoke_grant, const bool can_create_user,
|
||||
const bool revoke_grant,
|
||||
const bool can_create_user,
|
||||
const bool no_auto_create)
|
||||
{
|
||||
int error = -1;
|
||||
@@ -5426,6 +5429,7 @@ static int replace_column_table(GRANT_TABLE *g_t,
|
||||
|
||||
List_iterator <LEX_COLUMN> iter(columns);
|
||||
class LEX_COLUMN *column;
|
||||
|
||||
int error= table->file->ha_index_init(0, 1);
|
||||
if (unlikely(error))
|
||||
{
|
||||
@@ -5776,7 +5780,7 @@ static int replace_routine_table(THD *thd, GRANT_NAME *grant_name,
|
||||
*/
|
||||
|
||||
table->use_all_columns();
|
||||
restore_record(table, s->default_values); // Get empty record
|
||||
restore_record(table, s->default_values); // Get empty record
|
||||
table->field[0]->store(combo.host.str,combo.host.length, &my_charset_latin1);
|
||||
table->field[1]->store(db,(uint) strlen(db), &my_charset_latin1);
|
||||
table->field[2]->store(combo.user.str,combo.user.length, &my_charset_latin1);
|
||||
|
@@ -1027,9 +1027,7 @@ send_result_message:
|
||||
*save_next_global= table->next_global;
|
||||
table->next_local= table->next_global= 0;
|
||||
|
||||
tmp_disable_binlog(thd); // binlogging is done by caller if wanted
|
||||
result_code= admin_recreate_table(thd, table);
|
||||
reenable_binlog(thd);
|
||||
trans_commit_stmt(thd);
|
||||
trans_commit(thd);
|
||||
close_thread_tables(thd);
|
||||
|
@@ -734,8 +734,9 @@ bool close_cached_connection_tables(THD *thd, LEX_CSTRING *connection)
|
||||
Clear 'check_table_binlog_row_based_done' flag. For tables which were used
|
||||
by current substatement the flag is cleared as part of 'ha_reset()' call.
|
||||
For the rest of the open tables not used by current substament if this
|
||||
flag is enabled as part of current substatement execution, clear the flag
|
||||
explicitly.
|
||||
flag is enabled as part of current substatement execution,
|
||||
(for example when THD::binlog_write_table_maps() calls
|
||||
prepare_for_row_logging()), clear the flag explicitly.
|
||||
|
||||
NOTE
|
||||
The reason we reset query_id is that it's not enough to just test
|
||||
@@ -759,7 +760,7 @@ static void mark_used_tables_as_free_for_reuse(THD *thd, TABLE *table)
|
||||
table->query_id= 0;
|
||||
table->file->ha_reset();
|
||||
}
|
||||
else if (table->file->check_table_binlog_row_based_done)
|
||||
else
|
||||
table->file->clear_cached_table_binlog_row_based_flag();
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
@@ -1660,9 +1661,10 @@ static int set_partitions_as_used(TABLE_LIST *tl, TABLE *t)
|
||||
needed to remedy problem before retrying again.
|
||||
@retval FALSE 't' was not locked, not a VIEW or an error happened.
|
||||
*/
|
||||
|
||||
bool is_locked_view(THD *thd, TABLE_LIST *t)
|
||||
{
|
||||
DBUG_ENTER("check_locked_view");
|
||||
DBUG_ENTER("is_locked_view");
|
||||
/*
|
||||
Is this table a view and not a base table?
|
||||
(it is work around to allow to open view with locked tables,
|
||||
@@ -2212,8 +2214,8 @@ retry_share:
|
||||
my_error(ER_NOT_SEQUENCE, MYF(0), table_list->db.str, table_list->alias.str);
|
||||
DBUG_RETURN(true);
|
||||
}
|
||||
|
||||
table->init(thd, table_list);
|
||||
DBUG_ASSERT(thd->locked_tables_mode || table->file->row_logging == 0);
|
||||
|
||||
DBUG_RETURN(FALSE);
|
||||
|
||||
@@ -3738,10 +3740,11 @@ open_and_process_table(THD *thd, TABLE_LIST *tables, uint *counter, uint flags,
|
||||
temporary table or SEQUENCE (see sequence_insert()).
|
||||
*/
|
||||
DBUG_ASSERT(is_temporary_table(tables) || tables->table->s->sequence);
|
||||
if (tables->sequence && tables->table->s->table_type != TABLE_TYPE_SEQUENCE)
|
||||
if (tables->sequence &&
|
||||
tables->table->s->table_type != TABLE_TYPE_SEQUENCE)
|
||||
{
|
||||
my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str);
|
||||
DBUG_RETURN(true);
|
||||
my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str);
|
||||
DBUG_RETURN(true);
|
||||
}
|
||||
}
|
||||
else if (tables->open_type == OT_TEMPORARY_ONLY)
|
||||
@@ -5214,7 +5217,9 @@ bool open_and_lock_tables(THD *thd, const DDL_options_st &options,
|
||||
if (lock_tables(thd, tables, counter, flags))
|
||||
goto err;
|
||||
|
||||
(void) read_statistics_for_tables_if_needed(thd, tables);
|
||||
/* Don't read statistics tables when opening internal tables */
|
||||
if (!(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
|
||||
(void) read_statistics_for_tables_if_needed(thd, tables);
|
||||
|
||||
if (derived)
|
||||
{
|
||||
@@ -5348,12 +5353,19 @@ bool open_tables_only_view_structure(THD *thd, TABLE_LIST *table_list,
|
||||
static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list)
|
||||
{
|
||||
TABLE_LIST *table;
|
||||
DBUG_ENTER("mark_real_tables_as_free_for_reuse");
|
||||
|
||||
/*
|
||||
We have to make two loops as HA_EXTRA_DETACH_CHILDREN may
|
||||
remove items from the table list that we have to reset
|
||||
*/
|
||||
for (table= table_list; table; table= table->next_global)
|
||||
{
|
||||
if (!table->placeholder())
|
||||
{
|
||||
table->table->query_id= 0;
|
||||
}
|
||||
}
|
||||
for (table= table_list; table; table= table->next_global)
|
||||
{
|
||||
if (!table->placeholder())
|
||||
{
|
||||
/*
|
||||
@@ -5364,6 +5376,8 @@ static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list)
|
||||
*/
|
||||
table->table->file->extra(HA_EXTRA_DETACH_CHILDREN);
|
||||
}
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
@@ -5428,7 +5442,7 @@ err:
|
||||
|
||||
bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
{
|
||||
TABLE_LIST *table;
|
||||
TABLE_LIST *table, *first_not_own;
|
||||
DBUG_ENTER("lock_tables");
|
||||
/*
|
||||
We can't meet statement requiring prelocking if we already
|
||||
@@ -5438,7 +5452,9 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
!thd->lex->requires_prelocking());
|
||||
|
||||
if (!tables && !thd->lex->requires_prelocking())
|
||||
DBUG_RETURN(thd->decide_logging_format(tables));
|
||||
DBUG_RETURN(0);
|
||||
|
||||
first_not_own= thd->lex->first_not_own_table();
|
||||
|
||||
/*
|
||||
Check for thd->locked_tables_mode to avoid a redundant
|
||||
@@ -5454,13 +5470,26 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
{
|
||||
DBUG_ASSERT(thd->lock == 0); // You must lock everything at once
|
||||
TABLE **start,**ptr;
|
||||
bool found_first_not_own= 0;
|
||||
|
||||
if (!(ptr=start=(TABLE**) thd->alloc(sizeof(TABLE*)*count)))
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
/*
|
||||
Collect changes tables for table lock.
|
||||
Mark own tables with query id as this is needed by
|
||||
prepare_for_row_logging()
|
||||
*/
|
||||
for (table= tables; table; table= table->next_global)
|
||||
{
|
||||
if (table == first_not_own)
|
||||
found_first_not_own= 1;
|
||||
if (!table->placeholder())
|
||||
*(ptr++)= table->table;
|
||||
{
|
||||
*(ptr++)= table->table;
|
||||
if (!found_first_not_own)
|
||||
table->table->query_id= thd->query_id;
|
||||
}
|
||||
}
|
||||
|
||||
DEBUG_SYNC(thd, "before_lock_tables_takes_lock");
|
||||
@@ -5474,7 +5503,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
if (thd->lex->requires_prelocking() &&
|
||||
thd->lex->sql_command != SQLCOM_LOCK_TABLES)
|
||||
{
|
||||
TABLE_LIST *first_not_own= thd->lex->first_not_own_table();
|
||||
/*
|
||||
We just have done implicit LOCK TABLES, and now we have
|
||||
to emulate first open_and_lock_tables() after it.
|
||||
@@ -5492,7 +5520,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
{
|
||||
if (!table->placeholder())
|
||||
{
|
||||
table->table->query_id= thd->query_id;
|
||||
if (check_lock_and_start_stmt(thd, thd->lex, table))
|
||||
{
|
||||
mysql_unlock_tables(thd, thd->lock);
|
||||
@@ -5512,7 +5539,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
}
|
||||
else
|
||||
{
|
||||
TABLE_LIST *first_not_own= thd->lex->first_not_own_table();
|
||||
/*
|
||||
When open_and_lock_tables() is called for a single table out of
|
||||
a table list, the 'next_global' chain is temporarily broken. We
|
||||
@@ -5528,6 +5554,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
if (table->placeholder())
|
||||
continue;
|
||||
|
||||
table->table->query_id= thd->query_id;
|
||||
/*
|
||||
In a stored function or trigger we should ensure that we won't change
|
||||
a table that is already used by the calling statement.
|
||||
@@ -5567,7 +5594,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
|
||||
}
|
||||
|
||||
bool res= fix_all_session_vcol_exprs(thd, tables);
|
||||
if (!res)
|
||||
if (!res && !(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
|
||||
res= thd->decide_logging_format(tables);
|
||||
|
||||
DBUG_RETURN(res);
|
||||
@@ -9005,6 +9032,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
|
||||
*/
|
||||
if (open_and_lock_tables(thd, table_list, FALSE,
|
||||
(MYSQL_OPEN_IGNORE_FLUSH |
|
||||
MYSQL_OPEN_IGNORE_LOGGING_FORMAT |
|
||||
(table_list->lock_type < TL_WRITE_ALLOW_WRITE ?
|
||||
MYSQL_LOCK_IGNORE_TIMEOUT : 0))))
|
||||
{
|
||||
@@ -9016,6 +9044,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
|
||||
for (TABLE_LIST *tables= table_list; tables; tables= tables->next_global)
|
||||
{
|
||||
DBUG_ASSERT(tables->table->s->table_category == TABLE_CATEGORY_SYSTEM);
|
||||
tables->table->file->row_logging= 0;
|
||||
tables->table->use_all_columns();
|
||||
}
|
||||
lex->restore_backup_query_tables_list(&query_tables_list_backup);
|
||||
@@ -9103,8 +9132,9 @@ open_system_table_for_update(THD *thd, TABLE_LIST *one_table)
|
||||
{
|
||||
DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_SYSTEM);
|
||||
table->use_all_columns();
|
||||
/* This table instance is not row logged */
|
||||
table->file->row_logging= 0;
|
||||
}
|
||||
|
||||
DBUG_RETURN(table);
|
||||
}
|
||||
|
||||
@@ -9137,6 +9167,8 @@ open_log_table(THD *thd, TABLE_LIST *one_table, Open_tables_backup *backup)
|
||||
if ((table= open_ltable(thd, one_table, one_table->lock_type, flags)))
|
||||
{
|
||||
DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_LOG);
|
||||
DBUG_ASSERT(!table->file->row_logging);
|
||||
|
||||
/* Make sure all columns get assigned to a default value */
|
||||
table->use_all_columns();
|
||||
DBUG_ASSERT(table->s->no_replicate);
|
||||
|
@@ -125,6 +125,11 @@ TABLE *open_ltable(THD *thd, TABLE_LIST *table_list, thr_lock_type update,
|
||||
*/
|
||||
#define MYSQL_OPEN_IGNORE_REPAIR 0x10000
|
||||
|
||||
/**
|
||||
Don't call decide_logging_format. Used for statistic tables etc
|
||||
*/
|
||||
#define MYSQL_OPEN_IGNORE_LOGGING_FORMAT 0x20000
|
||||
|
||||
/** Please refer to the internals manual. */
|
||||
#define MYSQL_OPEN_REOPEN (MYSQL_OPEN_IGNORE_FLUSH |\
|
||||
MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |\
|
||||
|
@@ -636,7 +636,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
|
||||
m_current_stage_key(0), m_psi(0),
|
||||
in_sub_stmt(0), log_all_errors(0),
|
||||
binlog_unsafe_warning_flags(0),
|
||||
binlog_table_maps(0),
|
||||
bulk_param(0),
|
||||
table_map_for_update(0),
|
||||
m_examined_row_count(0),
|
||||
@@ -797,6 +796,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
|
||||
bzero((void*) ha_data, sizeof(ha_data));
|
||||
mysys_var=0;
|
||||
binlog_evt_union.do_union= FALSE;
|
||||
binlog_table_maps= FALSE;
|
||||
enable_slow_log= 0;
|
||||
durability_property= HA_REGULAR_DURABILITY;
|
||||
|
||||
@@ -1315,8 +1315,6 @@ void THD::init()
|
||||
else
|
||||
variables.option_bits&= ~OPTION_BIN_LOG;
|
||||
|
||||
variables.sql_log_bin_off= 0;
|
||||
|
||||
select_commands= update_commands= other_commands= 0;
|
||||
/* Set to handle counting of aborted connections */
|
||||
userstat_running= opt_userstat_running;
|
||||
@@ -5837,8 +5835,9 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
{
|
||||
DBUG_ENTER("THD::decide_logging_format");
|
||||
DBUG_PRINT("info", ("Query: %.*s", (uint) query_length(), query()));
|
||||
DBUG_PRINT("info", ("variables.binlog_format: %lu",
|
||||
variables.binlog_format));
|
||||
DBUG_PRINT("info", ("binlog_format: %lu", (ulong) variables.binlog_format));
|
||||
DBUG_PRINT("info", ("current_stmt_binlog_format: %lu",
|
||||
(ulong) current_stmt_binlog_format));
|
||||
DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x",
|
||||
lex->get_stmt_unsafe_flags()));
|
||||
|
||||
@@ -5861,18 +5860,14 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
}
|
||||
#endif /* WITH_WSREP */
|
||||
|
||||
if ((WSREP_EMULATE_BINLOG_NNULL(this) ||
|
||||
(mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) &&
|
||||
(mysql_bin_log.is_open() &&
|
||||
(variables.option_bits & OPTION_BIN_LOG))) &&
|
||||
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
|
||||
!binlog_filter->db_ok(db.str)))
|
||||
#else
|
||||
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
|
||||
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
|
||||
!binlog_filter->db_ok(db.str)))
|
||||
#endif /* WITH_WSREP */
|
||||
{
|
||||
|
||||
if (is_bulk_op())
|
||||
{
|
||||
if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
|
||||
@@ -5915,6 +5910,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
bool has_auto_increment_write_tables_not_first= FALSE;
|
||||
bool found_first_not_own_table= FALSE;
|
||||
bool has_write_tables_with_unsafe_statements= FALSE;
|
||||
bool blackhole_table_found= 0;
|
||||
|
||||
/*
|
||||
A pointer to a previous table that was changed.
|
||||
@@ -6040,6 +6036,10 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
if (prev_write_table && prev_write_table->file->ht !=
|
||||
table->file->ht)
|
||||
multi_write_engine= TRUE;
|
||||
|
||||
if (table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB)
|
||||
blackhole_table_found= 1;
|
||||
|
||||
if (share->non_determinstic_insert &&
|
||||
!(sql_command_flags[lex->sql_command] & CF_SCHEMA_CHANGE))
|
||||
has_write_tables_with_unsafe_statements= true;
|
||||
@@ -6285,7 +6285,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
is_current_stmt_binlog_format_row() ?
|
||||
"ROW" : "STATEMENT"));
|
||||
|
||||
if (variables.binlog_format == BINLOG_FORMAT_ROW &&
|
||||
if (blackhole_table_found &&
|
||||
variables.binlog_format == BINLOG_FORMAT_ROW &&
|
||||
(sql_command_flags[lex->sql_command] &
|
||||
(CF_UPDATES_DATA | CF_DELETES_DATA)))
|
||||
{
|
||||
@@ -6301,8 +6302,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB &&
|
||||
table->lock_type >= TL_WRITE_ALLOW_WRITE)
|
||||
{
|
||||
table_names.append(&table->table_name);
|
||||
table_names.append(",");
|
||||
table_names.append(&table->table_name);
|
||||
table_names.append(",");
|
||||
}
|
||||
}
|
||||
if (!table_names.is_empty())
|
||||
@@ -6322,9 +6323,12 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
table_names.c_ptr());
|
||||
}
|
||||
}
|
||||
|
||||
if (is_write && is_current_stmt_binlog_format_row())
|
||||
binlog_prepare_for_row_logging();
|
||||
}
|
||||
#ifndef DBUG_OFF
|
||||
else
|
||||
{
|
||||
DBUG_PRINT("info", ("decision: no logging since "
|
||||
"mysql_bin_log.is_open() = %d "
|
||||
"and (options & OPTION_BIN_LOG) = 0x%llx "
|
||||
@@ -6334,22 +6338,23 @@ int THD::decide_logging_format(TABLE_LIST *tables)
|
||||
(variables.option_bits & OPTION_BIN_LOG),
|
||||
(uint) wsrep_binlog_format(),
|
||||
binlog_filter->db_ok(db.str)));
|
||||
#endif
|
||||
|
||||
if (WSREP_NNULL(this) && is_current_stmt_binlog_format_row())
|
||||
binlog_prepare_for_row_logging();
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
int THD::decide_logging_format_low(TABLE *table)
|
||||
{
|
||||
DBUG_ENTER("decide_logging_format_low");
|
||||
/*
|
||||
INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
|
||||
can be unsafe.
|
||||
*/
|
||||
if(wsrep_binlog_format() <= BINLOG_FORMAT_STMT &&
|
||||
!is_current_stmt_binlog_format_row() &&
|
||||
!lex->is_stmt_unsafe() &&
|
||||
lex->sql_command == SQLCOM_INSERT &&
|
||||
lex->duplicates == DUP_UPDATE)
|
||||
INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
|
||||
can be unsafe.
|
||||
*/
|
||||
if (wsrep_binlog_format() <= BINLOG_FORMAT_STMT &&
|
||||
!is_current_stmt_binlog_format_row() &&
|
||||
!lex->is_stmt_unsafe() &&
|
||||
lex->duplicates == DUP_UPDATE)
|
||||
{
|
||||
uint unique_keys= 0;
|
||||
uint keys= table->s->keys, i= 0;
|
||||
@@ -6376,27 +6381,22 @@ exit:;
|
||||
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS);
|
||||
binlog_unsafe_warning_flags|= lex->get_stmt_unsafe_flags();
|
||||
set_current_stmt_binlog_format_row_if_mixed();
|
||||
return 1;
|
||||
if (is_current_stmt_binlog_format_row())
|
||||
binlog_prepare_for_row_logging();
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
Implementation of interface to write rows to the binary log through the
|
||||
thread. The thread is responsible for writing the rows it has
|
||||
inserted/updated/deleted.
|
||||
*/
|
||||
|
||||
#ifndef MYSQL_CLIENT
|
||||
|
||||
/*
|
||||
Template member function for ensuring that there is an rows log
|
||||
event of the apropriate type before proceeding.
|
||||
|
||||
PRE CONDITION:
|
||||
- Events of type 'RowEventT' have the type code 'type_code'.
|
||||
|
||||
|
||||
POST CONDITION:
|
||||
If a non-NULL pointer is returned, the pending event for thread 'thd' will
|
||||
be an event of type 'RowEventT' (which have the type code 'type_code')
|
||||
@@ -6814,7 +6814,8 @@ void THD::binlog_prepare_row_images(TABLE *table)
|
||||
*/
|
||||
if (table->s->primary_key < MAX_KEY &&
|
||||
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
|
||||
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
|
||||
!ha_check_storage_engine_flag(table->s->db_type(),
|
||||
HTON_NO_BINLOG_ROW_OPT))
|
||||
{
|
||||
/**
|
||||
Just to be sure that tmp_set is currently not in use as
|
||||
@@ -6859,7 +6860,7 @@ void THD::binlog_prepare_row_images(TABLE *table)
|
||||
|
||||
|
||||
|
||||
int THD::binlog_remove_pending_rows_event(bool clear_maps,
|
||||
int THD::binlog_remove_pending_rows_event(bool reset_stmt,
|
||||
bool is_transactional)
|
||||
{
|
||||
DBUG_ENTER("THD::binlog_remove_pending_rows_event");
|
||||
@@ -6873,12 +6874,12 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
|
||||
|
||||
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
|
||||
|
||||
if (clear_maps)
|
||||
binlog_table_maps= 0;
|
||||
|
||||
if (reset_stmt)
|
||||
reset_binlog_for_next_statement();
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
|
||||
{
|
||||
DBUG_ENTER("THD::binlog_flush_pending_rows_event");
|
||||
@@ -6904,9 +6905,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
|
||||
if (stmt_end)
|
||||
{
|
||||
pending->set_flags(Rows_log_event::STMT_END_F);
|
||||
binlog_table_maps= 0;
|
||||
reset_binlog_for_next_statement();
|
||||
}
|
||||
|
||||
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
|
||||
is_transactional);
|
||||
}
|
||||
@@ -7278,8 +7278,11 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
|
||||
suppress_use, errcode);
|
||||
error= mysql_bin_log.write(&qinfo);
|
||||
}
|
||||
/*
|
||||
row logged binlog may not have been reset in the case of locked tables
|
||||
*/
|
||||
reset_binlog_for_next_statement();
|
||||
|
||||
binlog_table_maps= 0;
|
||||
DBUG_RETURN(error >= 0 ? error : 1);
|
||||
}
|
||||
|
||||
|
@@ -75,14 +75,15 @@ void set_thd_stage_info(void *thd,
|
||||
#include "wsrep_condition_variable.h"
|
||||
|
||||
class Wsrep_applier_service;
|
||||
|
||||
#endif /* WITH_WSREP */
|
||||
|
||||
class Reprepare_observer;
|
||||
class Relay_log_info;
|
||||
struct rpl_group_info;
|
||||
class Rpl_filter;
|
||||
class Query_log_event;
|
||||
class Load_log_event;
|
||||
class Log_event_writer;
|
||||
class sp_rcontext;
|
||||
class sp_cache;
|
||||
class Lex_input_stream;
|
||||
@@ -737,11 +738,6 @@ typedef struct system_variables
|
||||
my_bool query_cache_strip_comments;
|
||||
my_bool sql_log_slow;
|
||||
my_bool sql_log_bin;
|
||||
/*
|
||||
A flag to help detect whether binary logging was temporarily disabled
|
||||
(see tmp_disable_binlog(A) macro).
|
||||
*/
|
||||
my_bool sql_log_bin_off;
|
||||
my_bool binlog_annotate_row_events;
|
||||
my_bool binlog_direct_non_trans_update;
|
||||
my_bool column_compression_zlib_wrap;
|
||||
@@ -2576,14 +2572,17 @@ public:
|
||||
*/
|
||||
void binlog_start_trans_and_stmt();
|
||||
void binlog_set_stmt_begin();
|
||||
int binlog_write_table_map(TABLE *table, bool is_transactional,
|
||||
bool *with_annotate= 0);
|
||||
int binlog_write_row(TABLE* table, bool is_transactional,
|
||||
const uchar *buf);
|
||||
int binlog_delete_row(TABLE* table, bool is_transactional,
|
||||
const uchar *buf);
|
||||
int binlog_update_row(TABLE* table, bool is_transactional,
|
||||
const uchar *old_data, const uchar *new_data);
|
||||
bool prepare_handlers_for_update(uint flag);
|
||||
bool binlog_write_annotated_row(Log_event_writer *writer);
|
||||
void binlog_prepare_for_row_logging();
|
||||
bool binlog_write_table_maps();
|
||||
bool binlog_write_table_map(TABLE *table, bool with_annotate);
|
||||
static void binlog_prepare_row_images(TABLE* table);
|
||||
|
||||
void set_server_id(uint32 sid) { variables.server_id = sid; }
|
||||
@@ -2677,22 +2676,20 @@ private:
|
||||
*/
|
||||
enum_binlog_format current_stmt_binlog_format;
|
||||
|
||||
/*
|
||||
Number of outstanding table maps, i.e., table maps in the
|
||||
transaction cache.
|
||||
*/
|
||||
uint binlog_table_maps;
|
||||
public:
|
||||
|
||||
/* 1 if binlog table maps has been written */
|
||||
bool binlog_table_maps;
|
||||
|
||||
void issue_unsafe_warnings();
|
||||
void reset_unsafe_warnings()
|
||||
{ binlog_unsafe_warning_flags= 0; }
|
||||
|
||||
uint get_binlog_table_maps() const {
|
||||
return binlog_table_maps;
|
||||
}
|
||||
void clear_binlog_table_maps() {
|
||||
void reset_binlog_for_next_statement()
|
||||
{
|
||||
binlog_table_maps= 0;
|
||||
}
|
||||
|
||||
#endif /* MYSQL_CLIENT */
|
||||
|
||||
public:
|
||||
@@ -5170,11 +5167,10 @@ my_eof(THD *thd)
|
||||
#define tmp_disable_binlog(A) \
|
||||
{ulonglong tmp_disable_binlog__save_options= (A)->variables.option_bits; \
|
||||
(A)->variables.option_bits&= ~OPTION_BIN_LOG; \
|
||||
(A)->variables.sql_log_bin_off= 1;
|
||||
(A)->variables.option_bits|= OPTION_BIN_TMP_LOG_OFF;
|
||||
|
||||
#define reenable_binlog(A) \
|
||||
(A)->variables.option_bits= tmp_disable_binlog__save_options; \
|
||||
(A)->variables.sql_log_bin_off= 0;}
|
||||
(A)->variables.option_bits= tmp_disable_binlog__save_options; }
|
||||
|
||||
|
||||
inline date_conv_mode_t sql_mode_for_dates(THD *thd)
|
||||
|
@@ -959,6 +959,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
|
||||
goto values_loop_end;
|
||||
|
||||
THD_STAGE_INFO(thd, stage_update);
|
||||
thd->decide_logging_format_low(table);
|
||||
do
|
||||
{
|
||||
DBUG_PRINT("info", ("iteration %llu", iteration));
|
||||
@@ -1071,7 +1072,6 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
|
||||
break;
|
||||
}
|
||||
|
||||
thd->decide_logging_format_low(table);
|
||||
#ifndef EMBEDDED_LIBRARY
|
||||
if (lock_type == TL_WRITE_DELAYED)
|
||||
{
|
||||
@@ -3051,6 +3051,8 @@ bool Delayed_insert::open_and_lock_table()
|
||||
return TRUE;
|
||||
}
|
||||
table->copy_blobs= 1;
|
||||
|
||||
table->file->prepare_for_row_logging();
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
@@ -3110,6 +3112,8 @@ pthread_handler_t handle_delayed_insert(void *arg)
|
||||
at which rows are inserted cannot be determined in mixed mode.
|
||||
*/
|
||||
thd->set_current_stmt_binlog_format_row_if_mixed();
|
||||
/* Don't annotate insert delayed binlog events */
|
||||
thd->variables.binlog_annotate_row_events= 0;
|
||||
|
||||
/*
|
||||
Clone tickets representing protection against GRL and the lock on
|
||||
@@ -3314,8 +3318,19 @@ pthread_handler_t handle_delayed_insert(void *arg)
|
||||
di->table->file->delete_update_handler();
|
||||
di->group_count=0;
|
||||
mysql_audit_release(thd);
|
||||
/*
|
||||
Reset binlog. We can't call ha_reset() for the table as this will
|
||||
reset the table maps we have calculated earlier.
|
||||
*/
|
||||
mysql_mutex_lock(&di->mutex);
|
||||
}
|
||||
|
||||
/*
|
||||
Reset binlog. We can't call ha_reset() for the table as this will
|
||||
reset the table maps we have calculated earlier.
|
||||
*/
|
||||
thd->reset_binlog_for_next_statement();
|
||||
|
||||
if (di->tables_in_use)
|
||||
mysql_cond_broadcast(&di->cond_client); // If waiting clients
|
||||
}
|
||||
@@ -3407,9 +3422,7 @@ bool Delayed_insert::handle_inserts(void)
|
||||
{
|
||||
int error;
|
||||
ulong max_rows;
|
||||
bool has_trans = TRUE;
|
||||
bool using_ignore= 0, using_opt_replace= 0,
|
||||
using_bin_log= mysql_bin_log.is_open();
|
||||
bool using_ignore= 0, using_opt_replace= 0, using_bin_log;
|
||||
delayed_row *row;
|
||||
DBUG_ENTER("handle_inserts");
|
||||
|
||||
@@ -3443,7 +3456,13 @@ bool Delayed_insert::handle_inserts(void)
|
||||
|
||||
if (table->file->ha_rnd_init_with_error(0))
|
||||
goto err;
|
||||
/*
|
||||
We have to call prepare_for_row_logging() as the second call to
|
||||
handler_writes() will not have called decide_logging_format.
|
||||
*/
|
||||
table->file->prepare_for_row_logging();
|
||||
table->file->prepare_for_insert();
|
||||
using_bin_log= table->file->row_logging;
|
||||
|
||||
/*
|
||||
We can't use row caching when using the binary log because if
|
||||
@@ -3452,6 +3471,7 @@ bool Delayed_insert::handle_inserts(void)
|
||||
*/
|
||||
if (!using_bin_log)
|
||||
table->file->extra(HA_EXTRA_WRITE_CACHE);
|
||||
|
||||
mysql_mutex_lock(&mutex);
|
||||
|
||||
while ((row=rows.get()))
|
||||
@@ -3480,8 +3500,8 @@ bool Delayed_insert::handle_inserts(void)
|
||||
Guaranteed that the INSERT DELAYED STMT will not be here
|
||||
in SBR when mysql binlog is enabled.
|
||||
*/
|
||||
DBUG_ASSERT(!(mysql_bin_log.is_open() &&
|
||||
!thd.is_current_stmt_binlog_format_row()));
|
||||
DBUG_ASSERT(!mysql_bin_log.is_open() ||
|
||||
thd.is_current_stmt_binlog_format_row());
|
||||
|
||||
/*
|
||||
This is the first value of an INSERT statement.
|
||||
@@ -3639,10 +3659,9 @@ bool Delayed_insert::handle_inserts(void)
|
||||
|
||||
TODO: Move the logging to last in the sequence of rows.
|
||||
*/
|
||||
has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
|
||||
table->file->has_transactions();
|
||||
if (thd.is_current_stmt_binlog_format_row() &&
|
||||
thd.binlog_flush_pending_rows_event(TRUE, has_trans))
|
||||
if (table->file->row_logging &&
|
||||
thd.binlog_flush_pending_rows_event(TRUE,
|
||||
table->file->row_logging_has_trans))
|
||||
goto err;
|
||||
|
||||
if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
|
||||
@@ -4548,6 +4567,18 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
|
||||
/* purecov: end */
|
||||
}
|
||||
table->s->table_creation_was_logged= save_table_creation_was_logged;
|
||||
if (!table->s->tmp_table)
|
||||
table->file->prepare_for_row_logging();
|
||||
|
||||
/*
|
||||
If slave is converting a statement event to row events, log the original
|
||||
create statement as an annotated row
|
||||
*/
|
||||
#ifdef HAVE_REPLICATION
|
||||
if (thd->slave_thread && opt_replicate_annotate_row_events &&
|
||||
thd->is_current_stmt_binlog_format_row())
|
||||
thd->variables.binlog_annotate_row_events= 1;
|
||||
#endif
|
||||
DBUG_RETURN(table);
|
||||
}
|
||||
|
||||
@@ -4792,6 +4823,7 @@ bool binlog_create_table(THD *thd, TABLE *table)
|
||||
logged
|
||||
*/
|
||||
thd->set_current_stmt_binlog_format_row();
|
||||
table->file->prepare_for_row_logging();
|
||||
return binlog_show_create_table(thd, table, 0) != 0;
|
||||
}
|
||||
|
||||
@@ -4854,6 +4886,9 @@ bool select_create::send_eof()
|
||||
if (table->s->tmp_table)
|
||||
thd->transaction.stmt.mark_created_temp_table();
|
||||
|
||||
if (thd->slave_thread)
|
||||
thd->variables.binlog_annotate_row_events= 0;
|
||||
|
||||
if (prepare_eof())
|
||||
{
|
||||
abort_result_set();
|
||||
|
@@ -7436,6 +7436,12 @@ void THD::reset_for_next_command(bool do_clear_error)
|
||||
DBUG_ENTER("THD::reset_for_next_command");
|
||||
DBUG_ASSERT(!spcont); /* not for substatements of routines */
|
||||
DBUG_ASSERT(!in_sub_stmt);
|
||||
/*
|
||||
Table maps should have been reset after previous statement except in the
|
||||
case where we have locked tables
|
||||
*/
|
||||
DBUG_ASSERT(binlog_table_maps == 0 ||
|
||||
locked_tables_mode == LTM_LOCK_TABLES);
|
||||
|
||||
if (likely(do_clear_error))
|
||||
{
|
||||
|
@@ -2168,14 +2168,13 @@ static bool finalize_install(THD *thd, TABLE *table, const LEX_CSTRING *name,
|
||||
of the insert into the plugin table, so that it is not replicated in
|
||||
row based mode.
|
||||
*/
|
||||
tmp_disable_binlog(thd);
|
||||
DBUG_ASSERT(!table->file->row_logging);
|
||||
table->use_all_columns();
|
||||
restore_record(table, s->default_values);
|
||||
table->field[0]->store(name->str, name->length, system_charset_info);
|
||||
table->field[1]->store(tmp->plugin_dl->dl.str, tmp->plugin_dl->dl.length,
|
||||
files_charset_info);
|
||||
error= table->file->ha_write_row(table->record[0]);
|
||||
reenable_binlog(thd);
|
||||
if (unlikely(error))
|
||||
{
|
||||
table->file->print_error(error, MYF(0));
|
||||
@@ -2322,9 +2321,8 @@ static bool do_uninstall(THD *thd, TABLE *table, const LEX_CSTRING *name)
|
||||
of the delete from the plugin table, so that it is not replicated in
|
||||
row based mode.
|
||||
*/
|
||||
tmp_disable_binlog(thd);
|
||||
table->file->row_logging= 0; // No logging
|
||||
error= table->file->ha_delete_row(table->record[0]);
|
||||
reenable_binlog(thd);
|
||||
if (unlikely(error))
|
||||
{
|
||||
table->file->print_error(error, MYF(0));
|
||||
|
@@ -180,6 +180,8 @@
|
||||
#define OPTION_NO_QUERY_CACHE (1ULL << 39) // SELECT, user
|
||||
#define OPTION_PROCEDURE_CLAUSE (1ULL << 40) // Internal usage
|
||||
#define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern
|
||||
#define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern
|
||||
#define OPTION_BIN_TMP_LOG_OFF (1ULL << 42) // disable binlog, intern
|
||||
|
||||
#define OPTION_LEX_FOUND_COMMENT (1ULL << 0) // intern, parser
|
||||
|
||||
|
@@ -580,7 +580,6 @@ void sequence_definition::adjust_values(longlong next_value)
|
||||
int sequence_definition::write_initial_sequence(TABLE *table)
|
||||
{
|
||||
int error;
|
||||
THD *thd= table->in_use;
|
||||
MY_BITMAP *save_write_set;
|
||||
|
||||
store_fields(table);
|
||||
@@ -588,15 +587,14 @@ int sequence_definition::write_initial_sequence(TABLE *table)
|
||||
table->s->sequence->copy(this);
|
||||
/*
|
||||
Sequence values will be replicated as a statement
|
||||
like 'create sequence'. So disable binary log temporarily
|
||||
like 'create sequence'. So disable row logging for this table & statement
|
||||
*/
|
||||
tmp_disable_binlog(thd);
|
||||
table->file->row_logging= table->file->row_logging_init= 0;
|
||||
save_write_set= table->write_set;
|
||||
table->write_set= &table->s->all_set;
|
||||
table->s->sequence->initialized= SEQUENCE::SEQ_IN_PREPARE;
|
||||
error= table->file->ha_write_row(table->record[0]);
|
||||
table->s->sequence->initialized= SEQUENCE::SEQ_UNINTIALIZED;
|
||||
reenable_binlog(thd);
|
||||
table->write_set= save_write_set;
|
||||
if (unlikely(error))
|
||||
table->file->print_error(error, MYF(0));
|
||||
|
@@ -404,6 +404,7 @@ insert_server(THD *thd, FOREIGN_SERVER *server)
|
||||
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */
|
||||
if (! (table= open_ltable(thd, &tables, TL_WRITE, MYSQL_LOCK_IGNORE_TIMEOUT)))
|
||||
goto end;
|
||||
table->file->row_logging= 0; // Don't log to binary log
|
||||
|
||||
/* insert the server into the table */
|
||||
if (unlikely(error= insert_server_record(table, server)))
|
||||
@@ -542,9 +543,9 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
|
||||
{
|
||||
int error;
|
||||
DBUG_ENTER("insert_server_record");
|
||||
tmp_disable_binlog(table->in_use);
|
||||
table->use_all_columns();
|
||||
DBUG_ASSERT(!table->file->row_logging);
|
||||
|
||||
table->use_all_columns();
|
||||
empty_record(table);
|
||||
|
||||
/* set the field that's the PK to the value we're looking for */
|
||||
@@ -577,8 +578,6 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
|
||||
}
|
||||
else
|
||||
error= ER_FOREIGN_SERVER_EXISTS;
|
||||
|
||||
reenable_binlog(table->in_use);
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
@@ -895,7 +894,8 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server)
|
||||
{
|
||||
int error=0;
|
||||
DBUG_ENTER("update_server_record");
|
||||
tmp_disable_binlog(table->in_use);
|
||||
DBUG_ASSERT(!table->file->row_logging);
|
||||
|
||||
table->use_all_columns();
|
||||
/* set the field that's the PK to the value we're looking for */
|
||||
table->field[0]->store(server->server_name,
|
||||
@@ -931,7 +931,6 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server)
|
||||
}
|
||||
|
||||
end:
|
||||
reenable_binlog(table->in_use);
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
@@ -956,7 +955,8 @@ delete_server_record(TABLE *table, LEX_CSTRING *name)
|
||||
{
|
||||
int error;
|
||||
DBUG_ENTER("delete_server_record");
|
||||
tmp_disable_binlog(table->in_use);
|
||||
DBUG_ASSERT(!table->file->row_logging);
|
||||
|
||||
table->use_all_columns();
|
||||
|
||||
/* set the field that's the PK to the value we're looking for */
|
||||
@@ -980,7 +980,6 @@ delete_server_record(TABLE *table, LEX_CSTRING *name)
|
||||
table->file->print_error(error, MYF(0));
|
||||
}
|
||||
|
||||
reenable_binlog(table->in_use);
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
|
@@ -10471,6 +10471,7 @@ do_continue:;
|
||||
No additional logging of query is needed
|
||||
*/
|
||||
binlog_done= 1;
|
||||
DBUG_ASSERT(new_table->file->row_logging);
|
||||
new_table->mark_columns_needed_for_insert();
|
||||
thd->binlog_write_table_map(new_table, 1);
|
||||
}
|
||||
|
@@ -1330,7 +1330,8 @@ bool Table_triggers_list::prepare_record_accessors(TABLE *table)
|
||||
*/
|
||||
|
||||
bool Table_triggers_list::check_n_load(THD *thd, const LEX_CSTRING *db,
|
||||
const LEX_CSTRING *table_name, TABLE *table,
|
||||
const LEX_CSTRING *table_name,
|
||||
TABLE *table,
|
||||
bool names_only)
|
||||
{
|
||||
char path_buff[FN_REFLEN];
|
||||
|
@@ -5269,7 +5269,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl)
|
||||
/* used in RBR Triggers */
|
||||
master_had_triggers= 0;
|
||||
#endif
|
||||
|
||||
/* Catch wrong handling of the auto_increment_field_not_null. */
|
||||
DBUG_ASSERT(!auto_increment_field_not_null);
|
||||
auto_increment_field_not_null= FALSE;
|
||||
@@ -5284,7 +5283,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl)
|
||||
}
|
||||
|
||||
notnull_cond= 0;
|
||||
|
||||
DBUG_ASSERT(!file->keyread_enabled());
|
||||
|
||||
restore_record(this, s->default_values);
|
||||
@@ -7293,8 +7291,7 @@ void TABLE::mark_columns_per_binlog_row_image()
|
||||
If in RBR we may need to mark some extra columns,
|
||||
depending on the binlog-row-image command line argument.
|
||||
*/
|
||||
if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
|
||||
thd->is_current_stmt_binlog_format_row() &&
|
||||
if (file->row_logging &&
|
||||
!ha_check_storage_engine_flag(s->db_type(), HTON_NO_BINLOG_ROW_OPT))
|
||||
{
|
||||
/* if there is no PK, then mark all columns for the BI. */
|
||||
|
@@ -1124,13 +1124,11 @@ TABLE *THD::open_temporary_table(TMP_TABLE_SHARE *share,
|
||||
|
||||
table->reginfo.lock_type= TL_WRITE; /* Simulate locked */
|
||||
table->grant.privilege= TMP_TABLE_ACLS;
|
||||
table->query_id= query_id;
|
||||
share->tmp_table= (table->file->has_transaction_manager() ?
|
||||
TRANSACTIONAL_TMP_TABLE : NON_TRANSACTIONAL_TMP_TABLE);
|
||||
share->not_usable_by_query_cache= 1;
|
||||
|
||||
table->pos_in_table_list= 0;
|
||||
table->query_id= query_id;
|
||||
|
||||
/* Add table to the head of table list. */
|
||||
share->all_tmp_tables.push_front(table);
|
||||
|
||||
|
@@ -302,8 +302,10 @@ bool trans_commit_implicit(THD *thd)
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
|
||||
{
|
||||
DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. "
|
||||
"Master and slave will have different GTID values"));
|
||||
}
|
||||
|
||||
if (thd->in_multi_stmt_transaction_mode() ||
|
||||
(thd->variables.option_bits & OPTION_TABLE_LOCK))
|
||||
@@ -361,9 +363,9 @@ bool trans_rollback(THD *thd)
|
||||
#ifdef HAVE_REPLICATION
|
||||
repl_semisync_master.wait_after_rollback(thd, FALSE);
|
||||
#endif
|
||||
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
|
||||
/* Reset the binlog transaction marker */
|
||||
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
|
||||
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG |
|
||||
OPTION_GTID_BEGIN);
|
||||
thd->transaction.all.reset();
|
||||
thd->lex->start_transaction_opt= 0;
|
||||
|
||||
|
@@ -222,7 +222,7 @@ extern wsrep_seqno_t wsrep_locked_seqno;
|
||||
/* use xxxxxx_NNULL macros when thd pointer is guaranteed to be non-null to
|
||||
* avoid compiler warnings (GCC 6 and later) */
|
||||
#define WSREP_NNULL(thd) \
|
||||
(WSREP_ON && thd->variables.wsrep_on)
|
||||
(thd->variables.wsrep_on && WSREP_ON)
|
||||
|
||||
#define WSREP(thd) \
|
||||
(thd && WSREP_NNULL(thd))
|
||||
|
Reference in New Issue
Block a user