diff --git a/mysql-test/suite/innodb/r/bulk_load.result b/mysql-test/suite/innodb/r/bulk_load.result new file mode 100644 index 00000000000..1e3d9ba877a --- /dev/null +++ b/mysql-test/suite/innodb/r/bulk_load.result @@ -0,0 +1,50 @@ +CREATE TABLE t1(f1 INT NOT NULL,f2 INT NOT NULL)ENGINE=InnoDB; +INSERT INTO t1 SELECT seq, seq from seq_1_to_131072; +INSERT INTO t1 VALUES(131073, 131073), (131074, 131073); +SELECT * INTO OUTFILE "VARDIR/tmp/t1.outfile" FROM t1; +# successful load statement using bulk insert +CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY, +f2 INT NOT NULL)ENGINE=InnoDB; +SET unique_checks=0, foreign_key_checks=0; +LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2; +SELECT COUNT(*) FROM t2; +COUNT(*) +131074 +CHECK TABLE t2 EXTENDED; +Table Op Msg_type Msg_text +test.t2 check status OK +DROP TABLE t2; +CREATE TABLE t2(f1 INT NOT NULL, PRIMARY KEY(f1 DESC), +f2 INT NOT NULL)ENGINE=InnoDB; +LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2; +SELECT COUNT(*) FROM t2; +COUNT(*) +131074 +CHECK TABLE t2 EXTENDED; +Table Op Msg_type Msg_text +test.t2 check status OK +DROP TABLE t2; +# load statement using bulk insert fails during secondary index +CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY, +f2 INT NOT NULL UNIQUE KEY)ENGINE=InnoDB; +LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2; +ERROR HY000: Got error 1 "Operation not permitted" during COMMIT +SELECT COUNT(*) FROM t2; +COUNT(*) +0 +CHECK TABLE t2 EXTENDED; +Table Op Msg_type Msg_text +test.t2 check status OK +DROP TABLE t2; +# load statement using bulk insert fails during primary index +CREATE TABLE t2(f1 INT NOT NULL, +f2 INT NOT NULL PRIMARY KEY)ENGINE=InnoDB; +LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2; +ERROR 23000: Duplicate entry '131073' for key 'PRIMARY' +SELECT COUNT(*) FROM t2; +COUNT(*) +0 +CHECK TABLE t2 EXTENDED; +Table Op Msg_type Msg_text +test.t2 check status OK +DROP TABLE t2, t1; diff --git a/mysql-test/suite/innodb/t/bulk_load.opt b/mysql-test/suite/innodb/t/bulk_load.opt new file mode 100644 index 00000000000..c856c2d215a --- /dev/null +++ b/mysql-test/suite/innodb/t/bulk_load.opt @@ -0,0 +1 @@ +--innodb_sort_buffer_size=65536 diff --git a/mysql-test/suite/innodb/t/bulk_load.test b/mysql-test/suite/innodb/t/bulk_load.test new file mode 100644 index 00000000000..30c25839ba2 --- /dev/null +++ b/mysql-test/suite/innodb/t/bulk_load.test @@ -0,0 +1,52 @@ +--source include/have_innodb.inc +--source include/have_sequence.inc +--source include/big_test.inc + +CREATE TABLE t1(f1 INT NOT NULL,f2 INT NOT NULL)ENGINE=InnoDB; +INSERT INTO t1 SELECT seq, seq from seq_1_to_131072; +INSERT INTO t1 VALUES(131073, 131073), (131074, 131073); +--replace_result $MYSQLTEST_VARDIR VARDIR +--disable_cursor_protocol +--disable_ps2_protocol +eval SELECT * INTO OUTFILE "$MYSQLTEST_VARDIR/tmp/t1.outfile" FROM t1; +--enable_ps2_protocol +--enable_cursor_protocol + +--echo # successful load statement using bulk insert +CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY, + f2 INT NOT NULL)ENGINE=InnoDB; +SET unique_checks=0, foreign_key_checks=0; +--replace_result $MYSQLTEST_VARDIR VARDIR +eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2; +SELECT COUNT(*) FROM t2; +CHECK TABLE t2 EXTENDED; +DROP TABLE t2; + +CREATE TABLE t2(f1 INT NOT NULL, PRIMARY KEY(f1 DESC), + f2 INT NOT NULL)ENGINE=InnoDB; +--replace_result $MYSQLTEST_VARDIR VARDIR +eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2; +SELECT COUNT(*) FROM t2; +CHECK TABLE t2 EXTENDED; +DROP TABLE t2; + +--echo # load statement using bulk insert fails during secondary index +CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY, + f2 INT NOT NULL UNIQUE KEY)ENGINE=InnoDB; +--replace_result $MYSQLTEST_VARDIR VARDIR +--error ER_ERROR_DURING_COMMIT +eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2; +SELECT COUNT(*) FROM t2; +CHECK TABLE t2 EXTENDED; +DROP TABLE t2; + +--echo # load statement using bulk insert fails during primary index +CREATE TABLE t2(f1 INT NOT NULL, + f2 INT NOT NULL PRIMARY KEY)ENGINE=InnoDB; +--replace_result $MYSQLTEST_VARDIR VARDIR +--error ER_DUP_ENTRY +eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2; +SELECT COUNT(*) FROM t2; +CHECK TABLE t2 EXTENDED; +--remove_file $MYSQLTEST_VARDIR/tmp/t1.outfile +DROP TABLE t2, t1; diff --git a/sql/sql_load.cc b/sql/sql_load.cc index b51c9ca12eb..0d329f5e029 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -725,7 +725,15 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list, table->file->print_error(my_errno, MYF(0)); error= 1; } - table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + if (!error) + { + int err= table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + if (err == HA_ERR_FOUND_DUPP_KEY) + { + error= 1; + my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1); + } + } table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); table->next_number_field=0; } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index a97dcba1b35..e8279ad819a 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -15851,7 +15851,8 @@ ha_innobase::extra( /* Allow a subsequent INSERT into an empty table if !unique_checks && !foreign_key_checks. */ if (dberr_t err = trx->bulk_insert_apply()) { - return err; + return convert_error_code_to_mysql( + err, 0, trx->mysql_thd); } break; } diff --git a/storage/innobase/include/row0merge.h b/storage/innobase/include/row0merge.h index fc39a9d24d9..47961ab3146 100644 --- a/storage/innobase/include/row0merge.h +++ b/storage/innobase/include/row0merge.h @@ -447,12 +447,21 @@ class row_merge_bulk_t /** Block for encryption */ row_merge_block_t *m_crypt_block= nullptr; public: + /** If this is false, then there will be only one + bulk_insert_buffered() call for the primary key followed by + load_one_row() and row_ins_clust_index_entry() for subsequent rows. + For secondary indexes or for true, bulk_insert_buffered() will be + invoked for each row. */ + const bool m_sort_primary_key; /** Constructor. Create all merge files, merge buffer for all the table indexes expect fts indexes. Create a merge block which is used to write IO operation - @param table table which undergoes bulk insert operation */ - row_merge_bulk_t(dict_table_t *table); + @param table table which undergoes bulk insert operation + @param sort_primary_key Allow primary key sort for bulk + operation. In case of load, InnoDB skips the + primary key sorting */ + row_merge_bulk_t(dict_table_t *table, bool sort_primary_key); /** Destructor. Remove all merge files, merge buffer for all table indexes. */ @@ -498,4 +507,9 @@ public: /** Init temporary files for each index */ void init_tmp_file(); + + /** Load one row into the primary index + @param trx bulk transaction + @return error code */ + dberr_t load_one_row(trx_t *trx); }; diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 84973040b4c..4af53053b77 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -451,12 +451,13 @@ public: } /** Notify the start of a bulk insert operation - @param table table to do bulk operation */ - void start_bulk_insert(dict_table_t *table) + @param table table to do bulk operation + @param also_primary start bulk insert operation for primary index */ + void start_bulk_insert(dict_table_t *table, bool also_primary) { first|= BULK; if (!table->is_temporary()) - bulk_store= new row_merge_bulk_t(table); + bulk_store= new row_merge_bulk_t(table, also_primary); } /** Notify the end of a bulk insert operation */ @@ -511,6 +512,12 @@ public: return bulk_store && is_bulk_insert(); } + /** @return whether InnoDB has to skip sort for clustered index */ + bool skip_sort_pk() const + { + return bulk_store && !bulk_store->m_sort_primary_key; + } + /** Free bulk insert operation */ void clear_bulk_buffer() { @@ -1152,17 +1159,22 @@ public: return false; } - /** @return logical modification time of a table only - if the table has bulk buffer exist in the transaction */ - trx_mod_table_time_t *check_bulk_buffer(dict_table_t *table) + /** + @return logical modification time of a table + @retval nullptr if the table doesn't have bulk buffer or + can skip sorting for primary key */ + trx_mod_table_time_t *use_bulk_buffer(dict_index_t *index) noexcept { if (UNIV_LIKELY(!bulk_insert)) return nullptr; - ut_ad(table->skip_alter_undo || !check_unique_secondary); - ut_ad(table->skip_alter_undo || !check_foreigns); - auto it= mod_tables.find(table); + ut_ad(index->table->skip_alter_undo || !check_unique_secondary); + ut_ad(index->table->skip_alter_undo || !check_foreigns); + auto it= mod_tables.find(index->table); if (it == mod_tables.end() || !it->second.bulk_buffer_exist()) return nullptr; + /* Avoid using bulk buffer for load statement */ + if (index->is_clust() && it->second.skip_sort_pk()) + return nullptr; return &it->second; } diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc index 2d297ade4c2..4df909afb27 100644 --- a/storage/innobase/row/row0ins.cc +++ b/storage/innobase/row/row0ins.cc @@ -2811,7 +2811,7 @@ avoid_bulk: trx_start_if_not_started(trx, true); trx->bulk_insert = true; auto m = trx->mod_tables.emplace(index->table, 0); - m.first->second.start_bulk_insert(index->table); + m.first->second.start_bulk_insert(index->table, true); err = m.first->second.bulk_insert_buffered( *entry, *index, trx); goto err_exit; @@ -3430,7 +3430,12 @@ row_ins_index_entry( return(DB_LOCK_WAIT);}); if (index->is_btree()) { - if (auto t= trx->check_bulk_buffer(index->table)) { + /* If the InnoDB skips the sorting of primary + index for bulk insert operation then InnoDB + should have called load_one_row() for the + first insert statement and shouldn't use + buffer for consecutive insert statement */ + if (auto t= trx->use_bulk_buffer(index)) { /* MDEV-25036 FIXME: row_ins_check_foreign_constraint() check should be done before buffering the insert diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index 944ca66aeb3..fba8467f095 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -5053,7 +5053,9 @@ dberr_t row_merge_bulk_t::alloc_block() return DB_SUCCESS; } -row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table) +row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table, + bool sort_primary_key) + : m_sort_primary_key(sort_primary_key) { ulint n_index= 0; for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); @@ -5179,6 +5181,33 @@ dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no) return DB_SUCCESS; } +ATTRIBUTE_COLD +dberr_t row_merge_bulk_t::load_one_row(trx_t *trx) +{ + /* Load the single row into the clustered index. BtrBulk has + nothing to do for bulk insert here and used only as a interface + to insert single row. */ + dict_index_t *index= m_merge_buf[0].index; + BtrBulk btr_bulk(index, trx); + ut_ad(m_merge_buf[0].n_tuples == 1); + dberr_t err= row_merge_insert_index_tuples(index, index->table, + OS_FILE_CLOSED, nullptr, + &m_merge_buf[0], &btr_bulk, + 0, 0, 0, nullptr, + index->table->space_id, + nullptr, + m_blob_file.fd == OS_FILE_CLOSED + ? nullptr : &m_blob_file); + if (err != DB_SUCCESS) + trx->error_info= index; + else if (index->table->persistent_autoinc) + btr_write_autoinc(index, 1); + err= btr_bulk.finish(err); + if (err == DB_SUCCESS && index->is_clust()) + index->table->stat_n_rows= 1; + return err; +} + dberr_t row_merge_bulk_t::bulk_insert_buffered(const dtuple_t &row, const dict_index_t &ind, trx_t *trx) @@ -5254,6 +5283,8 @@ add_to_buf: } func_exit: + if (!m_sort_primary_key && ind.is_clust()) + err= load_one_row(trx); if (large_tuple_heap) mem_heap_free(large_tuple_heap); return err; @@ -5325,9 +5356,16 @@ func_exit: dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx) { - ulint i= 0; - for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); - index; index= UT_LIST_GET_NEXT(indexes, index)) + dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + ut_ad(index->is_clust()); + ulint i= !m_sort_primary_key; + if (i) + /* For clustered index, InnoDB does call load_one_row() while + buffering the first insert and uses row_ins_clust_index_entry() + for subsequent rows. So skip the clustered index while applying + the buffered insert operation */ + index= UT_LIST_GET_NEXT(indexes, index); + for (; index; index= UT_LIST_GET_NEXT(indexes, index)) { if (!index->is_btree()) continue; diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 302947af9a5..53acf06fae6 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -1865,7 +1865,9 @@ trx_undo_report_row_operation( } else if (index->table->is_temporary()) { } else if (trx_has_lock_x(*trx, *index->table) && index->table->bulk_trx_id == trx->id) { - m.first->second.start_bulk_insert(index->table); + m.first->second.start_bulk_insert( + index->table, + thd_sql_command(trx->mysql_thd) != SQLCOM_LOAD); if (dberr_t err = m.first->second.bulk_insert_buffered( *clust_entry, *index, trx)) {