1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34703 LOAD DATA INFILE using Innodb bulk load aborts

problem:
=======
- During load statement, InnoDB bulk operation relies on temporary
directory and it got crash when tmpdir is exhausted.

Solution:
========
During bulk insert, LOAD statement is building the clustered
index one record at a time instead of page. By doing this,
InnoDB does the following
1) Avoids creation of temporary file for clustered index.
2) Writes the undo log for first insert operation alone
This commit is contained in:
Thirunarayanan Balathandayuthapani
2025-01-15 18:30:03 +05:30
parent f4e999d753
commit 2d42e9ff7d
10 changed files with 203 additions and 20 deletions

View File

@@ -0,0 +1,50 @@
CREATE TABLE t1(f1 INT NOT NULL,f2 INT NOT NULL)ENGINE=InnoDB;
INSERT INTO t1 SELECT seq, seq from seq_1_to_131072;
INSERT INTO t1 VALUES(131073, 131073), (131074, 131073);
SELECT * INTO OUTFILE "VARDIR/tmp/t1.outfile" FROM t1;
# successful load statement using bulk insert
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL)ENGINE=InnoDB;
SET unique_checks=0, foreign_key_checks=0;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
COUNT(*)
131074
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2;
CREATE TABLE t2(f1 INT NOT NULL, PRIMARY KEY(f1 DESC),
f2 INT NOT NULL)ENGINE=InnoDB;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
COUNT(*)
131074
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2;
# load statement using bulk insert fails during secondary index
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL UNIQUE KEY)ENGINE=InnoDB;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
ERROR HY000: Got error 1 "Operation not permitted" during COMMIT
SELECT COUNT(*) FROM t2;
COUNT(*)
0
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2;
# load statement using bulk insert fails during primary index
CREATE TABLE t2(f1 INT NOT NULL,
f2 INT NOT NULL PRIMARY KEY)ENGINE=InnoDB;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
ERROR 23000: Duplicate entry '131073' for key 'PRIMARY'
SELECT COUNT(*) FROM t2;
COUNT(*)
0
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2, t1;

View File

@@ -0,0 +1 @@
--innodb_sort_buffer_size=65536

View File

@@ -0,0 +1,52 @@
--source include/have_innodb.inc
--source include/have_sequence.inc
--source include/big_test.inc
CREATE TABLE t1(f1 INT NOT NULL,f2 INT NOT NULL)ENGINE=InnoDB;
INSERT INTO t1 SELECT seq, seq from seq_1_to_131072;
INSERT INTO t1 VALUES(131073, 131073), (131074, 131073);
--replace_result $MYSQLTEST_VARDIR VARDIR
--disable_cursor_protocol
--disable_ps2_protocol
eval SELECT * INTO OUTFILE "$MYSQLTEST_VARDIR/tmp/t1.outfile" FROM t1;
--enable_ps2_protocol
--enable_cursor_protocol
--echo # successful load statement using bulk insert
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL)ENGINE=InnoDB;
SET unique_checks=0, foreign_key_checks=0;
--replace_result $MYSQLTEST_VARDIR VARDIR
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
DROP TABLE t2;
CREATE TABLE t2(f1 INT NOT NULL, PRIMARY KEY(f1 DESC),
f2 INT NOT NULL)ENGINE=InnoDB;
--replace_result $MYSQLTEST_VARDIR VARDIR
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
DROP TABLE t2;
--echo # load statement using bulk insert fails during secondary index
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL UNIQUE KEY)ENGINE=InnoDB;
--replace_result $MYSQLTEST_VARDIR VARDIR
--error ER_ERROR_DURING_COMMIT
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
DROP TABLE t2;
--echo # load statement using bulk insert fails during primary index
CREATE TABLE t2(f1 INT NOT NULL,
f2 INT NOT NULL PRIMARY KEY)ENGINE=InnoDB;
--replace_result $MYSQLTEST_VARDIR VARDIR
--error ER_DUP_ENTRY
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
--remove_file $MYSQLTEST_VARDIR/tmp/t1.outfile
DROP TABLE t2, t1;

View File

@@ -725,7 +725,15 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
table->file->print_error(my_errno, MYF(0)); table->file->print_error(my_errno, MYF(0));
error= 1; error= 1;
} }
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); if (!error)
{
int err= table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
if (err == HA_ERR_FOUND_DUPP_KEY)
{
error= 1;
my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1);
}
}
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
table->next_number_field=0; table->next_number_field=0;
} }

View File

