1
0
mirror of https://github.com/MariaDB/server.git synced 2025-10-24 07:13:33 +03:00

Clean up and speed up interfaces for binary row logging

MDEV-21605 Clean up and speed up interfaces for binary row logging
MDEV-21617 Bug fix for previous version of this code

The intention is to have as few 'if' as possible in ha_write() and
related functions. This is done by pre-calculating once per statement the
row_logging state for all tables.

Benefits are simpler and faster code both when binary logging is disabled
and when it's enabled.

Changes:
- Added handler->row_logging to make it easy to check it table should be
  row logged. This also made it easier to disabling row logging for system,
  internal and temporary tables.
- The tables row_logging capabilities are checked once per "statements
  that updates tables" in THD::binlog_prepare_for_row_logging() which
  is called when needed from THD::decide_logging_format().
- Removed most usage of tmp_disable_binlog(), reenable_binlog() and
  temporary saving and setting of thd->variables.option_bits.
- Moved checks that can't change during a statement from
  check_table_binlog_row_based() to check_table_binlog_row_based_internal()
- Removed flag row_already_logged (used by sequence engine)
- Moved binlog_log_row() to a handler::
- Moved write_locked_table_maps() to THD::binlog_write_table_maps() as
  most other related binlog functions are in THD.
- Removed binlog_write_table_map() and binlog_log_row_internal() as
  they are now obsolete as 'has_transactions()' is pre-calculated in
  prepare_for_row_logging().
- Remove 'is_transactional' argument from binlog_write_table_map() as this
  can now be read from handler.
- Changed order of 'if's in handler::external_lock() and wsrep_mysqld.h
  to first evaluate fast and likely cases before more complex ones.
- Added error checking in ha_write_row() and related functions if
  binlog_log_row() failed.
- Don't clear check_table_binlog_row_based_result in
  clear_cached_table_binlog_row_based_flag() as it's not needed.
- THD::clear_binlog_table_maps() has been replaced with
  THD::reset_binlog_for_next_statement()
- Added 'MYSQL_OPEN_IGNORE_LOGGING_FORMAT' flag to open_and_lock_tables()
  to avoid calculating of binary log format for internal opens. This flag
  is also used to avoid reading statistics tables for internal tables.
- Added OPTION_BINLOG_LOG_OFF as a simple way to turn of binlog temporary
  for create (instead of using THD::sql_log_bin_off.
- Removed flag THD::sql_log_bin_off (not needed anymore)
- Speed up THD::decide_logging_format() by remembering if blackhole engine
  is used and avoid a loop over all tables if it's not used
  (the common case).
- THD::decide_logging_format() is not called anymore if no tables are used
  for the statement. This will speed up pure stored procedure code with
  about 5%+ according to some simple tests.
- We now get annotated events on slave if a CREATE ... SELECT statement
  is transformed on the slave from statement to row logging.
- In the original code, the master could come into a state where row
  logging is enforced for all future events if statement could be used.
  This is now partly fixed.

Other changes:
- Ensure that all tables used by a statement has query_id set.
- Had to restore the row_logging flag for not used tables in
  THD::binlog_write_table_maps (not normal scenario)
- Removed injector::transaction::use_table(server_id_type sid, table tbl)
  as it's not used.
- Cleaned up set_slave_thread_options()
- Some more DBUG_ENTER/DBUG_RETURN, code comments and minor indentation
  changes.
- Ensure we only call THD::decide_logging_format_low() once in
  mysql_insert() (inefficiency).
- Don't annotate INSERT DELAYED
- Removed zeroing pos_in_table_list in THD::open_temporary_table() as it's
  already 0
This commit is contained in:
Monty
2020-01-28 23:23:51 +02:00
parent f51df1dc78
commit 91ab42a823
35 changed files with 570 additions and 428 deletions

View File

@@ -0,0 +1,22 @@
--source include/have_innodb.inc
--source include/have_binlog_format_row.inc
reset master;
CREATE TABLE t1 (
id INT,
k INT,
c CHAR(8),
KEY (k),
PRIMARY KEY (id),
FOREIGN KEY (id) REFERENCES t1 (k)
) ENGINE=InnoDB;
LOCK TABLES t1 WRITE;
SET SESSION FOREIGN_KEY_CHECKS= OFF;
SET AUTOCOMMIT=OFF;
INSERT INTO t1 VALUES (1,1,'foo');
DROP TABLE t1;
SET SESSION FOREIGN_KEY_CHECKS= ON;
SET AUTOCOMMIT=ON;
source include/show_binlog_events.inc;

View File

@@ -85,9 +85,12 @@ master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */ master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */
master-bin.000001 # Gtid # # GTID #-#-# master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; create table t1 (a int) master-bin.000001 # Query # # use `test`; create table t1 (a int)
master-bin.000001 # Gtid # # BEGIN GTID #-#-# master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `test`.`t1`/* Generated to handle failed CREATE OR REPLACE */ master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `test`.`t1`/* Generated to handle failed CREATE OR REPLACE */
master-bin.000001 # Query # # ROLLBACK master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; create temporary table t9 (a int)
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # DROP TEMPORARY TABLE IF EXISTS `test`.`t9`/* Generated to handle failed CREATE OR REPLACE */
connection server_2; connection server_2;
show tables; show tables;
Tables_in_test Tables_in_test
@@ -154,7 +157,7 @@ slave-bin.000001 # Query # # use `test`; create table t4 (server_2_to_be_delete
slave-bin.000001 # Gtid # # GTID #-#-# slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `test`; create table t1 (new_table int) slave-bin.000001 # Query # # use `test`; create table t1 (new_table int)
slave-bin.000001 # Gtid # # BEGIN GTID #-#-# slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` ( slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
`a` int(11) DEFAULT NULL `a` int(11) DEFAULT NULL
) )
slave-bin.000001 # Annotate_rows # # create table t2 select * from t9 slave-bin.000001 # Annotate_rows # # create table t2 select * from t9
@@ -223,26 +226,12 @@ Log_name Pos Event_type Server_id End_log_pos Info
slave-bin.000001 # Gtid # # GTID #-#-# slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `test`; create table t1 (a int) slave-bin.000001 # Query # # use `test`; create table t1 (a int)
slave-bin.000001 # Gtid # # BEGIN GTID #-#-# slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Annotate_rows # # insert into t1 values (0),(1),(2) slave-bin.000001 # Query # # use `test`; insert into t1 values (0),(1),(2)
slave-bin.000001 # Table_map # # table_id: # (test.t1)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # COMMIT slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # BEGIN GTID #-#-# slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` ( slave-bin.000001 # Query # # use `test`; create table t2 engine=myisam select * from t1
`a` int(11) DEFAULT NULL slave-bin.000001 # Gtid # # GTID #-#-#
) ENGINE=MyISAM slave-bin.000001 # Query # # use `test`; create or replace table t2 engine=innodb select * from t1
slave-bin.000001 # Annotate_rows # # create table t2 engine=myisam select * from t1
slave-bin.000001 # Table_map # # table_id: # (test.t2)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
`a` int(11) DEFAULT NULL
) ENGINE=InnoDB
slave-bin.000001 # Annotate_rows # # create or replace table t2 engine=innodb select * from t1
slave-bin.000001 # Table_map # # table_id: # (test.t2)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Xid # # COMMIT /* XID */
connection server_1; connection server_1;
drop table t1; drop table t1;
# #

View File

@@ -160,6 +160,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` ( slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
`a` int(11) DEFAULT NULL `a` int(11) DEFAULT NULL
) )
slave-bin.000001 # Annotate_rows # # create table t2 select * from t9
slave-bin.000001 # Table_map # # table_id: # (test.t2) slave-bin.000001 # Table_map # # table_id: # (test.t2)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # COMMIT slave-bin.000001 # Query # # COMMIT
@@ -171,6 +172,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t5` ( slave-bin.000001 # Query # # use `test`; CREATE TABLE `t5` (
`a` int(11) DEFAULT NULL `a` int(11) DEFAULT NULL
) )
slave-bin.000001 # Annotate_rows # # create table t5 select * from t9
slave-bin.000001 # Table_map # # table_id: # (test.t5) slave-bin.000001 # Table_map # # table_id: # (test.t5)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # COMMIT slave-bin.000001 # Query # # COMMIT

View File

@@ -56,6 +56,7 @@ create or replace table t1 (a int primary key) select a from t2;
# Same with temporary table # Same with temporary table
create temporary table t9 (a int); create temporary table t9 (a int);
--error ER_DUP_ENTRY --error ER_DUP_ENTRY
create or replace temporary table t9 (a int primary key) select a from t2; create or replace temporary table t9 (a int primary key) select a from t2;

View File

@@ -1241,15 +1241,9 @@ Events::load_events_from_db(THD *thd)
TRUE); TRUE);
/* All the dmls to mysql.events tables are stmt bin-logged. */ /* All the dmls to mysql.events tables are stmt bin-logged. */
bool save_binlog_row_based; table->file->row_logging= 0;
if ((save_binlog_row_based= thd->is_current_stmt_binlog_format_row()))
thd->set_current_stmt_binlog_format_stmt();
(void) table->file->ha_update_row(table->record[1], table->record[0]); (void) table->file->ha_update_row(table->record[1], table->record[0]);
if (save_binlog_row_based)
thd->set_current_stmt_binlog_format_row();
delete et; delete et;
continue; continue;
} }

View File

@@ -2131,12 +2131,10 @@ int ha_partition::copy_partitions(ulonglong * const copied,
} }
else else
{ {
THD *thd= ha_thd();
/* Copy record to new handler */ /* Copy record to new handler */
(*copied)++; (*copied)++;
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ DBUG_ASSERT(!m_new_file[new_part]->row_logging);
result= m_new_file[new_part]->ha_write_row(m_rec0); result= m_new_file[new_part]->ha_write_row(m_rec0);
reenable_binlog(thd);
if (result) if (result)
goto error; goto error;
} }
@@ -4374,11 +4372,10 @@ int ha_partition::write_row(const uchar * buf)
start_part_bulk_insert(thd, part_id); start_part_bulk_insert(thd, part_id);
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ DBUG_ASSERT(!m_file[part_id]->row_logging);
error= m_file[part_id]->ha_write_row(buf); error= m_file[part_id]->ha_write_row(buf);
if (have_auto_increment && !table->s->next_number_keypart) if (have_auto_increment && !table->s->next_number_keypart)
set_auto_increment_if_higher(table->next_number_field); set_auto_increment_if_higher(table->next_number_field);
reenable_binlog(thd);
exit: exit:
table->auto_increment_field_not_null= saved_auto_inc_field_not_null; table->auto_increment_field_not_null= saved_auto_inc_field_not_null;
@@ -4457,12 +4454,11 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data)
m_last_part= new_part_id; m_last_part= new_part_id;
start_part_bulk_insert(thd, new_part_id); start_part_bulk_insert(thd, new_part_id);
DBUG_ASSERT(!m_file[new_part_id]->row_logging);
if (new_part_id == old_part_id) if (new_part_id == old_part_id)
{ {
DBUG_PRINT("info", ("Update in partition %u", (uint) new_part_id)); DBUG_PRINT("info", ("Update in partition %u", (uint) new_part_id));
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
error= m_file[new_part_id]->ha_update_row(old_data, new_data); error= m_file[new_part_id]->ha_update_row(old_data, new_data);
reenable_binlog(thd);
goto exit; goto exit;
} }
else else
@@ -4481,16 +4477,12 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data)
table->next_number_field= NULL; table->next_number_field= NULL;
DBUG_PRINT("info", ("Update from partition %u to partition %u", DBUG_PRINT("info", ("Update from partition %u to partition %u",
(uint) old_part_id, (uint) new_part_id)); (uint) old_part_id, (uint) new_part_id));
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
error= m_file[new_part_id]->ha_write_row((uchar*) new_data); error= m_file[new_part_id]->ha_write_row((uchar*) new_data);
reenable_binlog(thd);
table->next_number_field= saved_next_number_field; table->next_number_field= saved_next_number_field;
if (unlikely(error)) if (unlikely(error))
goto exit; goto exit;
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
error= m_file[old_part_id]->ha_delete_row(old_data); error= m_file[old_part_id]->ha_delete_row(old_data);
reenable_binlog(thd);
if (unlikely(error)) if (unlikely(error))
goto exit; goto exit;
} }
@@ -4592,9 +4584,8 @@ int ha_partition::delete_row(const uchar *buf)
if (!bitmap_is_set(&(m_part_info->lock_partitions), m_last_part)) if (!bitmap_is_set(&(m_part_info->lock_partitions), m_last_part))
DBUG_RETURN(HA_ERR_NOT_IN_LOCK_PARTITIONS); DBUG_RETURN(HA_ERR_NOT_IN_LOCK_PARTITIONS);
tmp_disable_binlog(thd); DBUG_ASSERT(!m_file[m_last_part]->row_logging);
error= m_file[m_last_part]->ha_delete_row(buf); error= m_file[m_last_part]->ha_delete_row(buf);
reenable_binlog(thd);
DBUG_RETURN(error); DBUG_RETURN(error);
} }

