1
0
mirror of https://github.com/MariaDB/server.git synced 2026-01-06 05:22:24 +03:00

A 5.5 version of the fix for Bug #54360 "Deadlock DROP/ALTER/CREATE

DATABASE with open HANDLER"

Remove LOCK_create_db, database name locks, and use metadata locks instead.
This exposes CREATE/DROP/ALTER DATABASE statements to the graph-based
deadlock detector in MDL, and paves the way for a safe, deadlock-free
implementation of RENAME DATABASE.

Database DDL statements will now take exclusive metadata locks on
the database name, while table/view/routine DDL statements take
intention exclusive locks on the database name. This prevents race
conditions between database DDL and table/view/routine DDL.
(e.g. DROP DATABASE with concurrent CREATE/ALTER/DROP TABLE)

By adding database name locks, this patch implements
WL#4450 "DDL locking: CREATE/DROP DATABASE must use database locks" and
WL#4985 "DDL locking: namespace/hierarchical locks".

The patch also changes code to use init_one_table() where appropriate.
The new lock_table_names() function requires TABLE_LIST::db_length to
be set correctly, and this is taken care of by init_one_table().

This patch also adds a simple template to help work with 
the mysys HASH data structure.

Most of the patch was written by Konstantin Osipov.
This commit is contained in:
Jon Olav Hauglid
2010-07-01 15:53:46 +02:00
parent 29e9130d60
commit 9ff272fbbd
28 changed files with 1300 additions and 652 deletions

View File

