1
0
mirror of https://github.com/MariaDB/server.git synced 2026-01-06 05:22:24 +03:00
Files
mariadb/sql/sql_load.cc
unknown 60272e750e WL#3146 "less locking in auto_increment":
this is a cleanup patch for our current auto_increment handling:
new names for auto_increment variables in THD, new methods to manipulate them
(see sql_class.h), some move into handler::, causing less backup/restore
work when executing substatements. 
This makes the logic hopefully clearer, less work is is needed in
mysql_insert().
By cleaning up, using different variables for different purposes (instead
of one for 3 things...), we fix those bugs, which someone may want to fix
in 5.0 too:
BUG#20339 "stored procedure using LAST_INSERT_ID() does not replicate
statement-based"
BUG#20341 "stored function inserting into one auto_increment puts bad
data in slave"
BUG#19243 "wrong LAST_INSERT_ID() after ON DUPLICATE KEY UPDATE"
(now if a row is updated, LAST_INSERT_ID() will return its id)
and re-fixes:
BUG#6880 "LAST_INSERT_ID() value changes during multi-row INSERT"
(already fixed differently by Ramil in 4.1)
Test of documented behaviour of mysql_insert_id() (there was no test).
The behaviour changes introduced are:
- LAST_INSERT_ID() now returns "the first autogenerated auto_increment value
successfully inserted", instead of "the first autogenerated auto_increment
value if any row was successfully inserted", see auto_increment.test.
Same for mysql_insert_id(), see mysql_client_test.c.
- LAST_INSERT_ID() returns the id of the updated row if ON DUPLICATE KEY
UPDATE, see auto_increment.test. Same for mysql_insert_id(), see
mysql_client_test.c.
- LAST_INSERT_ID() does not change if no autogenerated value was successfully 
inserted (it used to then be 0), see auto_increment.test.
- if in INSERT SELECT no autogenerated value was successfully inserted,
mysql_insert_id() now returns the id of the last inserted row (it already
did this for INSERT VALUES), see mysql_client_test.c.
- if INSERT SELECT uses LAST_INSERT_ID(X), mysql_insert_id() now returns X
(it already did this for INSERT VALUES), see mysql_client_test.c.
- NDB now behaves like other engines wrt SET INSERT_ID: with INSERT IGNORE,
the id passed in SET INSERT_ID is re-used until a row succeeds; SET INSERT_ID
influences not only the first row now.

Additionally, when unlocking a table we check that the thread is not keeping
a next_insert_id (as the table is unlocked that id is potentially out-of-date);
forgetting about this next_insert_id is done in a new
handler::ha_release_auto_increment().

Finally we prepare for engines capable of reserving finite-length intervals
of auto_increment values: we store such intervals in THD. The next step
(to be done by the replication team in 5.1) is to read those intervals from
THD and actually store them in the statement-based binary log. NDB
will be a good engine to test that.


mysql-test/extra/binlog_tests/binlog.test:
  Testing that if INSERT_ID is set to a value too big for the
  column's type, the binlogged INSERT_ID is the truncated value
  (important if slave has a column of a "wider" numeric type).
  Testing binlogging of INSERT_ID with INSERT DELAYED, to be sure that 
  we binlog an INSERT_ID event only for the delayed rows which use one.
mysql-test/extra/rpl_tests/rpl_insert_id.test:
  Testcase for BUG#20339 "stored procedure using
  LAST_INSERT_ID() does not replicate statement-based".
  Testcase for BUG#20341 "stored function inserting into one
  auto_increment puts bad data in slave".
mysql-test/extra/rpl_tests/rpl_loaddata.test:
  Test that LOAD DATA INFILE sets a value for a future LAST_INSERT_ID().
mysql-test/r/auto_increment.result:
  behaviour change: when INSERT totally fails (not even succeeds
  partially and then rolls back), don't change last_insert_id().
  Behaviour change: LAST_INSERT_ID() is now the first successfully inserted,
  autogenerated, id.
  Behaviour change: if INSERT ON DUPLICATE KEY UPDATE, if the table has auto_increment
  and a row is updated, then LAST_INSERT_ID() returns the id of this row.
mysql-test/r/binlog_row_binlog.result:
  result update
mysql-test/r/binlog_stm_binlog.result:
  result update
mysql-test/r/insert.result:
  result update
mysql-test/r/rpl_insert_id.result:
  result update
mysql-test/r/rpl_loaddata.result:
  result update
mysql-test/r/rpl_ndb_auto_inc.result:
  ndb's behaviour is now like other engines wrt SET INSERT_ID
  in a multi-row INSERT:
  - with INSERT IGNORE: the id passed in SET INSERT_ID is re-used until
  a row succeeds.
  - generally, SET INSERT_ID sets the first value and other values are
  simply computed from this first value, instead of previously where
  the 2nd and subsequent values where not influenced by SET INSERT_ID;
  this good change is due to the removal of "thd->next_insert_id=0"
  from ha_ndbcluster.
mysql-test/t/auto_increment.test:
  A testcase of BUG#19243: if ON DUPLICATE KEY UPDATE updates a row,
  LAST_INSERT_ID() now returns the id of the row.
  Test of new behaviour of last_insert_id() when no autogenerated value was
  inserted, or when only some autogenerated value (not the first of them) was
  inserted.
mysql-test/t/insert.test:
  testing INSERT IGNORE re-using generated values
sql/ha_federated.cc:
  update for new variables.
