1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-09 22:24:09 +03:00

Clean up and speed up interfaces for binary row logging

MDEV-21605 Clean up and speed up interfaces for binary row logging
MDEV-21617 Bug fix for previous version of this code

The intention is to have as few 'if' as possible in ha_write() and
related functions. This is done by pre-calculating once per statement the
row_logging state for all tables.

Benefits are simpler and faster code both when binary logging is disabled
and when it's enabled.

Changes:
- Added handler->row_logging to make it easy to check it table should be
  row logged. This also made it easier to disabling row logging for system,
  internal and temporary tables.
- The tables row_logging capabilities are checked once per "statements
  that updates tables" in THD::binlog_prepare_for_row_logging() which
  is called when needed from THD::decide_logging_format().
- Removed most usage of tmp_disable_binlog(), reenable_binlog() and
  temporary saving and setting of thd->variables.option_bits.
- Moved checks that can't change during a statement from
  check_table_binlog_row_based() to check_table_binlog_row_based_internal()
- Removed flag row_already_logged (used by sequence engine)
- Moved binlog_log_row() to a handler::
- Moved write_locked_table_maps() to THD::binlog_write_table_maps() as
  most other related binlog functions are in THD.
- Removed binlog_write_table_map() and binlog_log_row_internal() as
  they are now obsolete as 'has_transactions()' is pre-calculated in
  prepare_for_row_logging().
- Remove 'is_transactional' argument from binlog_write_table_map() as this
  can now be read from handler.
- Changed order of 'if's in handler::external_lock() and wsrep_mysqld.h
  to first evaluate fast and likely cases before more complex ones.
- Added error checking in ha_write_row() and related functions if
  binlog_log_row() failed.
- Don't clear check_table_binlog_row_based_result in
  clear_cached_table_binlog_row_based_flag() as it's not needed.
- THD::clear_binlog_table_maps() has been replaced with
  THD::reset_binlog_for_next_statement()
- Added 'MYSQL_OPEN_IGNORE_LOGGING_FORMAT' flag to open_and_lock_tables()
  to avoid calculating of binary log format for internal opens. This flag
  is also used to avoid reading statistics tables for internal tables.
- Added OPTION_BINLOG_LOG_OFF as a simple way to turn of binlog temporary
  for create (instead of using THD::sql_log_bin_off.
- Removed flag THD::sql_log_bin_off (not needed anymore)
- Speed up THD::decide_logging_format() by remembering if blackhole engine
  is used and avoid a loop over all tables if it's not used
  (the common case).
- THD::decide_logging_format() is not called anymore if no tables are used
  for the statement. This will speed up pure stored procedure code with
  about 5%+ according to some simple tests.
- We now get annotated events on slave if a CREATE ... SELECT statement
  is transformed on the slave from statement to row logging.
- In the original code, the master could come into a state where row
  logging is enforced for all future events if statement could be used.
  This is now partly fixed.

Other changes:
- Ensure that all tables used by a statement has query_id set.
- Had to restore the row_logging flag for not used tables in
  THD::binlog_write_table_maps (not normal scenario)
- Removed injector::transaction::use_table(server_id_type sid, table tbl)
  as it's not used.
- Cleaned up set_slave_thread_options()
- Some more DBUG_ENTER/DBUG_RETURN, code comments and minor indentation
  changes.
- Ensure we only call THD::decide_logging_format_low() once in
  mysql_insert() (inefficiency).
- Don't annotate INSERT DELAYED
- Removed zeroing pos_in_table_list in THD::open_temporary_table() as it's
  already 0
This commit is contained in:
Monty
2020-01-28 23:23:51 +02:00
parent f51df1dc78
commit 91ab42a823
35 changed files with 570 additions and 428 deletions

View File

@@ -0,0 +1,22 @@
--source include/have_innodb.inc
--source include/have_binlog_format_row.inc
reset master;
CREATE TABLE t1 (
id INT,
k INT,
c CHAR(8),
KEY (k),
PRIMARY KEY (id),
FOREIGN KEY (id) REFERENCES t1 (k)
) ENGINE=InnoDB;
LOCK TABLES t1 WRITE;
SET SESSION FOREIGN_KEY_CHECKS= OFF;
SET AUTOCOMMIT=OFF;
INSERT INTO t1 VALUES (1,1,'foo');
DROP TABLE t1;
SET SESSION FOREIGN_KEY_CHECKS= ON;
SET AUTOCOMMIT=ON;
source include/show_binlog_events.inc;

View File

@@ -85,9 +85,12 @@ master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; create table t1 (a int)
master-bin.000001 # Gtid # # BEGIN GTID #-#-#
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `test`.`t1`/* Generated to handle failed CREATE OR REPLACE */
master-bin.000001 # Query # # ROLLBACK
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; create temporary table t9 (a int)
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # DROP TEMPORARY TABLE IF EXISTS `test`.`t9`/* Generated to handle failed CREATE OR REPLACE */
connection server_2;
show tables;
Tables_in_test
@@ -154,7 +157,7 @@ slave-bin.000001 # Query # # use `test`; create table t4 (server_2_to_be_delete
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `test`; create table t1 (new_table int)
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` (
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
`a` int(11) DEFAULT NULL
)
slave-bin.000001 # Annotate_rows # # create table t2 select * from t9
@@ -223,26 +226,12 @@ Log_name Pos Event_type Server_id End_log_pos Info
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `test`; create table t1 (a int)
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Annotate_rows # # insert into t1 values (0),(1),(2)
slave-bin.000001 # Table_map # # table_id: # (test.t1)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # use `test`; insert into t1 values (0),(1),(2)
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` (
`a` int(11) DEFAULT NULL
) ENGINE=MyISAM
slave-bin.000001 # Annotate_rows # # create table t2 engine=myisam select * from t1
slave-bin.000001 # Table_map # # table_id: # (test.t2)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
`a` int(11) DEFAULT NULL
) ENGINE=InnoDB
slave-bin.000001 # Annotate_rows # # create or replace table t2 engine=innodb select * from t1
slave-bin.000001 # Table_map # # table_id: # (test.t2)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Xid # # COMMIT /* XID */
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `test`; create table t2 engine=myisam select * from t1
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `test`; create or replace table t2 engine=innodb select * from t1
connection server_1;
drop table t1;
#

View File