View File

@@ -202,7 +202,11 @@ int ha_sequence::write_row(const uchar *buf)
DBUG_ENTER("ha_sequence::write_row"); DBUG_ENTER("ha_sequence::write_row");
DBUG_ASSERT(table->record[0] == buf); DBUG_ASSERT(table->record[0] == buf);
row_already_logged= 0; /*
Log to binary log even if this function has been called before
(The function ends by setting row_logging to 0)
*/
row_logging= row_logging_init;
if (unlikely(sequence->initialized == SEQUENCE::SEQ_IN_PREPARE)) if (unlikely(sequence->initialized == SEQUENCE::SEQ_IN_PREPARE))
{ {
/* This calls is from ha_open() as part of create table */ /* This calls is from ha_open() as part of create table */
@@ -218,6 +222,7 @@ int ha_sequence::write_row(const uchar *buf)
sequence->copy(&tmp_seq); sequence->copy(&tmp_seq);
if (likely(!(error= file->write_row(buf)))) if (likely(!(error= file->write_row(buf))))
sequence->initialized= SEQUENCE::SEQ_READY_TO_USE; sequence->initialized= SEQUENCE::SEQ_READY_TO_USE;
row_logging= 0;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
if (unlikely(sequence->initialized != SEQUENCE::SEQ_READY_TO_USE)) if (unlikely(sequence->initialized != SEQUENCE::SEQ_READY_TO_USE))
@@ -262,10 +267,12 @@ int ha_sequence::write_row(const uchar *buf)
sequence->copy(&tmp_seq); sequence->copy(&tmp_seq);
rows_changed++; rows_changed++;
/* We have to do the logging while we hold the sequence mutex */ /* We have to do the logging while we hold the sequence mutex */
if (row_logging)
error= binlog_log_row(table, 0, buf, log_func); error= binlog_log_row(table, 0, buf, log_func);
row_already_logged= 1;
} }
/* Row is already logged, don't log it again in ha_write_row() */
row_logging= 0;
sequence->all_values_used= 0; sequence->all_values_used= 0;
if (!sequence_locked) if (!sequence_locked)
sequence->write_unlock(table); sequence->write_unlock(table);

View File