@@ -58,106 +58,6 @@ static void mysql_change_db_impl(THD *thd,
CHARSET_INFO *new_db_charset);
/* Database lock hash */
HASH lock_db_cache;
mysql_mutex_t LOCK_lock_db;
int creating_database= 0; // how many database locks are made
/* Structure for database lock */
typedef struct my_dblock_st
{
char *name; /* Database name */
uint name_length; /* Database length name */
} my_dblock_t;
/*
lock_db key.
*/
extern "C" uchar* lock_db_get_key(my_dblock_t *, size_t *, my_bool not_used);
uchar* lock_db_get_key(my_dblock_t *ptr, size_t *length,
my_bool not_used __attribute__((unused)))
{
*length= ptr->name_length;
return (uchar*) ptr->name;
}
/*
Free lock_db hash element.
*/
extern "C" void lock_db_free_element(void *ptr);
void lock_db_free_element(void *ptr)
{
my_free(ptr, MYF(0));
}
/*
Put a database lock entry into the hash.
DESCRIPTION
Insert a database lock entry into hash.
LOCK_db_lock must be previously locked.
RETURN VALUES
0 on success.
1 on error.
*/
static my_bool lock_db_insert(const char *dbname, uint length)
{
my_dblock_t *opt;
my_bool error= 0;
DBUG_ENTER("lock_db_insert");
mysql_mutex_assert_owner(&LOCK_lock_db);
if (!(opt= (my_dblock_t*) my_hash_search(&lock_db_cache,
(uchar*) dbname, length)))
{
/* Db is not in the hash, insert it */
char *tmp_name;
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
&opt, (uint) sizeof(*opt), &tmp_name, (uint) length+1,
NullS))
{
error= 1;
goto end;
}
opt->name= tmp_name;
strmov(opt->name, dbname);
opt->name_length= length;
if ((error= my_hash_insert(&lock_db_cache, (uchar*) opt)))
my_free(opt, MYF(0));
}
end:
DBUG_RETURN(error);
}
/*
Delete a database lock entry from hash.
*/
void lock_db_delete(const char *name, uint length)
{
my_dblock_t *opt;
mysql_mutex_assert_owner(&LOCK_lock_db);
if ((opt= (my_dblock_t *)my_hash_search(&lock_db_cache,
(const uchar*) name, length)))
my_hash_delete(&lock_db_cache, (uchar*) opt);
}
/* Database options hash */
static HASH dboptions;
static my_bool dboptions_init= 0;
@@ -233,21 +133,16 @@ static void init_database_names_psi_keys(void)
}
#endif
/*
Initialize database option hash and locked database hash.
/**
Initialize database option cache.
SYNOPSIS
my_database_names()
@note Must be called before any other database function is called.
NOTES
Must be called before any other database function is called.
RETURN
0 ok
1 Fatal error
@retval 0 ok
@retval 1 Fatal error
*/
bool my_database_names_init(void)
bool my_dboptions_cache_init(void)
{
#ifdef HAVE_PSI_INTERFACE
init_database_names_psi_keys();
@@ -261,36 +156,30 @@ bool my_database_names_init(void)
error= my_hash_init(&dboptions, lower_case_table_names ?
&my_charset_bin : system_charset_info,
32, 0, 0, (my_hash_get_key) dboptions_get_key,
free_dbopt,0) ||
my_hash_init(&lock_db_cache, lower_case_table_names ?
&my_charset_bin : system_charset_info,
32, 0, 0, (my_hash_get_key) lock_db_get_key,
lock_db_free_element,0);
free_dbopt,0);
}
return error;
}
/*
/**
Free database option hash and locked databases hash.
*/
void my_database_names_free(void)
void my_dboptions_cache_free(void)
{
if (dboptions_init)
{
dboptions_init= 0;
my_hash_free(&dboptions);
mysql_rwlock_destroy(&LOCK_dboptions);
my_hash_free(&lock_db_cache);
}
}
/*
Cleanup cached options
/**
Cleanup cached options.
*/
void my_dbopt_cleanup(void)
@@ -395,7 +284,7 @@ end:
Deletes database options from the hash.
*/
void del_dbopt(const char *path)
static void del_dbopt(const char *path)
{
my_dbopt_t *opt;
mysql_rwlock_wrlock(&LOCK_dboptions);
@@ -664,25 +553,8 @@ int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
DBUG_RETURN(-1);
}
/*
Do not create database if another thread is holding read lock.
Wait for global read lock before acquiring LOCK_mysql_create_db.
After wait_if_global_read_lock() we have protection against another
global read lock. If we would acquire LOCK_mysql_create_db first,
another thread could step in and get the global read lock before we
reach wait_if_global_read_lock(). If this thread tries the same as we
(admin a db), it would then go and wait on LOCK_mysql_create_db...
Furthermore wait_if_global_read_lock() checks if the current thread
has the global read lock and refuses the operation with
ER_CANT_UPDATE_WITH_READLOCK if applicable.
*/
if (thd->global_read_lock.wait_if_global_read_lock(thd, FALSE, TRUE))
{
error= -1;
goto exit2;
}
mysql_mutex_lock(&LOCK_mysql_create_db);
if (lock_schema_name(thd, db))
DBUG_RETURN(-1);
/* Check directory */
path_len= build_table_filename(path, sizeof(path) - 1, db, "", "", 0);
@@ -786,7 +658,10 @@ not_silent:
qinfo.db = db;
qinfo.db_len = strlen(db);
/* These DDL methods and logging protected with LOCK_mysql_create_db */
/*
These DDL methods and logging are protected with the exclusive
metadata lock on the schema
*/
if (mysql_bin_log.write(&qinfo))
{
error= -1;
@@ -797,9 +672,6 @@ not_silent:
}
exit:
mysql_mutex_unlock(&LOCK_mysql_create_db);
thd->global_read_lock.start_waiting_global_read_lock(thd);
exit2:
DBUG_RETURN(error);
}
@@ -813,22 +685,8 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
int error= 0;
DBUG_ENTER("mysql_alter_db");
/*
Do not alter database if another thread is holding read lock.
Wait for global read lock before acquiring LOCK_mysql_create_db.
After wait_if_global_read_lock() we have protection against another
global read lock. If we would acquire LOCK_mysql_create_db first,
another thread could step in and get the global read lock before we
reach wait_if_global_read_lock(). If this thread tries the same as we
(admin a db), it would then go and wait on LOCK_mysql_create_db...
Furthermore wait_if_global_read_lock() checks if the current thread
has the global read lock and refuses the operation with
ER_CANT_UPDATE_WITH_READLOCK if applicable.
*/
if ((error= thd->global_read_lock.wait_if_global_read_lock(thd, FALSE, TRUE)))
goto exit2;
mysql_mutex_lock(&LOCK_mysql_create_db);
if (lock_schema_name(thd, db))
DBUG_RETURN(TRUE);
/*
Recreate db options file: /dbpath/.db.opt
@@ -866,16 +724,16 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
qinfo.db = db;
qinfo.db_len = strlen(db);
/* These DDL methods and logging protected with LOCK_mysql_create_db */
/*
These DDL methods and logging are protected with the exclusive
metadata lock on the schema.
*/
if ((error= mysql_bin_log.write(&qinfo)))
goto exit;
}
my_ok(thd, result);
exit:
mysql_mutex_unlock(&LOCK_mysql_create_db);
thd->global_read_lock.start_waiting_global_read_lock(thd);
exit2:
DBUG_RETURN(error);
}
@@ -907,25 +765,9 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
TABLE_LIST* dropped_tables= 0;
DBUG_ENTER("mysql_rm_db");
/*
Do not drop database if another thread is holding read lock.
Wait for global read lock before acquiring LOCK_mysql_create_db.
After wait_if_global_read_lock() we have protection against another
global read lock. If we would acquire LOCK_mysql_create_db first,
another thread could step in and get the global read lock before we
reach wait_if_global_read_lock(). If this thread tries the same as we
(admin a db), it would then go and wait on LOCK_mysql_create_db...
Furthermore wait_if_global_read_lock() checks if the current thread
has the global read lock and refuses the operation with
ER_CANT_UPDATE_WITH_READLOCK if applicable.
*/
if (thd->global_read_lock.wait_if_global_read_lock(thd, FALSE, TRUE))
{
error= -1;
goto exit2;
}
mysql_mutex_lock(&LOCK_mysql_create_db);
if (lock_schema_name(thd, db))
DBUG_RETURN(TRUE);
length= build_table_filename(path, sizeof(path) - 1, db, "", "", 0);
strmov(path+length, MY_DB_OPT_FILE); // Append db option file name
@@ -1013,7 +855,10 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
qinfo.db = db;
qinfo.db_len = strlen(db);
/* These DDL methods and logging protected with LOCK_mysql_create_db */
/*
These DDL methods and logging are protected with the exclusive
metadata lock on the schema.
*/
if (mysql_bin_log.write(&qinfo))
{
error= -1;
@@ -1045,7 +890,10 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
tbl_name_len= strlen(tbl->table_name) + 3;
if (query_pos + tbl_name_len + 1 >= query_end)
{
/* These DDL methods and logging protected with LOCK_mysql_create_db */
/*
These DDL methods and logging are protected with the exclusive
metadata lock on the schema.
*/
if (write_to_binlog(thd, query, query_pos -1 - query, db, db_len))
{
error= -1;
@@ -1062,7 +910,10 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
if (query_pos != query_data_start)
{
/* These DDL methods and logging protected with LOCK_mysql_create_db */
/*
These DDL methods and logging are protected with the exclusive
metadata lock on the schema.
*/
if (write_to_binlog(thd, query, query_pos -1 - query, db, db_len))
{
error= -1;
@@ -1080,9 +931,6 @@ exit:
*/
if (thd->db && !strcmp(thd->db, db) && error == 0)
mysql_change_db_impl(thd, NULL, 0, thd->variables.collation_server);
mysql_mutex_unlock(&LOCK_mysql_create_db);
thd->global_read_lock.start_waiting_global_read_lock(thd);
exit2:
DBUG_RETURN(error);
}
@@ -1099,12 +947,12 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *db,
long deleted=0;
ulong found_other_files=0;
char filePath[FN_REFLEN];
TABLE_LIST *tot_list=0, **tot_list_next;
TABLE_LIST *tot_list=0, **tot_list_next_local, **tot_list_next_global;
List<String> raid_dirs;
DBUG_ENTER("mysql_rm_known_files");
DBUG_PRINT("enter",("path: %s", org_path));
tot_list_next= &tot_list;
tot_list_next_local= tot_list_next_global= &tot_list;
for (uint idx=0 ;
idx < (uint) dirp->number_off_files && !thd->killed ;
@@ -1192,23 +1040,28 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *db,
if (!table_list)
goto err;
table_list->db= (char*) (table_list+1);
table_list->table_name= strmov(table_list->db, db) + 1;
(void) filename_to_tablename(file->name, table_list->table_name,
MYSQL50_TABLE_NAME_PREFIX_LENGTH +
strlen(file->name) + 1);
table_list->db_length= strmov(table_list->db, db) - table_list->db;
table_list->table_name= table_list->db + table_list->db_length + 1;
table_list->table_name_length= filename_to_tablename(file->name,
table_list->table_name,
MYSQL50_TABLE_NAME_PREFIX_LENGTH +
strlen(file->name) + 1);
table_list->open_type= OT_BASE_ONLY;
/* To be able to correctly look up the table in the table cache. */
if (lower_case_table_names)
my_casedn_str(files_charset_info, table_list->table_name);
table_list->table_name_length= my_casedn_str(files_charset_info,
table_list->table_name);
table_list->alias= table_list->table_name; // If lower_case_table_names=2
table_list->internal_tmp_table= is_prefix(file->name, tmp_file_prefix);
table_list->mdl_request.init(MDL_key::TABLE, table_list->db,
table_list->table_name, MDL_EXCLUSIVE);
/* Link into list */
(*tot_list_next)= table_list;
tot_list_next= &table_list->next_local;
(*tot_list_next_local)= table_list;
(*tot_list_next_global)= table_list;
tot_list_next_local= &table_list->next_local;
tot_list_next_global= &table_list->next_global;
deleted++;
}
else
@@ -1771,60 +1624,6 @@ bool mysql_opt_change_db(THD *thd,
}
static int
lock_databases(THD *thd, const char *db1, uint length1,
const char *db2, uint length2)
{
mysql_mutex_lock(&LOCK_lock_db);
while (!thd->killed &&
(my_hash_search(&lock_db_cache,(uchar*) db1, length1) ||
my_hash_search(&lock_db_cache,(uchar*) db2, length2)))
{
wait_for_condition(thd, &LOCK_lock_db, &COND_refresh);
mysql_mutex_lock(&LOCK_lock_db);
}
if (thd->killed)
{
mysql_mutex_unlock(&LOCK_lock_db);
return 1;
}
lock_db_insert(db1, length1);
lock_db_insert(db2, length2);
creating_database++;
/*
Wait if a concurent thread is creating a table at the same time.
The assumption here is that it will not take too long until
there is a point in time when a table is not created.
*/
while (!thd->killed && creating_table)
{
wait_for_condition(thd, &LOCK_lock_db, &COND_refresh);
mysql_mutex_lock(&LOCK_lock_db);
}
if (thd->killed)
{
lock_db_delete(db1, length1);
lock_db_delete(db2, length2);
creating_database--;
mysql_mutex_unlock(&LOCK_lock_db);
mysql_cond_signal(&COND_refresh);
return(1);
}
/*
We can unlock now as the hash will protect against anyone creating a table
in the databases we are using
*/
mysql_mutex_unlock(&LOCK_lock_db);
return 0;
}
/**
Upgrade a 5.0 database.
This function is invoked whenever an ALTER DATABASE UPGRADE query is executed:
@@ -1866,9 +1665,9 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db)
new_db.str= old_db->str + MYSQL50_TABLE_NAME_PREFIX_LENGTH;
new_db.length= old_db->length - MYSQL50_TABLE_NAME_PREFIX_LENGTH;
if (lock_databases(thd, old_db->str, old_db->length,
new_db.str, new_db.length))
DBUG_RETURN(1);
/* Lock the old name, the new name will be locked by mysql_create_db().*/
if (lock_schema_name(thd, old_db->str))
DBUG_RETURN(-1);
/*
Let's remember if we should do "USE newdb" afterwards.
@@ -2035,15 +1834,6 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db)
error|= mysql_change_db(thd, & new_db, FALSE);
exit:
mysql_mutex_lock(&LOCK_lock_db);
/* Remove the databases from db lock cache */
lock_db_delete(old_db->str, old_db->length);
lock_db_delete(new_db.str, new_db.length);
creating_database--;
/* Signal waiting CREATE TABLE's to continue */
mysql_cond_signal(&COND_refresh);
mysql_mutex_unlock(&LOCK_lock_db);
DBUG_RETURN(error);
}