@@ -160,6 +160,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` (
`a` int(11) DEFAULT NULL
)
slave-bin.000001 # Annotate_rows # # create table t2 select * from t9
slave-bin.000001 # Table_map # # table_id: # (test.t2)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # COMMIT
@@ -171,6 +172,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `test`; CREATE TABLE `t5` (
`a` int(11) DEFAULT NULL
)
slave-bin.000001 # Annotate_rows # # create table t5 select * from t9
slave-bin.000001 # Table_map # # table_id: # (test.t5)
slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
slave-bin.000001 # Query # # COMMIT

View File

@@ -56,6 +56,7 @@ create or replace table t1 (a int primary key) select a from t2;
# Same with temporary table
create temporary table t9 (a int);
--error ER_DUP_ENTRY
create or replace temporary table t9 (a int primary key) select a from t2;

View File

@@ -1241,15 +1241,9 @@ Events::load_events_from_db(THD *thd)
TRUE);
/* All the dmls to mysql.events tables are stmt bin-logged. */
bool save_binlog_row_based;
if ((save_binlog_row_based= thd->is_current_stmt_binlog_format_row()))
thd->set_current_stmt_binlog_format_stmt();
table->file->row_logging= 0;
(void) table->file->ha_update_row(table->record[1], table->record[0]);
if (save_binlog_row_based)
thd->set_current_stmt_binlog_format_row();
delete et;
continue;
}

View File

@@ -2131,12 +2131,10 @@ int ha_partition::copy_partitions(ulonglong * const copied,
}
else
{
THD *thd= ha_thd();
/* Copy record to new handler */
(*copied)++;
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
DBUG_ASSERT(!m_new_file[new_part]->row_logging);
result= m_new_file[new_part]->ha_write_row(m_rec0);
reenable_binlog(thd);
if (result)
goto error;
}
@@ -4374,11 +4372,10 @@ int ha_partition::write_row(const uchar * buf)
start_part_bulk_insert(thd, part_id);
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
DBUG_ASSERT(!m_file[part_id]->row_logging);
error= m_file[part_id]->ha_write_row(buf);
if (have_auto_increment && !table->s->next_number_keypart)
set_auto_increment_if_higher(table->next_number_field);
reenable_binlog(thd);
exit:
table->auto_increment_field_not_null= saved_auto_inc_field_not_null;
@@ -4457,12 +4454,11 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data)
m_last_part= new_part_id;
start_part_bulk_insert(thd, new_part_id);
DBUG_ASSERT(!m_file[new_part_id]->row_logging);
if (new_part_id == old_part_id)
{
DBUG_PRINT("info", ("Update in partition %u", (uint) new_part_id));
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
error= m_file[new_part_id]->ha_update_row(old_data, new_data);
reenable_binlog(thd);
goto exit;
}
else
@@ -4481,16 +4477,12 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data)
table->next_number_field= NULL;
DBUG_PRINT("info", ("Update from partition %u to partition %u",
(uint) old_part_id, (uint) new_part_id));
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
error= m_file[new_part_id]->ha_write_row((uchar*) new_data);
reenable_binlog(thd);
table->next_number_field= saved_next_number_field;
if (unlikely(error))
goto exit;
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
error= m_file[old_part_id]->ha_delete_row(old_data);
reenable_binlog(thd);
if (unlikely(error))
goto exit;
}
@@ -4592,9 +4584,8 @@ int ha_partition::delete_row(const uchar *buf)
if (!bitmap_is_set(&(m_part_info->lock_partitions), m_last_part))
DBUG_RETURN(HA_ERR_NOT_IN_LOCK_PARTITIONS);
tmp_disable_binlog(thd);
DBUG_ASSERT(!m_file[m_last_part]->row_logging);
error= m_file[m_last_part]->ha_delete_row(buf);
reenable_binlog(thd);
DBUG_RETURN(error);
}

View File

@@ -202,7 +202,11 @@ int ha_sequence::write_row(const uchar *buf)
DBUG_ENTER("ha_sequence::write_row");
DBUG_ASSERT(table->record[0] == buf);
row_already_logged= 0;
/*
Log to binary log even if this function has been called before
(The function ends by setting row_logging to 0)
*/
row_logging= row_logging_init;
if (unlikely(sequence->initialized == SEQUENCE::SEQ_IN_PREPARE))
{
/* This calls is from ha_open() as part of create table */
@@ -218,6 +222,7 @@ int ha_sequence::write_row(const uchar *buf)
sequence->copy(&tmp_seq);
if (likely(!(error= file->write_row(buf))))
sequence->initialized= SEQUENCE::SEQ_READY_TO_USE;
row_logging= 0;
DBUG_RETURN(error);
}
if (unlikely(sequence->initialized != SEQUENCE::SEQ_READY_TO_USE))
@@ -262,10 +267,12 @@ int ha_sequence::write_row(const uchar *buf)
sequence->copy(&tmp_seq);
rows_changed++;
/* We have to do the logging while we hold the sequence mutex */
error= binlog_log_row(table, 0, buf, log_func);
row_already_logged= 1;
if (row_logging)
error= binlog_log_row(table, 0, buf, log_func);
}
/* Row is already logged, don't log it again in ha_write_row() */
row_logging= 0;
sequence->all_values_used= 0;
if (!sequence_locked)
sequence->write_unlock(table);

View File

@@ -3620,8 +3620,9 @@ int handler::update_auto_increment()
variables->auto_increment_increment);
auto_inc_intervals_count++;
/* Row-based replication does not need to store intervals in binlog */
if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open())
&& !thd->is_current_stmt_binlog_format_row())
if (((WSREP_NNULL(thd) && wsrep_emulate_bin_log) ||
mysql_bin_log.is_open()) &&
!thd->is_current_stmt_binlog_format_row())
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
append(auto_inc_interval_for_cur_row.minimum(),
auto_inc_interval_for_cur_row.values(),
@@ -6366,32 +6367,35 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat)
1 Row needs to be logged
*/
bool handler::check_table_binlog_row_based(bool binlog_row)
bool handler::check_table_binlog_row_based()
{
if (table->versioned(VERS_TRX_ID))
return false;
if (unlikely((table->in_use->variables.sql_log_bin_off)))
return 0; /* Called by partitioning engine */
#ifdef WITH_WSREP
if (!table->in_use->variables.sql_log_bin &&
wsrep_thd_is_applying(table->in_use))
return 0; /* wsrep patch sets sql_log_bin to silence binlogging
from high priority threads */
#endif /* WITH_WSREP */
if (unlikely((!check_table_binlog_row_based_done)))
{
check_table_binlog_row_based_done= 1;
check_table_binlog_row_based_result=
check_table_binlog_row_based_internal(binlog_row);
check_table_binlog_row_based_internal();
}
return check_table_binlog_row_based_result;
}
bool handler::check_table_binlog_row_based_internal(bool binlog_row)
bool handler::check_table_binlog_row_based_internal()
{
THD *thd= table->in_use;
#ifdef WITH_WSREP
if (!thd->variables.sql_log_bin &&
wsrep_thd_is_applying(table->in_use))
{
/*
wsrep patch sets sql_log_bin to silence binlogging from high
priority threads
*/
return 0;
}
#endif
return (table->s->can_do_row_logging &&
!table->versioned(VERS_TRX_ID) &&
!(thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) &&
thd->is_current_stmt_binlog_format_row() &&
/*
Wsrep partially enables binary logging if it have not been
@@ -6407,9 +6411,9 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
Otherwise, return 'true' if binary logging is on.
*/
IF_WSREP(((WSREP_EMULATE_BINLOG(thd) &&
IF_WSREP(((WSREP_EMULATE_BINLOG_NNULL(thd) &&
wsrep_thd_is_local(thd)) ||
((WSREP(thd) ||
((WSREP_NNULL(thd) ||
(thd->variables.option_bits & OPTION_BIN_LOG)) &&
mysql_bin_log.is_open())),
(thd->variables.option_bits & OPTION_BIN_LOG) &&
@@ -6417,151 +6421,22 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
}
/** @brief
Write table maps for all (manually or automatically) locked tables
to the binary log. Also, if binlog_annotate_row_events is ON,
write Annotate_rows event before the first table map.
SYNOPSIS
write_locked_table_maps()
thd Pointer to THD structure
DESCRIPTION
This function will generate and write table maps for all tables
that are locked by the thread 'thd'.
RETURN VALUE
0 All OK
1 Failed to write all table maps
SEE ALSO
THD::lock
*/
static int write_locked_table_maps(THD *thd)
int handler::binlog_log_row(TABLE *table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func)
{
DBUG_ENTER("write_locked_table_maps");
DBUG_PRINT("enter", ("thd:%p thd->lock:%p "
"thd->extra_lock: %p",
thd, thd->lock, thd->extra_lock));
bool error;
THD *thd= table->in_use;
DBUG_ENTER("binlog_log_row");
DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps()));
if (!thd->binlog_table_maps &&
thd->binlog_write_table_maps())
DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);
MYSQL_LOCK *locks[2];
locks[0]= thd->extra_lock;
locks[1]= thd->lock;
my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd),
true) &&
thd->variables.binlog_annotate_row_events &&
thd->query() && thd->query_length();
for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
{
MYSQL_LOCK const *const lock= locks[i];
if (lock == NULL)
continue;
TABLE **const end_ptr= lock->table + lock->table_count;
for (TABLE **table_ptr= lock->table ;
table_ptr != end_ptr ;
++table_ptr)
{
TABLE *const table= *table_ptr;
if (table->current_lock == F_WRLCK &&
table->file->check_table_binlog_row_based(0))
{
if (binlog_write_table_map(thd, table, with_annotate))
DBUG_RETURN(1);
with_annotate= 0;
}
}
}
DBUG_RETURN(0);
}
int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate)
{
DBUG_ENTER("binlog_write_table_map");
DBUG_PRINT("info", ("table %s", table->s->table_name.str));
/*
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
(e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
compatible behavior with the STMT based replication even when
the table is not transactional. In other words, if the operation
fails while executing the insert phase nothing is written to the
binlog.
Note that at this point, we check the type of a set of tables to
create the table map events. In the function binlog_log_row(),
which calls the current function, we check the type of the table
of the current row.
*/
bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
table->file->has_transactions());
int const error= thd->binlog_write_table_map(table, has_trans,
&with_annotate);
/*
If an error occurs, it is the responsibility of the caller to
roll back the transaction.
*/
if (unlikely(error))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
static int binlog_log_row_internal(TABLE* table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func)
{
bool error= 0;
THD *const thd= table->in_use;
/*
If there are no table maps written to the binary log, this is
the first row handled in this statement. In that case, we need
to write table maps for all locked tables to the binary log.
*/
if (likely(!(error= ((thd->get_binlog_table_maps() == 0 &&
write_locked_table_maps(thd))))))
{
/*
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
(i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
compatible behavior with the STMT based replication even when
the table is not transactional. In other words, if the operation
fails while executing the insert phase nothing is written to the
binlog. We need the same also for ALTER TABLE in the case we convert
a shared table to a not shared table as in this case we will log all
rows.
*/
bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
table->file->has_transactions());
error= (*log_func)(thd, table, has_trans, before_record, after_record);
}
return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
}
int binlog_log_row(TABLE* table, const uchar *before_record,
const uchar *after_record, Log_func *log_func)
{
#ifdef WITH_WSREP
THD *const thd= table->in_use;
/* only InnoDB tables will be replicated through binlog emulation */
if ((WSREP_EMULATE_BINLOG(thd) &&
!(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
thd->wsrep_ignore_table == true)
return 0;
#endif
if (!table->file->check_table_binlog_row_based(1))
return 0;
return binlog_log_row_internal(table, before_record, after_record, log_func);
error= (*log_func)(thd, table, row_logging_has_trans,
before_record, after_record);
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
}
@@ -6647,6 +6522,7 @@ int handler::ha_external_lock(THD *thd, int lock_type)
int handler::ha_reset()
{
DBUG_ENTER("ha_reset");
/* Check that we have called all proper deallocation functions */
DBUG_ASSERT((uchar*) table->def_read_set.bitmap +
table->s->column_bitmap_size ==
@@ -6662,6 +6538,10 @@ int handler::ha_reset()
pushed_cond= NULL;
tracker= NULL;
mark_trx_read_write_done= 0;
/*
Disable row logging.
*/
row_logging= row_logging_init= 0;
clear_cached_table_binlog_row_based_flag();
/* Reset information about pushed engine conditions */
cancel_pushed_idx_cond();
@@ -6678,8 +6558,8 @@ static int wsrep_after_row(THD *thd)
/* enforce wsrep_max_ws_rows */
thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
wsrep_thd_is_local(thd) &&
thd->wsrep_affected_rows > wsrep_max_ws_rows)
thd->wsrep_affected_rows > wsrep_max_ws_rows &&
wsrep_thd_is_local(thd))
{
trans_rollback_stmt(thd) || trans_rollback(thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
@@ -6864,6 +6744,74 @@ static int check_duplicate_long_entries_update(TABLE *table, uchar *new_rec)
/**
Check if galera disables binary logging for this table
@return 0 Binary logging disabled
@return 1 Binary logging can be enabled
*/
static inline bool wsrep_check_if_binlog_row(TABLE *table)
{
#ifdef WITH_WSREP
THD *const thd= table->in_use;
/* only InnoDB tables will be replicated through binlog emulation */
if ((WSREP_EMULATE_BINLOG(thd) &&
!(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
thd->wsrep_ignore_table == true)
return 0;
#endif
return 1;
}
/**
Prepare handler for row logging
@return 0 if handler will not participate in row logging
@return 1 handler will participate in row logging
This function is always safe to call on an opened table.
*/
bool handler::prepare_for_row_logging()
{
DBUG_ENTER("handler::prepare_for_row_logging");
/* Check if we should have row logging */
if (wsrep_check_if_binlog_row(table) &&
check_table_binlog_row_based())
{
/*
Row logging enabled. Intialize all variables and write
annotated and table maps
*/
row_logging= row_logging_init= 1;
/*
We need to have a transactional behavior for SQLCOM_CREATE_TABLE
(e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
compatible behavior with the STMT based replication even when
the table is not transactional. In other words, if the operation
fails while executing the insert phase nothing is written to the
binlog.
*/
row_logging_has_trans=
((sql_command_flags[table->in_use->lex->sql_command] &
(CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
table->file->has_transactions());
}
else
{
/* Check row_logging has not been properly cleared from previous command */
DBUG_ASSERT(row_logging == 0);
}
DBUG_RETURN(row_logging);
}
/*
Do all initialization needed for insert
@param force_update_handler Set to TRUE if we should always create an
@@ -6891,7 +6839,6 @@ int handler::prepare_for_insert(bool force_update_handler)
int handler::ha_write_row(const uchar *buf)
{
int error;
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
DBUG_ENTER("handler::ha_write_row");
@@ -6911,13 +6858,17 @@ int handler::ha_write_row(const uchar *buf)
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
if (likely(!error) && !row_already_logged)
if (likely(!error))
{
rows_changed++;
error= binlog_log_row(table, 0, buf, log_func);
if (row_logging)
{
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
error= binlog_log_row(table, 0, buf, log_func);
}
#ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
!error && (error= wsrep_after_row(ha_thd())))
{
DBUG_RETURN(error);
}
@@ -6932,10 +6883,8 @@ int handler::ha_write_row(const uchar *buf)
int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
{
int error;
Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
/*
Some storage engines require that the new record is in record[0]
(and the old record is in record[1]).
@@ -6950,20 +6899,22 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
(error= check_duplicate_long_entries_update(table, (uchar*) new_data)))
return error;
TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error,
TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error);
if (likely(!error) && !row_already_logged)
if (likely(!error))
{
rows_changed++;
error= binlog_log_row(table, old_data, new_data, log_func);
#ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
if (row_logging)
{
return error;
Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
error= binlog_log_row(table, old_data, new_data, log_func);
}
#ifdef WITH_WSREP
if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
!error && (error= wsrep_after_row(ha_thd())))
return error;
#endif /* WITH_WSREP */
}
return error;
@@ -7000,7 +6951,6 @@ int handler::update_first_row(const uchar *new_data)
int handler::ha_delete_row(const uchar *buf)
{
int error;
Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
/*
@@ -7019,10 +6969,14 @@ int handler::ha_delete_row(const uchar *buf)
if (likely(!error))
{
rows_changed++;
error= binlog_log_row(table, buf, 0, log_func);
if (row_logging)
{
Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
error= binlog_log_row(table, buf, 0, log_func);
}
#ifdef WITH_WSREP
if (table_share->tmp_table == NO_TMP_TABLE &&
WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
!error && (error= wsrep_after_row(ha_thd())))
{
return error;
}
@@ -7049,11 +7003,10 @@ int handler::ha_delete_row(const uchar *buf)
int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
{
int error;
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();
error = direct_update_rows(update_rows, found_rows);
error= direct_update_rows(update_rows, found_rows);
MYSQL_UPDATE_ROW_DONE(error);
return error;
}

View File

@@ -610,6 +610,7 @@ given at all. */
#define HA_CREATE_USED_SEQUENCE (1UL << 25)
typedef ulonglong alter_table_operations;
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
/*
These flags are set by the parser and describes the type of
@@ -3050,8 +3051,6 @@ public:
bool mark_trx_read_write_done; /* mark_trx_read_write was called */
bool check_table_binlog_row_based_done; /* check_table_binlog.. was called */
bool check_table_binlog_row_based_result; /* cached check_table_binlog... */
/* Set to 1 if handler logged last insert/update/delete operation */
bool row_already_logged;
/*
TRUE <=> the engine guarantees that returned records are within the range
being scanned.
@@ -3192,10 +3191,16 @@ public:
void end_psi_batch_mode();
bool set_top_table_fields;
struct TABLE *top_table;
Field **top_table_field;
uint top_table_fields;
/* If we have row logging enabled for this table */
bool row_logging, row_logging_init;
/* If the row logging should be done in transaction cache */
bool row_logging_has_trans;
private:
/**
The lock type set by when calling::ha_external_lock(). This is
@@ -3213,7 +3218,6 @@ private:
/** Stores next_insert_id for handling duplicate key errors. */
ulonglong m_prev_insert_id;
public:
handler(handlerton *ht_arg, TABLE_SHARE *share_arg)
:table_share(share_arg), table(0),
@@ -3223,7 +3227,6 @@ public:
mark_trx_read_write_done(0),
check_table_binlog_row_based_done(0),
check_table_binlog_row_based_result(0),
row_already_logged(0),
in_range_check_pushed_down(FALSE), errkey(-1),
key_used_on_scan(MAX_KEY),
active_index(MAX_KEY), keyread(MAX_KEY),
@@ -3242,6 +3245,7 @@ public:
m_psi_locker(NULL),
set_top_table_fields(FALSE), top_table(0),
top_table_field(0), top_table_fields(0),
row_logging(0), row_logging_init(0),
m_lock_type(F_UNLCK), ha_share(NULL), m_prev_insert_id(0)
{
DBUG_PRINT("info",
@@ -4598,13 +4602,17 @@ protected:
virtual int delete_table(const char *name);
public:
bool check_table_binlog_row_based(bool binlog_row);
bool check_table_binlog_row_based();
bool prepare_for_row_logging();
int prepare_for_insert(bool force_update_handler= 0);
int binlog_log_row(TABLE *table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func);
inline void clear_cached_table_binlog_row_based_flag()
{
check_table_binlog_row_based_done= 0;
check_table_binlog_row_based_result= 0;
}
private:
/* Cache result to avoid extra calls */
@@ -4619,7 +4627,7 @@ private:
private:
void mark_trx_read_write_internal();
bool check_table_binlog_row_based_internal(bool binlog_row);
bool check_table_binlog_row_based_internal();
protected:
/*
@@ -5202,7 +5210,6 @@ int binlog_log_row(TABLE* table,
if (unlikely(this_tracker)) \
tracker->stop_tracking(table->in_use); \
}
int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate);
void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag);
void print_keydup_error(TABLE *table, KEY *key, myf errflag);

View File

@@ -514,6 +514,12 @@ void Log_event_writer::add_status(enum_logged_status status)
cache_data->add_status(status);
}
void Log_event_writer::set_incident()
{
cache_data->set_incident();
}
class binlog_cache_mngr {
public:
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size,
@@ -714,7 +720,6 @@ bool Log_to_csv_event_handler::
uint field_index;
Silence_log_table_errors error_handler;
Open_tables_backup open_tables_backup;
ulonglong save_thd_options;
bool save_time_zone_used;
DBUG_ENTER("log_general");
@@ -724,9 +729,6 @@ bool Log_to_csv_event_handler::
*/
save_time_zone_used= thd->time_zone_used;
save_thd_options= thd->variables.option_bits;
thd->variables.option_bits&= ~OPTION_BIN_LOG;
table_list.init_one_table(&MYSQL_SCHEMA_NAME, &GENERAL_LOG_NAME, 0,
TL_WRITE_CONCURRENT_INSERT);
@@ -806,7 +808,6 @@ bool Log_to_csv_event_handler::
table->field[field_index]->set_default();
}
/* log table entries are not replicated */
if (table->file->ha_write_row(table->record[0]))
goto err;
@@ -827,7 +828,6 @@ err:
if (need_close)
close_log_table(thd, &open_tables_backup);
thd->variables.option_bits= save_thd_options;
thd->time_zone_used= save_time_zone_used;
DBUG_RETURN(result);
}
@@ -880,7 +880,6 @@ bool Log_to_csv_event_handler::
ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
ulong query_time_micro= (ulong) (query_utime % 1000000);
ulong lock_time_micro= (ulong) (lock_utime % 1000000);
DBUG_ENTER("Log_to_csv_event_handler::log_slow");
thd->push_internal_handler(& error_handler);
@@ -997,7 +996,6 @@ bool Log_to_csv_event_handler::
0, TRUE))
goto err;
/* log table entries are not replicated */
if (table->file->ha_write_row(table->record[0]))
goto err;
@@ -1982,7 +1980,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
if (cache_mngr->trx_cache.has_incident())
error= mysql_bin_log.write_incident(thd);
thd->clear_binlog_table_maps();
thd->reset_binlog_for_next_statement();
cache_mngr->reset(false, true);
}
@@ -2253,6 +2251,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
*/
cache_mngr->reset(false, true);
thd->reset_binlog_for_next_statement();
DBUG_RETURN(error);
}
if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd))
@@ -2297,6 +2296,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
*/
if (!all)
cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
thd->reset_binlog_for_next_statement();
DBUG_RETURN(error);
}
@@ -2478,14 +2478,14 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
/*
When a SAVEPOINT is executed inside a stored function/trigger we force the
pending event to be flushed with a STMT_END_F flag and clear the table maps
pending event to be flushed with a STMT_END_F flag and reset binlog
as well to ensure that following DMLs will have a clean state to start
with. ROLLBACK inside a stored routine has to finalize possibly existing
current row-based pending event with cleaning up table maps. That ensures
that following DMLs will have a clean state to start with.
*/
if (thd->in_sub_stmt)
thd->clear_binlog_table_maps();
thd->reset_binlog_for_next_statement();
DBUG_RETURN(0);
}
@@ -5773,7 +5773,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
We only update the saved position if the old one was undefined,
the reason is that there are some cases (e.g., for CREATE-SELECT)
where the position is saved twice (e.g., both in
select_create::prepare() and THD::binlog_write_table_map()) , but
select_create::prepare() and binlog_write_table_map()) , but
we should use the first. This means that calls to this function
can be used to start the statement before the first table map
event, to include some extra events.
@@ -5900,45 +5900,149 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
DBUG_RETURN(err);
}
/**
Prepare all tables that are updated for row logging
Annotate events and table maps are written by binlog_write_table_maps()
*/
void THD::binlog_prepare_for_row_logging()
{
DBUG_ENTER("THD::binlog_prepare_for_row_logging");
for (TABLE *table= open_tables ; table; table= table->next)
{
if (table->query_id == query_id && table->current_lock == F_WRLCK)
table->file->prepare_for_row_logging();
}
DBUG_VOID_RETURN;
}
/**
Write annnotated row event (the query) if needed
*/
bool THD::binlog_write_annotated_row(Log_event_writer *writer)
{
int error;
DBUG_ENTER("THD::binlog_write_annotated_row");
if (!(IF_WSREP(!wsrep_fragments_certified_for_stmt(this), true) &&
variables.binlog_annotate_row_events &&
query_length()))
DBUG_RETURN(0);
Annotate_rows_log_event anno(this, 0, false);
if (unlikely((error= writer->write(&anno))))
{
if (my_errno == EFBIG)
writer->set_incident();
DBUG_RETURN(error);
}
DBUG_RETURN(0);
}
/**
Write table map events for all tables that are using row logging.
This includes all tables used by this statement, including tables
used in triggers.
Also write annotate events and start transactions.
This is using the "tables_with_row_logging" list prepared by
THD::binlog_prepare_for_row_logging
*/
bool THD::binlog_write_table_maps()
{
bool with_annotate;
MYSQL_LOCK *locks[2], **locks_end= locks;
DBUG_ENTER("THD::binlog_write_table_maps");
DBUG_ASSERT(!binlog_table_maps);
DBUG_ASSERT(is_current_stmt_binlog_format_row());
/* Initialize cache_mngr once per statement */
binlog_start_trans_and_stmt();
with_annotate= 1; // Write annotate with first map
if ((*locks_end= extra_lock))
locks_end++;
if ((*locks_end= lock))
locks_end++;
for (MYSQL_LOCK **cur_lock= locks ; cur_lock < locks_end ; cur_lock++)
{
TABLE **const end_ptr= (*cur_lock)->table + (*cur_lock)->table_count;
for (TABLE **table_ptr= (*cur_lock)->table;
table_ptr != end_ptr ;
++table_ptr)
{
TABLE *table= *table_ptr;
bool restore= 0;
/*
We have to also write table maps for tables that have not yet been
used, like for tables in after triggers
*/
if (!table->file->row_logging &&
table->query_id != query_id && table->current_lock == F_WRLCK)
{
if (table->file->prepare_for_row_logging())
restore= 1;
}
if (table->file->row_logging)
{
if (binlog_write_table_map(table, with_annotate))
DBUG_RETURN(1);
with_annotate= 0;
}
if (restore)
{
/*
Restore original setting so that it doesn't cause problem for the
next statement
*/
table->file->row_logging= table->file->row_logging_init= 0;
}
}
}
binlog_table_maps= 1; // Table maps written
DBUG_RETURN(0);
}
/**
This function writes a table map to the binary log.
Note that in order to keep the signature uniform with related methods,
we use a redundant parameter to indicate whether a transactional table
was changed or not.
If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the table map.
@param table a pointer to the table.
@param is_transactional @c true indicates a transactional table,
otherwise @c false a non-transactional.
@param with_annotate If true call binlog_write_annotated_row()
@return
nonzero if an error pops up when writing the table map event.
*/
int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
bool *with_annotate)
bool THD::binlog_write_table_map(TABLE *table, bool with_annotate)
{
int error;
bool is_transactional= table->file->row_logging_has_trans;
DBUG_ENTER("THD::binlog_write_table_map");
DBUG_PRINT("enter", ("table: %p (%s: #%lu)",
table, table->s->table_name.str,
table->s->table_map_id));
/* Pre-conditions */
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_transactional= 1;
/* Pre-conditions */
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
the_event(this, table, table->s->table_map_id, is_transactional);
if (binlog_table_maps == 0)
binlog_start_trans_and_stmt();
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
binlog_cache_data *cache_data= (cache_mngr->
@@ -5946,25 +6050,17 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
IO_CACHE *file= &cache_data->cache_log;
Log_event_writer writer(file, cache_data);
if (with_annotate && *with_annotate)
{
Annotate_rows_log_event anno(table->in_use, is_transactional, false);
/* Annotate event should be written not more than once */
*with_annotate= 0;
if (unlikely((error= writer.write(&anno))))
{
if (my_errno == EFBIG)
cache_data->set_incident();
DBUG_RETURN(error);
}
}
if (with_annotate)
if (binlog_write_annotated_row(&writer))
DBUG_RETURN(1);
if (unlikely((error= writer.write(&the_event))))
DBUG_RETURN(error);
binlog_table_maps++;
DBUG_RETURN(0);
}
/**
This function retrieves a pending row event from a cache which is
specified through the parameter @c is_transactional. Respectively, when it
@@ -10848,7 +10944,7 @@ void wsrep_thd_binlog_trx_reset(THD * thd)
cache_mngr->stmt_cache.reset();
}
}
thd->clear_binlog_table_maps();
thd->reset_binlog_for_next_statement();
DBUG_VOID_RETURN;
}
@@ -10875,7 +10971,6 @@ bool wsrep_stmt_rollback_is_safe(THD* thd)
binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
if (binlog_hton && cache_mngr)
{
binlog_cache_data * trx_cache = &cache_mngr->trx_cache;

View File

@@ -1144,6 +1144,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
void make_default_log_name(char **out, const char* log_ext, bool once);
void binlog_reset_cache(THD *thd);
bool write_annotated_row(THD *thd);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton;

View File

@@ -942,6 +942,7 @@ public:
int write_footer();
my_off_t pos() { return my_b_safe_tell(file); }
void add_status(enum_logged_status status);
void set_incident();
Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg,
Binlog_crypt_data *cr= 0)

View File

@@ -799,7 +799,10 @@ my_bool Log_event::need_checksum()
int Log_event_writer::write_internal(const uchar *pos, size_t len)
{
if (my_b_safe_write(file, pos, len))
{
DBUG_PRINT("error", ("write to log failed: %d", my_errno));
return 1;
}
bytes_written+= len;
return 0;
}
@@ -6153,13 +6156,10 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
(tbl->s->db.str[tbl->s->db.length] == 0));
DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
#ifdef MYSQL_SERVER
binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields *
sizeof(Binlog_type_info));
for (uint i= 0; i < m_table->s->fields; i++)
binlog_type_info_array[i]= m_table->field[i]->binlog_type_info();
#endif
m_data_size= TABLE_MAP_HEADER_LEN;
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
@@ -7079,14 +7079,14 @@ bool Rows_log_event::process_triggers(trg_event_type event,
m_table->triggers->mark_fields_used(event);
if (slave_run_triggers_for_rbr == SLAVE_RUN_TRIGGERS_FOR_RBR_YES)
{
tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */
result= m_table->triggers->process_triggers(thd, event,
time_type, old_row_is_record1);
reenable_binlog(thd);
time_type,
old_row_is_record1);
}
else
result= m_table->triggers->process_triggers(thd, event,
time_type, old_row_is_record1);
time_type,
old_row_is_record1);
DBUG_RETURN(result);
}

View File

@@ -423,10 +423,12 @@ rpl_slave_state::truncate_state_table(THD *thd)
TABLE_LIST tlist;
int err= 0;
tmp_disable_binlog(thd);
tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE);
if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name,
NULL, TL_WRITE);
if (!(err= open_and_lock_tables(thd, &tlist, FALSE,
MYSQL_OPEN_IGNORE_LOGGING_FORMAT)))
{
DBUG_ASSERT(!tlist.table->file->row_logging);
tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql",
rpl_gtid_slave_state_table_name.str);
err= tlist.table->file->ha_truncate();
@@ -445,8 +447,6 @@ rpl_slave_state::truncate_state_table(THD *thd)
}
thd->mdl_context.release_transactional_locks();
}
reenable_binlog(thd);
return err;
}
@@ -676,6 +676,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table_opened= true;
table= tlist.table;
hton= table->s->db_type();
table->file->row_logging= 0; // No binary logging
if ((err= gtid_check_rpl_slave_state_table(table)))
goto end;

View File

@@ -100,6 +100,7 @@ int injector::transaction::commit()
}
#ifdef TO_BE_DELETED
int injector::transaction::use_table(server_id_type sid, table tbl)
{
DBUG_ENTER("injector::transaction::use_table");
@@ -111,12 +112,12 @@ int injector::transaction::use_table(server_id_type sid, table tbl)
server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid);
error= m_thd->binlog_write_table_map(tbl.get_table(),
tbl.is_transactional());
DBUG_ASSERT(tbl.is_transactional() == tbl.get_table()->file->row_logging_has_trans);
error= m_thd->binlog_write_table_map(tbl.get_table(), 0);
m_thd->set_server_id(save_id);
DBUG_RETURN(error);
}
#endif
injector::transaction::binlog_pos injector::transaction::start_pos() const

View File

@@ -177,8 +177,9 @@ public:
>0 Failure
*/
#ifdef TO_BE_DELETED
int use_table(server_id_type sid, table tbl);
#endif
/*
Commit a transaction.

View File

@@ -3582,20 +3582,19 @@ void set_slave_thread_options(THD* thd)
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
only for client threads.
*/
ulonglong options= thd->variables.option_bits | OPTION_BIG_SELECTS;
if (opt_log_slave_updates)
options|= OPTION_BIN_LOG;
else
ulonglong options= (thd->variables.option_bits |
OPTION_BIG_SELECTS | OPTION_BIN_LOG);
if (!opt_log_slave_updates)
options&= ~OPTION_BIN_LOG;
thd->variables.option_bits= options;
thd->variables.completion_type= 0;
/* For easier test in LOGGER::log_command */
if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE)
thd->variables.option_bits|= OPTION_LOG_OFF;
options|= OPTION_LOG_OFF;
thd->variables.option_bits= options;
thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements &
LOG_SLOW_DISABLE_SLAVE);
thd->variables.completion_type= 0;
thd->variables.sql_log_slow=
!MY_TEST(thd->variables.log_slow_disabled_statements &
LOG_SLOW_DISABLE_SLAVE);
DBUG_VOID_RETURN;
}
@@ -8196,7 +8195,7 @@ void Rows_event_tracker::reset()
/*
Update log event tracking data.
Update log event tracking data.
The first- and last- seen event binlog position get memorized, as
well as the end-of-statement status of the last one.
@@ -8206,6 +8205,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos,
const char* buf,
const Format_description_log_event *fdle)
{
DBUG_ENTER("Rows_event_tracker::update");
if (!first_seen)
{
first_seen= pos;
@@ -8214,6 +8214,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos,
last_seen= pos;
DBUG_ASSERT(stmt_end_seen == 0); // We can only have one
stmt_end_seen= get_row_event_stmt_end(buf, fdle);
DBUG_VOID_RETURN;
};

View File

@@ -1942,7 +1942,9 @@ class Grant_tables
if (res)
DBUG_RETURN(res);
if (lock_tables(thd, first, counter, MYSQL_LOCK_IGNORE_TIMEOUT))
if (lock_tables(thd, first, counter,
MYSQL_LOCK_IGNORE_TIMEOUT |
MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
DBUG_RETURN(-1);
p_user_table->set_table(tables[USER_TABLE].table);
@@ -4397,7 +4399,8 @@ static USER_AUTH auth_no_password;
static int replace_user_table(THD *thd, const User_table &user_table,
LEX_USER * const combo, privilege_t rights,
const bool revoke_grant, const bool can_create_user,
const bool revoke_grant,
const bool can_create_user,
const bool no_auto_create)
{
int error = -1;
@@ -5426,6 +5429,7 @@ static int replace_column_table(GRANT_TABLE *g_t,
List_iterator <LEX_COLUMN> iter(columns);
class LEX_COLUMN *column;
int error= table->file->ha_index_init(0, 1);
if (unlikely(error))
{
@@ -5776,7 +5780,7 @@ static int replace_routine_table(THD *thd, GRANT_NAME *grant_name,
*/
table->use_all_columns();
restore_record(table, s->default_values); // Get empty record
restore_record(table, s->default_values); // Get empty record
table->field[0]->store(combo.host.str,combo.host.length, &my_charset_latin1);
table->field[1]->store(db,(uint) strlen(db), &my_charset_latin1);
table->field[2]->store(combo.user.str,combo.user.length, &my_charset_latin1);

View File

@@ -1027,9 +1027,7 @@ send_result_message:
*save_next_global= table->next_global;
table->next_local= table->next_global= 0;
tmp_disable_binlog(thd); // binlogging is done by caller if wanted
result_code= admin_recreate_table(thd, table);
reenable_binlog(thd);
trans_commit_stmt(thd);
trans_commit(thd);
close_thread_tables(thd);

View File

@@ -734,8 +734,9 @@ bool close_cached_connection_tables(THD *thd, LEX_CSTRING *connection)
Clear 'check_table_binlog_row_based_done' flag. For tables which were used
by current substatement the flag is cleared as part of 'ha_reset()' call.
For the rest of the open tables not used by current substament if this
flag is enabled as part of current substatement execution, clear the flag
explicitly.
flag is enabled as part of current substatement execution,
(for example when THD::binlog_write_table_maps() calls
prepare_for_row_logging()), clear the flag explicitly.
NOTE
The reason we reset query_id is that it's not enough to just test
@@ -759,7 +760,7 @@ static void mark_used_tables_as_free_for_reuse(THD *thd, TABLE *table)
table->query_id= 0;
table->file->ha_reset();
}
else if (table->file->check_table_binlog_row_based_done)
else
table->file->clear_cached_table_binlog_row_based_flag();
}
DBUG_VOID_RETURN;
@@ -1660,9 +1661,10 @@ static int set_partitions_as_used(TABLE_LIST *tl, TABLE *t)
needed to remedy problem before retrying again.
@retval FALSE 't' was not locked, not a VIEW or an error happened.
*/
bool is_locked_view(THD *thd, TABLE_LIST *t)
{
DBUG_ENTER("check_locked_view");
DBUG_ENTER("is_locked_view");
/*
Is this table a view and not a base table?
(it is work around to allow to open view with locked tables,
@@ -2212,8 +2214,8 @@ retry_share:
my_error(ER_NOT_SEQUENCE, MYF(0), table_list->db.str, table_list->alias.str);
DBUG_RETURN(true);
}
table->init(thd, table_list);
DBUG_ASSERT(thd->locked_tables_mode || table->file->row_logging == 0);
DBUG_RETURN(FALSE);
@@ -3738,10 +3740,11 @@ open_and_process_table(THD *thd, TABLE_LIST *tables, uint *counter, uint flags,
temporary table or SEQUENCE (see sequence_insert()).
*/
DBUG_ASSERT(is_temporary_table(tables) || tables->table->s->sequence);
if (tables->sequence && tables->table->s->table_type != TABLE_TYPE_SEQUENCE)
if (tables->sequence &&
tables->table->s->table_type != TABLE_TYPE_SEQUENCE)
{
my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str);
DBUG_RETURN(true);
my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str);
DBUG_RETURN(true);
}
}
else if (tables->open_type == OT_TEMPORARY_ONLY)
@@ -5214,7 +5217,9 @@ bool open_and_lock_tables(THD *thd, const DDL_options_st &options,
if (lock_tables(thd, tables, counter, flags))
goto err;
(void) read_statistics_for_tables_if_needed(thd, tables);
/* Don't read statistics tables when opening internal tables */
if (!(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
(void) read_statistics_for_tables_if_needed(thd, tables);
if (derived)
{
@@ -5348,12 +5353,19 @@ bool open_tables_only_view_structure(THD *thd, TABLE_LIST *table_list,
static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list)
{
TABLE_LIST *table;
DBUG_ENTER("mark_real_tables_as_free_for_reuse");
/*
We have to make two loops as HA_EXTRA_DETACH_CHILDREN may
remove items from the table list that we have to reset
*/
for (table= table_list; table; table= table->next_global)
{
if (!table->placeholder())
{
table->table->query_id= 0;
}
}
for (table= table_list; table; table= table->next_global)
{
if (!table->placeholder())
{
/*
@@ -5364,6 +5376,8 @@ static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list)
*/
table->table->file->extra(HA_EXTRA_DETACH_CHILDREN);
}
}
DBUG_VOID_RETURN;
}
@@ -5428,7 +5442,7 @@ err:
bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
{
TABLE_LIST *table;
TABLE_LIST *table, *first_not_own;
DBUG_ENTER("lock_tables");
/*
We can't meet statement requiring prelocking if we already
@@ -5438,7 +5452,9 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
!thd->lex->requires_prelocking());
if (!tables && !thd->lex->requires_prelocking())
DBUG_RETURN(thd->decide_logging_format(tables));
DBUG_RETURN(0);
first_not_own= thd->lex->first_not_own_table();
/*
Check for thd->locked_tables_mode to avoid a redundant
@@ -5454,13 +5470,26 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
{
DBUG_ASSERT(thd->lock == 0); // You must lock everything at once
TABLE **start,**ptr;
bool found_first_not_own= 0;
if (!(ptr=start=(TABLE**) thd->alloc(sizeof(TABLE*)*count)))
DBUG_RETURN(TRUE);
/*
Collect changes tables for table lock.
Mark own tables with query id as this is needed by
prepare_for_row_logging()
*/
for (table= tables; table; table= table->next_global)
{
if (table == first_not_own)
found_first_not_own= 1;
if (!table->placeholder())
*(ptr++)= table->table;
{
*(ptr++)= table->table;
if (!found_first_not_own)
table->table->query_id= thd->query_id;
}
}
DEBUG_SYNC(thd, "before_lock_tables_takes_lock");
@@ -5474,7 +5503,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
if (thd->lex->requires_prelocking() &&
thd->lex->sql_command != SQLCOM_LOCK_TABLES)
{
TABLE_LIST *first_not_own= thd->lex->first_not_own_table();
/*
We just have done implicit LOCK TABLES, and now we have
to emulate first open_and_lock_tables() after it.
@@ -5492,7 +5520,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
{
if (!table->placeholder())
{
table->table->query_id= thd->query_id;
if (check_lock_and_start_stmt(thd, thd->lex, table))
{
mysql_unlock_tables(thd, thd->lock);
@@ -5512,7 +5539,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
}
else
{
TABLE_LIST *first_not_own= thd->lex->first_not_own_table();
/*
When open_and_lock_tables() is called for a single table out of
a table list, the 'next_global' chain is temporarily broken. We
@@ -5528,6 +5554,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
if (table->placeholder())
continue;
table->table->query_id= thd->query_id;
/*
In a stored function or trigger we should ensure that we won't change
a table that is already used by the calling statement.
@@ -5567,7 +5594,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags)
}
bool res= fix_all_session_vcol_exprs(thd, tables);
if (!res)
if (!res && !(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT))
res= thd->decide_logging_format(tables);
DBUG_RETURN(res);
@@ -9005,6 +9032,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
*/
if (open_and_lock_tables(thd, table_list, FALSE,
(MYSQL_OPEN_IGNORE_FLUSH |
MYSQL_OPEN_IGNORE_LOGGING_FORMAT |
(table_list->lock_type < TL_WRITE_ALLOW_WRITE ?
MYSQL_LOCK_IGNORE_TIMEOUT : 0))))
{
@@ -9016,6 +9044,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
for (TABLE_LIST *tables= table_list; tables; tables= tables->next_global)
{
DBUG_ASSERT(tables->table->s->table_category == TABLE_CATEGORY_SYSTEM);
tables->table->file->row_logging= 0;
tables->table->use_all_columns();
}
lex->restore_backup_query_tables_list(&query_tables_list_backup);
@@ -9103,8 +9132,9 @@ open_system_table_for_update(THD *thd, TABLE_LIST *one_table)
{
DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_SYSTEM);
table->use_all_columns();
/* This table instance is not row logged */
table->file->row_logging= 0;
}
DBUG_RETURN(table);
}
@@ -9137,6 +9167,8 @@ open_log_table(THD *thd, TABLE_LIST *one_table, Open_tables_backup *backup)
if ((table= open_ltable(thd, one_table, one_table->lock_type, flags)))
{
DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_LOG);
DBUG_ASSERT(!table->file->row_logging);
/* Make sure all columns get assigned to a default value */
table->use_all_columns();
DBUG_ASSERT(table->s->no_replicate);

View File

@@ -125,6 +125,11 @@ TABLE *open_ltable(THD *thd, TABLE_LIST *table_list, thr_lock_type update,
*/
#define MYSQL_OPEN_IGNORE_REPAIR 0x10000
/**
Don't call decide_logging_format. Used for statistic tables etc
*/
#define MYSQL_OPEN_IGNORE_LOGGING_FORMAT 0x20000
/** Please refer to the internals manual. */
#define MYSQL_OPEN_REOPEN (MYSQL_OPEN_IGNORE_FLUSH |\
MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |\

View File

@@ -636,7 +636,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
m_current_stage_key(0), m_psi(0),
in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
bulk_param(0),
table_map_for_update(0),
m_examined_row_count(0),
@@ -797,6 +796,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
bzero((void*) ha_data, sizeof(ha_data));
mysys_var=0;
binlog_evt_union.do_union= FALSE;
binlog_table_maps= FALSE;
enable_slow_log= 0;
durability_property= HA_REGULAR_DURABILITY;
@@ -1315,8 +1315,6 @@ void THD::init()
else
variables.option_bits&= ~OPTION_BIN_LOG;
variables.sql_log_bin_off= 0;
select_commands= update_commands= other_commands= 0;
/* Set to handle counting of aborted connections */
userstat_running= opt_userstat_running;
@@ -5837,8 +5835,9 @@ int THD::decide_logging_format(TABLE_LIST *tables)
{
DBUG_ENTER("THD::decide_logging_format");
DBUG_PRINT("info", ("Query: %.*s", (uint) query_length(), query()));
DBUG_PRINT("info", ("variables.binlog_format: %lu",
variables.binlog_format));
DBUG_PRINT("info", ("binlog_format: %lu", (ulong) variables.binlog_format));
DBUG_PRINT("info", ("current_stmt_binlog_format: %lu",
(ulong) current_stmt_binlog_format));
DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x",
lex->get_stmt_unsafe_flags()));
@@ -5861,18 +5860,14 @@ int THD::decide_logging_format(TABLE_LIST *tables)
DBUG_RETURN(-1);
}
}
#endif /* WITH_WSREP */
if ((WSREP_EMULATE_BINLOG_NNULL(this) ||
(mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) &&
(mysql_bin_log.is_open() &&
(variables.option_bits & OPTION_BIN_LOG))) &&
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db.str)))
#else
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db.str)))
#endif /* WITH_WSREP */
{
if (is_bulk_op())
{
if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
@@ -5915,6 +5910,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
bool has_auto_increment_write_tables_not_first= FALSE;
bool found_first_not_own_table= FALSE;
bool has_write_tables_with_unsafe_statements= FALSE;
bool blackhole_table_found= 0;
/*
A pointer to a previous table that was changed.
@@ -6040,6 +6036,10 @@ int THD::decide_logging_format(TABLE_LIST *tables)
if (prev_write_table && prev_write_table->file->ht !=
table->file->ht)
multi_write_engine= TRUE;
if (table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB)
blackhole_table_found= 1;
if (share->non_determinstic_insert &&
!(sql_command_flags[lex->sql_command] & CF_SCHEMA_CHANGE))
has_write_tables_with_unsafe_statements= true;
@@ -6285,7 +6285,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
is_current_stmt_binlog_format_row() ?
"ROW" : "STATEMENT"));
if (variables.binlog_format == BINLOG_FORMAT_ROW &&
if (blackhole_table_found &&
variables.binlog_format == BINLOG_FORMAT_ROW &&
(sql_command_flags[lex->sql_command] &
(CF_UPDATES_DATA | CF_DELETES_DATA)))
{
@@ -6301,8 +6302,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB &&
table->lock_type >= TL_WRITE_ALLOW_WRITE)
{
table_names.append(&table->table_name);
table_names.append(",");
table_names.append(&table->table_name);
table_names.append(",");
}
}
if (!table_names.is_empty())
@@ -6322,9 +6323,12 @@ int THD::decide_logging_format(TABLE_LIST *tables)
table_names.c_ptr());
}
}
if (is_write && is_current_stmt_binlog_format_row())
binlog_prepare_for_row_logging();
}
#ifndef DBUG_OFF
else
{
DBUG_PRINT("info", ("decision: no logging since "
"mysql_bin_log.is_open() = %d "
"and (options & OPTION_BIN_LOG) = 0x%llx "
@@ -6334,22 +6338,23 @@ int THD::decide_logging_format(TABLE_LIST *tables)
(variables.option_bits & OPTION_BIN_LOG),
(uint) wsrep_binlog_format(),
binlog_filter->db_ok(db.str)));
#endif
if (WSREP_NNULL(this) && is_current_stmt_binlog_format_row())
binlog_prepare_for_row_logging();
}
DBUG_RETURN(0);
}
int THD::decide_logging_format_low(TABLE *table)
{
DBUG_ENTER("decide_logging_format_low");
/*
INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
can be unsafe.
*/
if(wsrep_binlog_format() <= BINLOG_FORMAT_STMT &&
!is_current_stmt_binlog_format_row() &&
!lex->is_stmt_unsafe() &&
lex->sql_command == SQLCOM_INSERT &&
lex->duplicates == DUP_UPDATE)
INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
can be unsafe.
*/
if (wsrep_binlog_format() <= BINLOG_FORMAT_STMT &&
!is_current_stmt_binlog_format_row() &&
!lex->is_stmt_unsafe() &&
lex->duplicates == DUP_UPDATE)
{
uint unique_keys= 0;
uint keys= table->s->keys, i= 0;
@@ -6376,27 +6381,22 @@ exit:;
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS);
binlog_unsafe_warning_flags|= lex->get_stmt_unsafe_flags();
set_current_stmt_binlog_format_row_if_mixed();
return 1;
if (is_current_stmt_binlog_format_row())
binlog_prepare_for_row_logging();
DBUG_RETURN(1);
}
}
return 0;
DBUG_RETURN(0);
}
/*
Implementation of interface to write rows to the binary log through the
thread. The thread is responsible for writing the rows it has
inserted/updated/deleted.
*/
#ifndef MYSQL_CLIENT
/*
Template member function for ensuring that there is an rows log
event of the apropriate type before proceeding.
PRE CONDITION:
- Events of type 'RowEventT' have the type code 'type_code'.
POST CONDITION:
If a non-NULL pointer is returned, the pending event for thread 'thd' will
be an event of type 'RowEventT' (which have the type code 'type_code')
@@ -6814,7 +6814,8 @@ void THD::binlog_prepare_row_images(TABLE *table)
*/
if (table->s->primary_key < MAX_KEY &&
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
!ha_check_storage_engine_flag(table->s->db_type(),
HTON_NO_BINLOG_ROW_OPT))
{
/**
Just to be sure that tmp_set is currently not in use as
@@ -6859,7 +6860,7 @@ void THD::binlog_prepare_row_images(TABLE *table)
int THD::binlog_remove_pending_rows_event(bool clear_maps,
int THD::binlog_remove_pending_rows_event(bool reset_stmt,
bool is_transactional)
{
DBUG_ENTER("THD::binlog_remove_pending_rows_event");
@@ -6873,12 +6874,12 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
if (clear_maps)
binlog_table_maps= 0;
if (reset_stmt)
reset_binlog_for_next_statement();
DBUG_RETURN(0);
}
int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
{
DBUG_ENTER("THD::binlog_flush_pending_rows_event");
@@ -6904,9 +6905,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
binlog_table_maps= 0;
reset_binlog_for_next_statement();
}
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
is_transactional);
}
@@ -7278,8 +7278,11 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
suppress_use, errcode);
error= mysql_bin_log.write(&qinfo);
}
/*
row logged binlog may not have been reset in the case of locked tables
*/
reset_binlog_for_next_statement();
binlog_table_maps= 0;
DBUG_RETURN(error >= 0 ? error : 1);
}

View File

@@ -75,14 +75,15 @@ void set_thd_stage_info(void *thd,
#include "wsrep_condition_variable.h"
class Wsrep_applier_service;
#endif /* WITH_WSREP */
class Reprepare_observer;
class Relay_log_info;
struct rpl_group_info;
class Rpl_filter;
class Query_log_event;
class Load_log_event;
class Log_event_writer;
class sp_rcontext;
class sp_cache;
class Lex_input_stream;
@@ -737,11 +738,6 @@ typedef struct system_variables
my_bool query_cache_strip_comments;
my_bool sql_log_slow;
my_bool sql_log_bin;
/*
A flag to help detect whether binary logging was temporarily disabled
(see tmp_disable_binlog(A) macro).
*/
my_bool sql_log_bin_off;
my_bool binlog_annotate_row_events;
my_bool binlog_direct_non_trans_update;
my_bool column_compression_zlib_wrap;
@@ -2576,14 +2572,17 @@ public:
*/
void binlog_start_trans_and_stmt();
void binlog_set_stmt_begin();
int binlog_write_table_map(TABLE *table, bool is_transactional,
bool *with_annotate= 0);
int binlog_write_row(TABLE* table, bool is_transactional,
const uchar *buf);
int binlog_delete_row(TABLE* table, bool is_transactional,
const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional,
const uchar *old_data, const uchar *new_data);
bool prepare_handlers_for_update(uint flag);
bool binlog_write_annotated_row(Log_event_writer *writer);
void binlog_prepare_for_row_logging();
bool binlog_write_table_maps();
bool binlog_write_table_map(TABLE *table, bool with_annotate);
static void binlog_prepare_row_images(TABLE* table);
void set_server_id(uint32 sid) { variables.server_id = sid; }
@@ -2677,22 +2676,20 @@ private:
*/
enum_binlog_format current_stmt_binlog_format;
/*
Number of outstanding table maps, i.e., table maps in the
transaction cache.
*/
uint binlog_table_maps;
public:
/* 1 if binlog table maps has been written */
bool binlog_table_maps;
void issue_unsafe_warnings();
void reset_unsafe_warnings()
{ binlog_unsafe_warning_flags= 0; }
uint get_binlog_table_maps() const {
return binlog_table_maps;
}
void clear_binlog_table_maps() {
void reset_binlog_for_next_statement()
{
binlog_table_maps= 0;
}
#endif /* MYSQL_CLIENT */
public:
@@ -5170,11 +5167,10 @@ my_eof(THD *thd)
#define tmp_disable_binlog(A) \
{ulonglong tmp_disable_binlog__save_options= (A)->variables.option_bits; \
(A)->variables.option_bits&= ~OPTION_BIN_LOG; \
(A)->variables.sql_log_bin_off= 1;
(A)->variables.option_bits|= OPTION_BIN_TMP_LOG_OFF;
#define reenable_binlog(A) \
(A)->variables.option_bits= tmp_disable_binlog__save_options; \
(A)->variables.sql_log_bin_off= 0;}
(A)->variables.option_bits= tmp_disable_binlog__save_options; }
inline date_conv_mode_t sql_mode_for_dates(THD *thd)

View File

@@ -959,6 +959,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
goto values_loop_end;
THD_STAGE_INFO(thd, stage_update);
thd->decide_logging_format_low(table);
do
{
DBUG_PRINT("info", ("iteration %llu", iteration));
@@ -1071,7 +1072,6 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
break;
}
thd->decide_logging_format_low(table);
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
@@ -3051,6 +3051,8 @@ bool Delayed_insert::open_and_lock_table()
return TRUE;
}
table->copy_blobs= 1;
table->file->prepare_for_row_logging();
return FALSE;
}
@@ -3110,6 +3112,8 @@ pthread_handler_t handle_delayed_insert(void *arg)
at which rows are inserted cannot be determined in mixed mode.
*/
thd->set_current_stmt_binlog_format_row_if_mixed();
/* Don't annotate insert delayed binlog events */
thd->variables.binlog_annotate_row_events= 0;
/*
Clone tickets representing protection against GRL and the lock on
@@ -3314,8 +3318,19 @@ pthread_handler_t handle_delayed_insert(void *arg)
di->table->file->delete_update_handler();
di->group_count=0;
mysql_audit_release(thd);
/*
Reset binlog. We can't call ha_reset() for the table as this will
reset the table maps we have calculated earlier.
*/
mysql_mutex_lock(&di->mutex);
}
/*
Reset binlog. We can't call ha_reset() for the table as this will
reset the table maps we have calculated earlier.
*/
thd->reset_binlog_for_next_statement();
if (di->tables_in_use)
mysql_cond_broadcast(&di->cond_client); // If waiting clients
}
@@ -3407,9 +3422,7 @@ bool Delayed_insert::handle_inserts(void)
{
int error;
ulong max_rows;
bool has_trans = TRUE;
bool using_ignore= 0, using_opt_replace= 0,
using_bin_log= mysql_bin_log.is_open();
bool using_ignore= 0, using_opt_replace= 0, using_bin_log;
delayed_row *row;
DBUG_ENTER("handle_inserts");
@@ -3443,7 +3456,13 @@ bool Delayed_insert::handle_inserts(void)
if (table->file->ha_rnd_init_with_error(0))
goto err;
/*
We have to call prepare_for_row_logging() as the second call to
handler_writes() will not have called decide_logging_format.
*/
table->file->prepare_for_row_logging();
table->file->prepare_for_insert();
using_bin_log= table->file->row_logging;
/*
We can't use row caching when using the binary log because if
@@ -3452,6 +3471,7 @@ bool Delayed_insert::handle_inserts(void)
*/
if (!using_bin_log)
table->file->extra(HA_EXTRA_WRITE_CACHE);
mysql_mutex_lock(&mutex);
while ((row=rows.get()))
@@ -3480,8 +3500,8 @@ bool Delayed_insert::handle_inserts(void)
Guaranteed that the INSERT DELAYED STMT will not be here
in SBR when mysql binlog is enabled.
*/
DBUG_ASSERT(!(mysql_bin_log.is_open() &&
!thd.is_current_stmt_binlog_format_row()));
DBUG_ASSERT(!mysql_bin_log.is_open() ||
thd.is_current_stmt_binlog_format_row());
/*
This is the first value of an INSERT statement.
@@ -3639,10 +3659,9 @@ bool Delayed_insert::handle_inserts(void)
TODO: Move the logging to last in the sequence of rows.
*/
has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
table->file->has_transactions();
if (thd.is_current_stmt_binlog_format_row() &&
thd.binlog_flush_pending_rows_event(TRUE, has_trans))
if (table->file->row_logging &&
thd.binlog_flush_pending_rows_event(TRUE,
table->file->row_logging_has_trans))
goto err;
if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
@@ -4548,6 +4567,18 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
/* purecov: end */
}
table->s->table_creation_was_logged= save_table_creation_was_logged;
if (!table->s->tmp_table)
table->file->prepare_for_row_logging();
/*
If slave is converting a statement event to row events, log the original
create statement as an annotated row
*/
#ifdef HAVE_REPLICATION
if (thd->slave_thread && opt_replicate_annotate_row_events &&
thd->is_current_stmt_binlog_format_row())
thd->variables.binlog_annotate_row_events= 1;
#endif
DBUG_RETURN(table);
}
@@ -4792,6 +4823,7 @@ bool binlog_create_table(THD *thd, TABLE *table)
logged
*/
thd->set_current_stmt_binlog_format_row();
table->file->prepare_for_row_logging();
return binlog_show_create_table(thd, table, 0) != 0;
}
@@ -4854,6 +4886,9 @@ bool select_create::send_eof()
if (table->s->tmp_table)
thd->transaction.stmt.mark_created_temp_table();
if (thd->slave_thread)
thd->variables.binlog_annotate_row_events= 0;
if (prepare_eof())
{
abort_result_set();

View File

@@ -7436,6 +7436,12 @@ void THD::reset_for_next_command(bool do_clear_error)
DBUG_ENTER("THD::reset_for_next_command");
DBUG_ASSERT(!spcont); /* not for substatements of routines */
DBUG_ASSERT(!in_sub_stmt);
/*
Table maps should have been reset after previous statement except in the
case where we have locked tables
*/
DBUG_ASSERT(binlog_table_maps == 0 ||
locked_tables_mode == LTM_LOCK_TABLES);
if (likely(do_clear_error))
{

View File

@@ -2168,14 +2168,13 @@ static bool finalize_install(THD *thd, TABLE *table, const LEX_CSTRING *name,
of the insert into the plugin table, so that it is not replicated in
row based mode.
*/
tmp_disable_binlog(thd);
DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns();
restore_record(table, s->default_values);
table->field[0]->store(name->str, name->length, system_charset_info);
table->field[1]->store(tmp->plugin_dl->dl.str, tmp->plugin_dl->dl.length,
files_charset_info);
error= table->file->ha_write_row(table->record[0]);
reenable_binlog(thd);
if (unlikely(error))
{
table->file->print_error(error, MYF(0));
@@ -2322,9 +2321,8 @@ static bool do_uninstall(THD *thd, TABLE *table, const LEX_CSTRING *name)
of the delete from the plugin table, so that it is not replicated in
row based mode.
*/
tmp_disable_binlog(thd);
table->file->row_logging= 0; // No logging
error= table->file->ha_delete_row(table->record[0]);
reenable_binlog(thd);
if (unlikely(error))
{
table->file->print_error(error, MYF(0));

View File

@@ -180,6 +180,8 @@
#define OPTION_NO_QUERY_CACHE (1ULL << 39) // SELECT, user
#define OPTION_PROCEDURE_CLAUSE (1ULL << 40) // Internal usage
#define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern
#define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern
#define OPTION_BIN_TMP_LOG_OFF (1ULL << 42) // disable binlog, intern
#define OPTION_LEX_FOUND_COMMENT (1ULL << 0) // intern, parser

View File

@@ -580,7 +580,6 @@ void sequence_definition::adjust_values(longlong next_value)
int sequence_definition::write_initial_sequence(TABLE *table)
{
int error;
THD *thd= table->in_use;
MY_BITMAP *save_write_set;
store_fields(table);
@@ -588,15 +587,14 @@ int sequence_definition::write_initial_sequence(TABLE *table)
table->s->sequence->copy(this);
/*
Sequence values will be replicated as a statement
like 'create sequence'. So disable binary log temporarily
like 'create sequence'. So disable row logging for this table & statement
*/
tmp_disable_binlog(thd);
table->file->row_logging= table->file->row_logging_init= 0;
save_write_set= table->write_set;
table->write_set= &table->s->all_set;
table->s->sequence->initialized= SEQUENCE::SEQ_IN_PREPARE;
error= table->file->ha_write_row(table->record[0]);
table->s->sequence->initialized= SEQUENCE::SEQ_UNINTIALIZED;
reenable_binlog(thd);
table->write_set= save_write_set;
if (unlikely(error))
table->file->print_error(error, MYF(0));

View File

@@ -404,6 +404,7 @@ insert_server(THD *thd, FOREIGN_SERVER *server)
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */
if (! (table= open_ltable(thd, &tables, TL_WRITE, MYSQL_LOCK_IGNORE_TIMEOUT)))
goto end;
table->file->row_logging= 0; // Don't log to binary log
/* insert the server into the table */
if (unlikely(error= insert_server_record(table, server)))
@@ -542,9 +543,9 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
{
int error;
DBUG_ENTER("insert_server_record");
tmp_disable_binlog(table->in_use);
table->use_all_columns();
DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns();
empty_record(table);
/* set the field that's the PK to the value we're looking for */
@@ -577,8 +578,6 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server)
}
else
error= ER_FOREIGN_SERVER_EXISTS;
reenable_binlog(table->in_use);
DBUG_RETURN(error);
}
@@ -895,7 +894,8 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server)
{
int error=0;
DBUG_ENTER("update_server_record");
tmp_disable_binlog(table->in_use);
DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns();
/* set the field that's the PK to the value we're looking for */
table->field[0]->store(server->server_name,
@@ -931,7 +931,6 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server)
}
end:
reenable_binlog(table->in_use);
DBUG_RETURN(error);
}
@@ -956,7 +955,8 @@ delete_server_record(TABLE *table, LEX_CSTRING *name)
{
int error;
DBUG_ENTER("delete_server_record");
tmp_disable_binlog(table->in_use);
DBUG_ASSERT(!table->file->row_logging);
table->use_all_columns();
/* set the field that's the PK to the value we're looking for */
@@ -980,7 +980,6 @@ delete_server_record(TABLE *table, LEX_CSTRING *name)
table->file->print_error(error, MYF(0));
}
reenable_binlog(table->in_use);
DBUG_RETURN(error);
}

View File

@@ -10471,6 +10471,7 @@ do_continue:;
No additional logging of query is needed
*/
binlog_done= 1;
DBUG_ASSERT(new_table->file->row_logging);
new_table->mark_columns_needed_for_insert();
thd->binlog_write_table_map(new_table, 1);
}

View File

@@ -1330,7 +1330,8 @@ bool Table_triggers_list::prepare_record_accessors(TABLE *table)
*/
bool Table_triggers_list::check_n_load(THD *thd, const LEX_CSTRING *db,
const LEX_CSTRING *table_name, TABLE *table,
const LEX_CSTRING *table_name,
TABLE *table,
bool names_only)
{
char path_buff[FN_REFLEN];

View File

@@ -5269,7 +5269,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl)
/* used in RBR Triggers */
master_had_triggers= 0;
#endif
/* Catch wrong handling of the auto_increment_field_not_null. */
DBUG_ASSERT(!auto_increment_field_not_null);
auto_increment_field_not_null= FALSE;
@@ -5284,7 +5283,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl)
}
notnull_cond= 0;
DBUG_ASSERT(!file->keyread_enabled());
restore_record(this, s->default_values);
@@ -7293,8 +7291,7 @@ void TABLE::mark_columns_per_binlog_row_image()
If in RBR we may need to mark some extra columns,
depending on the binlog-row-image command line argument.
*/
if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
thd->is_current_stmt_binlog_format_row() &&
if (file->row_logging &&
!ha_check_storage_engine_flag(s->db_type(), HTON_NO_BINLOG_ROW_OPT))
{
/* if there is no PK, then mark all columns for the BI. */

View File

@@ -1124,13 +1124,11 @@ TABLE *THD::open_temporary_table(TMP_TABLE_SHARE *share,
table->reginfo.lock_type= TL_WRITE; /* Simulate locked */
table->grant.privilege= TMP_TABLE_ACLS;
table->query_id= query_id;
share->tmp_table= (table->file->has_transaction_manager() ?
TRANSACTIONAL_TMP_TABLE : NON_TRANSACTIONAL_TMP_TABLE);
share->not_usable_by_query_cache= 1;
table->pos_in_table_list= 0;
table->query_id= query_id;
/* Add table to the head of table list. */
share->all_tmp_tables.push_front(table);

View File

@@ -302,8 +302,10 @@ bool trans_commit_implicit(THD *thd)
DBUG_RETURN(TRUE);
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
{
DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. "
"Master and slave will have different GTID values"));
}
if (thd->in_multi_stmt_transaction_mode() ||
(thd->variables.option_bits & OPTION_TABLE_LOCK))
@@ -361,9 +363,9 @@ bool trans_rollback(THD *thd)
#ifdef HAVE_REPLICATION
repl_semisync_master.wait_after_rollback(thd, FALSE);
#endif
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
/* Reset the binlog transaction marker */
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG |
OPTION_GTID_BEGIN);
thd->transaction.all.reset();
thd->lex->start_transaction_opt= 0;

View File

@@ -222,7 +222,7 @@ extern wsrep_seqno_t wsrep_locked_seqno;
/* use xxxxxx_NNULL macros when thd pointer is guaranteed to be non-null to
* avoid compiler warnings (GCC 6 and later) */
#define WSREP_NNULL(thd) \
(WSREP_ON && thd->variables.wsrep_on)
(thd->variables.wsrep_on && WSREP_ON)
#define WSREP(thd) \
(thd && WSREP_NNULL(thd))