@@ -3620,8 +3620,9 @@ int handler::update_auto_increment()
variables->auto_increment_increment); variables->auto_increment_increment);
auto_inc_intervals_count++; auto_inc_intervals_count++;
/* Row-based replication does not need to store intervals in binlog */ /* Row-based replication does not need to store intervals in binlog */
if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open()) if (((WSREP_NNULL(thd) && wsrep_emulate_bin_log) ||
&& !thd->is_current_stmt_binlog_format_row()) mysql_bin_log.is_open()) &&
!thd->is_current_stmt_binlog_format_row())
thd->auto_inc_intervals_in_cur_stmt_for_binlog. thd->auto_inc_intervals_in_cur_stmt_for_binlog.
append(auto_inc_interval_for_cur_row.minimum(), append(auto_inc_interval_for_cur_row.minimum(),
auto_inc_interval_for_cur_row.values(), auto_inc_interval_for_cur_row.values(),
@@ -6366,32 +6367,35 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat)
1 Row needs to be logged 1 Row needs to be logged
*/ */
bool handler::check_table_binlog_row_based(bool binlog_row) bool handler::check_table_binlog_row_based()
{ {
if (table->versioned(VERS_TRX_ID))
return false;
if (unlikely((table->in_use->variables.sql_log_bin_off)))
return 0; /* Called by partitioning engine */
#ifdef WITH_WSREP
if (!table->in_use->variables.sql_log_bin &&
wsrep_thd_is_applying(table->in_use))
return 0; /* wsrep patch sets sql_log_bin to silence binlogging
from high priority threads */
#endif /* WITH_WSREP */
if (unlikely((!check_table_binlog_row_based_done))) if (unlikely((!check_table_binlog_row_based_done)))
{ {
check_table_binlog_row_based_done= 1; check_table_binlog_row_based_done= 1;
check_table_binlog_row_based_result= check_table_binlog_row_based_result=
check_table_binlog_row_based_internal(binlog_row); check_table_binlog_row_based_internal();
} }
return check_table_binlog_row_based_result; return check_table_binlog_row_based_result;
} }
bool handler::check_table_binlog_row_based_internal(bool binlog_row) bool handler::check_table_binlog_row_based_internal()
{ {
THD *thd= table->in_use; THD *thd= table->in_use;
#ifdef WITH_WSREP
if (!thd->variables.sql_log_bin &&
wsrep_thd_is_applying(table->in_use))
{
/*
wsrep patch sets sql_log_bin to silence binlogging from high
priority threads
*/
return 0;
}
#endif
return (table->s->can_do_row_logging && return (table->s->can_do_row_logging &&
!table->versioned(VERS_TRX_ID) &&
!(thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) &&
thd->is_current_stmt_binlog_format_row() && thd->is_current_stmt_binlog_format_row() &&
/* /*
Wsrep partially enables binary logging if it have not been Wsrep partially enables binary logging if it have not been
@@ -6407,9 +6411,9 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
Otherwise, return 'true' if binary logging is on. Otherwise, return 'true' if binary logging is on.
*/ */
IF_WSREP(((WSREP_EMULATE_BINLOG(thd) && IF_WSREP(((WSREP_EMULATE_BINLOG_NNULL(thd) &&
wsrep_thd_is_local(thd)) || wsrep_thd_is_local(thd)) ||
((WSREP(thd) || ((WSREP_NNULL(thd) ||
(thd->variables.option_bits & OPTION_BIN_LOG)) && (thd->variables.option_bits & OPTION_BIN_LOG)) &&
mysql_bin_log.is_open())), mysql_bin_log.is_open())),
(thd->variables.option_bits & OPTION_BIN_LOG) && (thd->variables.option_bits & OPTION_BIN_LOG) &&
@@ -6417,151 +6421,22 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
} }
/** @brief int handler::binlog_log_row(TABLE *table,
Write table maps for all (manually or automatically) locked tables
to the binary log. Also, if binlog_annotate_row_events is ON,
write Annotate_rows event before the first table map.
SYNOPSIS
write_locked_table_maps()
thd Pointer to THD structure
DESCRIPTION
This function will generate and write table maps for all tables
that are locked by the thread 'thd'.
RETURN VALUE
0 All OK
1 Failed to write all table maps
SEE ALSO
THD::lock
*/
static int write_locked_table_maps(THD *thd)
{
DBUG_ENTER("write_locked_table_maps");
DBUG_PRINT("enter", ("thd:%p thd->lock:%p "
"thd->extra_lock: %p",
thd, thd->lock, thd->extra_lock));
DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps()));
MYSQL_LOCK *locks[2];
locks[0]= thd->extra_lock;
locks[1]= thd->lock;
my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd),
true) &&
thd->variables.binlog_annotate_row_events &&
thd->query() && thd->query_length();
for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
{
MYSQL_LOCK const *const lock= locks[i];
if (lock == NULL)
continue;
TABLE **const end_ptr= lock->table + lock->table_count;
for (TABLE **table_ptr= lock->table ;
table_ptr != end_ptr ;
++table_ptr)
{
TABLE *const table= *table_ptr;
if (table->current_lock == F_WRLCK &&
table->file->check_table_binlog_row_based(0))
{
if (binlog_write_table_map(thd, table, with_annotate))
DBUG_RETURN(1);
with_annotate= 0;
}
}
}
DBUG_RETURN(0);
}
int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate)
{
DBUG_ENTER("binlog_write_table_map");
DBUG_PRINT("info", ("table %s", table->s->table_name.str));
/*
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
(e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
compatible behavior with the STMT based replication even when
the table is not transactional. In other words, if the operation
fails while executing the insert phase nothing is written to the
binlog.
Note that at this point, we check the type of a set of tables to
create the table map events. In the function binlog_log_row(),
which calls the current function, we check the type of the table
of the current row.
*/
bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
table->file->has_transactions());
int const error= thd->binlog_write_table_map(table, has_trans,
&with_annotate);
/*
If an error occurs, it is the responsibility of the caller to
roll back the transaction.
*/
if (unlikely(error))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
static int binlog_log_row_internal(TABLE* table,
const uchar *before_record, const uchar *before_record,
const uchar *after_record, const uchar *after_record,
Log_func *log_func) Log_func *log_func)
{ {
bool error= 0; bool error;
THD *const thd= table->in_use; THD *thd= table->in_use;
DBUG_ENTER("binlog_log_row");
/* if (!thd->binlog_table_maps &&
If there are no table maps written to the binary log, this is thd->binlog_write_table_maps())
the first row handled in this statement. In that case, we need DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);
to write table maps for all locked tables to the binary log.
*/
if (likely(!(error= ((thd->get_binlog_table_maps() == 0 &&
write_locked_table_maps(thd))))))
{
/*
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
(i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
compatible behavior with the STMT based replication even when
the table is not transactional. In other words, if the operation
fails while executing the insert phase nothing is written to the
binlog. We need the same also for ALTER TABLE in the case we convert
a shared table to a not shared table as in this case we will log all
rows.
*/
bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
table->file->has_transactions());
error= (*log_func)(thd, table, has_trans, before_record, after_record);
}
return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
}
int binlog_log_row(TABLE* table, const uchar *before_record, error= (*log_func)(thd, table, row_logging_has_trans,
const uchar *after_record, Log_func *log_func) before_record, after_record);
{ DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
#ifdef WITH_WSREP
THD *const thd= table->in_use;
/* only InnoDB tables will be replicated through binlog emulation */
if ((WSREP_EMULATE_BINLOG(thd) &&
!(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
thd->wsrep_ignore_table == true)
return 0;
#endif
if (!table->file->check_table_binlog_row_based(1))
return 0;
return binlog_log_row_internal(table, before_record, after_record, log_func);
} }
@@ -6647,6 +6522,7 @@ int handler::ha_external_lock(THD *thd, int lock_type)
int handler::ha_reset() int handler::ha_reset()
{ {
DBUG_ENTER("ha_reset"); DBUG_ENTER("ha_reset");
/* Check that we have called all proper deallocation functions */ /* Check that we have called all proper deallocation functions */
DBUG_ASSERT((uchar*) table->def_read_set.bitmap + DBUG_ASSERT((uchar*) table->def_read_set.bitmap +
table->s->column_bitmap_size == table->s->column_bitmap_size ==
@@ -6662,6 +6538,10 @@ int handler::ha_reset()
pushed_cond= NULL; pushed_cond= NULL;
tracker= NULL; tracker= NULL;
mark_trx_read_write_done= 0; mark_trx_read_write_done= 0;
/*
Disable row logging.
*/
row_logging= row_logging_init= 0;
clear_cached_table_binlog_row_based_flag(); clear_cached_table_binlog_row_based_flag();
/* Reset information about pushed engine conditions */ /* Reset information about pushed engine conditions */
cancel_pushed_idx_cond(); cancel_pushed_idx_cond();
@@ -6678,8 +6558,8 @@ static int wsrep_after_row(THD *thd)
/* enforce wsrep_max_ws_rows */ /* enforce wsrep_max_ws_rows */
thd->wsrep_affected_rows++; thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows && if (wsrep_max_ws_rows &&
wsrep_thd_is_local(thd) && thd->wsrep_affected_rows > wsrep_max_ws_rows &&
thd->wsrep_affected_rows > wsrep_max_ws_rows) wsrep_thd_is_local(thd))
{ {
trans_rollback_stmt(thd) || trans_rollback(thd); trans_rollback_stmt(thd) || trans_rollback(thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
@@ -6864,6 +6744,74 @@ static int check_duplicate_long_entries_update(TABLE *table, uchar *new_rec)
/** /**
Check if galera disables binary logging for this table
@return 0 Binary logging disabled
@return 1 Binary logging can be enabled
*/
static inline bool wsrep_check_if_binlog_row(TABLE *table)
{
#ifdef WITH_WSREP
THD *const thd= table->in_use;
/* only InnoDB tables will be replicated through binlog emulation */
if ((WSREP_EMULATE_BINLOG(thd) &&
!(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
thd->wsrep_ignore_table == true)
return 0;
#endif
return 1;
}
/**
Prepare handler for row logging
@return 0 if handler will not participate in row logging
@return 1 handler will participate in row logging
This function is always safe to call on an opened table.
*/
bool handler::prepare_for_row_logging()
{
DBUG_ENTER("handler::prepare_for_row_logging");
/* Check if we should have row logging */
if (wsrep_check_if_binlog_row(table) &&
check_table_binlog_row_based())
{
/*
Row logging enabled. Intialize all variables and write
annotated and table maps
*/
row_logging= row_logging_init= 1;
/*
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
(e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
compatible behavior with the STMT based replication even when
the table is not transactional. In other words, if the operation
fails while executing the insert phase nothing is written to the
binlog.
*/
row_logging_has_trans=
((sql_command_flags[table->in_use->lex->sql_command] &
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
table->file->has_transactions());
}
else
{
/* Check row_logging has not been properly cleared from previous command */
DBUG_ASSERT(row_logging == 0);
}
DBUG_RETURN(row_logging);
}
/*
Do all initialization needed for insert Do all initialization needed for insert
@param force_update_handler Set to TRUE if we should always create an @param force_update_handler Set to TRUE if we should always create an
@@ -6891,7 +6839,6 @@ int handler::prepare_for_insert(bool force_update_handler)
int handler::ha_write_row(const uchar *buf) int handler::ha_write_row(const uchar *buf)
{ {
int error; int error;
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK); m_lock_type == F_WRLCK);
DBUG_ENTER("handler::ha_write_row"); DBUG_ENTER("handler::ha_write_row");
@@ -6911,13 +6858,17 @@ int handler::ha_write_row(const uchar *buf)
{ error= write_row(buf); }) { error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error); MYSQL_INSERT_ROW_DONE(error);
if (likely(!error) && !row_already_logged) if (likely(!error))
{ {
rows_changed++; rows_changed++;
if (row_logging)
{
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
error= binlog_log_row(table, 0, buf, log_func); error= binlog_log_row(table, 0, buf, log_func);
}
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE && if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) !error && (error= wsrep_after_row(ha_thd())))
{ {
DBUG_RETURN(error); DBUG_RETURN(error);
} }
@@ -6932,10 +6883,8 @@ int handler::ha_write_row(const uchar *buf)
int handler::ha_update_row(const uchar *old_data, const uchar *new_data) int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
{ {
int error; int error;
Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK); m_lock_type == F_WRLCK);
/* /*
Some storage engines require that the new record is in record[0] Some storage engines require that the new record is in record[0]
(and the old record is in record[1]). (and the old record is in record[1]).
@@ -6950,20 +6899,22 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
(error= check_duplicate_long_entries_update(table, (uchar*) new_data))) (error= check_duplicate_long_entries_update(table, (uchar*) new_data)))
return error; return error;
TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error, TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);}) { error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error); MYSQL_UPDATE_ROW_DONE(error);
if (likely(!error) && !row_already_logged) if (likely(!error))
{ {
rows_changed++; rows_changed++;
error= binlog_log_row(table, old_data, new_data, log_func); if (row_logging)
#ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
{ {
return error; Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
error= binlog_log_row(table, old_data, new_data, log_func);
} }
#ifdef WITH_WSREP
if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
!error && (error= wsrep_after_row(ha_thd())))
return error;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
} }
return error; return error;
@@ -7000,7 +6951,6 @@ int handler::update_first_row(const uchar *new_data)
int handler::ha_delete_row(const uchar *buf) int handler::ha_delete_row(const uchar *buf)
{ {
int error; int error;
Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK); m_lock_type == F_WRLCK);
/* /*
@@ -7019,10 +6969,14 @@ int handler::ha_delete_row(const uchar *buf)
if (likely(!error)) if (likely(!error))
{ {
rows_changed++; rows_changed++;
if (row_logging)
{
Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
error= binlog_log_row(table, buf, 0, log_func); error= binlog_log_row(table, buf, 0, log_func);
}
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE && if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) !error && (error= wsrep_after_row(ha_thd())))
{ {
return error; return error;
} }
@@ -7049,11 +7003,10 @@ int handler::ha_delete_row(const uchar *buf)
int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
{ {
int error; int error;
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write(); mark_trx_read_write();
error = direct_update_rows(update_rows, found_rows); error= direct_update_rows(update_rows, found_rows);
MYSQL_UPDATE_ROW_DONE(error); MYSQL_UPDATE_ROW_DONE(error);
return error; return error;
} }

View File

@@ -610,6 +610,7 @@ given at all. */
#define HA_CREATE_USED_SEQUENCE (1UL << 25) #define HA_CREATE_USED_SEQUENCE (1UL << 25)
typedef ulonglong alter_table_operations; typedef ulonglong alter_table_operations;
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
/* /*
These flags are set by the parser and describes the type of These flags are set by the parser and describes the type of
@@ -3050,8 +3051,6 @@ public:
bool mark_trx_read_write_done; /* mark_trx_read_write was called */ bool mark_trx_read_write_done; /* mark_trx_read_write was called */
bool check_table_binlog_row_based_done; /* check_table_binlog.. was called */ bool check_table_binlog_row_based_done; /* check_table_binlog.. was called */
bool check_table_binlog_row_based_result; /* cached check_table_binlog... */ bool check_table_binlog_row_based_result; /* cached check_table_binlog... */
/* Set to 1 if handler logged last insert/update/delete operation */
bool row_already_logged;
/* /*
TRUE <=> the engine guarantees that returned records are within the range TRUE <=> the engine guarantees that returned records are within the range
being scanned. being scanned.
@@ -3192,10 +3191,16 @@ public:
void end_psi_batch_mode(); void end_psi_batch_mode();
bool set_top_table_fields; bool set_top_table_fields;
struct TABLE *top_table; struct TABLE *top_table;
Field **top_table_field; Field **top_table_field;
uint top_table_fields; uint top_table_fields;
/* If we have row logging enabled for this table */
bool row_logging, row_logging_init;
/* If the row logging should be done in transaction cache */
bool row_logging_has_trans;
private: private:
/** /**
The lock type set by when calling::ha_external_lock(). This is The lock type set by when calling::ha_external_lock(). This is
@@ -3213,7 +3218,6 @@ private:
/** Stores next_insert_id for handling duplicate key errors. */ /** Stores next_insert_id for handling duplicate key errors. */
ulonglong m_prev_insert_id; ulonglong m_prev_insert_id;
public: public:
handler(handlerton *ht_arg, TABLE_SHARE *share_arg) handler(handlerton *ht_arg, TABLE_SHARE *share_arg)
:table_share(share_arg), table(0), :table_share(share_arg), table(0),
@@ -3223,7 +3227,6 @@ public:
mark_trx_read_write_done(0), mark_trx_read_write_done(0),
check_table_binlog_row_based_done(0), check_table_binlog_row_based_done(0),
check_table_binlog_row_based_result(0), check_table_binlog_row_based_result(0),
row_already_logged(0),
in_range_check_pushed_down(FALSE), errkey(-1), in_range_check_pushed_down(FALSE), errkey(-1),
key_used_on_scan(MAX_KEY), key_used_on_scan(MAX_KEY),
active_index(MAX_KEY), keyread(MAX_KEY), active_index(MAX_KEY), keyread(MAX_KEY),
@@ -3242,6 +3245,7 @@ public:
m_psi_locker(NULL), m_psi_locker(NULL),
set_top_table_fields(FALSE), top_table(0), set_top_table_fields(FALSE), top_table(0),
top_table_field(0), top_table_fields(0), top_table_field(0), top_table_fields(0),
row_logging(0), row_logging_init(0),
m_lock_type(F_UNLCK), ha_share(NULL), m_prev_insert_id(0) m_lock_type(F_UNLCK), ha_share(NULL), m_prev_insert_id(0)
{ {
DBUG_PRINT("info", DBUG_PRINT("info",
@@ -4598,13 +4602,17 @@ protected:
virtual int delete_table(const char *name); virtual int delete_table(const char *name);
public: public:
bool check_table_binlog_row_based(bool binlog_row); bool check_table_binlog_row_based();
bool prepare_for_row_logging();
int prepare_for_insert(bool force_update_handler= 0); int prepare_for_insert(bool force_update_handler= 0);
int binlog_log_row(TABLE *table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func);
inline void clear_cached_table_binlog_row_based_flag() inline void clear_cached_table_binlog_row_based_flag()
{ {
check_table_binlog_row_based_done= 0; check_table_binlog_row_based_done= 0;
check_table_binlog_row_based_result= 0;
} }
private: private:
/* Cache result to avoid extra calls */ /* Cache result to avoid extra calls */
@@ -4619,7 +4627,7 @@ private:
private: private:
void mark_trx_read_write_internal(); void mark_trx_read_write_internal();
bool check_table_binlog_row_based_internal(bool binlog_row); bool check_table_binlog_row_based_internal();
protected: protected:
/* /*
@@ -5202,7 +5210,6 @@ int binlog_log_row(TABLE* table,
if (unlikely(this_tracker)) \ if (unlikely(this_tracker)) \
tracker->stop_tracking(table->in_use); \ tracker->stop_tracking(table->in_use); \
} }
int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate);
void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag); void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag);
void print_keydup_error(TABLE *table, KEY *key, myf errflag); void print_keydup_error(TABLE *table, KEY *key, myf errflag);

View File

@@ -514,6 +514,12 @@ void Log_event_writer::add_status(enum_logged_status status)
cache_data->add_status(status); cache_data->add_status(status);
} }
void Log_event_writer::set_incident()
{
cache_data->set_incident();
}
class binlog_cache_mngr { class binlog_cache_mngr {
public: public:
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size, binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size,
@@ -714,7 +720,6 @@ bool Log_to_csv_event_handler::
uint field_index; uint field_index;
Silence_log_table_errors error_handler; Silence_log_table_errors error_handler;
Open_tables_backup open_tables_backup; Open_tables_backup open_tables_backup;
ulonglong save_thd_options;
bool save_time_zone_used; bool save_time_zone_used;
DBUG_ENTER("log_general"); DBUG_ENTER("log_general");
@@ -724,9 +729,6 @@ bool Log_to_csv_event_handler::
*/ */
save_time_zone_used= thd->time_zone_used; save_time_zone_used= thd->time_zone_used;
save_thd_options= thd->variables.option_bits;
thd->variables.option_bits&= ~OPTION_BIN_LOG;
table_list.init_one_table(&MYSQL_SCHEMA_NAME, &GENERAL_LOG_NAME, 0, table_list.init_one_table(&MYSQL_SCHEMA_NAME, &GENERAL_LOG_NAME, 0,
TL_WRITE_CONCURRENT_INSERT); TL_WRITE_CONCURRENT_INSERT);
@@ -806,7 +808,6 @@ bool Log_to_csv_event_handler::
table->field[field_index]->set_default(); table->field[field_index]->set_default();
} }
/* log table entries are not replicated */
if (table->file->ha_write_row(table->record[0])) if (table->file->ha_write_row(table->record[0]))
goto err; goto err;
@@ -827,7 +828,6 @@ err:
if (need_close) if (need_close)
close_log_table(thd, &open_tables_backup); close_log_table(thd, &open_tables_backup);
thd->variables.option_bits= save_thd_options;
thd->time_zone_used= save_time_zone_used; thd->time_zone_used= save_time_zone_used;
DBUG_RETURN(result); DBUG_RETURN(result);
} }
@@ -880,7 +880,6 @@ bool Log_to_csv_event_handler::
ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
ulong query_time_micro= (ulong) (query_utime % 1000000); ulong query_time_micro= (ulong) (query_utime % 1000000);
ulong lock_time_micro= (ulong) (lock_utime % 1000000); ulong lock_time_micro= (ulong) (lock_utime % 1000000);
DBUG_ENTER("Log_to_csv_event_handler::log_slow"); DBUG_ENTER("Log_to_csv_event_handler::log_slow");
thd->push_internal_handler(& error_handler); thd->push_internal_handler(& error_handler);
@@ -997,7 +996,6 @@ bool Log_to_csv_event_handler::
0, TRUE)) 0, TRUE))
goto err; goto err;
/* log table entries are not replicated */
if (table->file->ha_write_row(table->record[0])) if (table->file->ha_write_row(table->record[0]))
goto err; goto err;
@@ -1982,7 +1980,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
if (cache_mngr->trx_cache.has_incident()) if (cache_mngr->trx_cache.has_incident())
error= mysql_bin_log.write_incident(thd); error= mysql_bin_log.write_incident(thd);
thd->clear_binlog_table_maps(); thd->reset_binlog_for_next_statement();
cache_mngr->reset(false, true); cache_mngr->reset(false, true);
} }
@@ -2253,6 +2251,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
*/ */
cache_mngr->reset(false, true); cache_mngr->reset(false, true);
thd->reset_binlog_for_next_statement();
DBUG_RETURN(error); DBUG_RETURN(error);
} }
if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd)) if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd))
@@ -2297,6 +2296,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
*/ */
if (!all) if (!all)
cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
thd->reset_binlog_for_next_statement();
DBUG_RETURN(error); DBUG_RETURN(error);
} }
@@ -2478,14 +2478,14 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
/* /*
When a SAVEPOINT is executed inside a stored function/trigger we force the When a SAVEPOINT is executed inside a stored function/trigger we force the
pending event to be flushed with a STMT_END_F flag and clear the table maps pending event to be flushed with a STMT_END_F flag and reset binlog
as well to ensure that following DMLs will have a clean state to start as well to ensure that following DMLs will have a clean state to start
with. ROLLBACK inside a stored routine has to finalize possibly existing with. ROLLBACK inside a stored routine has to finalize possibly existing
current row-based pending event with cleaning up table maps. That ensures current row-based pending event with cleaning up table maps. That ensures
that following DMLs will have a clean state to start with. that following DMLs will have a clean state to start with.
*/ */
if (thd->in_sub_stmt) if (thd->in_sub_stmt)
thd->clear_binlog_table_maps(); thd->reset_binlog_for_next_statement();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
@@ -5773,7 +5773,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
We only update the saved position if the old one was undefined, We only update the saved position if the old one was undefined,
the reason is that there are some cases (e.g., for CREATE-SELECT) the reason is that there are some cases (e.g., for CREATE-SELECT)
where the position is saved twice (e.g., both in where the position is saved twice (e.g., both in
select_create::prepare() and THD::binlog_write_table_map()) , but select_create::prepare() and binlog_write_table_map()) , but
we should use the first. This means that calls to this function we should use the first. This means that calls to this function
can be used to start the statement before the first table map can be used to start the statement before the first table map
event, to include some extra events. event, to include some extra events.
@@ -5900,45 +5900,149 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
DBUG_RETURN(err); DBUG_RETURN(err);
} }
/**
Prepare all tables that are updated for row logging
Annotate events and table maps are written by binlog_write_table_maps()
*/
void THD::binlog_prepare_for_row_logging()
{
DBUG_ENTER("THD::binlog_prepare_for_row_logging");
for (TABLE *table= open_tables ; table; table= table->next)
{
if (table->query_id == query_id && table->current_lock == F_WRLCK)
table->file->prepare_for_row_logging();
}
DBUG_VOID_RETURN;
}
/**
Write annnotated row event (the query) if needed
*/
bool THD::binlog_write_annotated_row(Log_event_writer *writer)
{
int error;
DBUG_ENTER("THD::binlog_write_annotated_row");
if (!(IF_WSREP(!wsrep_fragments_certified_for_stmt(this), true) &&
variables.binlog_annotate_row_events &&
query_length()))
DBUG_RETURN(0);
Annotate_rows_log_event anno(this, 0, false);
if (unlikely((error= writer->write(&anno))))
{
if (my_errno == EFBIG)
writer->set_incident();
DBUG_RETURN(error);
}
DBUG_RETURN(0);
}
/**
Write table map events for all tables that are using row logging.
This includes all tables used by this statement, including tables
used in triggers.
Also write annotate events and start transactions.
This is using the "tables_with_row_logging" list prepared by
THD::binlog_prepare_for_row_logging
*/
bool THD::binlog_write_table_maps()
{
bool with_annotate;
MYSQL_LOCK *locks[2], **locks_end= locks;
DBUG_ENTER("THD::binlog_write_table_maps");
DBUG_ASSERT(!binlog_table_maps);
DBUG_ASSERT(is_current_stmt_binlog_format_row());
/* Initialize cache_mngr once per statement */
binlog_start_trans_and_stmt();
with_annotate= 1; // Write annotate with first map
if ((*locks_end= extra_lock))
locks_end++;
if ((*locks_end= lock))
locks_end++;
for (MYSQL_LOCK **cur_lock= locks ; cur_lock < locks_end ; cur_lock++)
{
TABLE **const end_ptr= (*cur_lock)->table + (*cur_lock)->table_count;
for (TABLE **table_ptr= (*cur_lock)->table;
table_ptr != end_ptr ;
++table_ptr)
{
TABLE *table= *table_ptr;
bool restore= 0;
/*
We have to also write table maps for tables that have not yet been
used, like for tables in after triggers
*/
if (!table->file->row_logging &&
table->query_id != query_id && table->current_lock == F_WRLCK)
{
if (table->file->prepare_for_row_logging())
restore= 1;
}
if (table->file->row_logging)
{
if (binlog_write_table_map(table, with_annotate))
DBUG_RETURN(1);
with_annotate= 0;
}
if (restore)
{
/*
Restore original setting so that it doesn't cause problem for the
next statement
*/
table->file->row_logging= table->file->row_logging_init= 0;
}
}
}
binlog_table_maps= 1; // Table maps written
DBUG_RETURN(0);
}
/** /**
This function writes a table map to the binary log. This function writes a table map to the binary log.
Note that in order to keep the signature uniform with related methods, Note that in order to keep the signature uniform with related methods,
we use a redundant parameter to indicate whether a transactional table we use a redundant parameter to indicate whether a transactional table
was changed or not. was changed or not.
If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the table map.
@param table a pointer to the table. @param table a pointer to the table.
@param is_transactional @c true indicates a transactional table, @param with_annotate If true call binlog_write_annotated_row()
otherwise @c false a non-transactional.
@return @return
nonzero if an error pops up when writing the table map event. nonzero if an error pops up when writing the table map event.
*/ */
int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
bool *with_annotate) bool THD::binlog_write_table_map(TABLE *table, bool with_annotate)
{ {
int error; int error;
bool is_transactional= table->file->row_logging_has_trans;
DBUG_ENTER("THD::binlog_write_table_map"); DBUG_ENTER("THD::binlog_write_table_map");
DBUG_PRINT("enter", ("table: %p (%s: #%lu)", DBUG_PRINT("enter", ("table: %p (%s: #%lu)",
table, table->s->table_name.str, table, table->s->table_name.str,
table->s->table_map_id)); table->s->table_map_id));
/* Pre-conditions */
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
/* Ensure that all events in a GTID group are in the same cache */ /* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN) if (variables.option_bits & OPTION_GTID_BEGIN)
is_transactional= 1; is_transactional= 1;
/* Pre-conditions */
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event Table_map_log_event
the_event(this, table, table->s->table_map_id, is_transactional); the_event(this, table, table->s->table_map_id, is_transactional);
if (binlog_table_maps == 0)
binlog_start_trans_and_stmt();
binlog_cache_mngr *const cache_mngr= binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
binlog_cache_data *cache_data= (cache_mngr-> binlog_cache_data *cache_data= (cache_mngr->
@@ -5946,25 +6050,17 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
IO_CACHE *file= &cache_data->cache_log; IO_CACHE *file= &cache_data->cache_log;
Log_event_writer writer(file, cache_data); Log_event_writer writer(file, cache_data);
if (with_annotate && *with_annotate) if (with_annotate)
{ if (binlog_write_annotated_row(&writer))
Annotate_rows_log_event anno(table->in_use, is_transactional, false); DBUG_RETURN(1);
/* Annotate event should be written not more than once */
*with_annotate= 0;
if (unlikely((error= writer.write(&anno))))
{
if (my_errno == EFBIG)
cache_data->set_incident();
DBUG_RETURN(error);
}
}
if (unlikely((error= writer.write(&the_event)))) if (unlikely((error= writer.write(&the_event))))
DBUG_RETURN(error); DBUG_RETURN(error);
binlog_table_maps++;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/** /**
This function retrieves a pending row event from a cache which is This function retrieves a pending row event from a cache which is
specified through the parameter @c is_transactional. Respectively, when it specified through the parameter @c is_transactional. Respectively, when it
@@ -10848,7 +10944,7 @@ void wsrep_thd_binlog_trx_reset(THD * thd)
cache_mngr->stmt_cache.reset(); cache_mngr->stmt_cache.reset();
} }
} }
thd->clear_binlog_table_maps(); thd->reset_binlog_for_next_statement();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@@ -10875,7 +10971,6 @@ bool wsrep_stmt_rollback_is_safe(THD* thd)
binlog_cache_mngr *cache_mngr= binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
if (binlog_hton && cache_mngr) if (binlog_hton && cache_mngr)
{ {
binlog_cache_data * trx_cache = &cache_mngr->trx_cache; binlog_cache_data * trx_cache = &cache_mngr->trx_cache;

View File

@@ -1144,6 +1144,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
void make_default_log_name(char **out, const char* log_ext, bool once); void make_default_log_name(char **out, const char* log_ext, bool once);
void binlog_reset_cache(THD *thd); void binlog_reset_cache(THD *thd);
bool write_annotated_row(THD *thd);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton; extern handlerton *binlog_hton;

View File

@@ -942,6 +942,7 @@ public:
int write_footer(); int write_footer();
my_off_t pos() { return my_b_safe_tell(file); } my_off_t pos() { return my_b_safe_tell(file); }
void add_status(enum_logged_status status); void add_status(enum_logged_status status);
void set_incident();
Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg, Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg,
Binlog_crypt_data *cr= 0) Binlog_crypt_data *cr= 0)

View File

@@ -799,7 +799,10 @@ my_bool Log_event::need_checksum()
int Log_event_writer::write_internal(const uchar *pos, size_t len) int Log_event_writer::write_internal(const uchar *pos, size_t len)
{ {
if (my_b_safe_write(file, pos, len)) if (my_b_safe_write(file, pos, len))
{
DBUG_PRINT("error", ("write to log failed: %d", my_errno));
return 1; return 1;
}
bytes_written+= len; bytes_written+= len;
return 0; return 0;
} }
@@ -6153,13 +6156,10 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
(tbl->s->db.str[tbl->s->db.length] == 0)); (tbl->s->db.str[tbl->s->db.length] == 0));
DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0); DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
#ifdef MYSQL_SERVER
binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields * binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields *
sizeof(Binlog_type_info)); sizeof(Binlog_type_info));
for (uint i= 0; i < m_table->s->fields; i++) for (uint i= 0; i < m_table->s->fields; i++)
binlog_type_info_array[i]= m_table->field[i]->binlog_type_info(); binlog_type_info_array[i]= m_table->field[i]->binlog_type_info();
#endif
m_data_size= TABLE_MAP_HEADER_LEN; m_data_size= TABLE_MAP_HEADER_LEN;
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;); DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
@@ -7079,14 +7079,14 @@ bool Rows_log_event::process_triggers(trg_event_type event,
m_table->triggers->mark_fields_used(event); m_table->triggers->mark_fields_used(event);
if (slave_run_triggers_for_rbr == SLAVE_RUN_TRIGGERS_FOR_RBR_YES) if (slave_run_triggers_for_rbr == SLAVE_RUN_TRIGGERS_FOR_RBR_YES)
{ {
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
result= m_table->triggers->process_triggers(thd, event, result= m_table->triggers->process_triggers(thd, event,
time_type, old_row_is_record1); time_type,
reenable_binlog(thd); old_row_is_record1);
} }
else else
result= m_table->triggers->process_triggers(thd, event, result= m_table->triggers->process_triggers(thd, event,
time_type, old_row_is_record1); time_type,
old_row_is_record1);
DBUG_RETURN(result); DBUG_RETURN(result);
} }

View File

@@ -423,10 +423,12 @@ rpl_slave_state::truncate_state_table(THD *thd)
TABLE_LIST tlist; TABLE_LIST tlist;
int err= 0; int err= 0;
tmp_disable_binlog(thd); tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name,
tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE); NULL, TL_WRITE);
if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0))) if (!(err= open_and_lock_tables(thd, &tlist, FALSE,
MYSQL_OPEN_IGNORE_LOGGING_FORMAT)))
{ {
DBUG_ASSERT(!tlist.table->file->row_logging);
tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql", tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql",
rpl_gtid_slave_state_table_name.str); rpl_gtid_slave_state_table_name.str);
err= tlist.table->file->ha_truncate(); err= tlist.table->file->ha_truncate();
@@ -445,8 +447,6 @@ rpl_slave_state::truncate_state_table(THD *thd)
} }
thd->mdl_context.release_transactional_locks(); thd->mdl_context.release_transactional_locks();
} }
reenable_binlog(thd);
return err; return err;
} }
@@ -676,6 +676,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table_opened= true; table_opened= true;
table= tlist.table; table= tlist.table;
hton= table->s->db_type(); hton= table->s->db_type();
table->file->row_logging= 0; // No binary logging
if ((err= gtid_check_rpl_slave_state_table(table))) if ((err= gtid_check_rpl_slave_state_table(table)))
goto end; goto end;

View File

@@ -100,6 +100,7 @@ int injector::transaction::commit()
} }
#ifdef TO_BE_DELETED
int injector::transaction::use_table(server_id_type sid, table tbl) int injector::transaction::use_table(server_id_type sid, table tbl)
{ {
DBUG_ENTER("injector::transaction::use_table"); DBUG_ENTER("injector::transaction::use_table");
@@ -111,12 +112,12 @@ int injector::transaction::use_table(server_id_type sid, table tbl)
server_id_type save_id= m_thd->variables.server_id; server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid); m_thd->set_server_id(sid);
error= m_thd->binlog_write_table_map(tbl.get_table(), DBUG_ASSERT(tbl.is_transactional() == tbl.get_table()->file->row_logging_has_trans);
tbl.is_transactional()); error= m_thd->binlog_write_table_map(tbl.get_table(), 0);
m_thd->set_server_id(save_id); m_thd->set_server_id(save_id);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
#endif
injector::transaction::binlog_pos injector::transaction::start_pos() const injector::transaction::binlog_pos injector::transaction::start_pos() const

View File

@@ -177,8 +177,9 @@ public:
>0 Failure >0 Failure
*/ */
#ifdef TO_BE_DELETED
int use_table(server_id_type sid, table tbl); int use_table(server_id_type sid, table tbl);
#endif
/* /*
Commit a transaction. Commit a transaction.

View File

@@ -3582,19 +3582,18 @@ void set_slave_thread_options(THD* thd)
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
only for client threads. only for client threads.
*/ */
ulonglong options= thd->variables.option_bits | OPTION_BIG_SELECTS; ulonglong options= (thd->variables.option_bits |
if (opt_log_slave_updates) OPTION_BIG_SELECTS | OPTION_BIN_LOG);
options|= OPTION_BIN_LOG; if (!opt_log_slave_updates)
else
options&= ~OPTION_BIN_LOG; options&= ~OPTION_BIN_LOG;
thd->variables.option_bits= options;
thd->variables.completion_type= 0;
/* For easier test in LOGGER::log_command */ /* For easier test in LOGGER::log_command */
if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE) if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE)
thd->variables.option_bits|= OPTION_LOG_OFF; options|= OPTION_LOG_OFF;
thd->variables.option_bits= options;
thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements & thd->variables.completion_type= 0;
thd->variables.sql_log_slow=
!MY_TEST(thd->variables.log_slow_disabled_statements &
LOG_SLOW_DISABLE_SLAVE); LOG_SLOW_DISABLE_SLAVE);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@@ -8206,6 +8205,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos,
const char* buf, const char* buf,
const Format_description_log_event *fdle) const Format_description_log_event *fdle)
{ {
DBUG_ENTER("Rows_event_tracker::update");
if (!first_seen) if (!first_seen)
{ {
first_seen= pos; first_seen= pos;
@@ -8214,6 +8214,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos,
last_seen= pos; last_seen= pos;
DBUG_ASSERT(stmt_end_seen == 0); // We can only have one DBUG_ASSERT(stmt_end_seen == 0); // We can only have one
stmt_end_seen= get_row_event_stmt_end(buf, fdle); stmt_end_seen= get_row_event_stmt_end(buf, fdle);
DBUG_VOID_RETURN;
}; };

View File

@@ -1942,7 +1942,9 @@ class Grant_tables
if (res) if (res)
DBUG_RETURN(res); DBUG_RETURN(res);
if (lock_tables(thd, first, counter, MYSQL_LOCK_IGNORE_TIMEOUT)) if (lock_tables(thd, first, counter,
MYSQL_LOCK_IGNORE_TIMEOUT |
MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
DBUG_RETURN(-1); DBUG_RETURN(-1);
p_user_table->set_table(tables[USER_TABLE].table); p_user_table->set_table(tables[USER_TABLE].table);
@@ -4397,7 +4399,8 @@ static USER_AUTH auth_no_password;
static int replace_user_table(THD *thd, const User_table &user_table, static int replace_user_table(THD *thd, const User_table &user_table,
LEX_USER * const combo, privilege_t rights, LEX_USER * const combo, privilege_t rights,
const bool revoke_grant, const bool can_create_user, const bool revoke_grant,
const bool can_create_user,
const bool no_auto_create) const bool no_auto_create)
{ {
int error = -1; int error = -1;
@@ -5426,6 +5429,7 @@ static int replace_column_table(GRANT_TABLE *g_t,
List_iterator <LEX_COLUMN> iter(columns); List_iterator <LEX_COLUMN> iter(columns);
class LEX_COLUMN *column; class LEX_COLUMN *column;
int error= table->file->ha_index_init(0, 1); int error= table->file->ha_index_init(0, 1);
if (unlikely(error)) if (unlikely(error))
{ {

View File

@@ -1027,9 +1027,7 @@ send_result_message:
*save_next_global= table->next_global; *save_next_global= table->next_global;
table->next_local= table->next_global= 0; table->next_local= table->next_global= 0;
tmp_disable_binlog(thd); // binlogging is done by caller if wanted
result_code= admin_recreate_table(thd, table); result_code= admin_recreate_table(thd, table);
reenable_binlog(thd);
trans_commit_stmt(thd); trans_commit_stmt(thd);
trans_commit(thd); trans_commit(thd);
close_thread_tables(thd); close_thread_tables(thd);

View File

@@ -734,8 +734,9 @@ bool close_cached_connection_tables(THD *thd, LEX_CSTRING *connection)
Clear 'check_table_binlog_row_based_done' flag. For tables which were used Clear 'check_table_binlog_row_based_done' flag. For tables which were used
by current substatement the flag is cleared as part of 'ha_reset()' call. by current substatement the flag is cleared as part of 'ha_reset()' call.
For the rest of the open tables not used by current substament if this For the rest of the open tables not used by current substament if this
flag is enabled as part of current substatement execution, clear the flag flag is enabled as part of current substatement execution,
explicitly. (for example when THD::binlog_write_table_maps() calls
prepare_for_row_logging()), clear the flag explicitly.
NOTE NOTE
The reason we reset query_id is that it's not enough to just test The reason we reset query_id is that it's not enough to just test
@@ -759,7 +760,7 @@ static void mark_used_tables_as_free_for_reuse(THD *thd, TABLE *table)
table->query_id= 0; table->query_id= 0;
table->file->ha_reset(); table->file->ha_reset();
} }
else if (table->file->check_table_binlog_row_based_done) else
table->file->clear_cached_table_binlog_row_based_flag(); table->file->clear_cached_table_binlog_row_based_flag();
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
@@ -1660,9 +1661,10 @@ static int set_partitions_as_used(TABLE_LIST *tl, TABLE *t)
needed to remedy problem before retrying again. needed to remedy problem before retrying again.
@retval FALSE 't' was not locked, not a VIEW or an error happened. @retval FALSE 't' was not locked, not a VIEW or an error happened.
*/ */
bool is_locked_view(THD *thd, TABLE_LIST *t) bool is_locked_view(THD *thd, TABLE_LIST *t)
{ {
DBUG_ENTER("check_locked_view"); DBUG_ENTER("is_locked_view");
/* /*
Is this table a view and not a base table? Is this table a view and not a base table?
(it is work around to allow to open view with locked tables, (it is work around to allow to open view with locked tables,
@@ -2212,8 +2214,8 @@ retry_share:
my_error(ER_NOT_SEQUENCE, MYF(0), table_list->db.str, table_list->alias.str); my_error(ER_NOT_SEQUENCE, MYF(0), table_list->db.str, table_list->alias.str);
DBUG_RETURN(true); DBUG_RETURN(true);
} }
table->init(thd, table_list); table->init(thd, table_list);
DBUG_ASSERT(thd->locked_tables_mode || table->file->row_logging == 0);
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
@@ -3738,7 +3740,8 @@ open_and_process_table(THD *thd, TABLE_LIST *tables, uint *counter, uint flags,
temporary table or SEQUENCE (see sequence_insert()). temporary table or SEQUENCE (see sequence_insert()).
*/ */
DBUG_ASSERT(is_temporary_table(tables) || tables->table->s->sequence); DBUG_ASSERT(is_temporary_table(tables) || tables->table->s->sequence);
if (tables->sequence && tables->table->s->table_type != TABLE_TYPE_SEQUENCE) if (tables->sequence &&
tables->table->s->table_type != TABLE_TYPE_SEQUENCE)
{ {
my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str); my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str);
DBUG_RETURN(true); DBUG_RETURN(true);
@@ -5214,6 +5217,8 @@ bool open_and_lock_tables(THD *thd, const DDL_options_st &options,
if (lock_tables(thd, tables, counter, flags)) if (lock_tables(thd, tables, counter, flags))
goto err; goto err;
/* Don't read statistics tables when opening internal tables */
if (!(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
(void) read_statistics_for_tables_if_needed(thd, tables); (void) read_statistics_for_tables_if_needed(thd, tables);
if (derived) if (derived)
@@ -5348,12 +5353,19 @@ bool open_tables_only_view_structure(THD *thd, TABLE_LIST *table_list,
static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list) static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list)
{ {
TABLE_LIST *table; TABLE_LIST *table;
DBUG_ENTER("mark_real_tables_as_free_for_reuse");
/*
We have to make two loops as HA_EXTRA_DETACH_CHILDREN may
remove items from the table list that we have to reset
*/
for (table= table_list; table; table= table->next_global) for (table= table_list; table; table= table->next_global)
if (!table->placeholder())
{ {
if (!table->placeholder())
table->table->query_id= 0; table->table->query_id= 0;
} }
for (table= table_list; table; table= table->next_global) for (table= table_list; table; table= table->next_global)
{
if (!table->placeholder()) if (!table->placeholder())
{ {
/* /*
@@ -5364,6 +5376,8 @@ static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list)
*/ */
table->table->file->extra(HA_EXTRA_DETACH_CHILDREN); table->table->file->extra(HA_EXTRA_DETACH_CHILDREN);
} }
}
DBUG_VOID_RETURN;
} }
@@ -5428,7 +5442,7 @@ err:
bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
{ {
TABLE_LIST *table; TABLE_LIST *table, *first_not_own;
DBUG_ENTER("lock_tables"); DBUG_ENTER("lock_tables");
/* /*
We can't meet statement requiring prelocking if we already We can't meet statement requiring prelocking if we already
@@ -5438,7 +5452,9 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
!thd->lex->requires_prelocking()); !thd->lex->requires_prelocking());
if (!tables && !thd->lex->requires_prelocking()) if (!tables && !thd->lex->requires_prelocking())
DBUG_RETURN(thd->decide_logging_format(tables)); DBUG_RETURN(0);
first_not_own= thd->lex->first_not_own_table();
/* /*
Check for thd->locked_tables_mode to avoid a redundant Check for thd->locked_tables_mode to avoid a redundant
@@ -5454,13 +5470,26 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
{ {
DBUG_ASSERT(thd->lock == 0); // You must lock everything at once DBUG_ASSERT(thd->lock == 0); // You must lock everything at once
TABLE **start,**ptr; TABLE **start,**ptr;
bool found_first_not_own= 0;
if (!(ptr=start=(TABLE**) thd->alloc(sizeof(TABLE*)*count))) if (!(ptr=start=(TABLE**) thd->alloc(sizeof(TABLE*)*count)))
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
/*
Collect changes tables for table lock.
Mark own tables with query id as this is needed by
prepare_for_row_logging()
*/
for (table= tables; table; table= table->next_global) for (table= tables; table; table= table->next_global)
{ {
if (table == first_not_own)
found_first_not_own= 1;
if (!table->placeholder()) if (!table->placeholder())
{
*(ptr++)= table->table; *(ptr++)= table->table;
if (!found_first_not_own)
table->table->query_id= thd->query_id;
}
} }
DEBUG_SYNC(thd, "before_lock_tables_takes_lock"); DEBUG_SYNC(thd, "before_lock_tables_takes_lock");
@@ -5474,7 +5503,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
if (thd->lex->requires_prelocking() && if (thd->lex->requires_prelocking() &&
thd->lex->sql_command != SQLCOM_LOCK_TABLES) thd->lex->sql_command != SQLCOM_LOCK_TABLES)
{ {
TABLE_LIST *first_not_own= thd->lex->first_not_own_table();
/* /*
We just have done implicit LOCK TABLES, and now we have We just have done implicit LOCK TABLES, and now we have
to emulate first open_and_lock_tables() after it. to emulate first open_and_lock_tables() after it.
@@ -5492,7 +5520,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
{ {
if (!table->placeholder()) if (!table->placeholder())
{ {
table->table->query_id= thd->query_id;
if (check_lock_and_start_stmt(thd, thd->lex, table)) if (check_lock_and_start_stmt(thd, thd->lex, table))
{ {
mysql_unlock_tables(thd, thd->lock); mysql_unlock_tables(thd, thd->lock);
@@ -5512,7 +5539,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
} }
else else
{ {
TABLE_LIST *first_not_own= thd->lex->first_not_own_table();
/* /*
When open_and_lock_tables() is called for a single table out of When open_and_lock_tables() is called for a single table out of
a table list, the 'next_global' chain is temporarily broken. We a table list, the 'next_global' chain is temporarily broken. We
@@ -5528,6 +5554,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
if (table->placeholder()) if (table->placeholder())
continue; continue;
table->table->query_id= thd->query_id;
/* /*
In a stored function or trigger we should ensure that we won't change In a stored function or trigger we should ensure that we won't change
a table that is already used by the calling statement. a table that is already used by the calling statement.
@@ -5567,7 +5594,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
} }
bool res= fix_all_session_vcol_exprs(thd, tables); bool res= fix_all_session_vcol_exprs(thd, tables);
if (!res) if (!res && !(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
res= thd->decide_logging_format(tables); res= thd->decide_logging_format(tables);
DBUG_RETURN(res); DBUG_RETURN(res);
@@ -9005,6 +9032,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
*/ */
if (open_and_lock_tables(thd, table_list, FALSE, if (open_and_lock_tables(thd, table_list, FALSE,
(MYSQL_OPEN_IGNORE_FLUSH | (MYSQL_OPEN_IGNORE_FLUSH |
MYSQL_OPEN_IGNORE_LOGGING_FORMAT |
(table_list->lock_type < TL_WRITE_ALLOW_WRITE ? (table_list->lock_type < TL_WRITE_ALLOW_WRITE ?
MYSQL_LOCK_IGNORE_TIMEOUT : 0)))) MYSQL_LOCK_IGNORE_TIMEOUT : 0))))
{ {
@@ -9016,6 +9044,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
for (TABLE_LIST *tables= table_list; tables; tables= tables->next_global) for (TABLE_LIST *tables= table_list; tables; tables= tables->next_global)
{ {
DBUG_ASSERT(tables->table->s->table_category == TABLE_CATEGORY_SYSTEM); DBUG_ASSERT(tables->table->s->table_category == TABLE_CATEGORY_SYSTEM);
tables->table->file->row_logging= 0;
tables->table->use_all_columns(); tables->table->use_all_columns();
} }
lex->restore_backup_query_tables_list(&query_tables_list_backup); lex->restore_backup_query_tables_list(&query_tables_list_backup);
@@ -9103,8 +9132,9 @@ open_system_table_for_update(THD *thd, TABLE_LIST *one_table)
{ {
DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_SYSTEM); DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_SYSTEM);
table->use_all_columns(); table->use_all_columns();
/* This table instance is not row logged */
table->file->row_logging= 0;
} }
DBUG_RETURN(table); DBUG_RETURN(table);
} }
@@ -9137,6 +9167,8 @@ open_log_table(THD *thd, TABLE_LIST *one_table, Open_tables_backup *backup)
if ((table= open_ltable(thd, one_table, one_table->lock_type, flags))) if ((table= open_ltable(thd, one_table, one_table->lock_type, flags)))
{ {
DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_LOG); DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_LOG);
DBUG_ASSERT(!table->file->row_logging);
/* Make sure all columns get assigned to a default value */ /* Make sure all columns get assigned to a default value */
table->use_all_columns(); table->use_all_columns();
DBUG_ASSERT(table->s->no_replicate); DBUG_ASSERT(table->s->no_replicate);

View File

@@ -125,6 +125,11 @@ TABLE *open_ltable(THD *thd, TABLE_LIST *table_list, thr_lock_type update,
*/ */
#define MYSQL_OPEN_IGNORE_REPAIR 0x10000 #define MYSQL_OPEN_IGNORE_REPAIR 0x10000
/**
Don't call decide_logging_format. Used for statistic tables etc
*/
#define MYSQL_OPEN_IGNORE_LOGGING_FORMAT 0x20000
/** Please refer to the internals manual. */ /** Please refer to the internals manual. */
#define MYSQL_OPEN_REOPEN (MYSQL_OPEN_IGNORE_FLUSH |\ #define MYSQL_OPEN_REOPEN (MYSQL_OPEN_IGNORE_FLUSH |\
MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |\ MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |\

View File

@@ -636,7 +636,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
m_current_stage_key(0), m_psi(0), m_current_stage_key(0), m_psi(0),
in_sub_stmt(0), log_all_errors(0), in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0), binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
bulk_param(0), bulk_param(0),
table_map_for_update(0), table_map_for_update(0),
m_examined_row_count(0), m_examined_row_count(0),
@@ -797,6 +796,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
bzero((void*) ha_data, sizeof(ha_data)); bzero((void*) ha_data, sizeof(ha_data));
mysys_var=0; mysys_var=0;
binlog_evt_union.do_union= FALSE; binlog_evt_union.do_union= FALSE;
binlog_table_maps= FALSE;
enable_slow_log= 0; enable_slow_log= 0;
durability_property= HA_REGULAR_DURABILITY; durability_property= HA_REGULAR_DURABILITY;
@@ -1315,8 +1315,6 @@ void THD::init()
else else
variables.option_bits&= ~OPTION_BIN_LOG; variables.option_bits&= ~OPTION_BIN_LOG;
variables.sql_log_bin_off= 0;
select_commands= update_commands= other_commands= 0; select_commands= update_commands= other_commands= 0;
/* Set to handle counting of aborted connections */ /* Set to handle counting of aborted connections */
userstat_running= opt_userstat_running; userstat_running= opt_userstat_running;
@@ -5837,8 +5835,9 @@ int THD::decide_logging_format(TABLE_LIST *tables)
{ {
DBUG_ENTER("THD::decide_logging_format"); DBUG_ENTER("THD::decide_logging_format");
DBUG_PRINT("info", ("Query: %.*s", (uint) query_length(), query())); DBUG_PRINT("info", ("Query: %.*s", (uint) query_length(), query()));
DBUG_PRINT("info", ("variables.binlog_format: %lu", DBUG_PRINT("info", ("binlog_format: %lu", (ulong) variables.binlog_format));
variables.binlog_format)); DBUG_PRINT("info", ("current_stmt_binlog_format: %lu",
(ulong) current_stmt_binlog_format));
DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x",
lex->get_stmt_unsafe_flags())); lex->get_stmt_unsafe_flags()));
@@ -5861,18 +5860,14 @@ int THD::decide_logging_format(TABLE_LIST *tables)
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
} }
#endif /* WITH_WSREP */
if ((WSREP_EMULATE_BINLOG_NNULL(this) || if ((WSREP_EMULATE_BINLOG_NNULL(this) ||
(mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) && (mysql_bin_log.is_open() &&
(variables.option_bits & OPTION_BIN_LOG))) &&
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT && !(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db.str))) !binlog_filter->db_ok(db.str)))
#else
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db.str)))
#endif /* WITH_WSREP */
{ {
if (is_bulk_op()) if (is_bulk_op())
{ {
if (wsrep_binlog_format() == BINLOG_FORMAT_STMT) if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
@@ -5915,6 +5910,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
bool has_auto_increment_write_tables_not_first= FALSE; bool has_auto_increment_write_tables_not_first= FALSE;
bool found_first_not_own_table= FALSE; bool found_first_not_own_table= FALSE;
bool has_write_tables_with_unsafe_statements= FALSE; bool has_write_tables_with_unsafe_statements= FALSE;
bool blackhole_table_found= 0;
/* /*
A pointer to a previous table that was changed. A pointer to a previous table that was changed.
@@ -6040,6 +6036,10 @@ int THD::decide_logging_format(TABLE_LIST *tables)
if (prev_write_table && prev_write_table->file->ht != if (prev_write_table && prev_write_table->file->ht !=
table->file->ht) table->file->ht)
multi_write_engine= TRUE; multi_write_engine= TRUE;
if (table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB)
blackhole_table_found= 1;
if (share->non_determinstic_insert && if (share->non_determinstic_insert &&
!(sql_command_flags[lex->sql_command] & CF_SCHEMA_CHANGE)) !(sql_command_flags[lex->sql_command] & CF_SCHEMA_CHANGE))
has_write_tables_with_unsafe_statements= true; has_write_tables_with_unsafe_statements= true;
@@ -6285,7 +6285,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
is_current_stmt_binlog_format_row() ? is_current_stmt_binlog_format_row() ?
"ROW" : "STATEMENT")); "ROW" : "STATEMENT"));
if (variables.binlog_format == BINLOG_FORMAT_ROW && if (blackhole_table_found &&
variables.binlog_format == BINLOG_FORMAT_ROW &&
(sql_command_flags[lex->sql_command] & (sql_command_flags[lex->sql_command] &
(CF_UPDATES_DATA | CF_DELETES_DATA))) (CF_UPDATES_DATA | CF_DELETES_DATA)))
{ {
@@ -6322,9 +6323,12 @@ int THD::decide_logging_format(TABLE_LIST *tables)
table_names.c_ptr()); table_names.c_ptr());
} }
} }
if (is_write && is_current_stmt_binlog_format_row())
binlog_prepare_for_row_logging();
} }
#ifndef DBUG_OFF
else else
{
DBUG_PRINT("info", ("decision: no logging since " DBUG_PRINT("info", ("decision: no logging since "
"mysql_bin_log.is_open() = %d " "mysql_bin_log.is_open() = %d "
"and (options & OPTION_BIN_LOG) = 0x%llx " "and (options & OPTION_BIN_LOG) = 0x%llx "
@@ -6334,21 +6338,22 @@ int THD::decide_logging_format(TABLE_LIST *tables)
(variables.option_bits & OPTION_BIN_LOG), (variables.option_bits & OPTION_BIN_LOG),
(uint) wsrep_binlog_format(), (uint) wsrep_binlog_format(),
binlog_filter->db_ok(db.str))); binlog_filter->db_ok(db.str)));
#endif if (WSREP_NNULL(this) && is_current_stmt_binlog_format_row())
binlog_prepare_for_row_logging();
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int THD::decide_logging_format_low(TABLE *table) int THD::decide_logging_format_low(TABLE *table)
{ {
DBUG_ENTER("decide_logging_format_low");
/* /*
INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
can be unsafe. can be unsafe.
*/ */
if(wsrep_binlog_format() <= BINLOG_FORMAT_STMT && if (wsrep_binlog_format() <= BINLOG_FORMAT_STMT &&
!is_current_stmt_binlog_format_row() && !is_current_stmt_binlog_format_row() &&
!lex->is_stmt_unsafe() && !lex->is_stmt_unsafe() &&
lex->sql_command == SQLCOM_INSERT &&
lex->duplicates == DUP_UPDATE) lex->duplicates == DUP_UPDATE)
{ {
uint unique_keys= 0; uint unique_keys= 0;
@@ -6376,20 +6381,15 @@ exit:;
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS); lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS);
binlog_unsafe_warning_flags|= lex->get_stmt_unsafe_flags(); binlog_unsafe_warning_flags|= lex->get_stmt_unsafe_flags();
set_current_stmt_binlog_format_row_if_mixed(); set_current_stmt_binlog_format_row_if_mixed();
return 1; if (is_current_stmt_binlog_format_row())
binlog_prepare_for_row_logging();
DBUG_RETURN(1);
} }
} }
return 0; DBUG_RETURN(0);
} }
/*
Implementation of interface to write rows to the binary log through the
thread. The thread is responsible for writing the rows it has
inserted/updated/deleted.
*/
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
/* /*
Template member function for ensuring that there is an rows log Template member function for ensuring that there is an rows log
event of the apropriate type before proceeding. event of the apropriate type before proceeding.
@@ -6814,7 +6814,8 @@ void THD::binlog_prepare_row_images(TABLE *table)
*/ */
if (table->s->primary_key < MAX_KEY && if (table->s->primary_key < MAX_KEY &&
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) && (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT)) !ha_check_storage_engine_flag(table->s->db_type(),
HTON_NO_BINLOG_ROW_OPT))
{ {
/** /**
Just to be sure that tmp_set is currently not in use as Just to be sure that tmp_set is currently not in use as
@@ -6859,7 +6860,7 @@ void THD::binlog_prepare_row_images(TABLE *table)
int THD::binlog_remove_pending_rows_event(bool clear_maps, int THD::binlog_remove_pending_rows_event(bool reset_stmt,
bool is_transactional) bool is_transactional)
{ {
DBUG_ENTER("THD::binlog_remove_pending_rows_event"); DBUG_ENTER("THD::binlog_remove_pending_rows_event");
@@ -6873,12 +6874,12 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
mysql_bin_log.remove_pending_rows_event(this, is_transactional); mysql_bin_log.remove_pending_rows_event(this, is_transactional);
if (clear_maps) if (reset_stmt)
binlog_table_maps= 0; reset_binlog_for_next_statement();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
{ {
DBUG_ENTER("THD::binlog_flush_pending_rows_event"); DBUG_ENTER("THD::binlog_flush_pending_rows_event");
@@ -6904,9 +6905,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
if (stmt_end) if (stmt_end)
{ {
pending->set_flags(Rows_log_event::STMT_END_F); pending->set_flags(Rows_log_event::STMT_END_F);
binlog_table_maps= 0; reset_binlog_for_next_statement();
} }
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0, error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
is_transactional); is_transactional);
} }
@@ -7278,8 +7278,11 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
suppress_use, errcode); suppress_use, errcode);
error= mysql_bin_log.write(&qinfo); error= mysql_bin_log.write(&qinfo);
} }
/*
row logged binlog may not have been reset in the case of locked tables
*/
reset_binlog_for_next_statement();
binlog_table_maps= 0;
DBUG_RETURN(error >= 0 ? error : 1); DBUG_RETURN(error >= 0 ? error : 1);
} }

View File

@@ -75,14 +75,15 @@ void set_thd_stage_info(void *thd,
#include "wsrep_condition_variable.h" #include "wsrep_condition_variable.h"
class Wsrep_applier_service; class Wsrep_applier_service;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
class Reprepare_observer; class Reprepare_observer;
class Relay_log_info; class Relay_log_info;
struct rpl_group_info; struct rpl_group_info;
class Rpl_filter; class Rpl_filter;
class Query_log_event; class Query_log_event;
class Load_log_event; class Load_log_event;
class Log_event_writer;
class sp_rcontext; class sp_rcontext;
class sp_cache; class sp_cache;
class Lex_input_stream; class Lex_input_stream;
@@ -737,11 +738,6 @@ typedef struct system_variables
my_bool query_cache_strip_comments; my_bool query_cache_strip_comments;
my_bool sql_log_slow; my_bool sql_log_slow;
my_bool sql_log_bin; my_bool sql_log_bin;
/*
A flag to help detect whether binary logging was temporarily disabled
(see tmp_disable_binlog(A) macro).
*/
my_bool sql_log_bin_off;
my_bool binlog_annotate_row_events; my_bool binlog_annotate_row_events;
my_bool binlog_direct_non_trans_update; my_bool binlog_direct_non_trans_update;
my_bool column_compression_zlib_wrap; my_bool column_compression_zlib_wrap;
@@ -2576,14 +2572,17 @@ public:
*/ */
void binlog_start_trans_and_stmt(); void binlog_start_trans_and_stmt();
void binlog_set_stmt_begin(); void binlog_set_stmt_begin();
int binlog_write_table_map(TABLE *table, bool is_transactional,
bool *with_annotate= 0);
int binlog_write_row(TABLE* table, bool is_transactional, int binlog_write_row(TABLE* table, bool is_transactional,
const uchar *buf); const uchar *buf);
int binlog_delete_row(TABLE* table, bool is_transactional, int binlog_delete_row(TABLE* table, bool is_transactional,
const uchar *buf); const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional, int binlog_update_row(TABLE* table, bool is_transactional,
const uchar *old_data, const uchar *new_data); const uchar *old_data, const uchar *new_data);
bool prepare_handlers_for_update(uint flag);
bool binlog_write_annotated_row(Log_event_writer *writer);
void binlog_prepare_for_row_logging();
bool binlog_write_table_maps();
bool binlog_write_table_map(TABLE *table, bool with_annotate);
static void binlog_prepare_row_images(TABLE* table); static void binlog_prepare_row_images(TABLE* table);
void set_server_id(uint32 sid) { variables.server_id = sid; } void set_server_id(uint32 sid) { variables.server_id = sid; }
@@ -2677,22 +2676,20 @@ private:
*/ */
enum_binlog_format current_stmt_binlog_format; enum_binlog_format current_stmt_binlog_format;
/*
Number of outstanding table maps, i.e., table maps in the
transaction cache.
*/
uint binlog_table_maps;
public: public:
/* 1 if binlog table maps has been written */
bool binlog_table_maps;
void issue_unsafe_warnings(); void issue_unsafe_warnings();
void reset_unsafe_warnings() void reset_unsafe_warnings()
{ binlog_unsafe_warning_flags= 0; } { binlog_unsafe_warning_flags= 0; }
uint get_binlog_table_maps() const { void reset_binlog_for_next_statement()
return binlog_table_maps; {
}
void clear_binlog_table_maps() {
binlog_table_maps= 0; binlog_table_maps= 0;
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
public: public:
@@ -5170,11 +5167,10 @@ my_eof(THD *thd)
#define tmp_disable_binlog(A) \ #define tmp_disable_binlog(A) \
{ulonglong tmp_disable_binlog__save_options= (A)->variables.option_bits; \ {ulonglong tmp_disable_binlog__save_options= (A)->variables.option_bits; \
(A)->variables.option_bits&= ~OPTION_BIN_LOG; \ (A)->variables.option_bits&= ~OPTION_BIN_LOG; \
(A)->variables.sql_log_bin_off= 1; (A)->variables.option_bits|= OPTION_BIN_TMP_LOG_OFF;
#define reenable_binlog(A) \ #define reenable_binlog(A) \
(A)->variables.option_bits= tmp_disable_binlog__save_options; \ (A)->variables.option_bits= tmp_disable_binlog__save_options; }
(A)->variables.sql_log_bin_off= 0;}
inline date_conv_mode_t sql_mode_for_dates(THD *thd) inline date_conv_mode_t sql_mode_for_dates(THD *thd)

View File

@@ -959,6 +959,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
goto values_loop_end; goto values_loop_end;
THD_STAGE_INFO(thd, stage_update); THD_STAGE_INFO(thd, stage_update);
thd->decide_logging_format_low(table);
do do
{ {
DBUG_PRINT("info", ("iteration %llu", iteration)); DBUG_PRINT("info", ("iteration %llu", iteration));
@@ -1071,7 +1072,6 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
break; break;
} }
thd->decide_logging_format_low(table);
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED) if (lock_type == TL_WRITE_DELAYED)
{ {
@@ -3051,6 +3051,8 @@ bool Delayed_insert::open_and_lock_table()
return TRUE; return TRUE;
} }
table->copy_blobs= 1; table->copy_blobs= 1;
table->file->prepare_for_row_logging();
return FALSE; return FALSE;
} }
@@ -3110,6 +3112,8 @@ pthread_handler_t handle_delayed_insert(void *arg)
at which rows are inserted cannot be determined in mixed mode. at which rows are inserted cannot be determined in mixed mode.
*/ */
thd->set_current_stmt_binlog_format_row_if_mixed(); thd->set_current_stmt_binlog_format_row_if_mixed();
/* Don't annotate insert delayed binlog events */
thd->variables.binlog_annotate_row_events= 0;
/* /*
Clone tickets representing protection against GRL and the lock on Clone tickets representing protection against GRL and the lock on
@@ -3314,8 +3318,19 @@ pthread_handler_t handle_delayed_insert(void *arg)
di->table->file->delete_update_handler(); di->table->file->delete_update_handler();
di->group_count=0; di->group_count=0;
mysql_audit_release(thd); mysql_audit_release(thd);
/*
Reset binlog. We can't call ha_reset() for the table as this will
reset the table maps we have calculated earlier.
*/
mysql_mutex_lock(&di->mutex); mysql_mutex_lock(&di->mutex);
} }
/*
Reset binlog. We can't call ha_reset() for the table as this will
reset the table maps we have calculated earlier.
*/
thd->reset_binlog_for_next_statement();
if (di->tables_in_use) if (di->tables_in_use)
mysql_cond_broadcast(&di->cond_client); // If waiting clients mysql_cond_broadcast(&di->cond_client); // If waiting clients
} }
@@ -3407,9 +3422,7 @@ bool Delayed_insert::handle_inserts(void)
{ {
int error; int error;
ulong max_rows; ulong max_rows;
bool has_trans = TRUE; bool using_ignore= 0, using_opt_replace= 0, using_bin_log;
bool using_ignore= 0, using_opt_replace= 0,
using_bin_log= mysql_bin_log.is_open();
delayed_row *row; delayed_row *row;
DBUG_ENTER("handle_inserts"); DBUG_ENTER("handle_inserts");
@@ -3443,7 +3456,13 @@ bool Delayed_insert::handle_inserts(void)
if (table->file->ha_rnd_init_with_error(0)) if (table->file->ha_rnd_init_with_error(0))
goto err; goto err;
/*
We have to call prepare_for_row_logging() as the second call to
handler_writes() will not have called decide_logging_format.
*/
table->file->prepare_for_row_logging();
table->file->prepare_for_insert(); table->file->prepare_for_insert();
using_bin_log= table->file->row_logging;
/* /*
We can't use row caching when using the binary log because if We can't use row caching when using the binary log because if
@@ -3452,6 +3471,7 @@ bool Delayed_insert::handle_inserts(void)
*/ */
if (!using_bin_log) if (!using_bin_log)
table->file->extra(HA_EXTRA_WRITE_CACHE); table->file->extra(HA_EXTRA_WRITE_CACHE);
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
while ((row=rows.get())) while ((row=rows.get()))
@@ -3480,8 +3500,8 @@ bool Delayed_insert::handle_inserts(void)
Guaranteed that the INSERT DELAYED STMT will not be here Guaranteed that the INSERT DELAYED STMT will not be here
in SBR when mysql binlog is enabled. in SBR when mysql binlog is enabled.
*/ */
DBUG_ASSERT(!(mysql_bin_log.is_open() && DBUG_ASSERT(!mysql_bin_log.is_open() ||
!thd.is_current_stmt_binlog_format_row())); thd.is_current_stmt_binlog_format_row());
/* /*
This is the first value of an INSERT statement. This is the first value of an INSERT statement.
@@ -3639,10 +3659,9 @@ bool Delayed_insert::handle_inserts(void)
TODO: Move the logging to last in the sequence of rows. TODO: Move the logging to last in the sequence of rows.
*/ */
has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE || if (table->file->row_logging &&
table->file->has_transactions(); thd.binlog_flush_pending_rows_event(TRUE,
if (thd.is_current_stmt_binlog_format_row() && table->file->row_logging_has_trans))
thd.binlog_flush_pending_rows_event(TRUE, has_trans))
goto err; goto err;
if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE)))) if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
@@ -4548,6 +4567,18 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
/* purecov: end */ /* purecov: end */
} }
table->s->table_creation_was_logged= save_table_creation_was_logged; table->s->table_creation_was_logged= save_table_creation_was_logged;
if (!table->s->tmp_table)
table->file->prepare_for_row_logging();
/*
If slave is converting a statement event to row events, log the original
create statement as an annotated row
*/
#ifdef HAVE_REPLICATION
if (thd->slave_thread && opt_replicate_annotate_row_events &&
thd->is_current_stmt_binlog_format_row())
thd->variables.binlog_annotate_row_events= 1;
#endif
DBUG_RETURN(table); DBUG_RETURN(table);
} }
@@ -4792,6 +4823,7 @@ bool binlog_create_table(THD *thd, TABLE *table)
logged logged
*/ */
thd->set_current_stmt_binlog_format_row(); thd->set_current_stmt_binlog_format_row();
table->file->prepare_for_row_logging();
return binlog_show_create_table(thd, table, 0) != 0; return binlog_show_create_table(thd, table, 0) != 0;
} }
@@ -4854,6 +4886,9 @@ bool select_create::send_eof()
if (table->s->tmp_table) if (table->s->tmp_table)
thd->transaction.stmt.mark_created_temp_table(); thd->transaction.stmt.mark_created_temp_table();
if (thd->slave_thread)
thd->variables.binlog_annotate_row_events= 0;
if (prepare_eof()) if (prepare_eof())
{ {
abort_result_set(); abort_result_set();

View File

@@ -7436,6 +7436,12 @@ void THD::reset_for_next_command(bool do_clear_error)
DBUG_ENTER("THD::reset_for_next_command"); DBUG_ENTER("THD::reset_for_next_command");
DBUG_ASSERT(!spcont); /* not for substatements of routines */ DBUG_ASSERT(!spcont); /* not for substatements of routines */
DBUG_ASSERT(!in_sub_stmt); DBUG_ASSERT(!in_sub_stmt);
/*
Table maps should have been reset after previous statement except in the
case where we have locked tables
*/
DBUG_ASSERT(binlog_table_maps == 0 ||
locked_tables_mode == LTM_LOCK_TABLES);
if (likely(do_clear_error)) if (likely(do_clear_error))
{ {

View File

@@ -2168,14 +2168,13 @@ static bool finalize_install(THD *thd, TABLE *table, const LEX_CSTRING *name,
of the insert into the plugin table, so that it is not replicated in of the insert into the plugin table, so that it is not replicated in
row based mode. row based mode.
*/ */
tmp_disable_binlog(thd); DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns(); table->use_all_columns();
restore_record(table, s->default_values); restore_record(table, s->default_values);
table->field[0]->store(name->str, name->length, system_charset_info); table->field[0]->store(name->str, name->length, system_charset_info);
table->field[1]->store(tmp->plugin_dl->dl.str, tmp->plugin_dl->dl.length, table->field[1]->store(tmp->plugin_dl->dl.str, tmp->plugin_dl->dl.length,
files_charset_info); files_charset_info);
error= table->file->ha_write_row(table->record[0]); error= table->file->ha_write_row(table->record[0]);
reenable_binlog(thd);
if (unlikely(error)) if (unlikely(error))
{ {
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));
@@ -2322,9 +2321,8 @@ static bool do_uninstall(THD *thd, TABLE *table, const LEX_CSTRING *name)
of the delete from the plugin table, so that it is not replicated in of the delete from the plugin table, so that it is not replicated in
row based mode. row based mode.
*/ */
tmp_disable_binlog(thd); table->file->row_logging= 0; // No logging
error= table->file->ha_delete_row(table->record[0]); error= table->file->ha_delete_row(table->record[0]);
reenable_binlog(thd);
if (unlikely(error)) if (unlikely(error))
{ {
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));

View File

@@ -180,6 +180,8 @@
#define OPTION_NO_QUERY_CACHE (1ULL << 39) // SELECT, user #define OPTION_NO_QUERY_CACHE (1ULL << 39) // SELECT, user
#define OPTION_PROCEDURE_CLAUSE (1ULL << 40) // Internal usage #define OPTION_PROCEDURE_CLAUSE (1ULL << 40) // Internal usage
#define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern #define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern
#define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern
#define OPTION_BIN_TMP_LOG_OFF (1ULL << 42) // disable binlog, intern
#define OPTION_LEX_FOUND_COMMENT (1ULL << 0) // intern, parser #define OPTION_LEX_FOUND_COMMENT (1ULL << 0) // intern, parser

View File

@@ -580,7 +580,6 @@ void sequence_definition::adjust_values(longlong next_value)
int sequence_definition::write_initial_sequence(TABLE *table) int sequence_definition::write_initial_sequence(TABLE *table)
{ {
int error; int error;
THD *thd= table->in_use;
MY_BITMAP *save_write_set; MY_BITMAP *save_write_set;
store_fields(table); store_fields(table);
@@ -588,15 +587,14 @@ int sequence_definition::write_initial_sequence(TABLE *table)
table->s->sequence->copy(this); table->s->sequence->copy(this);
/* /*
Sequence values will be replicated as a statement Sequence values will be replicated as a statement
like 'create sequence'. So disable binary log temporarily like 'create sequence'. So disable row logging for this table & statement
*/ */
tmp_disable_binlog(thd); table->file->row_logging= table->file->row_logging_init= 0;
save_write_set= table->write_set; save_write_set= table->write_set;
table->write_set= &table->s->all_set; table->write_set= &table->s->all_set;
table->s->sequence->initialized= SEQUENCE::SEQ_IN_PREPARE; table->s->sequence->initialized= SEQUENCE::SEQ_IN_PREPARE;
error= table->file->ha_write_row(table->record[0]); error= table->file->ha_write_row(table->record[0]);
table->s->sequence->initialized= SEQUENCE::SEQ_UNINTIALIZED; table->s->sequence->initialized= SEQUENCE::SEQ_UNINTIALIZED;
reenable_binlog(thd);
table->write_set= save_write_set; table->write_set= save_write_set;
if (unlikely(error)) if (unlikely(error))
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));

View File

@@ -404,6 +404,7 @@ insert_server(THD *thd, FOREIGN_SERVER *server)
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */ /* need to open before acquiring THR_LOCK_plugin or it will deadlock */
if (! (table= open_ltable(thd, &tables, TL_WRITE, MYSQL_LOCK_IGNORE_TIMEOUT))) if (! (table= open_ltable(thd, &tables, TL_WRITE, MYSQL_LOCK_IGNORE_TIMEOUT)))
goto end; goto end;
table->file->row_logging= 0; // Don't log to binary log
/* insert the server into the table */ /* insert the server into the table */
if (unlikely(error= insert_server_record(table, server))) if (unlikely(error= insert_server_record(table, server)))
@@ -542,9 +543,9 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
{ {
int error; int error;
DBUG_ENTER("insert_server_record"); DBUG_ENTER("insert_server_record");
tmp_disable_binlog(table->in_use); DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns();
table->use_all_columns();
empty_record(table); empty_record(table);
/* set the field that's the PK to the value we're looking for */ /* set the field that's the PK to the value we're looking for */
@@ -577,8 +578,6 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
} }
else else
error= ER_FOREIGN_SERVER_EXISTS; error= ER_FOREIGN_SERVER_EXISTS;
reenable_binlog(table->in_use);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
@@ -895,7 +894,8 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server)
{ {
int error=0; int error=0;
DBUG_ENTER("update_server_record"); DBUG_ENTER("update_server_record");
tmp_disable_binlog(table->in_use); DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns(); table->use_all_columns();
/* set the field that's the PK to the value we're looking for */ /* set the field that's the PK to the value we're looking for */
table->field[0]->store(server->server_name, table->field[0]->store(server->server_name,
@@ -931,7 +931,6 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server)
} }
end: end:
reenable_binlog(table->in_use);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
@@ -956,7 +955,8 @@ delete_server_record(TABLE *table, LEX_CSTRING *name)
{ {
int error; int error;
DBUG_ENTER("delete_server_record"); DBUG_ENTER("delete_server_record");
tmp_disable_binlog(table->in_use); DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns(); table->use_all_columns();
/* set the field that's the PK to the value we're looking for */ /* set the field that's the PK to the value we're looking for */
@@ -980,7 +980,6 @@ delete_server_record(TABLE *table, LEX_CSTRING *name)
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));
} }
reenable_binlog(table->in_use);
DBUG_RETURN(error); DBUG_RETURN(error);
} }

View File

@@ -10471,6 +10471,7 @@ do_continue:;
No additional logging of query is needed No additional logging of query is needed
*/ */
binlog_done= 1; binlog_done= 1;
DBUG_ASSERT(new_table->file->row_logging);
new_table->mark_columns_needed_for_insert(); new_table->mark_columns_needed_for_insert();
thd->binlog_write_table_map(new_table, 1); thd->binlog_write_table_map(new_table, 1);
} }