@@ -15851,7 +15851,8 @@ ha_innobase::extra(
/* Allow a subsequent INSERT into an empty table /* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */ if !unique_checks && !foreign_key_checks. */
if (dberr_t err = trx->bulk_insert_apply()) { if (dberr_t err = trx->bulk_insert_apply()) {
return err; return convert_error_code_to_mysql(
err, 0, trx->mysql_thd);
} }
break; break;
} }

View File

@@ -447,12 +447,21 @@ class row_merge_bulk_t
/** Block for encryption */ /** Block for encryption */
row_merge_block_t *m_crypt_block= nullptr; row_merge_block_t *m_crypt_block= nullptr;
public: public:
/** If this is false, then there will be only one
bulk_insert_buffered() call for the primary key followed by
load_one_row() and row_ins_clust_index_entry() for subsequent rows.
For secondary indexes or for true, bulk_insert_buffered() will be
invoked for each row. */
const bool m_sort_primary_key;
/** Constructor. /** Constructor.
Create all merge files, merge buffer for all the table indexes Create all merge files, merge buffer for all the table indexes
expect fts indexes. expect fts indexes.
Create a merge block which is used to write IO operation Create a merge block which is used to write IO operation
@param table table which undergoes bulk insert operation */ @param table table which undergoes bulk insert operation
row_merge_bulk_t(dict_table_t *table); @param sort_primary_key Allow primary key sort for bulk
operation. In case of load, InnoDB skips the
primary key sorting */
row_merge_bulk_t(dict_table_t *table, bool sort_primary_key);
/** Destructor. /** Destructor.
Remove all merge files, merge buffer for all table indexes. */ Remove all merge files, merge buffer for all table indexes. */
@@ -498,4 +507,9 @@ public:
/** Init temporary files for each index */ /** Init temporary files for each index */
void init_tmp_file(); void init_tmp_file();
/** Load one row into the primary index
@param trx bulk transaction
@return error code */
dberr_t load_one_row(trx_t *trx);
}; };

View File

@@ -451,12 +451,13 @@ public:
} }
/** Notify the start of a bulk insert operation /** Notify the start of a bulk insert operation
@param table table to do bulk operation */ @param table table to do bulk operation
void start_bulk_insert(dict_table_t *table) @param also_primary start bulk insert operation for primary index */
void start_bulk_insert(dict_table_t *table, bool also_primary)
{ {
first|= BULK; first|= BULK;
if (!table->is_temporary()) if (!table->is_temporary())
bulk_store= new row_merge_bulk_t(table); bulk_store= new row_merge_bulk_t(table, also_primary);
} }
/** Notify the end of a bulk insert operation */ /** Notify the end of a bulk insert operation */
@@ -511,6 +512,12 @@ public:
return bulk_store && is_bulk_insert(); return bulk_store && is_bulk_insert();
} }
/** @return whether InnoDB has to skip sort for clustered index */
bool skip_sort_pk() const
{
return bulk_store && !bulk_store->m_sort_primary_key;
}
/** Free bulk insert operation */ /** Free bulk insert operation */
void clear_bulk_buffer() void clear_bulk_buffer()
{ {
@@ -1152,17 +1159,22 @@ public:
return false; return false;
} }
/** @return logical modification time of a table only /**
if the table has bulk buffer exist in the transaction */ @return logical modification time of a table
trx_mod_table_time_t *check_bulk_buffer(dict_table_t *table) @retval nullptr if the table doesn't have bulk buffer or
can skip sorting for primary key */
trx_mod_table_time_t *use_bulk_buffer(dict_index_t *index) noexcept
{ {
if (UNIV_LIKELY(!bulk_insert)) if (UNIV_LIKELY(!bulk_insert))
return nullptr; return nullptr;
ut_ad(table->skip_alter_undo || !check_unique_secondary); ut_ad(index->table->skip_alter_undo || !check_unique_secondary);
ut_ad(table->skip_alter_undo || !check_foreigns); ut_ad(index->table->skip_alter_undo || !check_foreigns);
auto it= mod_tables.find(table); auto it= mod_tables.find(index->table);
if (it == mod_tables.end() || !it->second.bulk_buffer_exist()) if (it == mod_tables.end() || !it->second.bulk_buffer_exist())
return nullptr; return nullptr;
/* Avoid using bulk buffer for load statement */
if (index->is_clust() && it->second.skip_sort_pk())
return nullptr;
return &it->second; return &it->second;
} }

View File

