1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-24576 Atomic CREATE TABLE

There are a few different cases to consider

Logging of CREATE TABLE and CREATE TABLE ... LIKE
- If REPLACE is used and there was an existing table, DDL log the drop of
  the table.
- If discovery of table is to be done
    - DDL LOG create table
  else
    - DDL log create table (with engine type)
    - create the table
- If table was created
  - Log entry to binary log with xid
  - Mark DDL log completed

Crash recovery:
- If query was in binary log do nothing and exit
- If discoverted table
   - Delete the .frm file
-else
   - Drop created table and frm file
- If table was dropped, write a DROP TABLE statement in binary log

CREATE TABLE ... SELECT required a little more work as when one is using
statement logging the query is written to the binary log before commit is
done.
This was fixed by adding a DROP TABLE to the binary log during crash
recovery if the ddl log entry was not closed. In this case the binary log
will contain:
CREATE TABLE xxx ... SELECT ....
DROP TABLE xxx;

Other things:
- Added debug_crash_here() functionality to Aria to be able to test
  crash in create table between the creation of the .MAI and the .MAD files.
This commit is contained in:
Monty
2021-01-17 16:06:43 +02:00
committed by Sergei Golubchik
parent 7a588c30b1
commit 6aa9a552c2
28 changed files with 1239 additions and 314 deletions

View File