sql/ha_ndbcluster.cc:
  handler::auto_increment_column_changed not needed, equivalent to
  (insert_id_for_cur_row > 0).
  thd->next_insert_id=0 not needed anymore; it was used to force
  handler::update_auto_increment() to call ha_ndbcluster::get_auto_increment()
  for each row of a multi-row INSERT, now this happens naturally
  because NDB says "I have reserved you *one* value" in get_auto_increment(),
  so handler::update_auto_increment() calls again for next row.
sql/handler.cc:
  More comments, use of new methods and variables. Hopes to be clearer
  than current code.
  thd->prev_insert_id not in THD anymore: it is managed locally by inserters
  (like mysql_insert()).
  THD::clear_next_insert_id is now equivalent to
  handler::next_insert_id > 0.
  get_auto_increment() reserves an interval of values from the engine,
  uses this interval for next rows of the statement, until interval
  is exhausted then it asks for another interval (of a bigger size
  than the first one; size doubles until reaching 65535 then it stays constant).
  If doing statement-based binlogging, intervals are remembered in a list
  for storage in the binlog.
  For "forced" insert_id values (SET INSERT_ID or replication slave),
  forced_auto_inc_intervals is non-empty and the handler takes its intervals
  from there, without calling get_auto_increment().
  ha_release_auto_increment() resets the handler's auto_increment variables;
  it calls release_auto_increment() which is handler-dependent and
  serves to return to the engine any unused tail of the last used
  interval.
  If ending a statement, next_insert_id>0 means that autoinc values have been
  generated or taken from the master's binlog (in a replication slave) so
  we clear those values read from binlog, so that next top- or sub-
  statement does not use them.
sql/handler.h:
  handler::auto_increment_changed can be replaced by
  (handler::insert_id_for_cur_row > 0).
  THD::next_insert_id moves into handler (more natural, and prepares
  for the day when we'll support a single statement inserting into
  two tables - "multi-table INSERT" like we have UPDATE - will this
  happen?).
  This move makes the backup/restore of THD::next_insert_id when entering
  a substatement unneeded, as each substatement has its own handler
  objects.
sql/item_func.cc:
  new names for variables.
  For the setting of what mysql_insert_id() will return to the client,
  LAST_INSERT_ID(X) used to simply pretend that the generated autoinc
  value for the current row was X, but this led to having no reliable
  way to know the really generated value, so we now have a bool:
  thd->arg_of_last_insert_id_function which enables us to know that
  LAST_INSERT_ID(X) was called (and then X can be found in
  thd->first_successful_insert_id_in_prev_stmt).
sql/log.cc:
  new variable names for insert_ids. Removing some unused variables in the slow
  log.
sql/log_event.cc:
  new variable names, comments. Preparing for when master's won't binlog
  LAST_INSERT_ID if it was 0.
sql/set_var.cc:
  new variable names.
  The last change repeats how Bar fixed BUG#20392
  "INSERT_ID session variable has weird value" in 5.0.
sql/sql_class.cc:
  new variables for insert_id. In THD::cleanup_after_query() we fix
  BUG#20339 "stored procedure using LAST_INSERT_ID() does not replicate
  statement-based" (will one want to fix it in 5.0?). Many comments
  about what stored functions do to auto_increment.
  In reset|restore_sub_statement_state(), we need to backup less
  auto_inc variables as some of them have moved to the handler;
  we backup/restore those which are about the current top- or sub-
  statement, *not* those about the statement-based binlog
  (which evolve as the top- and sub-statement execute).
  Because we split THD::last_insert_id into 
  THD::first_successful_insert_id_in_prev_stmt and
  THD::auto_inc_intervals_for_binlog (among others), we fix
  BUG#20341 "stored function inserting into one auto_increment
  puts bad data in slave": indeed we can afford to not backup/restore
  THD::auto_inc_intervals_for_binlog (which fixes the bug) while still
  backing up / restoring THD::first_successful_insert_id_in_prev_stmt
  (ensuring that the top-level LAST_INSERT_ID() is not affected by INSERTs
  done by sub-statements, as is desirable and tested in rpl_insert_id.test).
sql/sql_class.h:
  new variables and methods for auto_increment.
  Some THD members move into handler (those which are really about
  the table being inserted), some stay in THD (those which are
  about what a future LAST_INSERT_ID() should return, or about
  what should be stored into the statement-based binlog).
  THD::next_insert_id moves to handler::.
  THD::clear_next_insert_id removed (had become equivalent
  to next_insert_id > 0).
  THD::last_insert_id becomes four:
  THD::first_successful_insert_id_in_cur_stmt,
  THD::auto_inc_intervals_for_binlog,
  handler::insert_id_for_cur_row,
  THD::first_successful_insert_id_in_prev_stmt.
  THD::current_insert_id becomes:
  THD::first_successful_insert_id_in_prev_stmt_for_binlog
  THD::prev_insert_id is removed, handler can just use
  handler::insert_id_for_cur_row instead (which is more accurate:
  for the first row, prev_insert_id was set before get_auto_increment
  was called, so was 0, causing a call to
  get_auto_increment() for the 2nd row if the 1st row fails;
  here we don't need the call as insert_id_for_cur_row has
  the value of the first row).
  THD::last_insert_id_used becomes: stmt_depends_on_first_row_in_prev_stmt
  THD::insert_id_used is removed (equivalent to
  auto_inc_intervals_for_binlog non empty).
  The interval returned by get_auto_increment() and currently being
  consumed is handler::auto_inc_interval_for_cur_row.
  Comments to explain each of them.
  select_insert::last_insert_id becomes autoinc_value_of_last_inserted_row.
