mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
MDEV-16329 [5/5] ALTER ONLINE TABLE
* Log rows in online_alter_binlog. * Table online data is replicated within dedicated binlog file * Cached data is written on commit. * Versioning is fully supported. * Works both wit and without binlog enabled. * For now savepoints setup is forbidden while ONLINE ALTER goes on. Extra support is required. We can simply log the SAVEPOINT query events and replicate them together with row events. But it's not implemented for now. * Cache flipping: We want to care for the possible bottleneck in the online alter binlog reading/writing in advance. IO_CACHE does not provide anything better that sequential access, besides, only a single write is mutex-protected, which is not suitable, since we should write a transaction atomically. To solve this, a special layer on top Event_log is implemented. There are two IO_CACHE files underneath: one for reading, and one for writing. Once the read cache is empty, an exclusive lock is acquired (we can wait for a currently active transaction finish writing), and flip() is emitted, i.e. the write cache is reopened for read, and the read cache is emptied, and reopened for writing. This reminds a buffer flip that happens in accelerated graphics (DirectX/OpenGL/etc). Cache_flip_event_log is considered non-blocking for a single reader and a single writer in this sense, with the only lock held by reader during flip. An alternative approach by implementing a fair concurrent circular buffer is described in MDEV-24676. * Cache managers: We have two cache sinks: statement and transactional. It is important that the changes are first cached per-statement and per-transaction. If a statement fails, then only statement data is rolled back. The transaction moves along, however. Turns out, there's no guarantee that TABLE well persist in thd->open_tables to the transaction commit moment. If an error occurs, tables from statement are purged. Therefore, we can't store te caches in TABLE. Ideally, it should be handlerton, but we cut the corner and store it in THD in a list.
This commit is contained in:
committed by
Sergei Golubchik
parent
d2d0995cf2
commit
ab4bfad206
224
sql/sql_table.cc
224
sql/sql_table.cc
@ -55,6 +55,7 @@
|
||||
#include "sql_audit.h"
|
||||
#include "sql_sequence.h"
|
||||
#include "tztime.h"
|
||||
#include "rpl_rli.h"
|
||||
#include "sql_insert.h" // binlog_drop_table
|
||||
#include "ddl_log.h"
|
||||
#include "debug.h" // debug_crash_here()
|
||||
@ -85,7 +86,7 @@ static int copy_data_between_tables(THD *, TABLE *,TABLE *,
|
||||
List<Create_field> &, bool, uint, ORDER *,
|
||||
ha_rows *, ha_rows *,
|
||||
Alter_info::enum_enable_or_disable,
|
||||
Alter_table_ctx *);
|
||||
Alter_table_ctx *, bool);
|
||||
static int append_system_key_parts(THD *thd, HA_CREATE_INFO *create_info,
|
||||
Key *key);
|
||||
static int mysql_prepare_create_table(THD *, HA_CREATE_INFO *, Alter_info *,
|
||||
@ -2000,7 +2001,7 @@ bool log_drop_table(THD *thd, const LEX_CSTRING *db_name,
|
||||
in the binary log. We log this for non temporary tables, as the slave
|
||||
may use a filter to ignore queries for a specific database.
|
||||
*/
|
||||
error= thd->binlog_query(THD::STMT_QUERY_TYPE,
|
||||
error= thd->binlog_query(THD::STMT_QUERY_TYPE,
|
||||
query.ptr(), query.length(),
|
||||
FALSE, FALSE, temporary_table, 0) > 0;
|
||||
}
|
||||
@ -4324,7 +4325,7 @@ handler *mysql_create_frm_image(THD *thd, const LEX_CSTRING &db,
|
||||
{
|
||||
if (key->type == Key::FOREIGN_KEY)
|
||||
{
|
||||
my_error(ER_FEATURE_NOT_SUPPORTED_WITH_PARTITIONING, MYF(0),
|
||||
my_error(ER_FEATURE_NOT_SUPPORTED_WITH_PARTITIONING, MYF(0),
|
||||
"FOREIGN KEY");
|
||||
goto err;
|
||||
}
|
||||
@ -9938,6 +9939,11 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
|
||||
MDL_request target_mdl_request;
|
||||
MDL_ticket *mdl_ticket= 0;
|
||||
Alter_table_prelocking_strategy alter_prelocking_strategy;
|
||||
#ifdef HAVE_REPLICATION
|
||||
bool online= order == NULL && !opt_bootstrap;
|
||||
#else
|
||||
bool online= false;
|
||||
#endif
|
||||
TRIGGER_RENAME_PARAM trigger_param;
|
||||
|
||||
/*
|
||||
@ -10019,6 +10025,19 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
|
||||
has been already processed.
|
||||
*/
|
||||
table_list->required_type= TABLE_TYPE_NORMAL;
|
||||
|
||||
|
||||
if (alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_SHARED
|
||||
|| alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_EXCLUSIVE
|
||||
|| thd->locked_tables_mode == LTM_LOCK_TABLES
|
||||
|| thd->lex->sql_command == SQLCOM_OPTIMIZE
|
||||
|| alter_info->algorithm(thd) == Alter_info::ALTER_TABLE_ALGORITHM_NOCOPY)
|
||||
online= false;
|
||||
|
||||
if (online)
|
||||
{
|
||||
table_list->lock_type= TL_READ;
|
||||
}
|
||||
|
||||
DEBUG_SYNC(thd, "alter_table_before_open_tables");
|
||||
|
||||
@ -10050,6 +10069,8 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
|
||||
|
||||
table= table_list->table;
|
||||
bool is_reg_table= table->s->tmp_table == NO_TMP_TABLE;
|
||||
|
||||
online= online && !table->s->tmp_table;
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
if (WSREP(thd) &&
|
||||
@ -10877,7 +10898,8 @@ do_continue:;
|
||||
if (!table->s->tmp_table)
|
||||
{
|
||||
// COPY algorithm doesn't work with concurrent writes.
|
||||
if (alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_NONE)
|
||||
if (!online &&
|
||||
alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_NONE)
|
||||
{
|
||||
my_error(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON, MYF(0),
|
||||
"LOCK=NONE",
|
||||
@ -11020,7 +11042,7 @@ do_continue:;
|
||||
alter_info->create_list, ignore,
|
||||
order_num, order, &copied, &deleted,
|
||||
alter_info->keys_onoff,
|
||||
&alter_ctx))
|
||||
&alter_ctx, online))
|
||||
goto err_new_table_cleanup;
|
||||
}
|
||||
else
|
||||
@ -11514,6 +11536,58 @@ bool mysql_trans_commit_alter_copy_data(THD *thd)
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
#ifdef HAVE_REPLICATION
|
||||
static int online_alter_read_from_binlog(THD *thd, rpl_group_info *rgi,
|
||||
Cache_flip_event_log *log)
|
||||
{
|
||||
MEM_ROOT event_mem_root;
|
||||
Query_arena backup_arena;
|
||||
Query_arena event_arena(&event_mem_root, Query_arena::STMT_INITIALIZED);
|
||||
init_sql_alloc(key_memory_gdl, &event_mem_root,
|
||||
MEM_ROOT_BLOCK_SIZE, 0, MYF(0));
|
||||
|
||||
int error= 0;
|
||||
|
||||
IO_CACHE *log_file= log->flip();
|
||||
|
||||
thd_progress_report(thd, 0, my_b_write_tell(log_file));
|
||||
|
||||
Abort_on_warning_instant_set old_abort_on_warning(thd, 0);
|
||||
do
|
||||
{
|
||||
const auto *descr_event= rgi->rli->relay_log.description_event_for_exec;
|
||||
auto *ev= Log_event::read_log_event(log_file, descr_event, false);
|
||||
if (!ev)
|
||||
break;
|
||||
|
||||
ev->thd= thd;
|
||||
thd->set_n_backup_active_arena(&event_arena, &backup_arena);
|
||||
error= ev->apply_event(rgi);
|
||||
thd->restore_active_arena(&event_arena, &backup_arena);
|
||||
|
||||
event_arena.free_items();
|
||||
free_root(&event_mem_root, MYF(MY_KEEP_PREALLOC));
|
||||
if (ev != rgi->rli->relay_log.description_event_for_exec)
|
||||
delete ev;
|
||||
thd_progress_report(thd, my_b_tell(log_file), thd->progress.max_counter);
|
||||
DEBUG_SYNC(thd, "alter_table_online_progress");
|
||||
} while(!error);
|
||||
|
||||
return error;
|
||||
}
|
||||
#endif // HAVE_REPLICATION
|
||||
|
||||
static void online_alter_cleanup_binlog(THD *thd, TABLE_SHARE *s)
|
||||
{
|
||||
#ifdef HAVE_REPLICATION
|
||||
if (!s->online_alter_binlog)
|
||||
return;
|
||||
// s->online_alter_binlog->reset_logs(thd, false, NULL, 0, 0);
|
||||
s->online_alter_binlog->cleanup();
|
||||
s->online_alter_binlog->~Cache_flip_event_log();
|
||||
s->online_alter_binlog= NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
static int
|
||||
copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
@ -11521,7 +11595,7 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
uint order_num, ORDER *order,
|
||||
ha_rows *copied, ha_rows *deleted,
|
||||
Alter_info::enum_enable_or_disable keys_onoff,
|
||||
Alter_table_ctx *alter_ctx)
|
||||
Alter_table_ctx *alter_ctx, bool online)
|
||||
{
|
||||
int error= 1;
|
||||
Copy_field *copy= NULL, *copy_end;
|
||||
@ -11546,14 +11620,64 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
MYSQL_TIME query_start;
|
||||
DBUG_ENTER("copy_data_between_tables");
|
||||
|
||||
/* Two or 3 stages; Sorting, copying data and update indexes */
|
||||
thd_progress_init(thd, 2 + MY_TEST(order));
|
||||
/*
|
||||
if ORDER BY: sorting
|
||||
always: copying, building indexes.
|
||||
if online: reading up the binlog (second binlog is being written)
|
||||
reading up the second binlog under exclusive lock
|
||||
*/
|
||||
thd_progress_init(thd, MY_TEST(order) + 2 + 2 * MY_TEST(online));
|
||||
|
||||
#ifdef HAVE_REPLICATION
|
||||
if (online)
|
||||
{
|
||||
void *buf= alloc_root(thd->mem_root, sizeof (Cache_flip_event_log));
|
||||
|
||||
from->s->online_alter_binlog= new (buf) Cache_flip_event_log();
|
||||
if (!from->s->online_alter_binlog)
|
||||
DBUG_RETURN(1);
|
||||
|
||||
from->s->online_alter_binlog->init_pthread_objects();
|
||||
|
||||
error= from->s->online_alter_binlog->open(WRITE_CACHE);
|
||||
|
||||
DBUG_ASSERT(!error);
|
||||
|
||||
if (!error)
|
||||
{
|
||||
/*
|
||||
Some engines (for example, InnoDB) might not create a read view
|
||||
until the first row is read. We need to be sure that we won't see any
|
||||
table changes after we enable replication and downgrade the MDL.
|
||||
So, we force the consistent snapshot to be created now.
|
||||
*/
|
||||
handlerton *ht= from->s->db_type();
|
||||
if (ht->start_consistent_snapshot)
|
||||
{
|
||||
thd->tx_isolation= ISO_REPEATABLE_READ;
|
||||
from->file->open_read_view();
|
||||
}
|
||||
}
|
||||
|
||||
if (error)
|
||||
{
|
||||
online_alter_cleanup_binlog(thd, from->s);
|
||||
DBUG_RETURN(1);
|
||||
}
|
||||
|
||||
from->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE);
|
||||
DEBUG_SYNC(thd, "alter_table_online_downgraded");
|
||||
}
|
||||
#else
|
||||
DBUG_ASSERT(!online);
|
||||
#endif // HAVE_REPLICATION
|
||||
|
||||
if (!(copy= new (thd->mem_root) Copy_field[to->s->fields]))
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
if (mysql_trans_prepare_alter_copy_data(thd))
|
||||
{
|
||||
online_alter_cleanup_binlog(thd, from->s);
|
||||
delete [] copy;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
@ -11561,6 +11685,7 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
/* We need external lock before we can disable/enable keys */
|
||||
if (to->file->ha_external_lock(thd, F_WRLCK))
|
||||
{
|
||||
online_alter_cleanup_binlog(thd, from->s);
|
||||
/* Undo call to mysql_trans_prepare_alter_copy_data() */
|
||||
ha_enable_transaction(thd, TRUE);
|
||||
delete [] copy;
|
||||
@ -11587,6 +11712,7 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
Create_field *def;
|
||||
copy_end=copy;
|
||||
to->s->default_fields= 0;
|
||||
error= 1;
|
||||
for (Field **ptr=to->field ; *ptr ; ptr++)
|
||||
{
|
||||
def=it++;
|
||||
@ -11705,6 +11831,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
if (!ignore) /* for now, InnoDB needs the undo log for ALTER IGNORE */
|
||||
to->file->extra(HA_EXTRA_BEGIN_ALTER_COPY);
|
||||
|
||||
DEBUG_SYNC(thd, "alter_table_copy_start");
|
||||
|
||||
while (likely(!(error= info.read_record())))
|
||||
{
|
||||
if (unlikely(thd->killed))
|
||||
@ -11839,14 +11967,11 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
thd->get_stmt_da()->inc_current_row_for_warning();
|
||||
}
|
||||
|
||||
DEBUG_SYNC(thd, "alter_table_copy_end");
|
||||
|
||||
THD_STAGE_INFO(thd, stage_enabling_keys);
|
||||
thd_progress_next_stage(thd);
|
||||
|
||||
if (error > 0 && !from->s->tmp_table)
|
||||
{
|
||||
/* We are going to drop the temporary table */
|
||||
to->file->extra(HA_EXTRA_PREPARE_FOR_DROP);
|
||||
}
|
||||
if (unlikely(to->file->ha_end_bulk_insert()) && error <= 0)
|
||||
{
|
||||
/* Give error, if not already given */
|
||||
@ -11854,6 +11979,7 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
to->file->print_error(my_errno,MYF(0));
|
||||
error= 1;
|
||||
}
|
||||
|
||||
bulk_insert_started= 0;
|
||||
if (!ignore)
|
||||
to->file->extra(HA_EXTRA_END_ALTER_COPY);
|
||||
@ -11861,6 +11987,76 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
cleanup_done= 1;
|
||||
to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
|
||||
|
||||
#ifdef HAVE_REPLICATION
|
||||
if (likely(online && error < 0))
|
||||
{
|
||||
Ha_trx_info *trx_info_save= thd->transaction->all.ha_list;
|
||||
thd->transaction->all.ha_list = NULL;
|
||||
thd_progress_next_stage(thd);
|
||||
Table_map_log_event table_event(thd, from, from->s->table_map_id,
|
||||
from->file->has_transactions());
|
||||
Relay_log_info rli(false);
|
||||
rpl_group_info rgi(&rli);
|
||||
RPL_TABLE_LIST rpl_table(to, TL_WRITE, from, table_event.get_table_def(),
|
||||
copy, copy_end);
|
||||
Cache_flip_event_log *binlog= from->s->online_alter_binlog;
|
||||
rgi.thd= thd;
|
||||
rgi.tables_to_lock= &rpl_table;
|
||||
|
||||
rgi.m_table_map.set_table(from->s->table_map_id, to);
|
||||
|
||||
DBUG_ASSERT(binlog->is_open());
|
||||
|
||||
rli.relay_log.description_event_for_exec=
|
||||
new Format_description_log_event(4);
|
||||
|
||||
// We restore bitmaps, because update event is going to mess up with them.
|
||||
to->default_column_bitmaps();
|
||||
|
||||
error= online_alter_read_from_binlog(thd, &rgi, binlog);
|
||||
|
||||
DEBUG_SYNC(thd, "alter_table_online_before_lock");
|
||||
|
||||
int lock_error=
|
||||
thd->mdl_context.upgrade_shared_lock(from->mdl_ticket, MDL_EXCLUSIVE,
|
||||
(double)thd->variables.lock_wait_timeout);
|
||||
if (!error)
|
||||
error= lock_error;
|
||||
|
||||
if (!error)
|
||||
{
|
||||
thd_progress_next_stage(thd);
|
||||
error= online_alter_read_from_binlog(thd, &rgi, binlog);
|
||||
}
|
||||
|
||||
thd->transaction->all.ha_list = trx_info_save;
|
||||
}
|
||||
else if (unlikely(online)) // error was on copy stage
|
||||
{
|
||||
/*
|
||||
We need to issue a barrier to clean up gracefully.
|
||||
Without this, following possible:
|
||||
T1: ALTER TABLE starts
|
||||
T2: INSERT starts
|
||||
T1: ALTER TABLE fails with error (i.e. ER_DUP_KEY)
|
||||
T1: from->s->online_alter_binlog sets to NULL
|
||||
T2: INSERT committs
|
||||
T2: thd->online_alter_cache_list is not empty
|
||||
T2: binlog_commit: DBUG_ASSERT(binlog); is issued.
|
||||
*/
|
||||
// Ignore the return result. We already have an error.
|
||||
thd->mdl_context.upgrade_shared_lock(from->mdl_ticket,
|
||||
MDL_SHARED_NO_WRITE,
|
||||
thd->variables.lock_wait_timeout);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (error > 0 && !from->s->tmp_table)
|
||||
{
|
||||
/* We are going to drop the temporary table */
|
||||
to->file->extra(HA_EXTRA_PREPARE_FOR_DROP);
|
||||
}
|
||||
|
||||
DEBUG_SYNC(thd, "copy_data_between_tables_before_reset_backup_lock");
|
||||
if (backup_reset_alter_copy_lock(thd))
|
||||
error= 1;
|
||||
@ -11873,6 +12069,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
|
||||
(void) to->file->ha_end_bulk_insert();
|
||||
|
||||
/* Free resources */
|
||||
online_alter_cleanup_binlog(thd, from->s);
|
||||
|
||||
if (init_read_record_done)
|
||||
end_read_record(&info);
|
||||
delete [] copy;
|
||||
|
Reference in New Issue
Block a user