From 2e2b2a046942bb8f743202c946300b9a3b0d58fd Mon Sep 17 00:00:00 2001 From: Nikita Malyavin Date: Wed, 28 Dec 2022 23:05:46 +0300 Subject: [PATCH] MDEV-15990 Refactor write_record and fix idempotent replication See also MDEV-30046. Idempotent write_row works same as REPLACE: if there is a duplicating record in the table, then it will be deleted and re-inserted, with the same update optimization. The code in Rows:log_event::write_row was basically copy-pasted from write_record. What's done: REPLACE operation was unified across replication and sql. It is now representred as a Write_record class, that holds the whole state, and allows re-using some resources in between the row writes. Replace, IODKU and single insert implementations are split across different methods, reluting in a much cleaner code. The entry point is preserved as a single Write_record::write_record() call. The implementation to call is chosen on the constructor stage. This allowed several optimizations to be done: 1. The table key list is not iterated for every row. We find last unique key in the order of checking once and preserve it across the rows. See last_uniq_key(). 2. ib_handler::referenced_by_foreign_key acquires a global lock. This call was done per row as well. Not all the table config that allows optimized replace is folded into a single boolean field can_optimize. All the fields to check are even stored in a single register on a 64-bit platform. 3. DUP_REPLACE and DUP_UPDATE cases now have one less level of indirection 4. modified_non_trans_tables is checked and set only when it's really needed. 5. Obsolete bitmap manipulations are removed. Also: * Unify replace initialization step across implementations: add prepare_for_replace and finalize_replace * alloca is removed in favor of mem_root allocation. This memory is reused across the rows. * An rpl-related callback is added to the replace branch, meaning that an extra check is made per row replace even for the common case. It can be avoided with templates if considered a problem. --- .../main/long_unique_bugs_replication.result | 48 +- .../main/long_unique_bugs_replication.test | 46 +- sql/handler.cc | 6 - sql/handler.h | 8 +- sql/log_event.h | 21 +- sql/log_event_server.cc | 273 ++--- sql/sql_class.h | 9 +- sql/sql_insert.cc | 1084 +++++++++-------- sql/sql_insert.h | 87 +- sql/sql_load.cc | 29 +- sql/sql_parse.cc | 4 +- sql/sql_table.cc | 5 +- 12 files changed, 873 insertions(+), 747 deletions(-) diff --git a/mysql-test/main/long_unique_bugs_replication.result b/mysql-test/main/long_unique_bugs_replication.result index 39b0ebe26d2..ab6674f886f 100644 --- a/mysql-test/main/long_unique_bugs_replication.result +++ b/mysql-test/main/long_unique_bugs_replication.result @@ -9,20 +9,64 @@ insert into t1 values (2,2); update t1 set a1 = 'd' limit 1; update t1 set a1 = 'd2' where i1= 2; connection slave; +connection slave; +select * from t1; +i1 a1 +1 d +2 d2 connection master; drop table t1; +connection slave; +connection master; # # MDEV-32093 long uniques break old->new replication # -connection slave; create table t1 (id int not null, b1 varchar(255) not null, b2 varchar(2550) not null, unique (id), unique key (b1,b2) using hash) default charset utf8mb3; set global slave_exec_mode=idempotent; binlog 'aRf2ZA8BAAAA/AAAAAABAAAAAAQAMTAuNS4xNS1NYXJpYURCLWxvZwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABpF/ZkEzgNAAgAEgAEBAQEEgAA5AAEGggAAAAICAgCAAAACgoKAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEEwQADQgICAoKCgFRmTlk'; binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A==bBf2ZBcBAAAANAAAAMwHAAAAAHEAAAAAAAEABP/wj6QAAAEAYgEAZa6/VU0JAAAANteqUw=='; binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A==bBf2ZBcBAAAANAAAAMwHAAAAAHEAAAAAAAEABP/wj6QAAAEAYgEAZa6/VU0JAAAANteqUw=='; binlog 'bBf2ZBMBAAAANAAAAHUkAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AaTGFIg==bBf2ZBgBAAAASAAAAL0kAAAAAHEAAAAAAAEABP//8I+kAAABAGIBAGWuv1VNCQAAAPBuWwAAAQBiAQBlrr9VTQkAAADxS9Lu'; -drop table t1; +connection slave; +select * from t1; +id b1 b2 +23406 b e +connection master; set global slave_exec_mode=default; +drop table t1; +# +# End of 10.4 tests +# +# Idempotent scenario, which triggers REPLACE code to be used in the +# event, i.e. duplicated record will be deleted and then re-inserted. +create table t1 (i1 int, a1 text, unique key i1 (a1)) engine=myisam; +connection slave; +connection slave; +set @save_slave_exec_mode= @@slave_exec_mode; +set global slave_exec_mode = idempotent; +insert into t1 values (1,1); +connection master; +insert into t1 values (2,1); +connection slave; +connection slave; +select * from t1; +i1 a1 +2 1 +connection master; +insert into t1 values (3,3); +update t1 set a1 = 'd' limit 1; +update t1 set a1 = 'd2' where i1= 3; +connection slave; +connection slave; +select * from t1; +i1 a1 +2 d +3 d2 +set global slave_exec_mode = @save_slave_exec_mode; +connection master; +drop table t1; +connection slave; +connection master; # # End of 10.4 tests # diff --git a/mysql-test/main/long_unique_bugs_replication.test b/mysql-test/main/long_unique_bugs_replication.test index 9c44d13e6a5..f2225cde101 100644 --- a/mysql-test/main/long_unique_bugs_replication.test +++ b/mysql-test/main/long_unique_bugs_replication.test @@ -16,17 +16,21 @@ update t1 set a1 = 'd' limit 1; update t1 set a1 = 'd2' where i1= 2; sync_slave_with_master; - +connection slave; +select * from t1; connection master; drop table t1; +sync_slave_with_master; +connection master; + + --echo # --echo # MDEV-32093 long uniques break old->new replication --echo # # this is techically a bug in replication, but it needs an old master # so we'll run it as a non-replicated test with BINLOG command -sync_slave_with_master; create table t1 (id int not null, b1 varchar(255) not null, b2 varchar(2550) not null, unique (id), unique key (b1,b2) using hash) default charset utf8mb3; set global slave_exec_mode=idempotent; @@ -40,10 +44,46 @@ binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A== ### UPDATE t1 WHERE (42127, 'b', 'e', 39952170926) SET (23406, 'b', 'e', 39952170926) binlog 'bBf2ZBMBAAAANAAAAHUkAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AaTGFIg==bBf2ZBgBAAAASAAAAL0kAAAAAHEAAAAAAAEABP//8I+kAAABAGIBAGWuv1VNCQAAAPBuWwAAAQBiAQBlrr9VTQkAAADxS9Lu'; -drop table t1; +sync_slave_with_master; +select * from t1; +connection master; set global slave_exec_mode=default; +drop table t1; --echo # --echo # End of 10.4 tests --echo # + +--echo # Idempotent scenario, which triggers REPLACE code to be used in the +--echo # event, i.e. duplicated record will be deleted and then re-inserted. +create table t1 (i1 int, a1 text, unique key i1 (a1)) engine=myisam; + +sync_slave_with_master; +connection slave; +set @save_slave_exec_mode= @@slave_exec_mode; +set global slave_exec_mode = idempotent; +insert into t1 values (1,1); +connection master; +insert into t1 values (2,1); +sync_slave_with_master; +connection slave; +select * from t1; +connection master; +insert into t1 values (3,3); +update t1 set a1 = 'd' limit 1; +update t1 set a1 = 'd2' where i1= 3; +sync_slave_with_master; + +connection slave; +select * from t1; +set global slave_exec_mode = @save_slave_exec_mode; +connection master; +drop table t1; +sync_slave_with_master; +connection master; + +--echo # +--echo # End of 10.4 tests +--echo # + --source include/rpl_end.inc diff --git a/sql/handler.cc b/sql/handler.cc index f770296f8df..6ac0ee5b9a4 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -5003,12 +5003,6 @@ uint handler::get_dup_key(int error) DBUG_RETURN(errkey); } -bool handler::has_dup_ref() const -{ - DBUG_ASSERT(lookup_errkey != (uint)-1 || errkey != (uint)-1); - return ha_table_flags() & HA_DUPLICATE_POS || lookup_errkey != (uint)-1; -} - /** Delete all files with extension from bas_ext(). diff --git a/sql/handler.h b/sql/handler.h index eeeee42c2ca..efa4b126d2c 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -3138,7 +3138,6 @@ protected: Table_flags cached_table_flags; /* Set on init() and open() */ ha_rows estimation_rows_to_insert; - handler *lookup_handler; /* Statistics for the query. Updated if handler_stats.active is set */ ha_handler_stats active_handler_stats; void set_handler_stats(); @@ -3147,6 +3146,7 @@ public: uchar *ref; /* Pointer to current row */ uchar *dup_ref; /* Pointer to duplicate row */ uchar *lookup_buffer; + handler *lookup_handler; /* General statistics for the table like number of row, file sizes etc */ ha_statistics stats; @@ -3348,9 +3348,8 @@ public: handler(handlerton *ht_arg, TABLE_SHARE *share_arg) :table_share(share_arg), table(0), estimation_rows_to_insert(0), - lookup_handler(this), - ht(ht_arg), ref(0), lookup_buffer(NULL), handler_stats(NULL), - end_range(NULL), implicit_emptied(0), + ht(ht_arg), ref(0), lookup_buffer(NULL), lookup_handler(this), + handler_stats(NULL), end_range(NULL), implicit_emptied(0), mark_trx_read_write_done(0), check_table_binlog_row_based_done(0), check_table_binlog_row_based_result(0), @@ -3543,7 +3542,6 @@ public: virtual void print_error(int error, myf errflag); virtual bool get_error_message(int error, String *buf); uint get_dup_key(int error); - bool has_dup_ref() const; /** Retrieves the names of the table and the key for which there was a duplicate entry in the case of HA_ERR_FOREIGN_DUPLICATE_KEY. diff --git a/sql/log_event.h b/sql/log_event.h index 3263637ecf6..9403e15e4c6 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -53,6 +53,7 @@ #include "rpl_record.h" #include "rpl_reporting.h" #include "sql_class.h" /* THD */ +#include "sql_insert.h" #endif #include "rpl_gtid.h" @@ -4930,7 +4931,7 @@ public: enum_logged_status logged_status() override { return LOGGED_TABLE_MAP; } bool is_valid() const override { return m_memory != NULL; /* we check malloc */ } - int get_data_size() override { return (uint) m_data_size; } + int get_data_size() override { return (uint) m_data_size; } #ifdef MYSQL_SERVER #ifdef HAVE_REPLICATION bool is_part_of_group() override { return 1; } @@ -5345,7 +5346,6 @@ protected: int find_key(); // Find a best key to use in find_row() int find_row(rpl_group_info *); - int write_row(rpl_group_info *, const bool); int update_sequence(); // Unpack the current row into m_table->record[0], but with @@ -5410,8 +5410,9 @@ private: The member function will return 0 if all went OK, or a non-zero error code otherwise. */ - virtual - int do_before_row_operations(const Slave_reporting_capability *const log) = 0; + virtual + int do_before_row_operations(rpl_group_info *log, + COPY_INFO*, Write_record*) = 0; /* Primitive to clean up after a sequence of row executions. @@ -5489,6 +5490,7 @@ public: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) uint8 get_trg_event_map() override; + int incomplete_record_callback(rpl_group_info *rgi); #endif private: @@ -5499,7 +5501,10 @@ private: #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - int do_before_row_operations(const Slave_reporting_capability *const) override; + Write_record *m_write_record; + int write_row(rpl_group_info *, bool); + int do_before_row_operations(rpl_group_info *rgi, + COPY_INFO*, Write_record*) override; int do_after_row_operations(const Slave_reporting_capability *const,int) override; int do_exec_row(rpl_group_info *) override; #endif @@ -5587,7 +5592,8 @@ protected: #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - int do_before_row_operations(const Slave_reporting_capability *const) override; + int do_before_row_operations(rpl_group_info *rgi, + COPY_INFO*, Write_record*) override; int do_after_row_operations(const Slave_reporting_capability *const,int) override; int do_exec_row(rpl_group_info *) override; #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */ @@ -5672,7 +5678,8 @@ protected: #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - int do_before_row_operations(const Slave_reporting_capability *const) override; + int do_before_row_operations(rpl_group_info *rgi, + COPY_INFO*, Write_record*) override; int do_after_row_operations(const Slave_reporting_capability *const,int) override; int do_exec_row(rpl_group_info *) override; #endif diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index b1058dd0b6b..49fbc11c436 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -6133,7 +6133,10 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) Rows_log_event::Db_restore_ctx restore_ctx(this); master_had_triggers= table->master_had_triggers; bool transactional_table= table->file->has_transactions_and_rollback(); - table->file->prepare_for_insert(get_general_type_code() != WRITE_ROWS_EVENT); + this->slave_exec_mode= slave_exec_mode_options; // fix the mode + + table->file->prepare_for_insert(get_general_type_code() != WRITE_ROWS_EVENT + || slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); /* table == NULL means that this table should not be replicated @@ -6178,10 +6181,10 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) } m_table->mark_columns_per_binlog_row_image(); - this->slave_exec_mode= slave_exec_mode_options; // fix the mode - - // Do event specific preparations - error= do_before_row_operations(rli); + COPY_INFO copy_info; + Write_record write_record; + // Do event specific preparations + error= do_before_row_operations(rgi, ©_info, &write_record); /* Bug#56662 Assertion failed: next_insert_id == 0, file handler.cc @@ -7547,8 +7550,21 @@ bool Write_rows_compressed_log_event::write() #if defined(HAVE_REPLICATION) -int -Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) + +int Write_rows_log_event::incomplete_record_callback(rpl_group_info *rgi) +{ + restore_record(m_table,record[1]); + int error= unpack_current_row(rgi); + if (!error && m_table->s->long_unique_table) + error= m_table->update_virtual_fields(m_table->file, VCOL_UPDATE_FOR_WRITE); + return error; +} + + +int +Write_rows_log_event::do_before_row_operations(rpl_group_info *rgi, + COPY_INFO* copy_info, + Write_record* write_record) { int error= 0; @@ -7622,6 +7638,39 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability m_table->mark_auto_increment_column(true); } + if (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT && + (m_table->file->ha_table_flags() & HA_DUPLICATE_POS || + m_table->s->long_unique_table)) + error= m_table->file->ha_rnd_init_with_error(0); + + if (!error) + { + bzero(copy_info, sizeof *copy_info); + copy_info->handle_duplicates= + slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT ? + DUP_REPLACE : DUP_ERROR; + copy_info->table_list= m_table->pos_in_table_list; + + int (*callback)(void *, void*)= NULL; + if (!get_flags(COMPLETE_ROWS_F)) + { + /* + If row is incomplete we will use the record found to fill + missing columns. + */ + callback= [](void *e, void* r)->int { + auto rgi= static_cast(r); + auto event= static_cast(e); + return event->incomplete_record_callback(rgi); + }; + } + new (write_record) Write_record(thd, m_table, copy_info, + m_table->versioned(VERS_TIMESTAMP), + m_table->triggers && do_invoke_trigger(), + NULL, callback, this, rgi); + m_write_record= write_record; + } + return error; } @@ -7663,7 +7712,15 @@ Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability * { m_table->file->print_error(local_error, MYF(0)); } - return error? error : local_error; + int rnd_error= 0; + if (m_table->file->inited) + { + DBUG_ASSERT(slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); + DBUG_ASSERT(m_table->file->ha_table_flags() & HA_DUPLICATE_POS || + m_table->s->long_unique_table); + rnd_error= m_table->file->ha_rnd_end(); + } + return error? error : local_error ? local_error : rnd_error; } bool Rows_log_event::process_triggers(trg_event_type event, @@ -7686,17 +7743,6 @@ bool Rows_log_event::process_triggers(trg_event_type event, DBUG_RETURN(result); } -/* - Check if there are more UNIQUE keys after the given key. -*/ -static int -last_uniq_key(TABLE *table, uint keyno) -{ - while (++keyno < table->s->keys) - if (table->key_info[keyno].flags & HA_NOSAME) - return 0; - return 1; -} /** Check if an error is a duplicate key error. @@ -7759,22 +7805,20 @@ is_duplicate_key_error(int errcode) */ int -Rows_log_event::write_row(rpl_group_info *rgi, - const bool overwrite) +Write_rows_log_event::write_row(rpl_group_info *rgi, + const bool overwrite) { DBUG_ENTER("write_row"); DBUG_ASSERT(m_table != NULL && thd != NULL); TABLE *table= m_table; // pointer to event's table - int error; - int UNINIT_VAR(keynum); const bool invoke_triggers= (m_table->triggers && do_invoke_trigger()); - auto_afree_ptr key(NULL); prepare_record(table, m_width, true); /* unpack row into table->record[0] */ - if (unlikely((error= unpack_current_row(rgi)))) + int error= unpack_current_row(rgi); + if (unlikely(error)) { table->file->print_error(error, MYF(0)); DBUG_RETURN(error); @@ -7843,178 +7887,11 @@ Rows_log_event::write_row(rpl_group_info *rgi, my_sleep(20000);); if (table->s->sequence) error= update_sequence(); - else while (unlikely(error= table->file->ha_write_row(table->record[0]))) + else { - if (error == HA_ERR_LOCK_DEADLOCK || - error == HA_ERR_LOCK_WAIT_TIMEOUT || - (keynum= table->file->get_dup_key(error)) < 0 || - !overwrite) - { - DBUG_PRINT("info",("get_dup_key returns %d)", keynum)); - /* - Deadlock, waiting for lock or just an error from the handler - such as HA_ERR_FOUND_DUPP_KEY when overwrite is false. - Retrieval of the duplicate key number may fail - - either because the error was not "duplicate key" error - - or because the information which key is not available - */ - table->file->print_error(error, MYF(0)); - DBUG_RETURN(error); - } - /* - We need to retrieve the old row into record[1] to be able to - either update or delete the offending record. We either: + error= m_write_record->write_record(); - - use rnd_pos() with a row-id (available as dupp_row) to the - offending row, if that is possible (MyISAM and Blackhole), or else - - - use index_read_idx() with the key that is duplicated, to - retrieve the offending row. - */ - if (table->file->ha_table_flags() & HA_DUPLICATE_POS) - { - DBUG_PRINT("info",("Locating offending record using rnd_pos()")); - - if ((error= table->file->ha_rnd_init_with_error(0))) - { - DBUG_RETURN(error); - } - - error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref); - if (unlikely(error)) - { - DBUG_PRINT("info",("rnd_pos() returns error %d",error)); - table->file->print_error(error, MYF(0)); - DBUG_RETURN(error); - } - table->file->ha_rnd_end(); - } - else - { - DBUG_PRINT("info",("Locating offending record using index_read_idx()")); - - if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) - { - DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE")); - DBUG_RETURN(my_errno); - } - - if (key.get() == NULL) - { - key.assign(static_cast(my_alloca(table->s->max_unique_length))); - if (key.get() == NULL) - { - DBUG_PRINT("info",("Can't allocate key buffer")); - DBUG_RETURN(ENOMEM); - } - } - - key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, - 0); - error= table->file->ha_index_read_idx_map(table->record[1], keynum, - (const uchar*)key.get(), - HA_WHOLE_KEY, - HA_READ_KEY_EXACT); - if (unlikely(error)) - { - DBUG_PRINT("info",("index_read_idx() returns %s", HA_ERR(error))); - table->file->print_error(error, MYF(0)); - DBUG_RETURN(error); - } - } - - /* - Now, record[1] should contain the offending row. That - will enable us to update it or, alternatively, delete it (so - that we can insert the new row afterwards). - */ - if (table->s->long_unique_table) - { - /* same as for REPLACE/ODKU */ - table->move_fields(table->field, table->record[1], table->record[0]); - table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE); - table->move_fields(table->field, table->record[0], table->record[1]); - } - - /* - If row is incomplete we will use the record found to fill - missing columns. - */ - if (!get_flags(COMPLETE_ROWS_F)) - { - restore_record(table,record[1]); - error= unpack_current_row(rgi); - if (table->s->long_unique_table) - table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_WRITE); - } - - DBUG_PRINT("debug",("preparing for update: before and after image")); - DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength); - DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength); - - /* - REPLACE is defined as either INSERT or DELETE + INSERT. If - possible, we can replace it with an UPDATE, but that will not - work on InnoDB if FOREIGN KEY checks are necessary. - - I (Matz) am not sure of the reason for the last_uniq_key() - check as, but I'm guessing that it's something along the - following lines. - - Suppose that we got the duplicate key to be a key that is not - the last unique key for the table and we perform an update: - then there might be another key for which the unique check will - fail, so we're better off just deleting the row and inserting - the correct row. - - Additionally we don't use UPDATE if rbr triggers should be invoked - - when triggers are used we want a simple and predictable execution path. - */ - if (last_uniq_key(table, keynum) && !invoke_triggers && - !table->file->referenced_by_foreign_key()) - { - DBUG_PRINT("info",("Updating row using ha_update_row()")); - error= table->file->ha_update_row(table->record[1], - table->record[0]); - switch (error) { - - case HA_ERR_RECORD_IS_THE_SAME: - DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from" - " ha_update_row()")); - error= 0; - - case 0: - break; - - default: - DBUG_PRINT("info",("ha_update_row() returns error %d",error)); - table->file->print_error(error, MYF(0)); - } - - DBUG_RETURN(error); - } - else - { - DBUG_PRINT("info",("Deleting offending row and trying to write new one again")); - if (invoke_triggers && - unlikely(process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, - TRUE))) - error= HA_ERR_GENERIC; // in case if error is not set yet - else - { - if (unlikely((error= table->file->ha_delete_row(table->record[1])))) - { - DBUG_PRINT("info",("ha_delete_row() returns error %d",error)); - table->file->print_error(error, MYF(0)); - DBUG_RETURN(error); - } - if (invoke_triggers && - unlikely(process_triggers(TRG_EVENT_DELETE, TRG_ACTION_AFTER, - TRUE))) - DBUG_RETURN(HA_ERR_GENERIC); // in case if error is not set yet - } - /* Will retry ha_write_row() with the offending row removed. */ - } + DBUG_RETURN(error ? m_write_record->last_errno() : 0); } if (invoke_triggers && @@ -8663,7 +8540,8 @@ bool Delete_rows_compressed_log_event::write() #if defined(HAVE_REPLICATION) int -Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +Delete_rows_log_event::do_before_row_operations(rpl_group_info *rgi, + COPY_INFO*, Write_record*) { /* Increment the global status delete count variable @@ -8793,7 +8671,8 @@ void Update_rows_log_event::init(MY_BITMAP const *cols) #if defined(HAVE_REPLICATION) int -Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +Update_rows_log_event::do_before_row_operations(rpl_group_info *, + COPY_INFO*, Write_record*) { /* Increment the global status update count variable diff --git a/sql/sql_class.h b/sql/sql_class.h index 4edc4a081b3..b48af3788a7 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -6395,6 +6395,8 @@ public: int send_data(List &items) override; }; +class Write_record; // defined in sql_insert.h + class select_insert :public select_result_interceptor { public: @@ -6402,13 +6404,14 @@ class select_insert :public select_result_interceptor { TABLE_LIST *table_list; TABLE *table; List *fields; + Write_record *write; ulonglong autoinc_value_of_last_inserted_row; // autogenerated or not COPY_INFO info; bool insert_into_view; select_insert(THD *thd_arg, TABLE_LIST *table_list_par, TABLE *table_par, List *fields_par, List *update_fields, List *update_values, enum_duplicates duplic, - bool ignore, select_result *sel_ret_list); + bool ignore, select_result *sel_ret_list, Write_record *write); ~select_insert(); int prepare(List &list, SELECT_LEX_UNIT *u) override; int prepare2(JOIN *join) override; @@ -6442,9 +6445,9 @@ public: Table_specification_st *create_info_par, Alter_info *alter_info_arg, List &select_fields,enum_duplicates duplic, bool ignore, - TABLE_LIST *select_tables_arg): + TABLE_LIST *select_tables_arg, Write_record *write): select_insert(thd_arg, table_arg, NULL, &select_fields, 0, 0, duplic, - ignore, NULL), + ignore, NULL, write), create_info(create_info_par), select_tables(select_tables_arg), alter_info(alter_info_arg), diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 6d0cbf90022..d3c38217bea 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -82,6 +82,7 @@ #include "debug.h" // debug_crash_here #include #include "rpl_rli.h" +#include "scope.h" #ifdef WITH_WSREP #include "wsrep_mysqld.h" /* wsrep_append_table_keys() */ @@ -693,6 +694,30 @@ Field **TABLE::field_to_fill() return triggers && triggers->nullable_fields() ? triggers->nullable_fields() : field; } +int prepare_for_replace(TABLE *table, enum_duplicates handle_duplicates, + bool ignore) +{ + bool create_lookup_handler= handle_duplicates != DUP_ERROR; + if (ignore || handle_duplicates != DUP_ERROR) + { + create_lookup_handler= true; + table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + if (table->file->ha_table_flags() & HA_DUPLICATE_POS || + table->s->long_unique_table) + { + if (table->file->ha_rnd_init_with_error(false)) + return 1; + } + } + return table->file->prepare_for_insert(create_lookup_handler); +} + +int finalize_replace(TABLE *table, enum_duplicates handle_duplicates, + bool ignore) +{ + return table->file->inited ? table->file->ha_rnd_end() : 0; +} + /** INSERT statement implementation @@ -786,6 +811,8 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, DBUG_RETURN(TRUE); value_count= values->elements; + Write_record write; + if ((res= mysql_prepare_insert(thd, table_list, fields, values, update_fields, update_values, duplic, ignore, &unused_conds, FALSE, &cache_insert_values))) @@ -878,7 +905,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, } its.rewind (); thd->get_stmt_da()->reset_current_row_for_warning(0); - + /* Restore the current context. */ ctx_state.restore_state(context, table_list); @@ -944,18 +971,8 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, if (lock_type != TL_WRITE_DELAYED) #endif /* EMBEDDED_LIBRARY */ { - bool create_lookup_handler= duplic != DUP_ERROR; - if (duplic != DUP_ERROR || ignore) - { - create_lookup_handler= true; - table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - if (table->file->ha_table_flags() & HA_DUPLICATE_POS) - { - if (table->file->ha_rnd_init_with_error(0)) - goto abort; - } - } - table->file->prepare_for_insert(create_lookup_handler); + if (prepare_for_replace(table, info.handle_duplicates, info.ignore)) + DBUG_RETURN(1); /** This is a simple check for the case when the table has a trigger that reads from it, or when the statement invokes a stored function @@ -1027,6 +1044,9 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, THD_STAGE_INFO(thd, stage_update); + + new (&write) Write_record(thd, table, &info, result); + if (duplic == DUP_UPDATE) { restore_record(table,s->default_values); // Get empty record @@ -1194,7 +1214,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, } else #endif - error= write_record(thd, table, &info, result); + error= write.write_record(); if (unlikely(error)) break; info.accepted_rows++; @@ -1254,8 +1274,7 @@ values_loop_end: if (duplic != DUP_ERROR || ignore) { table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); - if (table->file->ha_table_flags() & HA_DUPLICATE_POS) - table->file->ha_rnd_end(); + finalize_replace(table, duplic, ignore); } transactional_table= table->file->has_transactions_and_rollback(); @@ -1891,31 +1910,74 @@ int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, } - /* Check if there is more uniq keys after field */ - -static int last_uniq_key(TABLE *table, const KEY *key, uint keynr) +/* Check if there is more uniq keys after field */ +static ushort get_last_unique_key(TABLE *table, KEY *key_info) { + for (uint key= table->s->keys; key; --key) + if (key_info[key-1].flags & HA_NOSAME) + return key-1; + return MAX_KEY; +} + +/** + An update optimization can be done in REPLACE only when certain criterial met. + In particular, if the "error key" was the last one to check. If not, there can + be other keys that can fail the uniqueness check after we will delete the row. + + The problem arises with the order of the key checking. It is divided in two + parts: + 1. Higher-level handler::ha_write_row goes first. It checks for long unique + and period overlaps (the latter is not supported by REPLACE yet). + 2. Engine usual unique index check. + However, the order of the keys in TABLE_SHARE::key_info is: + * first, engine unique keys + * then long unique, which is not seen by engine + WITHOUT OVERLAPS key has no particular order + see sort_keys in sql_table.cc + + So we need to wrap this order. + @return last unique key to check for duplication, or MAX_KEY if none. + */ +ushort Write_record::get_last_unique_key() const +{ + if (info->handle_duplicates != DUP_REPLACE) + return MAX_KEY; // Not used. + if (!table->s->key_info) + return MAX_KEY; /* When an underlying storage engine informs that the unique key conflicts are not reported in the ascending order by setting the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this information to determine the last key conflict. - + The information about the last key conflict will be used to do a replace of the new row on the conflicting row, rather than doing a delete (of old row) + insert (of new row). - + Hence check for this flag and disable replacing the last row - by returning 0 always. Returning 0 will result in doing + by returning MAX_KEY always. Returning MAX_KEY will result in doing a delete + insert always. */ if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER) - return 0; + return MAX_KEY; - while (++keynr < table->s->keys) - if (key[keynr].flags & HA_NOSAME) - return 0; - return 1; + /* + Note, that TABLE_SHARE and TABLE see long uniques differently: + - TABLE_SHARE sees as HA_KEY_ALG_LONG_HASH and HA_NOSAME + - TABLE sees as usual non-unique indexes + */ + return + table->key_info[0].flags & HA_NOSAME ? + /* + We have an in-engine unique, which should be checked last. + Go through the TABLE list, which knows nothing about long uniques. + */ + ::get_last_unique_key(table, table->key_info) : + /* + We can only have long uniques. + Go through the TABLE_SHARE list, where long uniques can be found. + */ + ::get_last_unique_key(table, table->s->key_info); } @@ -1944,19 +2006,353 @@ int vers_insert_history_row(TABLE *table) } /* + Retrieve the old (duplicated) row into record[1] to be able to + either update or delete the offending record. We either: + + - use ha_rnd_pos() with a row-id (available as dup_ref) to the + offending row, if that is possible (MyISAM and Blackhole, also long unique + and application-time periods), or else + + - use ha_index_read_idx_map() with the key that is duplicated, to + retrieve the offending row. + */ +int Write_record::locate_dup_record() +{ + handler *h= table->file; + int error= 0; + if (h->ha_table_flags() & HA_DUPLICATE_POS || h->lookup_errkey != (uint)-1) + { + DBUG_PRINT("info", ("Locating offending record using rnd_pos()")); + + error= h->ha_rnd_pos(table->record[1], h->dup_ref); + if (unlikely(error)) + { + DBUG_PRINT("info", ("rnd_pos() returns error %d",error)); + h->print_error(error, MYF(0)); + } + } + else + { + DBUG_PRINT("info", + ("Locating offending record using ha_index_read_idx_map")); + + if (h->lookup_handler) + h= h->lookup_handler; + + error= h->extra(HA_EXTRA_FLUSH_CACHE); + if (unlikely(error)) + { + DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE")); + return my_errno; + } + + if (unlikely(key == NULL)) + { + key= static_cast(thd->alloc(table->s->max_unique_length)); + if (key == NULL) + { + DBUG_PRINT("info",("Can't allocate key buffer")); + return ENOMEM; + } + } + + key_copy(key, table->record[0], table->key_info + key_nr, 0); + + error= h->ha_index_read_idx_map(table->record[1], key_nr, key, + HA_WHOLE_KEY, HA_READ_KEY_EXACT); + if (unlikely(error)) + { + DBUG_PRINT("info", ("index_read_idx() returns %d", error)); + h->print_error(error, MYF(0)); + } + } + + if (unlikely(error)) + return error; + + if (table->vfield) + { + /* + We have not yet called update_virtual_fields() + in handler methods for the just read row in record[1]. + */ + table->move_fields(table->field, table->record[1], table->record[0]); + error= table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE); + table->move_fields(table->field, table->record[0], table->record[1]); + } + return error; +} + +/** + REPLACE is defined as either INSERT or DELETE + INSERT. If + possible, we can replace it with an UPDATE, but that will not + work on InnoDB if FOREIGN KEY checks are necessary. + + Suppose that we got the duplicate key to be a key that is not + the last unique key for the table and we perform an update: + then there might be another key for which the unique check will + fail, so we're better off just deleting the row and trying to insert again. + + Additionally we don't use ha_update_row in following cases: + * when triggers should be invoked, as it may spoil the intermedite NEW_ROW + representation + * when we have referenced keys, as there can be different ON DELETE/ON UPDATE + actions +*/ +int Write_record::replace_row(ha_rows *inserted, ha_rows *deleted) +{ + int error; + while (unlikely(error=table->file->ha_write_row(table->record[0]))) + { + error= prepare_handle_duplicate(error); + if (unlikely(error)) + return restore_on_error(); + + if (incomplete_records_cb && (error= incomplete_records_cb(arg1, arg2))) + return restore_on_error(); + + if (can_optimize && key_nr == last_unique_key) + { + DBUG_PRINT("info", ("Updating row using ha_update_row()")); + if (table->versioned(VERS_TRX_ID)) + table->vers_start_field()->store(0, false); + + error= table->file->ha_update_row(table->record[1], table->record[0]); + + if (likely(!error)) + ++*deleted; + else if (error != HA_ERR_RECORD_IS_THE_SAME) + return on_ha_error(error); + + if (versioned) + { + store_record(table, record[2]); + error= vers_insert_history_row(table); + restore_record(table, record[2]); + if (unlikely(error)) + return on_ha_error(error); + } + + break; + } + else + { + DBUG_PRINT("info", ("Deleting offending row and trying to write" + " new one again")); + + auto *trg = table->triggers; + if (use_triggers && trg->process_triggers(table->in_use, TRG_EVENT_DELETE, + TRG_ACTION_BEFORE, TRUE)) + return restore_on_error(); + if (!versioned) + error = table->file->ha_delete_row(table->record[1]); + else + { + store_record(table, record[2]); + restore_record(table, record[1]); + table->vers_update_end(); + error = table->file->ha_update_row(table->record[1], table->record[0]); + restore_record(table, record[2]); + } + + if (unlikely(error)) + return on_ha_error(error); + ++*deleted; + + if (use_triggers) + { + error= trg->process_triggers(table->in_use, TRG_EVENT_DELETE, + TRG_ACTION_AFTER, TRUE); + if (unlikely(error)) + return restore_on_error(); + } + } + } + + /* + If more than one iteration of the above loop is done, from + the second one the row being inserted will have an explicit + value in the autoinc field, which was set at the first call of + handler::update_auto_increment(). This value is saved to avoid + thd->insert_id_for_cur_row becoming 0. Use this saved autoinc + value. + */ + if (table->file->insert_id_for_cur_row == 0) + table->file->insert_id_for_cur_row = insert_id_for_cur_row; + + return after_insert(inserted); +} + + +int Write_record::insert_on_duplicate_update(ha_rows *inserted, + ha_rows *updated) +{ + int res= table->file->ha_write_row(table->record[0]); + if (likely(!res)) + return after_insert(inserted); + + res=prepare_handle_duplicate(res); + if (unlikely(res)) + return restore_on_error(); + + ulonglong prev_insert_id_for_cur_row= 0; + /* + We don't check for other UNIQUE keys - the first row + that matches, is updated. If update causes a conflict again, + an error is returned + */ + DBUG_ASSERT(table->insert_values != NULL); + store_record(table,insert_values); + restore_record(table,record[1]); + table->reset_default_fields(); + + /* + in INSERT ... ON DUPLICATE KEY UPDATE the set of modified fields can + change per row. Thus, we have to do reset_default_fields() per row. + Twice (before insert and before update). + */ + DBUG_ASSERT(info->update_fields->elements == + info->update_values->elements); + if (fill_record_n_invoke_before_triggers(thd, table, + *info->update_fields, + *info->update_values, + info->ignore, + TRG_EVENT_UPDATE)) + return restore_on_error(); + + bool different_records= (!records_are_comparable(table) || + compare_record(table)); + /* + Default fields must be updated before checking view updateability. + This branch of INSERT is executed only when a UNIQUE key was violated + with the ON DUPLICATE KEY UPDATE option. In this case the INSERT + operation is transformed to an UPDATE, and the default fields must + be updated as if this is an UPDATE. + */ + if (different_records && table->default_field) + table->evaluate_update_default_function(); + + /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */ + res= info->table_list->view_check_option(table->in_use, info->ignore); + if (unlikely(res)) + { + if (res == VIEW_CHECK_ERROR) + return restore_on_error(); + DBUG_ASSERT(res == VIEW_CHECK_SKIP); + return 0; + } + + table->file->restore_auto_increment(prev_insert_id); + info->touched++; + if (different_records) + { + int error= table->file->ha_update_row(table->record[1], table->record[0]); + + if (unlikely(error) && error != HA_ERR_RECORD_IS_THE_SAME) + { + if (info->ignore && !table->file->is_fatal_error(error, HA_CHECK_ALL)) + { + if (!(thd->variables.old_behavior & + OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) + table->file->print_error(error, MYF(ME_WARNING)); + return 0; + } + return on_ha_error(error); + } + + if (error != HA_ERR_RECORD_IS_THE_SAME) + { + ++*updated; + if (table->versioned() && + table->vers_check_update(*info->update_fields)) + { + if (versioned) + { + store_record(table, record[2]); + error= vers_insert_history_row(table); + restore_record(table, record[2]); + if (unlikely(error)) + return on_ha_error(error); + } + ++*inserted; + } + } + /* + If ON DUP KEY UPDATE updates a row instead of inserting + one, it's like a regular UPDATE statement: it should not + affect the value of a next SELECT LAST_INSERT_ID() or + mysql_insert_id(). Except if LAST_INSERT_ID(#) was in the + INSERT query, which is handled separately by + THD::arg_of_last_insert_id_function. + */ + prev_insert_id_for_cur_row= table->file->insert_id_for_cur_row; + insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0; + + ++*inserted; // Conforms the older behavior; + + if (use_triggers + && table->triggers->process_triggers(thd, TRG_EVENT_UPDATE, + TRG_ACTION_AFTER, TRUE)) + return restore_on_error(); + } + + /* + Only update next_insert_id if the AUTO_INCREMENT value was explicitly + updated, so we don't update next_insert_id with the value from the + row being updated. Otherwise reset next_insert_id to what it was + before the duplicate key error, since that value is unused. + */ + if (table->next_number_field_updated) + { + DBUG_ASSERT(table->next_number_field != NULL); + + table->file->adjust_next_insert_id_after_explicit_value( + table->next_number_field->val_int()); + } + else if (prev_insert_id_for_cur_row) + { + table->file->restore_auto_increment(prev_insert_id_for_cur_row); + } + + return send_data(); +} + +bool Write_record::is_fatal_error(int error) +{ + return error == HA_ERR_LOCK_DEADLOCK || + error == HA_ERR_LOCK_WAIT_TIMEOUT || + table->file->is_fatal_error(error, HA_CHECK_ALL); +} + +int Write_record::single_insert(ha_rows *inserted) +{ + DBUG_EXECUTE_IF("rpl_write_record_small_sleep_gtid_100_200", + { + uint64 seq= thd->rgi_slave ? thd->rgi_slave->current_gtid.seq_no : 0; + if (seq == 100 || seq == 200) + my_sleep(20000); + }); + + int error= table->file->ha_write_row(table->record[0]); + if (unlikely(error)) + { + DEBUG_SYNC(thd, "write_row_noreplace"); + if (!info->ignore || is_fatal_error(error)) + return on_ha_error(error); + if (!(thd->variables.old_behavior & + OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) + table->file->print_error(error, MYF(ME_WARNING)); + table->file->restore_auto_increment(prev_insert_id); + return 0; + } + return after_insert(inserted); +} + +/** Write a record to table with optional deleting of conflicting records, invoke proper triggers if needed. - SYNOPSIS - write_record() - thd - thread context - table - table to which record should be written - info - COPY_INFO structure describing handling of duplicates - and which is used for counting number of records inserted - and deleted. - sink - result sink for the RETURNING clause - - NOTE + @note Once this record will be written to table after insert trigger will be invoked. If instead of inserting new record we will update old one then both on update triggers will work instead. Similarly both on @@ -1965,466 +2361,156 @@ int vers_insert_history_row(TABLE *table) Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have transactions. - RETURN VALUE + @return 0 - success non-0 - error */ - - -int write_record(THD *thd, TABLE *table, COPY_INFO *info, select_result *sink) +int Write_record::write_record() { - int error, trg_error= 0; - char *key=0; - MY_BITMAP *save_read_set, *save_write_set; - ulonglong prev_insert_id= table->file->next_insert_id; - ulonglong insert_id_for_cur_row= 0; - ulonglong prev_insert_id_for_cur_row= 0; - DBUG_ENTER("write_record"); - + ha_rows inserted= 0, updated= 0; + prev_insert_id= table->file->next_insert_id; info->records++; - save_read_set= table->read_set; - save_write_set= table->write_set; - - DBUG_EXECUTE_IF("rpl_write_record_small_sleep_gtid_100_200", - { - if (thd->rgi_slave && (thd->rgi_slave->current_gtid.seq_no == 100 || - thd->rgi_slave->current_gtid.seq_no == 200)) - my_sleep(20000); - }); - if (info->handle_duplicates == DUP_REPLACE || - info->handle_duplicates == DUP_UPDATE) + ignored_error= false; + int res= 0; + switch(info->handle_duplicates) { - while (unlikely(error=table->file->ha_write_row(table->record[0]))) - { - uint key_nr; - /* - If we do more than one iteration of this loop, from the second one the - row will have an explicit value in the autoinc field, which was set at - the first call of handler::update_auto_increment(). So we must save - the autogenerated value to avoid thd->insert_id_for_cur_row to become - 0. - */ - if (table->file->insert_id_for_cur_row > 0) - insert_id_for_cur_row= table->file->insert_id_for_cur_row; - else - table->file->insert_id_for_cur_row= insert_id_for_cur_row; - bool is_duplicate_key_error; - if (table->file->is_fatal_error(error, HA_CHECK_ALL)) - goto err; - is_duplicate_key_error= - table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP); - if (!is_duplicate_key_error) - { - /* - We come here when we had an ignorable error which is not a duplicate - key error. In this we ignore error if ignore flag is set, otherwise - report error as usual. We will not do any duplicate key processing. - */ - if (info->ignore) - { - table->file->print_error(error, MYF(ME_WARNING)); - goto after_trg_or_ignored_err; /* Ignoring a not fatal error */ - } - goto err; - } - if (unlikely((int) (key_nr = table->file->get_dup_key(error)) < 0)) - { - error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */ - goto err; - } - DEBUG_SYNC(thd, "write_row_replace"); + case DUP_ERROR: + res= single_insert(&inserted); + break; + case DUP_UPDATE: + res= insert_on_duplicate_update(&inserted, &updated); + info->updated+= updated; + break; + case DUP_REPLACE: + res= replace_row(&inserted, &updated); + if (versioned) + info->updated+= updated; + else + info->deleted+= updated; + } - /* Read all columns for the row we are going to replace */ - table->use_all_columns(); - /* - Don't allow REPLACE to replace a row when a auto_increment column - was used. This ensures that we don't get a problem when the - whole range of the key has been used. - */ - if (info->handle_duplicates == DUP_REPLACE && - key_nr == table->s->next_number_index && insert_id_for_cur_row > 0) - goto err; - if (table->file->has_dup_ref()) - { - /* - If engine doesn't support HA_DUPLICATE_POS, the handler may init to - INDEX, but dup_ref could also be set by lookup_handled (and then, - lookup_errkey is set, f.ex. long unique duplicate). + info->copied+= inserted; + if (inserted || updated) + notify_non_trans_table_modified(); + return res; +} - In such case, handler would stay uninitialized, so do it here. - */ - bool init_lookup_handler= table->file->lookup_errkey != (uint)-1 && - table->file->inited == handler::NONE; - if (init_lookup_handler && table->file->ha_rnd_init_with_error(false)) - goto err; - DBUG_ASSERT(table->file->inited == handler::RND); - int rnd_pos_err= table->file->ha_rnd_pos(table->record[1], - table->file->dup_ref); +int Write_record::prepare_handle_duplicate(int error) +{ + /* + If we do more than one iteration of the replace loop, from the second one the + row will have an explicit value in the autoinc field, which was set at + the first call of handler::update_auto_increment(). So we must save + the autogenerated value to avoid thd->insert_id_for_cur_row to become + 0. + */ + if (table->file->insert_id_for_cur_row > 0) + insert_id_for_cur_row= table->file->insert_id_for_cur_row; + else + table->file->insert_id_for_cur_row= insert_id_for_cur_row; - if (init_lookup_handler) - table->file->ha_rnd_end(); - if (rnd_pos_err) - goto err; - } - else - { - if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */ - { - error=my_errno; - goto err; - } + if (is_fatal_error(error)) + return on_ha_error(error); - if (!key) - { - if (!(key=(char*) my_safe_alloca(table->s->max_unique_length))) - { - error=ENOMEM; - goto err; - } - } - key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0); - key_part_map keypart_map= (1 << table->key_info[key_nr].user_defined_key_parts) - 1; - if ((error= (table->file->ha_index_read_idx_map(table->record[1], - key_nr, (uchar*) key, - keypart_map, - HA_READ_KEY_EXACT)))) - goto err; - } - if (table->vfield) - { - /* - We have not yet called update_virtual_fields(VOL_UPDATE_FOR_READ) - in handler methods for the just read row in record[1]. - */ - table->move_fields(table->field, table->record[1], table->record[0]); - int verr = table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE); - table->move_fields(table->field, table->record[0], table->record[1]); - if (verr) - goto err; - } - if (info->handle_duplicates == DUP_UPDATE) - { - int res= 0; - /* - We don't check for other UNIQUE keys - the first row - that matches, is updated. If update causes a conflict again, - an error is returned - */ - DBUG_ASSERT(table->insert_values != NULL); - store_record(table,insert_values); - restore_record(table,record[1]); - table->reset_default_fields(); - - /* - in INSERT ... ON DUPLICATE KEY UPDATE the set of modified fields can - change per row. Thus, we have to do reset_default_fields() per row. - Twice (before insert and before update). - */ - DBUG_ASSERT(info->update_fields->elements == - info->update_values->elements); - if (fill_record_n_invoke_before_triggers(thd, table, - *info->update_fields, - *info->update_values, - info->ignore, - TRG_EVENT_UPDATE)) - goto before_trg_err; - - bool different_records= (!records_are_comparable(table) || - compare_record(table)); - /* - Default fields must be updated before checking view updateability. - This branch of INSERT is executed only when a UNIQUE key was violated - with the ON DUPLICATE KEY UPDATE option. In this case the INSERT - operation is transformed to an UPDATE, and the default fields must - be updated as if this is an UPDATE. - */ - if (different_records && table->default_field) - table->evaluate_update_default_function(); - - /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */ - res= info->table_list->view_check_option(table->in_use, info->ignore); - if (res == VIEW_CHECK_SKIP) - goto after_trg_or_ignored_err; - if (res == VIEW_CHECK_ERROR) - goto before_trg_err; - - table->file->restore_auto_increment(prev_insert_id); - info->touched++; - if (different_records) - { - if (unlikely(error=table->file->ha_update_row(table->record[1], - table->record[0])) && - error != HA_ERR_RECORD_IS_THE_SAME) - { - if (info->ignore && - !table->file->is_fatal_error(error, HA_CHECK_ALL)) - { - if (!(thd->variables.old_behavior & - OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) - table->file->print_error(error, MYF(ME_WARNING)); - goto after_trg_or_ignored_err; - } - goto err; - } - - if (error != HA_ERR_RECORD_IS_THE_SAME) - { - info->updated++; - if (table->versioned() && - table->vers_check_update(*info->update_fields)) - { - if (table->versioned(VERS_TIMESTAMP)) - { - store_record(table, record[2]); - if ((error= vers_insert_history_row(table))) - { - info->last_errno= error; - table->file->print_error(error, MYF(0)); - trg_error= 1; - restore_record(table, record[2]); - goto after_trg_or_ignored_err; - } - restore_record(table, record[2]); - } - info->copied++; - } - } - else - error= 0; - /* - If ON DUP KEY UPDATE updates a row instead of inserting - one, it's like a regular UPDATE statement: it should not - affect the value of a next SELECT LAST_INSERT_ID() or - mysql_insert_id(). Except if LAST_INSERT_ID(#) was in the - INSERT query, which is handled separately by - THD::arg_of_last_insert_id_function. - */ - prev_insert_id_for_cur_row= table->file->insert_id_for_cur_row; - insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0; - trg_error= (table->triggers && - table->triggers->process_triggers(thd, TRG_EVENT_UPDATE, - TRG_ACTION_AFTER, TRUE)); - info->copied++; - } - - /* - Only update next_insert_id if the AUTO_INCREMENT value was explicitly - updated, so we don't update next_insert_id with the value from the - row being updated. Otherwise reset next_insert_id to what it was - before the duplicate key error, since that value is unused. - */ - if (table->next_number_field_updated) - { - DBUG_ASSERT(table->next_number_field != NULL); - - table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int()); - } - else if (prev_insert_id_for_cur_row) - { - table->file->restore_auto_increment(prev_insert_id_for_cur_row); - } - goto ok; - } - else /* DUP_REPLACE */ - { - /* - The manual defines the REPLACE semantics that it is either - an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in - InnoDB do not function in the defined way if we allow MySQL - to convert the latter operation internally to an UPDATE. - We also should not perform this conversion if we have - timestamp field with ON UPDATE which is different from DEFAULT. - Another case when conversion should not be performed is when - we have ON DELETE trigger on table so user may notice that - we cheat here. Note that it is ok to do such conversion for - tables which have ON UPDATE but have no ON DELETE triggers, - we just should not expose this fact to users by invoking - ON UPDATE triggers. - - Note, TABLE_SHARE and TABLE see long uniques differently: - - TABLE_SHARE sees as HA_KEY_ALG_LONG_HASH and HA_NOSAME - - TABLE sees as usual non-unique indexes - */ - bool is_long_unique= table->s->key_info && - table->s->key_info[key_nr].algorithm == - HA_KEY_ALG_LONG_HASH; - if ((is_long_unique ? - /* - We have a long unique. Test that there are no in-engine - uniques and the current long unique is the last long unique. - */ - !(table->key_info[0].flags & HA_NOSAME) && - last_uniq_key(table, table->s->key_info, key_nr) : - /* - We have a normal key - not a long unique. - Test is the current normal key is unique and - it is the last normal unique. - */ - last_uniq_key(table, table->key_info, key_nr)) && - !table->file->referenced_by_foreign_key() && - (!table->triggers || !table->triggers->has_delete_triggers())) - { - /* - Optimized dup handling via UPDATE (and insert history for versioned). - */ - if (table->versioned(VERS_TRX_ID)) - { - DBUG_ASSERT(table->vers_write); - bitmap_set_bit(table->write_set, table->vers_start_field()->field_index); - table->file->column_bitmaps_signal(); - table->vers_start_field()->store(0, false); - } - if (unlikely(error= table->file->ha_update_row(table->record[1], - table->record[0])) && - error != HA_ERR_RECORD_IS_THE_SAME) - goto err; - if (likely(!error)) - { - info->deleted++; - if (!table->file->has_transactions()) - thd->transaction->stmt.modified_non_trans_table= TRUE; - if (table->versioned(VERS_TIMESTAMP) && table->vers_write) - { - store_record(table, record[2]); - error= vers_insert_history_row(table); - restore_record(table, record[2]); - if (unlikely(error)) - goto err; - } - } - else - error= 0; // error was HA_ERR_RECORD_IS_THE_SAME - /* - Since we pretend that we have done insert we should call - its after triggers. - */ - goto after_trg_n_copied_inc; - } - else - { - /* - Normal dup handling via DELETE (or UPDATE to history for versioned) - and repeating the cycle of INSERT. - */ - if (table->triggers && - table->triggers->process_triggers(thd, TRG_EVENT_DELETE, - TRG_ACTION_BEFORE, TRUE)) - goto before_trg_err; - - bool do_delete= !table->versioned(VERS_TIMESTAMP); - if (do_delete) - error= table->file->ha_delete_row(table->record[1]); - else - { - /* Update existing row to history */ - store_record(table, record[2]); - restore_record(table, record[1]); - table->vers_update_end(); - error= table->file->ha_update_row(table->record[1], - table->record[0]); - restore_record(table, record[2]); - if (error == HA_ERR_FOUND_DUPP_KEY || /* Unique index, any SE */ - error == HA_ERR_FOREIGN_DUPLICATE_KEY || /* Unique index, InnoDB */ - error == HA_ERR_RECORD_IS_THE_SAME) /* No index */ - { - /* Such history row was already generated from previous cycles */ - error= table->file->ha_delete_row(table->record[1]); - do_delete= true; - } - } - if (unlikely(error)) - goto err; - if (do_delete) - info->deleted++; - else - info->updated++; - if (!table->file->has_transactions_and_rollback()) - thd->transaction->stmt.modified_non_trans_table= TRUE; - if (table->triggers && - table->triggers->process_triggers(thd, TRG_EVENT_DELETE, - TRG_ACTION_AFTER, TRUE)) - { - trg_error= 1; - goto after_trg_or_ignored_err; - } - /* Let us attempt do write_row() once more */ - } - } - } - + bool is_duplicate_key_error= + table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP); + if (!is_duplicate_key_error) + { /* - If more than one iteration of the above while loop is done, from - the second one the row being inserted will have an explicit - value in the autoinc field, which was set at the first call of - handler::update_auto_increment(). This value is saved to avoid - thd->insert_id_for_cur_row becoming 0. Use this saved autoinc - value. - */ - if (table->file->insert_id_for_cur_row == 0) - table->file->insert_id_for_cur_row= insert_id_for_cur_row; - - /* - Restore column maps if they where replaced during an duplicate key - problem. + We come here when we had an ignorable error which is not a duplicate + key error. In this we ignore error if ignore flag is set, otherwise + report error as usual. We will not do any duplicate key processing. */ - if (table->read_set != save_read_set || - table->write_set != save_write_set) - table->column_bitmaps_set(save_read_set, save_write_set); - } - else if (unlikely((error=table->file->ha_write_row(table->record[0])))) - { - DEBUG_SYNC(thd, "write_row_noreplace"); - if (!info->ignore || - table->file->is_fatal_error(error, HA_CHECK_ALL)) - goto err; - if (!(thd->variables.old_behavior & - OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE)) + if (info->ignore) + { table->file->print_error(error, MYF(ME_WARNING)); - table->file->restore_auto_increment(prev_insert_id); - goto after_trg_or_ignored_err; + ignored_error= true; + return 1; /* Ignoring a not fatal error */ + } + return on_ha_error(error); } -after_trg_n_copied_inc: - info->copied++; - thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row); - trg_error= (table->triggers && - table->triggers->process_triggers(thd, TRG_EVENT_INSERT, - TRG_ACTION_AFTER, TRUE)); + key_nr = table->file->get_dup_key(error); -ok: + if (unlikely(key_nr == std::numeric_limits::max())) + return on_ha_error(HA_ERR_FOUND_DUPP_KEY); /* Database can't find key */ + + DEBUG_SYNC(thd, "write_row_replace"); + + /* Read all columns for the row we are going to replace */ + table->use_all_columns(); + /* + Don't allow REPLACE to replace a row when an auto_increment column + was used. This ensures that we don't get a problem when the + whole range of the key has been used. + */ + if (info->handle_duplicates == DUP_REPLACE && table->next_number_field && + key_nr == table->s->next_number_index && insert_id_for_cur_row > 0) + return on_ha_error(error); + + error= locate_dup_record(); + if (error) + return on_ha_error(error); + + return 0; +} + +inline +void Write_record::notify_non_trans_table_modified() +{ + if (!table->file->has_transactions_and_rollback()) + thd->transaction->stmt.modified_non_trans_table= TRUE; +} + +inline +int Write_record::on_ha_error(int error) +{ + info->last_errno= error; + table->file->print_error(error,MYF(0)); + DBUG_PRINT("info", ("Returning error %d", error)); + return restore_on_error(); +} + +inline +int Write_record::restore_on_error() +{ + table->file->restore_auto_increment(prev_insert_id); + return ignored_error ? 0 : 1; +} + +inline +int Write_record::after_insert(ha_rows *inserted) +{ + ++*inserted; + thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row); + return after_ins_trg() || send_data(); +} + +inline +int Write_record::after_ins_trg() +{ + int res= 0; + if (use_triggers) + res= table->triggers->process_triggers(thd, TRG_EVENT_INSERT, + TRG_ACTION_AFTER, TRUE); + return res; +} + +inline +int Write_record::send_data() +{ /* We send the row after writing it to the table so that the correct values are sent to the client. Otherwise it won't show autoinc values (generated inside the handler::ha_write()) and values updated in ON DUPLICATE KEY UPDATE. */ - if (sink) - { - if (sink->send_data(thd->lex->returning()->item_list) < 0) - trg_error= 1; - } - -after_trg_or_ignored_err: - if (key) - my_safe_afree(key,table->s->max_unique_length); - if (!table->file->has_transactions_and_rollback()) - thd->transaction->stmt.modified_non_trans_table= TRUE; - DBUG_RETURN(trg_error); - -err: - info->last_errno= error; - table->file->print_error(error,MYF(0)); - -before_trg_err: - table->file->restore_auto_increment(prev_insert_id); - if (key) - my_safe_afree(key, table->s->max_unique_length); - table->column_bitmaps_set(save_read_set, save_write_set); - DBUG_RETURN(1); + return sink ? sink->send_data(thd->lex->returning()->item_list) < 0 : 0; } + /****************************************************************************** Check that there aren't any null_fields ******************************************************************************/ @@ -3715,6 +3801,7 @@ bool Delayed_insert::handle_inserts(void) ulong max_rows; bool using_ignore= 0, using_opt_replace= 0, using_bin_log; delayed_row *row; + Write_record write; DBUG_ENTER("handle_inserts"); /* Allow client to insert new rows */ @@ -3855,7 +3942,9 @@ bool Delayed_insert::handle_inserts(void) VCOL_UPDATE_FOR_WRITE); } - if (unlikely(tmp_error || write_record(&thd, table, &info, NULL))) + new (&write) Write_record(&thd, table, &info); + + if (unlikely(tmp_error || write.write_record())) { info.error_count++; // Ignore errors thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status); @@ -4080,10 +4169,12 @@ select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par, List *update_values, enum_duplicates duplic, bool ignore_check_option_errors, - select_result *result): + select_result *result, + Write_record *write): select_result_interceptor(thd_arg), sel_result(result), table_list(table_list_par), table(table_par), fields(fields_par), + write(write), autoinc_value_of_last_inserted_row(0), insert_into_view(table_list_par && table_list_par->view != 0) { @@ -4264,18 +4355,10 @@ select_insert::prepare(List &values, SELECT_LEX_UNIT *u) #endif thd->cuted_fields=0; - bool create_lookup_handler= info.handle_duplicates != DUP_ERROR; - if (info.ignore || info.handle_duplicates != DUP_ERROR) - { - create_lookup_handler= true; - table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - if (table->file->ha_table_flags() & HA_DUPLICATE_POS) - { - if (table->file->ha_rnd_init_with_error(0)) - DBUG_RETURN(1); - } - } - table->file->prepare_for_insert(create_lookup_handler); + + if (prepare_for_replace(table, info.handle_duplicates, info.ignore)) + DBUG_RETURN(1); + if (info.handle_duplicates == DUP_REPLACE && (!table->triggers || !table->triggers->has_delete_triggers())) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); @@ -4290,6 +4373,7 @@ select_insert::prepare(List &values, SELECT_LEX_UNIT *u) table->prepare_triggers_for_insert_stmt_or_event(); table->mark_columns_needed_for_insert(); } + new (write) Write_record(thd, table, &info, sel_result); DBUG_RETURN(res); } @@ -4380,7 +4464,7 @@ int select_insert::send_data(List &values) } } - error= write_record(thd, table, &info, sel_result); + error= write->write_record(); table->auto_increment_field_not_null= FALSE; if (likely(!error)) @@ -4467,9 +4551,8 @@ bool select_insert::prepare_eof() if (likely(!error) && unlikely(thd->is_error())) error= thd->get_stmt_da()->sql_errno(); - if (info.ignore || info.handle_duplicates != DUP_ERROR) - if (table->file->ha_table_flags() & HA_DUPLICATE_POS) - table->file->ha_rnd_end(); + if (unlikely(finalize_replace(table, info.handle_duplicates, info.ignore))) + DBUG_RETURN(1); if (error <= 0) { error= table->file->extra(HA_EXTRA_END_ALTER_COPY); @@ -4615,8 +4698,7 @@ void select_insert::abort_result_set() if (thd->locked_tables_mode <= LTM_LOCK_TABLES) table->file->ha_end_bulk_insert(); - if (table->file->inited) - table->file->ha_rnd_end(); + finalize_replace(table, info.handle_duplicates, info.ignore); table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); table->file->extra(HA_EXTRA_ABORT_ALTER_COPY); @@ -5076,18 +5158,8 @@ select_create::prepare(List &_values, SELECT_LEX_UNIT *u) restore_record(table,s->default_values); // Get empty record thd->cuted_fields=0; - bool create_lookup_handler= info.handle_duplicates != DUP_ERROR; - if (info.ignore || info.handle_duplicates != DUP_ERROR) - { - create_lookup_handler= true; - table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - if (table->file->ha_table_flags() & HA_DUPLICATE_POS) - { - if (table->file->ha_rnd_init_with_error(0)) - DBUG_RETURN(1); - } - } - table->file->prepare_for_insert(create_lookup_handler); + if (prepare_for_replace(table, info.handle_duplicates, info.ignore)) + DBUG_RETURN(1); if (info.handle_duplicates == DUP_REPLACE && (!table->triggers || !table->triggers->has_delete_triggers())) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); @@ -5107,6 +5179,7 @@ select_create::prepare(List &_values, SELECT_LEX_UNIT *u) table->mark_columns_needed_for_insert(); // Mark table as used table->query_id= thd->query_id; + new (write) Write_record(thd, table, &info, sel_result); DBUG_RETURN(0); } @@ -5504,10 +5577,7 @@ void select_create::abort_result_set() thd->restore_tmp_table_share(saved_tmp_table_share); } - if (table->file->inited && - (info.ignore || info.handle_duplicates != DUP_ERROR) && - (table->file->ha_table_flags() & HA_DUPLICATE_POS)) - table->file->ha_rnd_end(); + finalize_replace(table, info.handle_duplicates, info.ignore); table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); table->auto_increment_field_not_null= FALSE; diff --git a/sql/sql_insert.h b/sql/sql_insert.h index 176c861cfad..163708d70c5 100644 --- a/sql/sql_insert.h +++ b/sql/sql_insert.h @@ -23,6 +23,7 @@ typedef List List_item; typedef struct st_copy_info COPY_INFO; + int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, List &fields, List_item *values, List &update_fields, @@ -46,7 +47,10 @@ int write_record(THD *thd, TABLE *table, COPY_INFO *info, void kill_delayed_threads(void); bool binlog_create_table(THD *thd, TABLE *table, bool replace); bool binlog_drop_table(THD *thd, TABLE *table); - +int prepare_for_replace(TABLE *table, enum_duplicates handle_duplicates, + bool ignore); +int finalize_replace(TABLE *table, enum_duplicates handle_duplicates, + bool ignore); static inline void restore_default_record_for_insert(TABLE *t) { restore_record(t,s->default_values); @@ -54,6 +58,87 @@ static inline void restore_default_record_for_insert(TABLE *t) t->triggers->default_extra_null_bitmap(); } + +class Write_record +{ + THD *thd; + TABLE *table; + COPY_INFO *info; + + ulonglong prev_insert_id; + ulonglong insert_id_for_cur_row= 0; + uchar *key; + ushort key_nr; + + ushort last_unique_key; + bool use_triggers; + bool versioned; + bool can_optimize; + bool ignored_error; + + int (*incomplete_records_cb)(void *arg1, void *arg2); + void *arg1, *arg2; + select_result *sink; + + ushort get_last_unique_key() const; + // FINALIZATION + void notify_non_trans_table_modified(); + int after_insert(ha_rows *inserted); + int after_ins_trg(); + int send_data(); + + int on_ha_error(int error); + int restore_on_error(); + + bool is_fatal_error(int error); + int prepare_handle_duplicate(int error); + int locate_dup_record(); + + int replace_row(ha_rows *inserted, ha_rows *deleted); + int insert_on_duplicate_update(ha_rows *inserted, ha_rows *updated); + int single_insert(ha_rows *inserted); +public: + + /** + @param thd thread context + @param info COPY_INFO structure describing handling of duplicates + and which is used for counting number of records inserted + and deleted. + @param sink result sink for the RETURNING clause + @param table + @param versioned + @param use_triggers + */ + Write_record(THD *thd, TABLE *table, COPY_INFO *info, + bool versioned, bool use_triggers, select_result *sink, + int (*incomplete_records_cb)(void *, void *), + void *arg1, void* arg2): + thd(thd), table(table), info(info), insert_id_for_cur_row(0), + key(NULL), last_unique_key(get_last_unique_key()), + use_triggers(use_triggers), versioned(versioned), + incomplete_records_cb(incomplete_records_cb), arg1(arg1), arg2(arg2), + sink(sink) + { + if (info->handle_duplicates == DUP_REPLACE) + { + bool has_delete_triggers= use_triggers && + table->triggers->has_delete_triggers(); + bool referenced_by_fk= table->file->referenced_by_foreign_key(); + can_optimize= !referenced_by_fk && !has_delete_triggers; + } + } + Write_record(THD *thd, TABLE *table, COPY_INFO *info, + select_result *sink= NULL): + Write_record(thd, table, info, table->versioned(VERS_TIMESTAMP), + table->triggers, sink, NULL, NULL, NULL) + {} + Write_record() = default; // dummy, to allow later (lazy) initializations + + /* Main entry point, see docs in sql_insert.cc */ + int write_record(); + + int last_errno() { return info->last_errno; } +}; #ifdef EMBEDDED_LIBRARY inline void kill_delayed_threads(void) {} #endif diff --git a/sql/sql_load.cc b/sql/sql_load.cc index fa4f8a57607..26b64d99bf4 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -684,14 +684,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list, thd->abort_on_warning= !ignore && thd->is_strict_mode(); thd->get_stmt_da()->reset_current_row_for_warning(1); - bool create_lookup_handler= handle_duplicates != DUP_ERROR; - if ((table_list->table->file->ha_table_flags() & HA_DUPLICATE_POS)) - { - create_lookup_handler= true; - if ((error= table_list->table->file->ha_rnd_init_with_error(0))) - goto err; - } - table->file->prepare_for_insert(create_lookup_handler); + if (prepare_for_replace(table, info.handle_duplicates, info.ignore)) + DBUG_RETURN(1); + thd_progress_init(thd, 2); fix_rownum_pointers(thd, thd->lex->current_select, &info.copied); if (table_list->table->validate_default_values_of_unset_fields(thd)) @@ -712,8 +707,8 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list, set_fields, set_values, read_info, *ex->enclosed, skip_lines, ignore); - if (table_list->table->file->ha_table_flags() & HA_DUPLICATE_POS) - table_list->table->file->ha_rnd_end(); + if (unlikely(finalize_replace(table, handle_duplicates, ignore))) + DBUG_RETURN(1); thd_proc_info(thd, "End bulk insert"); if (likely(!error)) @@ -988,6 +983,8 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, if ((thd->progress.max_counter= read_info.file_length()) == ~(my_off_t) 0) progress_reports= 0; + Write_record write(thd, table, &info, NULL); + while (!read_info.read_fixed_length()) { if (thd->killed) @@ -1068,7 +1065,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - err= write_record(thd, table, &info); + err= write.write_record(); table->auto_increment_field_not_null= FALSE; if (err) DBUG_RETURN(1); @@ -1117,6 +1114,8 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, if ((thd->progress.max_counter= read_info.file_length()) == ~(my_off_t) 0) progress_reports= 0; + Write_record write(thd, table, &info, NULL); + for (;;it.rewind()) { if (thd->killed) @@ -1218,7 +1217,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - err= write_record(thd, table, &info); + err= write.write_record(); table->auto_increment_field_not_null= FALSE; if (err) DBUG_RETURN(1); @@ -1262,7 +1261,9 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_ENTER("read_xml_field"); no_trans_update_stmt= !table->file->has_transactions_and_rollback(); - + + Write_record write(thd, table, &info, NULL); + for ( ; ; it.rewind()) { bool err; @@ -1339,7 +1340,7 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - err= write_record(thd, table, &info); + err= write.write_record(); table->auto_increment_field_not_null= false; if (err) DBUG_RETURN(1); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 3657f8127d2..758a57a0967 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4799,6 +4799,7 @@ mysql_execute_command(THD *thd, bool is_called_from_prepared_stmt) select_lex->context.table_list= select_lex->context.first_name_resolution_table= second_table; res= mysql_insert_select_prepare(thd, result); + Write_record write; if (!res && (sel_result= new (thd->mem_root) select_insert(thd, first_table, @@ -4808,7 +4809,8 @@ mysql_execute_command(THD *thd, bool is_called_from_prepared_stmt) &lex->value_list, lex->duplicates, lex->ignore, - result))) + result, + &write))) { if (lex->analyze_stmt) ((select_result_interceptor*)sel_result)->disable_my_ok_calls(); diff --git a/sql/sql_table.cc b/sql/sql_table.cc index ff6adb49363..089d364e73b 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -13063,6 +13063,8 @@ bool Sql_cmd_create_table_like::execute(THD *thd) DEBUG_SYNC(thd, "wsrep_create_table_as_select"); + Write_record write; + /* select_create is currently not re-execution friendly and needs to be created for every execution of a PS/SP. @@ -13074,7 +13076,8 @@ bool Sql_cmd_create_table_like::execute(THD *thd) select_lex->item_list, lex->duplicates, lex->ignore, - select_tables))) + select_tables, + &write))) { /* CREATE from SELECT give its SELECT_LEX for SELECT,