View File

@@ -1330,7 +1330,8 @@ bool Table_triggers_list::prepare_record_accessors(TABLE *table)
*/ */
bool Table_triggers_list::check_n_load(THD *thd, const LEX_CSTRING *db, bool Table_triggers_list::check_n_load(THD *thd, const LEX_CSTRING *db,
const LEX_CSTRING *table_name, TABLE *table, const LEX_CSTRING *table_name,
TABLE *table,
bool names_only) bool names_only)
{ {
char path_buff[FN_REFLEN]; char path_buff[FN_REFLEN];

View File

@@ -5269,7 +5269,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl)
/* used in RBR Triggers */ /* used in RBR Triggers */
master_had_triggers= 0; master_had_triggers= 0;
#endif #endif
/* Catch wrong handling of the auto_increment_field_not_null. */ /* Catch wrong handling of the auto_increment_field_not_null. */
DBUG_ASSERT(!auto_increment_field_not_null); DBUG_ASSERT(!auto_increment_field_not_null);
auto_increment_field_not_null= FALSE; auto_increment_field_not_null= FALSE;
@@ -5284,7 +5283,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl)
} }
notnull_cond= 0; notnull_cond= 0;
DBUG_ASSERT(!file->keyread_enabled()); DBUG_ASSERT(!file->keyread_enabled());
restore_record(this, s->default_values); restore_record(this, s->default_values);
@@ -7293,8 +7291,7 @@ void TABLE::mark_columns_per_binlog_row_image()
If in RBR we may need to mark some extra columns, If in RBR we may need to mark some extra columns,
depending on the binlog-row-image command line argument. depending on the binlog-row-image command line argument.
*/ */
if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && if (file->row_logging &&
thd->is_current_stmt_binlog_format_row() &&
!ha_check_storage_engine_flag(s->db_type(), HTON_NO_BINLOG_ROW_OPT)) !ha_check_storage_engine_flag(s->db_type(), HTON_NO_BINLOG_ROW_OPT))
{ {
/* if there is no PK, then mark all columns for the BI. */ /* if there is no PK, then mark all columns for the BI. */

View File

@@ -1124,13 +1124,11 @@ TABLE *THD::open_temporary_table(TMP_TABLE_SHARE *share,
table->reginfo.lock_type= TL_WRITE; /* Simulate locked */ table->reginfo.lock_type= TL_WRITE; /* Simulate locked */
table->grant.privilege= TMP_TABLE_ACLS; table->grant.privilege= TMP_TABLE_ACLS;
table->query_id= query_id;
share->tmp_table= (table->file->has_transaction_manager() ? share->tmp_table= (table->file->has_transaction_manager() ?
TRANSACTIONAL_TMP_TABLE : NON_TRANSACTIONAL_TMP_TABLE); TRANSACTIONAL_TMP_TABLE : NON_TRANSACTIONAL_TMP_TABLE);
share->not_usable_by_query_cache= 1; share->not_usable_by_query_cache= 1;
table->pos_in_table_list= 0;
table->query_id= query_id;
/* Add table to the head of table list. */ /* Add table to the head of table list. */
share->all_tmp_tables.push_front(table); share->all_tmp_tables.push_front(table);

View File

@@ -302,8 +302,10 @@ bool trans_commit_implicit(THD *thd)
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
if (thd->variables.option_bits & OPTION_GTID_BEGIN) if (thd->variables.option_bits & OPTION_GTID_BEGIN)
{
DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. " DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. "
"Master and slave will have different GTID values")); "Master and slave will have different GTID values"));
}
if (thd->in_multi_stmt_transaction_mode() || if (thd->in_multi_stmt_transaction_mode() ||
(thd->variables.option_bits & OPTION_TABLE_LOCK)) (thd->variables.option_bits & OPTION_TABLE_LOCK))
@@ -361,9 +363,9 @@ bool trans_rollback(THD *thd)
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
repl_semisync_master.wait_after_rollback(thd, FALSE); repl_semisync_master.wait_after_rollback(thd, FALSE);
#endif #endif
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
/* Reset the binlog transaction marker */ /* Reset the binlog transaction marker */
thd->variables.option_bits&= ~OPTION_GTID_BEGIN; thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG |
OPTION_GTID_BEGIN);
thd->transaction.all.reset(); thd->transaction.all.reset();
thd->lex->start_transaction_opt= 0; thd->lex->start_transaction_opt= 0;

View File

@@ -222,7 +222,7 @@ extern wsrep_seqno_t wsrep_locked_seqno;
/* use xxxxxx_NNULL macros when thd pointer is guaranteed to be non-null to /* use xxxxxx_NNULL macros when thd pointer is guaranteed to be non-null to
* avoid compiler warnings (GCC 6 and later) */ * avoid compiler warnings (GCC 6 and later) */
#define WSREP_NNULL(thd) \ #define WSREP_NNULL(thd) \
(WSREP_ON && thd->variables.wsrep_on) (thd->variables.wsrep_on && WSREP_ON)
#define WSREP(thd) \ #define WSREP(thd) \
(thd && WSREP_NNULL(thd)) (thd && WSREP_NNULL(thd))