@@ -1078,6 +1078,7 @@ static uint32 get_comment(THD *thd, uint32 comment_pos,
const uchar *query_end= (uchar*) query + thd->query_length();
const uchar *const state_map= thd->charset()->state_map;
*comment_start= ""; // Ensure it points to something
for (; query < query_end; query++)
{
if (state_map[static_cast<uchar>(*query)] == MY_LEX_SKIP)
@@ -4131,10 +4132,13 @@ err:
*/
static
int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
int create_table_impl(THD *thd,
DDL_LOG_STATE *ddl_log_state_create,
DDL_LOG_STATE *ddl_log_state_rm,
const LEX_CSTRING &orig_db,
const LEX_CSTRING &orig_table_name,
const LEX_CSTRING &db, const LEX_CSTRING &table_name,
const char *path, const DDL_options_st options,
const LEX_CSTRING &path, const DDL_options_st options,
HA_CREATE_INFO *create_info, Alter_info *alter_info,
int create_table_mode, bool *is_trans, KEY **key_info,
uint *key_count, LEX_CUSTRING *frm)
@@ -4145,9 +4149,16 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
bool frm_only= create_table_mode == C_ALTER_TABLE_FRM_ONLY;
bool internal_tmp_table= create_table_mode == C_ALTER_TABLE || frm_only;
handlerton *exists_hton;
DBUG_ENTER("mysql_create_table_no_lock");
DBUG_ENTER("create_table_impl");
DBUG_PRINT("enter", ("db: '%s' table: '%s' tmp: %d path: %s",
db.str, table_name.str, internal_tmp_table, path));
db.str, table_name.str, internal_tmp_table, path.str));
/* Easy check for ddl logging if we are creating a temporary table */
if (create_info->tmp_table())
{
ddl_log_state_create= 0;
ddl_log_state_rm= 0;
}
if (fix_constraints_names(thd, &alter_info->check_constraint_list,
create_info))
@@ -4261,10 +4272,12 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
(void) trans_rollback_stmt(thd);
/* Remove normal table without logging. Keep tables locked */
if (mysql_rm_table_no_locks(thd, &table_list, &thd->db,
(DDL_LOG_STATE*) 0,
ddl_log_state_rm,
0, 0, 0, 0, 1, 1))
goto err;
debug_crash_here("ddl_log_create_after_drop");
/*
We have to log this query, even if it failed later to ensure the
drop is done.
@@ -4327,7 +4340,7 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
goto err;
}
init_tmp_table_share(thd, &share, db.str, 0, table_name.str, path);
init_tmp_table_share(thd, &share, db.str, 0, table_name.str, path.str);
/* prepare everything for discovery */
share.field= &no_fields;
@@ -4338,6 +4351,14 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
if (parse_engine_table_options(thd, hton, &share))
goto err;
/*
Log that we are going to do discovery. If things fails, any generated
.frm files are deleted
*/
if (ddl_log_state_create)
ddl_log_create_table(thd, ddl_log_state_create, (handlerton*) 0, &path,
&db, &table_name, 1);
ha_err= hton->discover_table_structure(hton, thd, &share, create_info);
/*
@@ -4359,45 +4380,56 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
}
else
{
if (ddl_log_state_create)
ddl_log_create_table(thd, ddl_log_state_create, create_info->db_type,
&path, &db, &table_name, frm_only);
debug_crash_here("ddl_log_create_before_create_frm");
file= mysql_create_frm_image(thd, orig_db, orig_table_name, create_info,
alter_info, create_table_mode, key_info,
key_count, frm);
/*
TODO: remove this check of thd->is_error() (now it intercept
errors in some val_*() methoids and bring some single place to
such error interception).
TODO: remove this check of thd->is_error() (now it intercept
errors in some val_*() methods and bring some single place to
such error interception).
*/
if (!file || thd->is_error())
{
if (!file)
deletefrm(path.str);
goto err;
}
if (thd->variables.keep_files_on_create)
create_info->options|= HA_CREATE_KEEP_FILES;
if (file->ha_create_partitioning_metadata(path, NULL, CHF_CREATE_FLAG))
if (file->ha_create_partitioning_metadata(path.str, NULL, CHF_CREATE_FLAG))
goto err;
if (!frm_only)
{
if (ha_create_table(thd, path, db.str, table_name.str, create_info,
debug_crash_here("ddl_log_create_before_create_table");
if (ha_create_table(thd, path.str, db.str, table_name.str, create_info,
frm, 0))
{
file->ha_create_partitioning_metadata(path, NULL, CHF_DELETE_FLAG);
deletefrm(path);
file->ha_create_partitioning_metadata(path.str, NULL, CHF_DELETE_FLAG);
deletefrm(path.str);
goto err;
}
debug_crash_here("ddl_log_create_after_create_table");
}
}
create_info->table= 0;
if (!frm_only && create_info->tmp_table())
{
TABLE *table= thd->create_and_open_tmp_table(frm, path, db.str,
TABLE *table= thd->create_and_open_tmp_table(frm, path.str, db.str,
table_name.str,
false);
if (!table)
{
(void) thd->rm_temporary_table(create_info->db_type, path);
(void) thd->rm_temporary_table(create_info->db_type, path.str);
goto err;
}
@@ -4410,6 +4442,12 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
error= 0;
err:
if (unlikely(error) && ddl_log_state_create)
{
/* Table was never created, so we can ignore the ddl log entry */
ddl_log_complete(ddl_log_state_create);
}
THD_STAGE_INFO(thd, stage_after_create);
delete file;
DBUG_PRINT("exit", ("return: %d", error));
@@ -4435,7 +4473,10 @@ warn:
-1 Table was used with IF NOT EXISTS and table existed (warning, not error)
*/
int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db,
int mysql_create_table_no_lock(THD *thd,
DDL_LOG_STATE *ddl_log_state_create,
DDL_LOG_STATE *ddl_log_state_rm,
const LEX_CSTRING *db,
const LEX_CSTRING *table_name,
Table_specification_st *create_info,
Alter_info *alter_info, bool *is_trans,
@@ -4444,26 +4485,31 @@ int mysql_create_table_no_lock(THD *thd, const LEX_CSTRING *db,
KEY *not_used_1;
uint not_used_2;
int res;
uint path_length;
char path[FN_REFLEN + 1];
LEX_CSTRING cpath;
LEX_CUSTRING frm= {0,0};
if (create_info->tmp_table())
build_tmptable_filename(thd, path, sizeof(path));
path_length= build_tmptable_filename(thd, path, sizeof(path));
else
{
int length;
const LEX_CSTRING *alias= table_case_name(create_info, table_name);
length= build_table_filename(path, sizeof(path) - 1, db->str, alias->str, "", 0);
path_length= build_table_filename(path, sizeof(path) - 1, db->str,
alias->str,
"", 0);
// Check if we hit FN_REFLEN bytes along with file extension.
if (length+reg_ext_length > FN_REFLEN)
if (path_length+reg_ext_length > FN_REFLEN)
{
my_error(ER_IDENT_CAUSES_TOO_LONG_PATH, MYF(0), (int) sizeof(path)-1,
path);
return true;
}
}
lex_string_set3(&cpath, path, path_length);
res= create_table_impl(thd, *db, *table_name, *db, *table_name, path,
res= create_table_impl(thd, ddl_log_state_create, ddl_log_state_rm,
*db, *table_name, *db, *table_name, cpath,
*create_info, create_info,
alter_info, create_table_mode,
is_trans, &not_used_1, &not_used_2, &frm);
@@ -4516,17 +4562,22 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
Table_specification_st *create_info,
Alter_info *alter_info)
{
bool is_trans= FALSE;
bool result;
int create_table_mode;
TABLE_LIST *pos_in_locked_tables= 0;
MDL_ticket *mdl_ticket= 0;
DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm;
int create_table_mode;
uint save_thd_create_info_options;
bool is_trans= FALSE;
bool result;
DBUG_ENTER("mysql_create_table");
DBUG_ASSERT(create_table == thd->lex->query_tables);
bzero(&ddl_log_state_create, sizeof(ddl_log_state_create));
bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm));
/* Copy temporarily the statement flags to thd for lock_table_names() */
uint save_thd_create_info_options= thd->lex->create_info.options;
save_thd_create_info_options= thd->lex->create_info.options;
thd->lex->create_info.options|= create_info->options;
/* Open or obtain an exclusive metadata lock on table being created */
@@ -4569,7 +4620,8 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
/* We can abort create table for any table type */
thd->abort_on_warning= thd->is_strict_mode();
if (mysql_create_table_no_lock(thd, &create_table->db,
if (mysql_create_table_no_lock(thd, &ddl_log_state_create, &ddl_log_state_rm,
&create_table->db,
&create_table->table_name, create_info,
alter_info,
&is_trans, create_table_mode,
@@ -4641,10 +4693,19 @@ err:
*/
create_info->table->s->table_creation_was_logged= 1;
}
thd->binlog_xid= thd->query_id;
ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
if (ddl_log_state_rm.is_active())
ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
debug_crash_here("ddl_log_create_before_binlog");
if (unlikely(write_bin_log(thd, result ? FALSE : TRUE, thd->query(),
thd->query_length(), is_trans)))
result= 1;
debug_crash_here("ddl_log_create_after_binlog");
thd->binlog_xid= 0;
}
ddl_log_complete(&ddl_log_state_rm);
ddl_log_complete(&ddl_log_state_create);
DBUG_RETURN(result);
}
@@ -4921,7 +4982,8 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
Table_specification_st local_create_info;
TABLE_LIST *pos_in_locked_tables= 0;
Alter_info local_alter_info;
Alter_table_ctx local_alter_ctx; // Not used
Alter_table_ctx local_alter_ctx; // Not used
DDL_LOG_STATE ddl_log_state_create, ddl_log_state_rm;
int res= 1;
bool is_trans= FALSE;
bool do_logging= FALSE;
@@ -4930,6 +4992,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
int create_res;
DBUG_ENTER("mysql_create_like_table");
bzero(&ddl_log_state_create, sizeof(ddl_log_state_create));
bzero(&ddl_log_state_rm, sizeof(ddl_log_state_rm));
#ifdef WITH_WSREP
if (WSREP(thd) && !thd->wsrep_applier &&
wsrep_create_like_table(thd, table, src_table, create_info))
@@ -5023,7 +5088,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
pos_in_locked_tables= local_create_info.table->pos_in_locked_tables;
res= ((create_res=
mysql_create_table_no_lock(thd, &table->db, &table->table_name,
mysql_create_table_no_lock(thd,
&ddl_log_state_create, &ddl_log_state_rm,
&table->db, &table->table_name,
&local_create_info, &local_alter_info,
&is_trans, C_ORDINARY_CREATE,
table)) > 0);
@@ -5240,22 +5307,33 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
err:
if (do_logging)
{
thd->binlog_xid= thd->query_id;
ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
if (ddl_log_state_rm.is_active())
ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
debug_crash_here("ddl_log_create_before_binlog");
if (res && create_info->table_was_deleted)
{
/*
Table was not deleted. Original table was deleted.
We have to log it.
*/
log_drop_table(thd, &table->db, &table->table_name, create_info->tmp_table());
DBUG_ASSERT(ddl_log_state_rm.is_active());
log_drop_table(thd, &table->db, &table->table_name,
create_info->tmp_table());
}
else if (res != 2) // Table was not dropped
{
if (write_bin_log(thd, res ? FALSE : TRUE, thd->query(),
thd->query_length(), is_trans))
res= 1;
res= 1;
}
debug_crash_here("ddl_log_create_after_binlog");
thd->binlog_xid= 0;
}
ddl_log_complete(&ddl_log_state_rm);
ddl_log_complete(&ddl_log_state_create);
DBUG_RETURN(res != 0);
}
@@ -9662,9 +9740,10 @@ do_continue:;
tmp_disable_binlog(thd);
create_info->options|=HA_CREATE_TMP_ALTER;
create_info->alias= alter_ctx.table_name;
error= create_table_impl(thd, alter_ctx.db, alter_ctx.table_name,
error= create_table_impl(thd, (DDL_LOG_STATE*) 0, (DDL_LOG_STATE*) 0,
alter_ctx.db, alter_ctx.table_name,
alter_ctx.new_db, alter_ctx.tmp_name,
alter_ctx.get_tmp_path(),
alter_ctx.get_tmp_cstring_path(),
thd->lex->create_info, create_info, alter_info,
C_ALTER_TABLE_FRM_ONLY, NULL,
&key_info, &key_count, &frm);