diff --git a/mysql-test/r/binlog_row_mix_innodb_myisam.result b/mysql-test/r/binlog_row_mix_innodb_myisam.result index ae66f98739d..a192d243bb0 100644 --- a/mysql-test/r/binlog_row_mix_innodb_myisam.result +++ b/mysql-test/r/binlog_row_mix_innodb_myisam.result @@ -359,15 +359,6 @@ show binlog events from 102; Log_name Pos Event_type Server_id End_log_pos Info master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Query 1 # use `test`; BEGIN -master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` ( - `a` int(11) NOT NULL DEFAULT '0', - `b` int(11) DEFAULT NULL, - PRIMARY KEY (`a`) -) ENGINE=InnoDB -master-bin.000001 # Table_map 1 # table_id: # (test.t2) -master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2 master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F @@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2 master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Query 1 # use `test`; BEGIN -master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` ( - `a` int(11) NOT NULL DEFAULT '0', - `b` int(11) DEFAULT NULL, - PRIMARY KEY (`a`) -) ENGINE=InnoDB -master-bin.000001 # Table_map 1 # table_id: # (test.t2) -master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2 master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Table_map 1 # table_id: # (test.t1) diff --git a/sql/log.cc b/sql/log.cc index 05758fd6e7d..0b1b94ac576 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -32,11 +32,22 @@ #include +/* + Define placement versions of operator new and operator delete since + we cannot be sure that the include exists. + */ +inline void *operator new(size_t, void *ptr) { return ptr; } +inline void *operator new[](size_t, void *ptr) { return ptr; } +inline void operator delete(void*, void*) { /* Do nothing */ } +inline void operator delete[](void*, void*) { /* Do nothing */ } + /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_USER_HOST_SIZE 512 #define MAX_TIME_SIZE 32 +#define FLAGSTR(V,F) ((V)&(F)?#F" ":"") + LOGGER logger; MYSQL_BIN_LOG mysql_bin_log; @@ -70,23 +81,96 @@ char *make_default_log_name(char *buff,const char* log_ext) } /* - This is a POD. Please keep it that way! - - Don't add constructors, destructors, or virtual functions. + Helper class to store binary log transaction data. */ -struct binlog_trx_data { +class binlog_trx_data { +public: + enum { + UNDEF_POS = ~ (my_off_t) 0 + }; + + binlog_trx_data() +#ifdef HAVE_ROW_BASED_REPLICATION + : m_pending(0), before_stmt_pos(UNDEF_POS) +#endif + { + trans_log.end_of_file= max_binlog_cache_size; + } + + ~binlog_trx_data() + { +#ifdef HAVE_ROW_BASED_REPLICATION + DBUG_ASSERT(pending() == NULL); +#endif + close_cached_file(&trans_log); + } + + my_off_t position() const { + return my_b_tell(&trans_log); + } + bool empty() const { #ifdef HAVE_ROW_BASED_REPLICATION - return pending == NULL && my_b_tell(&trans_log) == 0; + return pending() == NULL && my_b_tell(&trans_log) == 0; #else return my_b_tell(&trans_log) == 0; #endif } - binlog_trx_data() {} - IO_CACHE trans_log; // The transaction cache + + /* + Truncate the transaction cache to a certain position. This + includes deleting the pending event. + */ + void truncate(my_off_t pos) + { #ifdef HAVE_ROW_BASED_REPLICATION - Rows_log_event *pending; // The pending binrows event + delete pending(); + set_pending(0); +#endif + reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); + } + + /* + Reset the entire contents of the transaction cache, emptying it + completely. + */ + void reset() { + if (!empty()) + truncate(0); +#ifdef HAVE_ROW_BASED_REPLICATION + before_stmt_pos= UNDEF_POS; +#endif + trans_log.end_of_file= max_binlog_cache_size; + } + +#ifdef HAVE_ROW_BASED_REPLICATION + Rows_log_event *pending() const + { + return m_pending; + } + + void set_pending(Rows_log_event *const pending) + { + m_pending= pending; + } +#endif + + IO_CACHE trans_log; // The transaction cache + +private: +#ifdef HAVE_ROW_BASED_REPLICATION + /* + Pending binrows event. This event is the event where the rows are + currently written. + */ + Rows_log_event *m_pending; + +public: + /* + Binlog position before the start of the current statement. + */ + my_off_t before_stmt_pos; #endif }; @@ -1149,6 +1233,69 @@ void Log_to_csv_event_handler:: } + /* + Save position of binary log transaction cache. + + SYNPOSIS + binlog_trans_log_savepos() + + thd The thread to take the binlog data from + pos Pointer to variable where the position will be stored + + DESCRIPTION + + Save the current position in the binary log transaction cache into + the variable pointed to by 'pos' + */ + +static void +binlog_trans_log_savepos(THD *thd, my_off_t *pos) +{ + DBUG_ENTER("binlog_trans_log_savepos"); + DBUG_ASSERT(pos != NULL); + if (thd->ha_data[binlog_hton.slot] == NULL) + thd->binlog_setup_trx_data(); + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + DBUG_ASSERT(mysql_bin_log.is_open()); + *pos= trx_data->position(); + DBUG_PRINT("return", ("*pos=%u", *pos)); + DBUG_VOID_RETURN; +} + + +/* + Truncate the binary log transaction cache. + + SYNPOSIS + binlog_trans_log_truncate() + + thd The thread to take the binlog data from + pos Position to truncate to + + DESCRIPTION + + Truncate the binary log to the given position. Will not change + anything else. + + */ +static void +binlog_trans_log_truncate(THD *thd, my_off_t pos) +{ + DBUG_ENTER("binlog_trans_log_truncate"); + DBUG_PRINT("enter", ("pos=%u", pos)); + + DBUG_ASSERT(thd->ha_data[binlog_hton.slot] != NULL); + /* Only true if binlog_trans_log_savepos() wasn't called before */ + DBUG_ASSERT(pos != ~(my_off_t) 0); + + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + trx_data->truncate(pos); + DBUG_VOID_RETURN; +} + + /* this function is mostly a placeholder. conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open) @@ -1175,27 +1322,62 @@ static int binlog_close_connection(handlerton *hton, THD *thd) { binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; - IO_CACHE *trans_log= &trx_data->trans_log; DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty()); - close_cached_file(trans_log); - thd->ha_data[binlog_hton->slot]= 0; + thd->ha_data[binlog_hton.slot]= 0; + trx_data->~binlog_trx_data(); my_free((gptr)trx_data, MYF(0)); return 0; } +/* + End a transaction. + + SYNOPSIS + binlog_end_trans() + + thd The thread whose transaction should be ended + trx_data Pointer to the transaction data to use + end_ev The end event to use, or NULL + all True if the entire transaction should be ended, false if + only the statement transaction should be ended. + + DESCRIPTION + + End the currently open transaction. The transaction can be either + a real transaction (if 'all' is true) or a statement transaction + (if 'all' is false). + + If 'end_ev' is NULL, the transaction is a rollback of only + transactional tables, so the transaction cache will be truncated + to either just before the last opened statement transaction (if + 'all' is false), or reset completely (if 'all' is true). + */ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) +binlog_end_trans(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev, bool all) { DBUG_ENTER("binlog_end_trans"); int error=0; IO_CACHE *trans_log= &trx_data->trans_log; + DBUG_PRINT("enter", ("transaction: %s, end_ev=%p", + all ? "all" : "stmt", end_ev)); + DBUG_PRINT("info", ("thd->options={ %s%s}", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN))); - - /* NULL denotes ROLLBACK with nothing to replicate */ + /* + NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of + only transactional tables. If the transaction contain changes to + any non-transactiona tables, we need write the transaction and log + a ROLLBACK last. + */ if (end_ev != NULL) { /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + We can always end the statement when ending a transaction since transactions are not allowed inside stored functions. If they were, we would have to ensure that we're not ending a statement @@ -1204,38 +1386,55 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, #ifdef HAVE_ROW_BASED_REPLICATION thd->binlog_flush_pending_rows_event(TRUE); #endif - error= mysql_bin_log.write(thd, trans_log, end_ev); + /* + We write the transaction cache to the binary log if either we're + committing the entire transaction, or if we are doing an + autocommit outside a transaction. + */ + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + { + error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev); + trx_data->reset(); +#ifdef HAVE_ROW_BASED_REPLICATION + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); +#endif + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } + } } #ifdef HAVE_ROW_BASED_REPLICATION else { -#ifdef HAVE_ROW_BASED_REPLICATION - thd->binlog_delete_pending_rows_event(); -#endif + /* + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. + + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + + */ + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + trx_data->reset(); + else + trx_data->truncate(trx_data->before_stmt_pos); // ...statement + + /* + We need to step the table map version on a rollback to ensure + that a new table map event is generated instead of the one that + was written to the thrown-away transaction cache. + */ + mysql_bin_log.update_table_map_version(); } - - /* - We need to step the table map version both after writing the - entire transaction to the log file and after rolling back the - transaction. - - We need to step the table map version after writing the - transaction cache to disk. In addition, we need to step the table - map version on a rollback to ensure that a new table map event is - generated instead of the one that was written to the thrown-away - transaction cache. - */ - mysql_bin_log.update_table_map_version(); #endif - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } - reinit_io_cache(trans_log, WRITE_CACHE, (my_off_t) 0, 0, 1); // cannot fail - trans_log->end_of_file= max_binlog_cache_size; DBUG_RETURN(error); } @@ -1252,26 +1451,31 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) static int binlog_commit(handlerton *hton, THD *thd, bool all) { + int error= 0; DBUG_ENTER("binlog_commit"); binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_ASSERT(mysql_bin_log.is_open() && - (all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))); + DBUG_ASSERT(mysql_bin_log.is_open()); - if (trx_data->empty()) + if (all && trx_data->empty()) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log() + trx_data->reset(); DBUG_RETURN(0); } - if (all) + if (all) { Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE); qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) - DBUG_RETURN(binlog_end_trans(thd, trx_data, &qev)); + int error= binlog_end_trans(thd, trx_data, &qev, all); + DBUG_RETURN(error); } else - DBUG_RETURN(binlog_end_trans(thd, trx_data, &invisible_commit)); + { + int error= binlog_end_trans(thd, trx_data, &invisible_commit, all); + DBUG_RETURN(error); + } } static int binlog_rollback(handlerton *hton, THD *thd, bool all) @@ -1281,13 +1485,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - /* - First assert is guaranteed - see trans_register_ha() call below. - The second must be true. If it is not, we're registering - unnecessary, doing extra work. The cause should be found and eliminated - */ - DBUG_ASSERT(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); - DBUG_ASSERT(mysql_bin_log.is_open() && !trx_data->empty()); + DBUG_ASSERT(mysql_bin_log.is_open()); + + if (trx_data->empty()) { + trx_data->reset(); + DBUG_RETURN(0); + } + /* Update the binary log with a BEGIN/ROLLBACK block if we have cached some queries and we updated some non-transactional @@ -1299,10 +1503,10 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) { Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE); qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) - error= binlog_end_trans(thd, trx_data, &qev); + error= binlog_end_trans(thd, trx_data, &qev, all); } else - error= binlog_end_trans(thd, trx_data, 0); + error= binlog_end_trans(thd, trx_data, 0, all); DBUG_RETURN(error); } @@ -1330,11 +1534,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) { DBUG_ENTER("binlog_savepoint_set"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; - DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(&trx_data->trans_log)); - *(my_off_t *)sv= my_b_tell(&trx_data->trans_log); + binlog_trans_log_savepos(thd, (my_off_t*) sv); /* Write it to the binary log */ int const error= @@ -1349,7 +1550,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); + DBUG_ASSERT(mysql_bin_log.is_open()); /* Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some @@ -1364,7 +1565,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) thd->query, thd->query_length, TRUE, FALSE); DBUG_RETURN(error); } - reinit_io_cache(trans_log, WRITE_CACHE, *(my_off_t *)sv, 0, 0); + binlog_trans_log_truncate(thd, *(my_off_t*)sv); DBUG_RETURN(0); } @@ -2494,7 +2695,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) thread. If the transaction involved MyISAM tables, it should go into binlog even on rollback. */ - (void) pthread_mutex_lock(&LOCK_thread_count); + VOID(pthread_mutex_lock(&LOCK_thread_count)); /* Save variables so that we can reopen the log */ save_name=name; @@ -2526,7 +2727,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) my_free((gptr) save_name, MYF(0)); err: - (void) pthread_mutex_unlock(&LOCK_thread_count); + VOID(pthread_mutex_unlock(&LOCK_thread_count)); pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); @@ -3092,18 +3293,76 @@ int THD::binlog_setup_trx_data() ha_data[binlog_hton->slot]= 0; DBUG_RETURN(1); // Didn't manage to set it up } - trx_data->trans_log.end_of_file= max_binlog_cache_size; + + trx_data= new (ha_data[binlog_hton.slot]) binlog_trx_data; + DBUG_RETURN(0); } +#ifdef HAVE_ROW_BASED_REPLICATION +/* + Function to start a statement and optionally a transaction for the + binary log. + + SYNOPSIS + binlog_start_trans_and_stmt() + + DESCRIPTION + + This function does three things: + - Start a transaction if not in autocommit mode or if a BEGIN + statement has been seen. + + - Start a statement transaction to allow us to truncate the binary + log. + + - Save the currrent binlog position so that we can roll back the + statement by truncating the transaction log. + + We only update the saved position if the old one was undefined, + the reason is that there are some cases (e.g., for CREATE-SELECT) + where the position is saved twice (e.g., both in + select_create::prepare() and THD::binlog_write_table_map()) , but + we should use the first. This means that calls to this function + can be used to start the statement before the first table map + event, to include some extra events. + */ + +void +THD::binlog_start_trans_and_stmt() +{ + DBUG_ENTER("binlog_start_trans_and_stmt"); + binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot]; + DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data)); + if (trx_data) + DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u", + trx_data->before_stmt_pos)); + if (trx_data == NULL || + trx_data->before_stmt_pos == binlog_trx_data::UNDEF_POS) + { + /* + The call to binlog_trans_log_savepos() might create the trx_data + structure, if it didn't exist before, so we save the position + into an auto variable and then write it into the transaction + data for the binary log (i.e., trx_data). + */ + my_off_t pos= 0; + binlog_trans_log_savepos(this, &pos); + trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot]; + + trx_data->before_stmt_pos= pos; + + if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trans_register_ha(this, TRUE, &binlog_hton); + trans_register_ha(this, FALSE, &binlog_hton); + } + DBUG_VOID_RETURN; +} + /* Write a table map to the binary log. - - This function is called from ha_external_lock() after the storage - engine has registered for the transaction. */ -#ifdef HAVE_ROW_BASED_REPLICATION int THD::binlog_write_table_map(TABLE *table, bool is_trans) { int error; @@ -3122,10 +3381,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) Table_map_log_event the_event(this, table, table->s->table_map_id, is_trans, flags); - if (is_trans) - trans_register_ha(this, - (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0, - binlog_hton); + if (is_trans && binlog_table_maps == 0) + binlog_start_trans_and_stmt(); if ((error= mysql_bin_log.write(&the_event))) DBUG_RETURN(error); @@ -3146,7 +3403,7 @@ THD::binlog_get_pending_rows_event() const (since the trx_data is set up there). In that case, we just return NULL. */ - return trx_data ? trx_data->pending : NULL; + return trx_data ? trx_data->pending() : NULL; } void @@ -3159,7 +3416,7 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) (binlog_trx_data*) ha_data[binlog_hton->slot]; DBUG_ASSERT(trx_data); - trx_data->pending= ev; + trx_data->set_pending(ev); } @@ -3168,8 +3425,9 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) (either cached binlog if transaction, or disk binlog). Sets a new pending event. */ -int MYSQL_BIN_LOG:: - flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event) +int +MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, + Rows_log_event* event) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(mysql_bin_log.is_open()); @@ -3182,9 +3440,9 @@ int MYSQL_BIN_LOG:: DBUG_ASSERT(trx_data); - DBUG_PRINT("info", ("trx_data->pending=%p", trx_data->pending)); + DBUG_PRINT("info", ("trx_data->pending()=%p", trx_data->pending())); - if (Rows_log_event* pending= trx_data->pending) + if (Rows_log_event* pending= trx_data->pending()) { IO_CACHE *file= &log_file; @@ -3334,15 +3592,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - bool trans_log_in_use= my_b_tell(trans_log) != 0; - if (event_info->get_cache_stmt() && !trans_log_in_use) - trans_register_ha(thd, - (thd->options & - (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0, - binlog_hton); - if (event_info->get_cache_stmt() || trans_log_in_use) + my_off_t trans_log_pos= my_b_tell(trans_log); + if (event_info->get_cache_stmt() || trans_log_pos != 0) { - DBUG_PRINT("info", ("Using trans_log")); + DBUG_PRINT("info", ("Using trans_log: cache=%d, trans_log_pos=%u", + event_info->get_cache_stmt(), + trans_log_pos)); + if (trans_log_pos == 0) + thd->binlog_start_trans_and_stmt(); file= trans_log; } /* @@ -3546,61 +3803,69 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event) uint length; /* - Log "BEGIN" at the beginning of the transaction. - which may contain more than 1 SQL statement. - */ - if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + We only bother to write to the binary log if there is anything + to write. + */ + if (my_b_tell(cache) > 0) { - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); /* - Imagine this is rollback due to net timeout, after all statements of - the transaction succeeded. Then we want a zero-error code in BEGIN. - In other words, if there was a really serious error code it's already - in the statement's events, there is no need to put it also in this - internally generated event, and as this event is generated late it - would lead to false alarms. - This is safer than thd->clear_error() against kills at shutdown. + Log "BEGIN" at the beginning of the transaction. + which may contain more than 1 SQL statement. */ - qinfo.error_code= 0; - /* - Now this Query_log_event has artificial log_pos 0. It must be adjusted - to reflect the real position in the log. Not doing it would confuse the - slave: it would prevent this one from knowing where he is in the - master's binlog, which would result in wrong positions being shown to - the user, MASTER_POS_WAIT undue waiting etc. - */ - if (qinfo.write(&log_file)) - goto err; - } - /* Read from the file used to cache the queries .*/ - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - goto err; - length=my_b_bytes_in_cache(cache); - DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); - do - { - /* Write data to the binary log file */ - if (my_b_write(&log_file, cache->read_pos, length)) - goto err; - cache->read_pos=cache->read_end; // Mark buffer used up - DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); - } while ((length=my_b_fill(cache))); + if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + { + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); + /* + Imagine this is rollback due to net timeout, after all statements of + the transaction succeeded. Then we want a zero-error code in BEGIN. + In other words, if there was a really serious error code it's already + in the statement's events, there is no need to put it also in this + internally generated event, and as this event is generated late it + would lead to false alarms. + This is safer than thd->clear_error() against kills at shutdown. + */ + qinfo.error_code= 0; + /* + Now this Query_log_event has artificial log_pos 0. It must be adjusted + to reflect the real position in the log. Not doing it would confuse the + slave: it would prevent this one from knowing where he is in the + master's binlog, which would result in wrong positions being shown to + the user, MASTER_POS_WAIT undue waiting etc. + */ + if (qinfo.write(&log_file)) + goto err; + } + /* Read from the file used to cache the queries .*/ + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + goto err; + length=my_b_bytes_in_cache(cache); + DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); + do + { + /* Write data to the binary log file */ + if (my_b_write(&log_file, cache->read_pos, length)) + goto err; + cache->read_pos=cache->read_end; // Mark buffer used up + DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); + } while ((length=my_b_fill(cache))); - if (commit_event->write(&log_file)) - goto err; + if (commit_event && commit_event->write(&log_file)) + goto err; #ifndef DBUG_OFF -DBUG_skip_commit: + DBUG_skip_commit: #endif - if (flush_and_sync()) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); - if (cache->error) // Error on read - { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + if (flush_and_sync()) + goto err; + DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); + if (cache->error) // Error on read + { + sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); + write_error=1; // Don't give more errors + goto err; + } + signal_update(); } - signal_update(); + /* if commit_event is Xid_log_event, increase the number of prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated @@ -3609,7 +3874,7 @@ DBUG_skip_commit: If the commit_event is not Xid_log_event (then it's a Query_log_event) rotate binlog, if necessary. */ - if (commit_event->get_type_code() == XID_EVENT) + if (commit_event && commit_event->get_type_code() == XID_EVENT) { pthread_mutex_lock(&LOCK_prep_xids); prepared_xids++; @@ -4619,12 +4884,17 @@ int TC_LOG_BINLOG::log(THD *thd, my_xid xid) Xid_log_event xle(thd, xid); binlog_trx_data *trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle)); // invert return value + /* + We always commit the entire transaction when writing an XID. Also + note that the return value is inverted. + */ + DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) { pthread_mutex_lock(&LOCK_prep_xids); + DBUG_ASSERT(prepared_xids > 0); if (--prepared_xids == 0) pthread_cond_signal(&COND_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids); diff --git a/sql/sql_class.h b/sql/sql_class.h index 6b46c9676f7..95283ec2fc8 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -930,6 +930,7 @@ public: /* Public interface to write RBR events to the binlog */ + void binlog_start_trans_and_stmt(); int binlog_write_table_map(TABLE *table, bool is_transactional); int binlog_write_row(TABLE* table, bool is_transactional, MY_BITMAP const* cols, my_size_t colcnt, diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index c0bc6628754..c5a2e8674e6 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1266,7 +1266,7 @@ err: if (thd->lex->current_select) thd->lex->current_select->no_error= 0; // Give error table->file->print_error(error,MYF(0)); - + before_trg_err: table->file->restore_auto_increment(prev_insert_id); if (key) @@ -2014,6 +2014,10 @@ err: rolled back. We only need to roll back a potential statement transaction, since real transactions are rolled back in close_thread_tables(). + + TODO: This is not true any more, table maps are generated on the + first call to ha_*_row() instead. Remove code that are used to + cover for the case outlined above. */ ha_rollback_stmt(thd); @@ -2357,6 +2361,7 @@ select_insert::prepare(List &values, SELECT_LEX_UNIT *u) DBUG_ENTER("select_insert::prepare"); unit= u; + /* Since table in which we are going to insert is added to the first select, LEX::current_select should point to the first select while @@ -2586,56 +2591,54 @@ void select_insert::send_error(uint errcode,const char *err) if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) my_message(errcode, err, MYF(0)); - if (!table) + /* + If the creation of the table failed (due to a syntax error, for + example), no table will have been opened and therefore 'table' + will be NULL. In that case, we still need to execute the rollback + and the end of the function to truncate the binary log, but we can + skip all the intermediate steps. + */ + if (table) { /* - This can only happen when using CREATE ... SELECT and the table was not - created becasue of an syntax error + If we are not in prelocked mode, we end the bulk insert started + before. */ - DBUG_VOID_RETURN; - } - if (!thd->prelocked_mode) - table->file->ha_end_bulk_insert(); - /* - If at least one row has been inserted/modified and will stay in the table - (the table doesn't have transactions) we must write to the binlog (and - the error code will make the slave stop). + if (!thd->prelocked_mode) + table->file->ha_end_bulk_insert(); - For many errors (example: we got a duplicate key error while - inserting into a MyISAM table), no row will be added to the table, - so passing the error to the slave will not help since there will - be an error code mismatch (the inserts will succeed on the slave - with no error). + /* + If at least one row has been inserted/modified and will stay in + the table (the table doesn't have transactions) we must write to + the binlog (and the error code will make the slave stop). - If we are using row-based replication we have two cases where this - code is executed: replication of CREATE-SELECT and replication of - INSERT-SELECT. + For many errors (example: we got a duplicate key error while + inserting into a MyISAM table), no row will be added to the table, + so passing the error to the slave will not help since there will + be an error code mismatch (the inserts will succeed on the slave + with no error). - When replicating a CREATE-SELECT statement, we shall not write the - events to the binary log and should thus not set - OPTION_STATUS_NO_TRANS_UPDATE. - - When replicating INSERT-SELECT, we shall not write the events to - the binary log for transactional table, but shall write all events - if there is one or more writes to non-transactional tables. In - this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a - write to a non-transactional table, otherwise it is cleared. - */ - if (info.copied || info.deleted || info.updated) - { - if (!table->file->has_transactions()) + If table creation failed, the number of rows modified will also be + zero, so no check for that is made. + */ + if (info.copied || info.deleted || info.updated) { - if (mysql_bin_log.is_open()) + DBUG_ASSERT(table != NULL); + if (!table->file->has_transactions()) { - thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + if (mysql_bin_log.is_open()) + { + thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, + table->file->has_transactions(), FALSE); + } + if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && + !can_rollback_data()) + thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; + query_cache_invalidate3(thd, table, 1); } - if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && - !can_rollback_data()) - thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; - query_cache_invalidate3(thd, table, 1); } } + ha_rollback_stmt(thd); table->file->ha_release_auto_increment(); DBUG_VOID_RETURN; @@ -2645,8 +2648,11 @@ void select_insert::send_error(uint errcode,const char *err) bool select_insert::send_eof() { int error,error2; + bool const trans_table= table->file->has_transactions(); ulonglong id; DBUG_ENTER("select_insert::send_eof"); + DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", + trans_table, table->file->table_type())); error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); @@ -2666,9 +2672,8 @@ bool select_insert::send_eof() are not logged in RBR) - We are using statement based replication */ - if (!table->file->has_transactions() && - (!table->s->tmp_table || - !thd->current_stmt_binlog_row_based)) + if (!trans_table && + (!table->s->tmp_table || !thd->current_stmt_binlog_row_based)) thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; } @@ -2684,11 +2689,22 @@ bool select_insert::send_eof() thd->clear_error(); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + trans_table, FALSE); + } + /* + We will call ha_autocommit_or_rollback() also for + non-transactional tables under row-based replication: there might + be events in the binary logs transaction, and we need to write + them to the binary log. + */ + if (trans_table || thd->current_stmt_binlog_row_based) + { + int const error2= ha_autocommit_or_rollback(thd, error); + if (error2 && !error) + error=error2; } - if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) - error=error2; table->file->ha_release_auto_increment(); + if (error) { table->file->print_error(error,MYF(0)); @@ -2885,14 +2901,19 @@ select_create::prepare(List &values, SELECT_LEX_UNIT *u) class MY_HOOKS : public TABLEOP_HOOKS { public: MY_HOOKS(select_create *x) : ptr(x) { } - virtual void do_prelock(TABLE **tables, uint count) - { - if (ptr->get_thd()->current_stmt_binlog_row_based && - !(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE)) - ptr->binlog_show_create_table(tables, count); - } private: + virtual void do_prelock(TABLE **tables, uint count) + { + TABLE const *const table = *tables; + if (ptr->get_thd()->current_stmt_binlog_row_based && + table->s->tmp_table == NO_TMP_TABLE && + !ptr->get_create_info()->table_existed) + { + ptr->binlog_show_create_table(tables, count); + } + } + select_create *ptr; }; @@ -2901,6 +2922,20 @@ select_create::prepare(List &values, SELECT_LEX_UNIT *u) #endif unit= u; + +#ifdef HAVE_ROW_BASED_REPLICATION + /* + Start a statement transaction before the create if we are creating + a non-temporary table and are using row-based replication for the + statement. + */ + if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 && + thd->current_stmt_binlog_row_based) + { + thd->binlog_start_trans_and_stmt(); + } +#endif + if (!(table= create_table_from_items(thd, create_info, create_table, extra_fields, keys, &values, &thd->extra_lock, hook_ptr))) @@ -3048,8 +3083,17 @@ void select_create::abort() table->s->version= 0; hash_delete(&open_cache,(byte*) table); if (!create_info->table_existed) + { quick_rm_table(table_type, create_table->db, create_table->table_name, 0); + /* + We roll back the statement, including truncating the + transaction cache of the binary log, if the statement + failed. + */ + if (thd->current_stmt_binlog_row_based) + ha_rollback_stmt(thd); + } /* Tell threads waiting for refresh that something has happened */ if (version != refresh_version) broadcast_refresh();