@@ -2811,7 +2811,7 @@ avoid_bulk:
trx_start_if_not_started(trx, true); trx_start_if_not_started(trx, true);
trx->bulk_insert = true; trx->bulk_insert = true;
auto m = trx->mod_tables.emplace(index->table, 0); auto m = trx->mod_tables.emplace(index->table, 0);
m.first->second.start_bulk_insert(index->table); m.first->second.start_bulk_insert(index->table, true);
err = m.first->second.bulk_insert_buffered( err = m.first->second.bulk_insert_buffered(
*entry, *index, trx); *entry, *index, trx);
goto err_exit; goto err_exit;
@@ -3430,7 +3430,12 @@ row_ins_index_entry(
return(DB_LOCK_WAIT);}); return(DB_LOCK_WAIT);});
if (index->is_btree()) { if (index->is_btree()) {
if (auto t= trx->check_bulk_buffer(index->table)) { /* If the InnoDB skips the sorting of primary
index for bulk insert operation then InnoDB
should have called load_one_row() for the
first insert statement and shouldn't use
buffer for consecutive insert statement */
if (auto t= trx->use_bulk_buffer(index)) {
/* MDEV-25036 FIXME: /* MDEV-25036 FIXME:
row_ins_check_foreign_constraint() check row_ins_check_foreign_constraint() check
should be done before buffering the insert should be done before buffering the insert

View File

@@ -5053,7 +5053,9 @@ dberr_t row_merge_bulk_t::alloc_block()
return DB_SUCCESS; return DB_SUCCESS;
} }
row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table) row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table,
bool sort_primary_key)
: m_sort_primary_key(sort_primary_key)
{ {
ulint n_index= 0; ulint n_index= 0;
for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
@@ -5179,6 +5181,33 @@ dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no)
return DB_SUCCESS; return DB_SUCCESS;
} }
ATTRIBUTE_COLD
dberr_t row_merge_bulk_t::load_one_row(trx_t *trx)
{
/* Load the single row into the clustered index. BtrBulk has
nothing to do for bulk insert here and used only as a interface
to insert single row. */
dict_index_t *index= m_merge_buf[0].index;
BtrBulk btr_bulk(index, trx);
ut_ad(m_merge_buf[0].n_tuples == 1);
dberr_t err= row_merge_insert_index_tuples(index, index->table,
OS_FILE_CLOSED, nullptr,
&m_merge_buf[0], &btr_bulk,
0, 0, 0, nullptr,
index->table->space_id,
nullptr,
m_blob_file.fd == OS_FILE_CLOSED
? nullptr : &m_blob_file);
if (err != DB_SUCCESS)
trx->error_info= index;
else if (index->table->persistent_autoinc)
btr_write_autoinc(index, 1);
err= btr_bulk.finish(err);
if (err == DB_SUCCESS && index->is_clust())
index->table->stat_n_rows= 1;
return err;
}
dberr_t row_merge_bulk_t::bulk_insert_buffered(const dtuple_t &row, dberr_t row_merge_bulk_t::bulk_insert_buffered(const dtuple_t &row,
const dict_index_t &ind, const dict_index_t &ind,
trx_t *trx) trx_t *trx)
@@ -5254,6 +5283,8 @@ add_to_buf:
} }
func_exit: func_exit:
if (!m_sort_primary_key && ind.is_clust())
err= load_one_row(trx);
if (large_tuple_heap) if (large_tuple_heap)
mem_heap_free(large_tuple_heap); mem_heap_free(large_tuple_heap);
return err; return err;
@@ -5325,9 +5356,16 @@ func_exit:
dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx) dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx)
{ {
ulint i= 0; dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); ut_ad(index->is_clust());
index; index= UT_LIST_GET_NEXT(indexes, index)) ulint i= !m_sort_primary_key;
if (i)
/* For clustered index, InnoDB does call load_one_row() while
buffering the first insert and uses row_ins_clust_index_entry()
for subsequent rows. So skip the clustered index while applying
the buffered insert operation */
index= UT_LIST_GET_NEXT(indexes, index);
for (; index; index= UT_LIST_GET_NEXT(indexes, index))
{ {
if (!index->is_btree()) if (!index->is_btree())
continue; continue;

View File

@@ -1865,7 +1865,9 @@ trx_undo_report_row_operation(
} else if (index->table->is_temporary()) { } else if (index->table->is_temporary()) {
} else if (trx_has_lock_x(*trx, *index->table) } else if (trx_has_lock_x(*trx, *index->table)
&& index->table->bulk_trx_id == trx->id) { && index->table->bulk_trx_id == trx->id) {
m.first->second.start_bulk_insert(index->table); m.first->second.start_bulk_insert(
index->table,
thd_sql_command(trx->mysql_thd) != SQLCOM_LOAD);
if (dberr_t err = m.first->second.bulk_insert_buffered( if (dberr_t err = m.first->second.bulk_insert_buffered(
*clust_entry, *index, trx)) { *clust_entry, *index, trx)) {