sql/sql_insert.cc:
  the "id" variable is not changed for each row now; it used to compensate for
  this contradiction:
  - thd->last_insert_id supposed job was to keep the id of the first row
  - but it was updated for every row
  - so mysql_insert() made sure to catch its first value and restore it at the end of stmt.
  Now THD keeps the first value in first_successful_insert_id_in_cur_stmt,
  and value of the row in insert_id_for_cur_row. So "id" only serves to fill
  mysql_insert_id(), as depending on some conditions, "id" must be different
  values.
  Prev_insert_id moves from THD to write_record().
  We now set LAST_INSERT_ID() in ON DUPLICATE KEY UPDATE too (BUG#19243).
  In an INSERT DELAYED, we still "reset auto-increment caching" but differently
  (by calling ha_release_auto_increment()).
sql/sql_load.cc:
  no need to fiddle with "id", THD maintains
  THD::first_successful_insert_id_in_cur_stmt by itself and correctly now.
  ha_release_auto_increment() is now (logically) called before we unlock
  the table.
sql/sql_parse.cc:
  update to new variable names.
  Assertion that reset_thd_for_next_command() is not called for every
  substatement of a routine (I'm not against it, but if we do this change,
  statement-based binlogging needs some adjustments).
sql/sql_select.cc:
  update for new variable names
sql/sql_table.cc:
  next_insert_id not needed in mysql_alter_table(), THD manages.
sql/sql_update.cc:
  update for new variable names.
  Even though this is UPDATE, an insert id can be generated (by
  LAST_INSERT_ID(X)) and should be recorded because mysql_insert_id() wants
  to know about it.
sql/structs.h:
  A class for "discrete" intervals (intervals of integer numbers with a certain
  increment between them): Discrete_interval, and a class for a list of such
  intervals: Discrete_intervals_list
tests/mysql_client_test.c:
  tests of behaviour of mysql_insert_id(): there were no such tests, while in
  our manual we document its behaviour. In comments you'll notice the behaviour
  changes introduced (there are 5).
2006-07-09 17:52:19 +02:00

1265 lines
35 KiB
C++

/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* Copy data from a textfile to table */
#include "mysql_priv.h"
#include <my_dir.h>
#include <m_ctype.h>
#include "sql_repl.h"
#include "sp_head.h"
#include "sql_trigger.h"
class READ_INFO {
File file;
byte *buffer, /* Buffer for read text */
*end_of_buff; /* Data in bufferts ends here */
uint buff_length, /* Length of buffert */
max_length; /* Max length of row */
char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
uint field_term_length,line_term_length,enclosed_length;
int field_term_char,line_term_char,enclosed_char,escape_char;
int *stack,*stack_pos;
bool found_end_of_line,start_of_line,eof;
bool need_end_io_cache;
IO_CACHE cache;
NET *io_net;
public:
bool error,line_cuted,found_null,enclosed;
byte *row_start, /* Found row starts here */
*row_end; /* Found row ends here */
CHARSET_INFO *read_charset;
READ_INFO(File file,uint tot_length,CHARSET_INFO *cs,
String &field_term,String &line_start,String &line_term,
String &enclosed,int escape,bool get_it_from_net, bool is_fifo);
~READ_INFO();
int read_field();
int read_fixed_length(void);
int next_line(void);
char unescape(char chr);
int terminator(char *ptr,uint length);
bool find_start_of_fields();
/*
We need to force cache close before destructor is invoked to log
the last read block
*/
void end_io_cache()
{
::end_io_cache(&cache);
need_end_io_cache = 0;
}
/*
Either this method, or we need to make cache public
Arg must be set from mysql_load() since constructor does not see
either the table or THD value
*/
void set_io_cache_arg(void* arg) { cache.arg = arg; }
};
static int read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
List<Item> &fields_vars, List<Item> &set_fields,
List<Item> &set_values, READ_INFO &read_info,
ulong skip_lines,
bool ignore_check_option_errors);
static int read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
List<Item> &fields_vars, List<Item> &set_fields,
List<Item> &set_values, READ_INFO &read_info,
String &enclosed, ulong skip_lines,
bool ignore_check_option_errors);
static bool write_execute_load_query_log_event(THD *thd,
bool duplicates, bool ignore,
bool transactional_table);
/*
Execute LOAD DATA query
SYNOPSYS
mysql_load()
thd - current thread
ex - sql_exchange object representing source file and its parsing rules
table_list - list of tables to which we are loading data
fields_vars - list of fields and variables to which we read
data from file
set_fields - list of fields mentioned in set clause
set_values - expressions to assign to fields in previous list
handle_duplicates - indicates whenever we should emit error or
replace row if we will meet duplicates.
ignore - - indicates whenever we should ignore duplicates
read_file_from_client - is this LOAD DATA LOCAL ?
RETURN VALUES
TRUE - error / FALSE - success
*/
bool mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
List<Item> &fields_vars, List<Item> &set_fields,
List<Item> &set_values,
enum enum_duplicates handle_duplicates, bool ignore,
bool read_file_from_client)
{
char name[FN_REFLEN];
File file;
TABLE *table= NULL;
int error;
String *field_term=ex->field_term,*escaped=ex->escaped;
String *enclosed=ex->enclosed;
bool is_fifo=0;
#ifndef EMBEDDED_LIBRARY
LOAD_FILE_INFO lf_info;
#endif
char *db = table_list->db; // This is never null
/*
If path for file is not defined, we will use the current database.
If this is not set, we will use the directory where the table to be
loaded is located
*/
char *tdb= thd->db ? thd->db : db; // Result is never null
ulong skip_lines= ex->skip_lines;
bool transactional_table;
DBUG_ENTER("mysql_load");
#ifdef EMBEDDED_LIBRARY
read_file_from_client = 0; //server is always in the same process
#endif
if (escaped->length() > 1 || enclosed->length() > 1)
{
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
MYF(0));
DBUG_RETURN(TRUE);
}
/*
This needs to be done before external_lock
*/
ha_enable_transaction(thd, FALSE);
if (open_and_lock_tables(thd, table_list))
DBUG_RETURN(TRUE);
if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
&thd->lex->select_lex.top_join_list,
table_list,
&thd->lex->select_lex.leaf_tables, FALSE,
INSERT_ACL | UPDATE_ACL))
DBUG_RETURN(-1);
if (!table_list->table || // do not suport join view
!table_list->updatable || // and derived tables
check_key_in_view(thd, table_list))
{
my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "LOAD");
DBUG_RETURN(TRUE);
}
if (table_list->prepare_where(thd, 0, TRUE) ||
table_list->prepare_check_option(thd))
{
DBUG_RETURN(TRUE);
}
/*
Let us emit an error if we are loading data to table which is used
in subselect in SET clause like we do it for INSERT.
The main thing to fix to remove this restriction is to ensure that the
table is marked to be 'used for insert' in which case we should never
mark this table as 'const table' (ie, one that has only one row).
*/
if (unique_table(thd, table_list, table_list->next_global))
{
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
DBUG_RETURN(TRUE);
}
table= table_list->table;
transactional_table= table->file->has_transactions();
if (table->found_next_number_field)
table->mark_auto_increment_column();
if (!fields_vars.elements)
{
Field **field;
for (field=table->field; *field ; field++)
fields_vars.push_back(new Item_field(*field));
bitmap_set_all(table->write_set);
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
/*
Let us also prepare SET clause, altough it is probably empty
in this case.
*/
if (setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
DBUG_RETURN(TRUE);
}
else
{ // Part field list
/* TODO: use this conds for 'WITH CHECK OPTIONS' */
if (setup_fields(thd, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
setup_fields(thd, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
check_that_all_fields_are_given_values(thd, table, table_list))
DBUG_RETURN(TRUE);
/*
Check whenever TIMESTAMP field with auto-set feature specified
explicitly.
*/
if (table->timestamp_field)
{
if (bitmap_is_set(table->write_set,
table->timestamp_field->field_index))
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
else
{
bitmap_set_bit(table->write_set,
table->timestamp_field->field_index);
}
}
/* Fix the expressions in SET clause */
if (setup_fields(thd, 0, set_values, MARK_COLUMNS_READ, 0, 0))
DBUG_RETURN(TRUE);
}
uint tot_length=0;
bool use_blobs= 0, use_vars= 0;
List_iterator_fast<Item> it(fields_vars);
Item *item;
while ((item= it++))
{
if (item->type() == Item::FIELD_ITEM)
{
Field *field= ((Item_field*)item)->field;
if (field->flags & BLOB_FLAG)
{
use_blobs= 1;
tot_length+= 256; // Will be extended if needed
}
else
tot_length+= field->field_length;
}
else
use_vars= 1;
}
if (use_blobs && !ex->line_term->length() && !field_term->length())
{
my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
MYF(0));
DBUG_RETURN(TRUE);
}
if (use_vars && !field_term->length() && !enclosed->length())
{
my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
DBUG_RETURN(TRUE);
}
/* We can't give an error in the middle when using LOCAL files */
if (read_file_from_client && handle_duplicates == DUP_ERROR)
ignore= 1;
#ifndef EMBEDDED_LIBRARY
if (read_file_from_client)
{
(void)net_request_file(&thd->net,ex->file_name);
file = -1;
}
else
#endif
{
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
ex->file_name+=dirname_length(ex->file_name);
#endif
if (!dirname_length(ex->file_name))
{
strxnmov(name, FN_REFLEN-1, mysql_real_data_home, tdb, NullS);
(void) fn_format(name, ex->file_name, name, "",
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
}
else
{
(void) fn_format(name, ex->file_name, mysql_real_data_home, "",
MY_RELATIVE_PATH | MY_UNPACK_FILENAME);
#if !defined(__WIN__) && ! defined(__NETWARE__)
MY_STAT stat_info;
if (!my_stat(name,&stat_info,MYF(MY_WME)))
DBUG_RETURN(TRUE);
// if we are not in slave thread, the file must be:
if (!thd->slave_thread &&
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
((stat_info.st_mode & S_IFREG) == S_IFREG ||
(stat_info.st_mode & S_IFIFO) == S_IFIFO)))
{
my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), name);
DBUG_RETURN(TRUE);
}
if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
is_fifo = 1;
#endif
}
if ((file=my_open(name,O_RDONLY,MYF(MY_WME))) < 0)
DBUG_RETURN(TRUE);
}
COPY_INFO info;
bzero((char*) &info,sizeof(info));
info.ignore= ignore;
info.handle_duplicates=handle_duplicates;
info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
READ_INFO read_info(file,tot_length,thd->variables.collation_database,
*field_term,*ex->line_start, *ex->line_term, *enclosed,
info.escape_char, read_file_from_client, is_fifo);
if (read_info.error)
{
if (file >= 0)
my_close(file,MYF(0)); // no files in net reading
DBUG_RETURN(TRUE); // Can't allocate buffers
}
#ifndef EMBEDDED_LIBRARY
if (mysql_bin_log.is_open())
{
lf_info.thd = thd;
lf_info.wrote_create_file = 0;
lf_info.last_pos_in_file = HA_POS_ERROR;
lf_info.log_delayed= transactional_table;
read_info.set_io_cache_arg((void*) &lf_info);
}
#endif /*!EMBEDDED_LIBRARY*/
thd->count_cuted_fields= CHECK_FIELD_WARN; /* calc cuted fields */
thd->cuted_fields=0L;
/* Skip lines if there is a line terminator */
if (ex->line_term->length())
{
/* ex->skip_lines needs to be preserved for logging */
while (skip_lines > 0)
{
skip_lines--;
if (read_info.next_line())
break;
}
}
if (!(error=test(read_info.error)))
{
table->next_number_field=table->found_next_number_field;
if (ignore ||
handle_duplicates == DUP_REPLACE)
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
if (!thd->prelocked_mode)
table->file->ha_start_bulk_insert((ha_rows) 0);
table->copy_blobs=1;
thd->no_trans_update= 0;
thd->abort_on_warning= (!ignore &&
(thd->variables.sql_mode &
(MODE_STRICT_TRANS_TABLES |
MODE_STRICT_ALL_TABLES)));
if (!field_term->length() && !enclosed->length())
error= read_fixed_length(thd, info, table_list, fields_vars,
set_fields, set_values, read_info,
skip_lines, ignore);
else
error= read_sep_field(thd, info, table_list, fields_vars,
set_fields, set_values, read_info,
*enclosed, skip_lines, ignore);
if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
{
table->file->print_error(my_errno, MYF(0));
error= 1;
}
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
table->next_number_field=0;
}
ha_enable_transaction(thd, TRUE);
if (file >= 0)
my_close(file,MYF(0));
free_blobs(table); /* if pack_blob was used */
table->copy_blobs=0;
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
/*
We must invalidate the table in query cache before binlog writing and
ha_autocommit_...
*/
query_cache_invalidate3(thd, table_list, 0);
if (error)
{
if (transactional_table)
ha_autocommit_or_rollback(thd,error);
if (read_file_from_client)
while (!read_info.next_line())
;
#ifndef EMBEDDED_LIBRARY
if (mysql_bin_log.is_open())
{
{
/*
Make sure last block (the one which caused the error) gets
logged. This is needed because otherwise after write of (to
the binlog, not to read_info (which is a cache))
Delete_file_log_event the bad block will remain in read_info
(because pre_read is not called at the end of the last
block; remember pre_read is called whenever a new block is
read from disk). At the end of mysql_load(), the destructor
of read_info will call end_io_cache() which will flush
read_info, so we will finally have this in the binlog:
Append_block # The last successfull block
Delete_file
Append_block # The failing block
which is nonsense.
Or could also be (for a small file)
Create_file # The failing block
which is nonsense (Delete_file is not written in this case, because:
Create_file has not been written, so Delete_file is not written, then
when read_info is destroyed end_io_cache() is called which writes
Create_file.
*/
read_info.end_io_cache();
/* If the file was not empty, wrote_create_file is true */
if (lf_info.wrote_create_file)
{
if ((info.copied || info.deleted) && !transactional_table)
write_execute_load_query_log_event(thd, handle_duplicates,
ignore, transactional_table);
else
{
Delete_file_log_event d(thd, db, transactional_table);
d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
mysql_bin_log.write(&d);
}
}
}
}
#endif /*!EMBEDDED_LIBRARY*/
error= -1; // Error on read
goto err;
}
sprintf(name, ER(ER_LOAD_INFO), (ulong) info.records, (ulong) info.deleted,
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
send_ok(thd,info.copied+info.deleted,0L,name);
if (!transactional_table)
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
#ifndef EMBEDDED_LIBRARY
if (mysql_bin_log.is_open())
{
#ifdef HAVE_ROW_BASED_REPLICATION
/*
We need to do the job that is normally done inside
binlog_query() here, which is to ensure that the pending event
is written before tables are unlocked and before any other
events are written. We also need to update the table map
version for the binary log to mark that table maps are invalid
after this point.
*/
if (thd->current_stmt_binlog_row_based)
thd->binlog_flush_pending_rows_event(true);
else
#endif
{
/*
As already explained above, we need to call end_io_cache() or the last
block will be logged only after Execute_load_query_log_event (which is
wrong), when read_info is destroyed.
*/
read_info.end_io_cache();
if (lf_info.wrote_create_file)
{
write_execute_load_query_log_event(thd, handle_duplicates,
ignore, transactional_table);
}
}
}
#endif /*!EMBEDDED_LIBRARY*/
if (transactional_table)
error=ha_autocommit_or_rollback(thd,error);
err:
table->file->ha_release_auto_increment();
if (thd->lock)
{
mysql_unlock_tables(thd, thd->lock);
thd->lock=0;
}
thd->abort_on_warning= 0;
DBUG_RETURN(error);
}
/* Not a very useful function; just to avoid duplication of code */
static bool write_execute_load_query_log_event(THD *thd,
bool duplicates, bool ignore,
bool transactional_table)
{
Execute_load_query_log_event
e(thd, thd->query, thd->query_length,
(char*)thd->lex->fname_start - (char*)thd->query,
(char*)thd->lex->fname_end - (char*)thd->query,
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
transactional_table, FALSE);
e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
return mysql_bin_log.write(&e);
}
/****************************************************************************
** Read of rows of fixed size + optional garage + optonal newline
****************************************************************************/
static int
read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
List<Item> &fields_vars, List<Item> &set_fields,
List<Item> &set_values, READ_INFO &read_info,
ulong skip_lines, bool ignore_check_option_errors)
{
List_iterator_fast<Item> it(fields_vars);
Item_field *sql_field;
TABLE *table= table_list->table;
ulonglong id;
bool no_trans_update;
DBUG_ENTER("read_fixed_length");
id= 0;
while (!read_info.read_fixed_length())
{
if (thd->killed)
{
thd->send_kill_message();
DBUG_RETURN(1);
}
if (skip_lines)
{
/*
We could implement this with a simple seek if:
- We are not using DATA INFILE LOCAL
- escape character is ""
- line starting prefix is ""
*/
skip_lines--;
continue;
}
it.rewind();
byte *pos=read_info.row_start;
#ifdef HAVE_purify
read_info.row_end[0]=0;
#endif
no_trans_update= !table->file->has_transactions();
restore_record(table, s->default_values);
/*
There is no variables in fields_vars list in this format so
this conversion is safe.
*/
while ((sql_field= (Item_field*) it++))
{
Field *field= sql_field->field;
if (field == table->next_number_field)
table->auto_increment_field_not_null= TRUE;
/*
No fields specified in fields_vars list can be null in this format.
Mark field as not null, we should do this for each row because of
restore_record...
*/
field->set_notnull();
if (pos == read_info.row_end)
{
thd->cuted_fields++; /* Not enough fields */
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARN_TOO_FEW_RECORDS,
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
}
else
{
uint length;
byte save_chr;
if (field == table->next_number_field)
table->auto_increment_field_not_null= TRUE;
if ((length=(uint) (read_info.row_end-pos)) >
field->field_length)
length=field->field_length;
save_chr=pos[length]; pos[length]='\0'; // Safeguard aganst malloc
field->store((char*) pos,length,read_info.read_charset);
pos[length]=save_chr;
if ((pos+=length) > read_info.row_end)
pos= read_info.row_end; /* Fills rest with space */
}
}
if (pos != read_info.row_end)
{
thd->cuted_fields++; /* To long row */
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARN_TOO_MANY_RECORDS,
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
}
if (thd->killed ||
fill_record_n_invoke_before_triggers(thd, set_fields, set_values,
ignore_check_option_errors,
table->triggers,
TRG_EVENT_INSERT))
DBUG_RETURN(1);
switch (table_list->view_check_option(thd,
ignore_check_option_errors)) {
case VIEW_CHECK_SKIP:
read_info.next_line();
goto continue_loop;
case VIEW_CHECK_ERROR:
DBUG_RETURN(-1);
}
if (write_record(thd, table, &info))
DBUG_RETURN(1);
thd->no_trans_update= no_trans_update;
/*
We don't need to reset auto-increment field since we are restoring
its default value at the beginning of each loop iteration.
*/
if (read_info.next_line()) // Skip to next line
break;
if (read_info.line_cuted)
{
thd->cuted_fields++; /* To long row */
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARN_TOO_MANY_RECORDS,
ER(ER_WARN_TOO_MANY_RECORDS), thd->row_count);
}
thd->row_count++;
continue_loop:;
}
DBUG_RETURN(test(read_info.error));
}
static int
read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
List<Item> &fields_vars, List<Item> &set_fields,
List<Item> &set_values, READ_INFO &read_info,
String &enclosed, ulong skip_lines,
bool ignore_check_option_errors)
{
List_iterator_fast<Item> it(fields_vars);
Item *item;
TABLE *table= table_list->table;
uint enclosed_length;
ulonglong id;
bool no_trans_update;
DBUG_ENTER("read_sep_field");
enclosed_length=enclosed.length();
id= 0;
no_trans_update= !table->file->has_transactions();
for (;;it.rewind())
{
if (thd->killed)
{
thd->send_kill_message();
DBUG_RETURN(1);
}
restore_record(table, s->default_values);
while ((item= it++))
{
uint length;
byte *pos;
if (read_info.read_field())
break;
/* If this line is to be skipped we don't want to fill field or var */
if (skip_lines)
continue;
pos=read_info.row_start;
length=(uint) (read_info.row_end-pos);
if (!read_info.enclosed &&
(enclosed_length && length == 4 &&
!memcmp(pos, STRING_WITH_LEN("NULL"))) ||
(length == 1 && read_info.found_null))
{
if (item->type() == Item::FIELD_ITEM)
{
Field *field= ((Item_field *)item)->field;
field->reset();
field->set_null();
if (field == table->next_number_field)
table->auto_increment_field_not_null= TRUE;
if (!field->maybe_null())
{
if (field->type() == FIELD_TYPE_TIMESTAMP)
((Field_timestamp*) field)->set_time();
else if (field != table->next_number_field)
field->set_warning(MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARN_NULL_TO_NOTNULL, 1);
}
}
else
((Item_user_var_as_out_param *)item)->set_null_value(
read_info.read_charset);
continue;
}
if (item->type() == Item::FIELD_ITEM)
{
Field *field= ((Item_field *)item)->field;
field->set_notnull();
read_info.row_end[0]=0; // Safe to change end marker
if (field == table->next_number_field)
table->auto_increment_field_not_null= TRUE;
field->store((char*) pos, length, read_info.read_charset);
}
else
((Item_user_var_as_out_param *)item)->set_value((char*) pos, length,
read_info.read_charset);
}
if (read_info.error)
break;
if (skip_lines)
{
skip_lines--;
continue;
}
if (item)
{
/* Have not read any field, thus input file is simply ended */
if (item == fields_vars.head())
break;
for (; item ; item= it++)
{
if (item->type() == Item::FIELD_ITEM)
{
/*
QQ: We probably should not throw warning for each field.
But how about intention to always have the same number
of warnings in THD::cuted_fields (and get rid of cuted_fields
in the end ?)
*/
thd->cuted_fields++;
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARN_TOO_FEW_RECORDS,
ER(ER_WARN_TOO_FEW_RECORDS), thd->row_count);
}
else
((Item_user_var_as_out_param *)item)->set_null_value(
read_info.read_charset);
}
}
if (thd->killed ||
fill_record_n_invoke_before_triggers(thd, set_fields, set_values,
ignore_check_option_errors,
table->triggers,
TRG_EVENT_INSERT))
DBUG_RETURN(1);
switch (table_list->view_check_option(thd,
ignore_check_option_errors)) {
case VIEW_CHECK_SKIP:
read_info.next_line();
goto continue_loop;
case VIEW_CHECK_ERROR:
DBUG_RETURN(-1);
}
if (write_record(thd, table, &info))
DBUG_RETURN(1);
/*
We don't need to reset auto-increment field since we are restoring
its default value at the beginning of each loop iteration.
*/
thd->no_trans_update= no_trans_update;
if (read_info.next_line()) // Skip to next line
break;
if (read_info.line_cuted)
{
thd->cuted_fields++; /* To long row */
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
thd->row_count);
if (thd->killed)
DBUG_RETURN(1);
}
thd->row_count++;
continue_loop:;
}
DBUG_RETURN(test(read_info.error));
}
/* Unescape all escape characters, mark \N as null */
char
READ_INFO::unescape(char chr)
{
switch(chr) {
case 'n': return '\n';
case 't': return '\t';
case 'r': return '\r';
case 'b': return '\b';
case '0': return 0; // Ascii null
case 'Z': return '\032'; // Win32 end of file
case 'N': found_null=1;
/* fall through */
default: return chr;
}
}
/*
Read a line using buffering
If last line is empty (in line mode) then it isn't outputed
*/
READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
String &field_term, String &line_start, String &line_term,
String &enclosed_par, int escape, bool get_it_from_net,
bool is_fifo)
:file(file_par),escape_char(escape)
{
read_charset= cs;
field_term_ptr=(char*) field_term.ptr();
field_term_length= field_term.length();
line_term_ptr=(char*) line_term.ptr();
line_term_length= line_term.length();
if (line_start.length() == 0)
{
line_start_ptr=0;
start_of_line= 0;
}
else
{
line_start_ptr=(char*) line_start.ptr();
line_start_end=line_start_ptr+line_start.length();
start_of_line= 1;
}
/* If field_terminator == line_terminator, don't use line_terminator */
if (field_term_length == line_term_length &&
!memcmp(field_term_ptr,line_term_ptr,field_term_length))
{
line_term_length=0;
line_term_ptr=(char*) "";
}
enclosed_char= (enclosed_length=enclosed_par.length()) ?
(uchar) enclosed_par[0] : INT_MAX;
field_term_char= field_term_length ? (uchar) field_term_ptr[0] : INT_MAX;
line_term_char= line_term_length ? (uchar) line_term_ptr[0] : INT_MAX;
error=eof=found_end_of_line=found_null=line_cuted=0;
buff_length=tot_length;
/* Set of a stack for unget if long terminators */
uint length=max(field_term_length,line_term_length)+1;
set_if_bigger(length,line_start.length());
stack=stack_pos=(int*) sql_alloc(sizeof(int)*length);
if (!(buffer=(byte*) my_malloc(buff_length+1,MYF(0))))
error=1; /* purecov: inspected */
else
{
end_of_buff=buffer+buff_length;
if (init_io_cache(&cache,(get_it_from_net) ? -1 : file, 0,
(get_it_from_net) ? READ_NET :
(is_fifo ? READ_FIFO : READ_CACHE),0L,1,
MYF(MY_WME)))
{
my_free((gptr) buffer,MYF(0)); /* purecov: inspected */
error=1;
}
else
{
/*
init_io_cache() will not initialize read_function member
if the cache is READ_NET. So we work around the problem with a
manual assignment
*/
need_end_io_cache = 1;
#ifndef EMBEDDED_LIBRARY
if (get_it_from_net)
cache.read_function = _my_b_net_read;
if (mysql_bin_log.is_open())
cache.pre_read = cache.pre_close =
(IO_CACHE_CALLBACK) log_loaded_block;
#endif
}
}
}
READ_INFO::~READ_INFO()
{
if (!error)
{
if (need_end_io_cache)
::end_io_cache(&cache);
my_free((gptr) buffer,MYF(0));
error=1;
}
}
#define GET (stack_pos != stack ? *--stack_pos : my_b_get(&cache))
#define PUSH(A) *(stack_pos++)=(A)
inline int READ_INFO::terminator(char *ptr,uint length)
{
int chr=0; // Keep gcc happy
uint i;
for (i=1 ; i < length ; i++)
{
if ((chr=GET) != *++ptr)
{
break;
}
}
if (i == length)
return 1;
PUSH(chr);
while (i-- > 1)
PUSH((uchar) *--ptr);
return 0;
}
int READ_INFO::read_field()
{
int chr,found_enclosed_char;
byte *to,*new_buffer;
found_null=0;
if (found_end_of_line)
return 1; // One have to call next_line
/* Skip until we find 'line_start' */
if (start_of_line)
{ // Skip until line_start
start_of_line=0;
if (find_start_of_fields())
return 1;
}
if ((chr=GET) == my_b_EOF)
{
found_end_of_line=eof=1;
return 1;
}
to=buffer;
if (chr == enclosed_char)
{
found_enclosed_char=enclosed_char;
*to++=(byte) chr; // If error
}
else
{
found_enclosed_char= INT_MAX;
PUSH(chr);
}
for (;;)
{
while ( to < end_of_buff)
{
chr = GET;
#ifdef USE_MB
if ((my_mbcharlen(read_charset, chr) > 1) &&
to+my_mbcharlen(read_charset, chr) <= end_of_buff)
{
uchar* p = (uchar*)to;
*to++ = chr;
int ml = my_mbcharlen(read_charset, chr);
int i;
for (i=1; i<ml; i++) {
chr = GET;
if (chr == my_b_EOF)
goto found_eof;
*to++ = chr;
}
if (my_ismbchar(read_charset,
(const char *)p,
(const char *)to))
continue;
for (i=0; i<ml; i++)
PUSH((uchar) *--to);
chr = GET;
}
#endif
if (chr == my_b_EOF)
goto found_eof;
if (chr == escape_char)
{
if ((chr=GET) == my_b_EOF)
{
*to++= (byte) escape_char;
goto found_eof;
}
/*
When escape_char == enclosed_char, we treat it like we do for
handling quotes in SQL parsing -- you can double-up the
escape_char to include it literally, but it doesn't do escapes
like \n. This allows: LOAD DATA ... ENCLOSED BY '"' ESCAPED BY '"'
with data like: "fie""ld1", "field2"
*/
if (escape_char != enclosed_char || chr == escape_char)
{
*to++ = (byte) unescape((char) chr);
continue;
}
PUSH(chr);
chr= escape_char;
}
#ifdef ALLOW_LINESEPARATOR_IN_STRINGS
if (chr == line_term_char)
#else
if (chr == line_term_char && found_enclosed_char == INT_MAX)
#endif
{
if (terminator(line_term_ptr,line_term_length))
{ // Maybe unexpected linefeed
enclosed=0;
found_end_of_line=1;
row_start=buffer;
row_end= to;
return 0;
}
}
if (chr == found_enclosed_char)
{
if ((chr=GET) == found_enclosed_char)
{ // Remove dupplicated
*to++ = (byte) chr;
continue;
}
// End of enclosed field if followed by field_term or line_term
if (chr == my_b_EOF ||
chr == line_term_char && terminator(line_term_ptr,
line_term_length))
{ // Maybe unexpected linefeed
enclosed=1;
found_end_of_line=1;
row_start=buffer+1;
row_end= to;
return 0;
}
if (chr == field_term_char &&
terminator(field_term_ptr,field_term_length))
{
enclosed=1;
row_start=buffer+1;
row_end= to;
return 0;
}
/*
The string didn't terminate yet.
Store back next character for the loop
*/
PUSH(chr);
/* copy the found term character to 'to' */
chr= found_enclosed_char;
}
else if (chr == field_term_char && found_enclosed_char == INT_MAX)
{
if (terminator(field_term_ptr,field_term_length))
{
enclosed=0;
row_start=buffer;
row_end= to;
return 0;
}
}
*to++ = (byte) chr;
}
/*
** We come here if buffer is too small. Enlarge it and continue
*/
if (!(new_buffer=(byte*) my_realloc((char*) buffer,buff_length+1+IO_SIZE,
MYF(MY_WME))))
return (error=1);
to=new_buffer + (to-buffer);
buffer=new_buffer;
buff_length+=IO_SIZE;
end_of_buff=buffer+buff_length;
}
found_eof:
enclosed=0;
found_end_of_line=eof=1;
row_start=buffer;
row_end=to;
return 0;
}
/*
Read a row with fixed length.
NOTES
The row may not be fixed size on disk if there are escape
characters in the file.
IMPLEMENTATION NOTE
One can't use fixed length with multi-byte charset **
RETURN
0 ok
1 error
*/
int READ_INFO::read_fixed_length()
{
int chr;
byte *to;
if (found_end_of_line)
return 1; // One have to call next_line
if (start_of_line)
{ // Skip until line_start
start_of_line=0;
if (find_start_of_fields())
return 1;
}
to=row_start=buffer;
while (to < end_of_buff)
{
if ((chr=GET) == my_b_EOF)
goto found_eof;
if (chr == escape_char)
{
if ((chr=GET) == my_b_EOF)
{
*to++= (byte) escape_char;
goto found_eof;
}
*to++ =(byte) unescape((char) chr);
continue;
}
if (chr == line_term_char)
{
if (terminator(line_term_ptr,line_term_length))
{ // Maybe unexpected linefeed
found_end_of_line=1;
row_end= to;
return 0;
}
}
*to++ = (byte) chr;
}
row_end=to; // Found full line
return 0;
found_eof:
found_end_of_line=eof=1;
row_start=buffer;
row_end=to;
return to == buffer ? 1 : 0;
}
int READ_INFO::next_line()
{
line_cuted=0;
start_of_line= line_start_ptr != 0;
if (found_end_of_line || eof)
{
found_end_of_line=0;
return eof;
}
found_end_of_line=0;
if (!line_term_length)
return 0; // No lines
for (;;)
{
int chr = GET;
#ifdef USE_MB
if (my_mbcharlen(read_charset, chr) > 1)
{
for (int i=1;
chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
i++)
chr = GET;
if (chr == escape_char)
continue;
}
#endif
if (chr == my_b_EOF)
{
eof=1;
return 1;
}
if (chr == escape_char)
{
line_cuted=1;
if (GET == my_b_EOF)
return 1;
continue;
}
if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
return 0;
line_cuted=1;
}
}
bool READ_INFO::find_start_of_fields()
{
int chr;
try_again:
do
{
if ((chr=GET) == my_b_EOF)
{
found_end_of_line=eof=1;
return 1;
}
} while ((char) chr != line_start_ptr[0]);
for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
{
chr=GET; // Eof will be checked later
if ((char) chr != *ptr)
{ // Can't be line_start
PUSH(chr);
while (--ptr != line_start_ptr)
{ // Restart with next char
PUSH((uchar) *ptr);
}
goto try_again;
}
}
return 0;
}