1
0
mirror of https://github.com/MariaDB/server.git synced 2025-12-24 11:21:21 +03:00
Bug #20662 "Infinite loop in CREATE TABLE IF NOT EXISTS ... SELECT
              with locked tables"
  Bug #20903 "Crash when using CREATE TABLE .. SELECT and triggers"
  Bug #24738 "CREATE TABLE ... SELECT is not isolated properly"
  Bug #24508 "Inconsistent results of CREATE TABLE ... SELECT when
              temporary table exists"

Deadlock occured when one tried to execute CREATE TABLE IF NOT
EXISTS ... SELECT statement under LOCK TABLES which held
read lock on target table.
Attempt to execute the same statement for already existing
target table with triggers caused server crashes.
Also concurrent execution of CREATE TABLE ... SELECT statement
and other statements involving target table suffered from
various races (some of which might've led to deadlocks).
Finally, attempt to execute CREATE TABLE ... SELECT in case
when a temporary table with same name was already present
led to the insertion of data into this temporary table and
creation of empty non-temporary table.
 
All above problems stemmed from the old implementation of CREATE
TABLE ... SELECT in which we created, opened and locked target
table without any special protection in a separate step and not
with the rest of tables used by this statement.
This underminded deadlock-avoidance approach used in server
and created window for races. It also excluded target table
from prelocking causing problems with trigger execution.

The patch solves these problems by implementing new approach to
handling of CREATE TABLE ... SELECT for base tables.
We try to open and lock table to be created at the same time as
the rest of tables used by this statement. If such table does not
exist at this moment we create and place in the table cache special
placeholder for it which prevents its creation or any other usage
by other threads.
We still use old approach for creation of temporary tables.

Note that we have separate fix for 5.0 since there we use slightly
different less intrusive approach.
This commit is contained in:
dlenev@mockturtle.local
2007-05-11 21:51:03 +04:00
parent 770219c15a
commit 4cafc8eeec
17 changed files with 1465 additions and 245 deletions

View File

@@ -3196,7 +3196,7 @@ static HA_CREATE_INFO *copy_create_info(HA_CREATE_INFO *lex_create_info)
Create a table
SYNOPSIS
mysql_create_table_internal()
mysql_create_table_no_lock()
thd Thread object
db Database
table_name Table name
@@ -3213,6 +3213,11 @@ static HA_CREATE_INFO *copy_create_info(HA_CREATE_INFO *lex_create_info)
DESCRIPTION
If one creates a temporary table, this is automatically opened
Note that this function assumes that caller already have taken
name-lock on table being created or used some other way to ensure
that concurrent operations won't intervene. mysql_create_table()
is a wrapper that can be used for this.
no_log is needed for the case of CREATE ... SELECT,
as the logging will be done later in sql_insert.cc
select_field_count is also used for CREATE ... SELECT,
@@ -3223,7 +3228,7 @@ static HA_CREATE_INFO *copy_create_info(HA_CREATE_INFO *lex_create_info)
TRUE error
*/
bool mysql_create_table_internal(THD *thd,
bool mysql_create_table_no_lock(THD *thd,
const char *db, const char *table_name,
HA_CREATE_INFO *lex_create_info,
List<create_field> &fields,
@@ -3239,7 +3244,7 @@ bool mysql_create_table_internal(THD *thd,
HA_CREATE_INFO *create_info;
handler *file;
bool error= TRUE;
DBUG_ENTER("mysql_create_table_internal");
DBUG_ENTER("mysql_create_table_no_lock");
DBUG_PRINT("enter", ("db: '%s' table: '%s' tmp: %d",
db, table_name, internal_tmp_table));
@@ -3581,7 +3586,7 @@ warn:
/*
Database locking aware wrapper for mysql_create_table_internal(),
Database and name-locking aware wrapper for mysql_create_table_no_lock(),
*/
bool mysql_create_table(THD *thd, const char *db, const char *table_name,
@@ -3591,6 +3596,7 @@ bool mysql_create_table(THD *thd, const char *db, const char *table_name,
uint select_field_count,
bool use_copy_create_info)
{
TABLE *name_lock= 0;
bool result;
DBUG_ENTER("mysql_create_table");
@@ -3611,11 +3617,44 @@ bool mysql_create_table(THD *thd, const char *db, const char *table_name,
creating_table++;
pthread_mutex_unlock(&LOCK_lock_db);
result= mysql_create_table_internal(thd, db, table_name, create_info,
fields, keys, internal_tmp_table,
select_field_count,
use_copy_create_info);
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
{
if (lock_table_name_if_not_cached(thd, db, table_name, &name_lock))
{
result= TRUE;
goto unlock;
}
if (!name_lock)
{
if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
{
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
table_name);
create_info->table_existed= 1;
result= FALSE;
}
else
{
my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
result= TRUE;
}
goto unlock;
}
}
result= mysql_create_table_no_lock(thd, db, table_name, create_info,
fields, keys, internal_tmp_table,
select_field_count,
use_copy_create_info);
unlock:
if (name_lock)
{
pthread_mutex_lock(&LOCK_open);
unlink_open_table(thd, name_lock, FALSE);
pthread_mutex_unlock(&LOCK_open);
}
pthread_mutex_lock(&LOCK_lock_db);
if (!--creating_table && creating_database)
pthread_cond_signal(&COND_refresh);
@@ -3818,7 +3857,7 @@ void close_cached_table(THD *thd, TABLE *table)
thd->lock=0; // Start locked threads
}
/* Close all copies of 'table'. This also frees all LOCK TABLES lock */
thd->open_tables=unlink_open_table(thd,thd->open_tables,table);
unlink_open_table(thd, table, TRUE);
/* When lock on LOCK_open is freed other threads can continue */
broadcast_refresh();
@@ -3894,7 +3933,7 @@ static int prepare_for_restore(THD* thd, TABLE_LIST* table,
to finish the restore in the handler later on
*/
pthread_mutex_lock(&LOCK_open);
if (reopen_name_locked_table(thd, table))
if (reopen_name_locked_table(thd, table, TRUE))
{
unlock_table_name(thd, table);
pthread_mutex_unlock(&LOCK_open);
@@ -4026,7 +4065,7 @@ static int prepare_for_repair(THD *thd, TABLE_LIST *table_list,
to finish the repair in the handler later on.
*/
pthread_mutex_lock(&LOCK_open);
if (reopen_name_locked_table(thd, table_list))
if (reopen_name_locked_table(thd, table_list, TRUE))
{
unlock_table_name(thd, table_list);
pthread_mutex_unlock(&LOCK_open);
@@ -4632,7 +4671,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
HA_CREATE_INFO *lex_create_info,
Table_ident *table_ident)
{
TABLE *tmp_table;
TABLE *tmp_table, *name_lock= 0;
char src_path[FN_REFLEN], dst_path[FN_REFLEN];
char src_table_name_buff[FN_REFLEN], src_db_name_buff[FN_REFLEN];
uint dst_path_length;
@@ -4641,14 +4680,14 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
char *src_db;
char *src_table= table_ident->table.str;
int err;
bool res= TRUE, unlock_dst_table= FALSE;
bool res= TRUE;
enum legacy_db_type not_used;
HA_CREATE_INFO *create_info;
#ifdef WITH_PARTITION_STORAGE_ENGINE
char tmp_path[FN_REFLEN];
#endif
char ts_name[FN_LEN];
TABLE_LIST src_tables_list, dst_tables_list;
TABLE_LIST src_tables_list;
DBUG_ENTER("mysql_create_like_table");
if (!(create_info= copy_create_info(lex_create_info)))
@@ -4756,6 +4795,10 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
}
else
{
if (lock_table_name_if_not_cached(thd, db, table_name, &name_lock))
goto err;
if (!name_lock)
goto table_exists;
dst_path_length= build_table_filename(dst_path, sizeof(dst_path),
db, table_name, reg_ext, 0);
if (!access(dst_path, F_OK))
@@ -4843,28 +4886,21 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
char buf[2048];
String query(buf, sizeof(buf), system_charset_info);
query.length(0); // Have to zero it since constructor doesn't
uint counter;
/*
Here we open the destination table. This is needed for
store_create_info() to work. The table will be closed
by close_thread_tables() at the end of the statement.
Here we open the destination table, on which we already have
name-lock. This is needed for store_create_info() to work.
The table will be closed by unlink_open_table() at the end
of this function.
*/
if (open_tables(thd, &table, &counter, 0))
table->table= name_lock;
VOID(pthread_mutex_lock(&LOCK_open));
if (reopen_name_locked_table(thd, table, FALSE))
{
VOID(pthread_mutex_unlock(&LOCK_open));
goto err;
bzero((gptr)&dst_tables_list, sizeof(dst_tables_list));
dst_tables_list.db= table->db;
dst_tables_list.table_name= table->table_name;
/*
lock destination table name, to make sure that nobody
can drop/alter the table while we execute store_create_info()
*/
if (lock_and_wait_for_table_name(thd, &dst_tables_list))
goto err;
else
unlock_dst_table= TRUE;
}
VOID(pthread_mutex_unlock(&LOCK_open));
IF_DBUG(int result=) store_create_info(thd, table, &query,
create_info);
@@ -4899,10 +4935,10 @@ table_exists:
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
err:
if (unlock_dst_table)
if (name_lock)
{
pthread_mutex_lock(&LOCK_open);
unlock_table_name(thd, &dst_tables_list);
unlink_open_table(thd, name_lock, FALSE);
pthread_mutex_unlock(&LOCK_open);
}
DBUG_RETURN(res);
@@ -5349,7 +5385,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
uint order_num, ORDER *order, bool ignore,
ALTER_INFO *alter_info, bool do_send_ok)
{
TABLE *table,*new_table=0;
TABLE *table, *new_table= 0, *name_lock= 0;
int error= 0;
char tmp_name[80],old_name[32],new_name_buff[FN_REFLEN];
char new_alias_buff[FN_REFLEN], *table_name, *db, *new_alias, *alias;
@@ -5486,6 +5522,18 @@ view_err:
DBUG_RETURN(TRUE);
table->use_all_columns();
List_iterator<Alter_drop> drop_it(alter_info->drop_list);
List_iterator<create_field> def_it(fields);
List_iterator<Alter_column> alter_it(alter_info->alter_list);
List<create_field> create_list; // Add new fields here
List<Key> key_list; // Add new keys here
List_iterator<create_field> find_it(create_list);
List_iterator<Key> key_it(keys);
List_iterator<create_field> field_it(create_list);
List<key_part_spec> key_parts;
KEY *key_info=table->key_info;
/* Check that we are not trying to rename to an existing table */
if (new_name)
{
@@ -5522,13 +5570,21 @@ view_err:
}
else
{
if (lock_table_name_if_not_cached(thd, new_db, new_name, &name_lock))
DBUG_RETURN(TRUE);
if (!name_lock)
{
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_alias);
DBUG_RETURN(TRUE);
}
build_table_filename(new_name_buff, sizeof(new_name_buff),
new_db, new_name_buff, reg_ext, 0);
if (!access(new_name_buff, F_OK))
{
/* Table will be closed in do_command() */
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_alias);
DBUG_RETURN(TRUE);
goto err;
}
}
}
@@ -5563,12 +5619,10 @@ view_err:
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (prep_alter_part_table(thd, table, alter_info, create_info, old_db_type,
&partition_changed, &fast_alter_partition))
{
DBUG_RETURN(TRUE);
}
goto err;
#endif
if (check_engine(thd, new_name, create_info))
DBUG_RETURN(TRUE);
goto err;
new_db_type= create_info->db_type;
if (create_info->row_type == ROW_TYPE_NOT_USED)
create_info->row_type= table->s->row_type;
@@ -5581,7 +5635,7 @@ view_err:
{
DBUG_PRINT("info", ("doesn't support alter"));
my_error(ER_ILLEGAL_HA, MYF(0), table_name);
DBUG_RETURN(TRUE);
goto err;
}
thd->proc_info="setup";
@@ -5640,7 +5694,19 @@ view_err:
if (!error && (new_name != table_name || new_db != db))
{
thd->proc_info="rename";
/* Then do a 'simple' rename of the table */
/*
Then do a 'simple' rename of the table. First we need to close all
instances of 'source' table.
*/
close_cached_table(thd, table);
/*
Then, we want check once again that target table does not exist.
Actually the order of these two steps does not matter since
earlier we took name-lock on the target table, so we do them
in this particular order only to be consistent with 5.0, in which
we don't take this name-lock and where this order really matters.
TODO: Investigate if we need this access() check at all.
*/
if (!access(new_name_buff,F_OK))
{
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name);
@@ -5649,8 +5715,6 @@ view_err:
else
{
*fn_ext(new_name)=0;
table->s->version= 0; // Force removal of table def
close_cached_table(thd, table);
if (mysql_rename_table(old_db_type,db,table_name,new_db,new_alias, 0))
error= -1;
else if (Table_triggers_list::change_table_name(thd, db, table_name,
@@ -5682,6 +5746,8 @@ view_err:
table->file->print_error(error, MYF(0));
error= -1;
}
if (name_lock)
unlink_open_table(thd, name_lock, FALSE);
VOID(pthread_mutex_unlock(&LOCK_open));
table_list->table= NULL; // For query cache
query_cache_invalidate3(thd, table_list, 0);
@@ -5718,11 +5784,6 @@ view_err:
create_info->tablespace= tablespace;
}
restore_record(table, s->default_values); // Empty record for DEFAULT
List_iterator<Alter_drop> drop_it(alter_info->drop_list);
List_iterator<create_field> def_it(fields);
List_iterator<Alter_column> alter_it(alter_info->alter_list);
List<create_field> create_list; // Add new fields here
List<Key> key_list; // Add new keys here
create_field *def;
/*
@@ -5793,7 +5854,7 @@ view_err:
if (def->sql_type == MYSQL_TYPE_BLOB)
{
my_error(ER_BLOB_CANT_HAVE_DEFAULT, MYF(0), def->change);
DBUG_RETURN(TRUE);
goto err;
}
if ((def->def=alter->def)) // Use new default
def->flags&= ~NO_DEFAULT_VALUE_FLAG;
@@ -5804,13 +5865,12 @@ view_err:
}
}
def_it.rewind();
List_iterator<create_field> find_it(create_list);
while ((def=def_it++)) // Add new columns
{
if (def->change && ! def->field)
{
my_error(ER_BAD_FIELD_ERROR, MYF(0), def->change, table_name);
DBUG_RETURN(TRUE);
goto err;
}
if (!def->after)
create_list.push_back(def);
@@ -5828,7 +5888,7 @@ view_err:
if (!find)
{
my_error(ER_BAD_FIELD_ERROR, MYF(0), def->after, table_name);
DBUG_RETURN(TRUE);
goto err;
}
find_it.after(def); // Put element after this
}
@@ -5837,13 +5897,13 @@ view_err:
{
my_error(ER_BAD_FIELD_ERROR, MYF(0),
alter_info->alter_list.head()->name, table_name);
DBUG_RETURN(TRUE);
goto err;
}
if (!create_list.elements)
{
my_message(ER_CANT_REMOVE_ALL_FIELDS, ER(ER_CANT_REMOVE_ALL_FIELDS),
MYF(0));
DBUG_RETURN(TRUE);
goto err;
}
/*
@@ -5851,11 +5911,6 @@ view_err:
for which some fields exists.
*/
List_iterator<Key> key_it(keys);
List_iterator<create_field> field_it(create_list);
List<key_part_spec> key_parts;
KEY *key_info=table->key_info;
for (uint i=0 ; i < table->s->keys ; i++,key_info++)
{
char *key_name= key_info->name;
@@ -5959,7 +6014,7 @@ view_err:
!my_strcasecmp(system_charset_info,key->name,primary_key_name))
{
my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->name);
DBUG_RETURN(TRUE);
goto err;
}
}
}
@@ -6201,6 +6256,7 @@ view_err:
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (fast_alter_partition)
{
DBUG_ASSERT(!name_lock);
DBUG_RETURN(fast_alter_partition_table(thd, table, alter_info,
create_info, table_list,
&create_list, &key_list,
@@ -6261,11 +6317,12 @@ view_err:
We don't log the statement, it will be logged later.
*/
tmp_disable_binlog(thd);
error= mysql_create_table(thd, new_db, tmp_name,
create_info,create_list,key_list,1,0,0);
error= mysql_create_table_no_lock(thd, new_db, tmp_name,
create_info, create_list,
key_list, 1, 0, 0);
reenable_binlog(thd);
if (error)
DBUG_RETURN(error);
goto err;
/* Open the table if we need to copy the data. */
if (need_copy_table)
@@ -6526,17 +6583,6 @@ view_err:
current_pid, thd->thread_id);
if (lower_case_table_names)
my_casedn_str(files_charset_info, old_name);
if (new_name != table_name || new_db != db)
{
if (!access(new_name_buff,F_OK))
{
error=1;
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name_buff);
VOID(quick_rm_table(new_db_type, new_db, tmp_name, FN_IS_TMP));
VOID(pthread_mutex_unlock(&LOCK_open));
goto err;
}
}
#if !defined( __WIN__)
if (table->file->has_transactions())
@@ -6546,7 +6592,6 @@ view_err:
Win32 and InnoDB can't drop a table that is in use, so we must
close the original table before doing the rename
*/
table->s->version= 0; // Force removal of table def
close_cached_table(thd, table);
table=0; // Marker that table is closed
no_table_reopen= TRUE;
@@ -6556,6 +6601,21 @@ view_err:
table->file->extra(HA_EXTRA_FORCE_REOPEN); // Don't use this file anymore
#endif
if (new_name != table_name || new_db != db)
{
/*
Check that there is no table with target name. See the
comment describing code for 'simple' ALTER TABLE ... RENAME.
*/
if (!access(new_name_buff,F_OK))
{
error=1;
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name_buff);
VOID(quick_rm_table(new_db_type, new_db, tmp_name, FN_IS_TMP));
VOID(pthread_mutex_unlock(&LOCK_open));
goto err;
}
}
error=0;
save_old_db_type= old_db_type;
@@ -6606,7 +6666,6 @@ view_err:
*/
if (table)
{
table->s->version= 0; // Force removal of table def
close_cached_table(thd,table);
}
VOID(pthread_mutex_unlock(&LOCK_open));
@@ -6647,7 +6706,6 @@ view_err:
*/
if (table)
{
table->s->version= 0; // Force removal of table def
close_cached_table(thd,table);
}
VOID(quick_rm_table(old_db_type, db, old_name, FN_IS_TMP));
@@ -6673,7 +6731,6 @@ view_err:
{ // This shouldn't happen
if (table)
{
table->s->version= 0; // Force removal of table def
close_cached_table(thd,table); // Remove lock for table
}
VOID(pthread_mutex_unlock(&LOCK_open));
@@ -6730,6 +6787,13 @@ view_err:
table_list->table=0; // For query cache
query_cache_invalidate3(thd, table_list, 0);
if (name_lock)
{
pthread_mutex_lock(&LOCK_open);
unlink_open_table(thd, name_lock, FALSE);
pthread_mutex_unlock(&LOCK_open);
}
end_temporary:
my_snprintf(tmp_name, sizeof(tmp_name), ER(ER_INSERT_INFO),
(ulong) (copied + deleted), (ulong) deleted,
@@ -6749,6 +6813,12 @@ err1:
VOID(quick_rm_table(new_db_type, new_db, tmp_name, FN_IS_TMP));
err:
if (name_lock)
{
pthread_mutex_lock(&LOCK_open);
unlink_open_table(thd, name_lock, FALSE);
pthread_mutex_unlock(&LOCK_open);
}
DBUG_RETURN(TRUE);
}
/* mysql_alter_table */