1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

remove handler::open_read_view()

ht->start_consistent_snapshot() is also not a way,
because some engines (e.g. rocksdb) only do it readonly.
instead, downgrade the lock after reading the first row
(which implicitly opens a read view).
This commit is contained in:
Sergei Golubchik
2022-05-25 19:20:30 +02:00
parent 0b67af5a81
commit d767ed5c89
4 changed files with 144 additions and 177 deletions

View File

@ -11623,52 +11623,11 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
*/
thd_progress_init(thd, MY_TEST(order) + 2 + 2 * MY_TEST(online));
#ifdef HAVE_REPLICATION
if (online)
{
from->s->online_alter_binlog= new (thd->mem_root) Cache_flip_event_log();
if (!from->s->online_alter_binlog)
DBUG_RETURN(1);
from->s->online_alter_binlog->init_pthread_objects();
error= from->s->online_alter_binlog->open(WRITE_CACHE);
if (!error)
{
/*
Some engines (for example, InnoDB) might not create a read view
until the first row is read. We need to be sure that we won't see any
table changes after we enable replication and downgrade the MDL.
So, we force the consistent snapshot to be created now.
*/
handlerton *ht= from->s->db_type();
if (ht->start_consistent_snapshot)
{
thd->tx_isolation= ISO_REPEATABLE_READ;
from->file->open_read_view();
}
}
if (error)
{
online_alter_cleanup_binlog(thd, from->s);
DBUG_RETURN(1);
}
from->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE);
DEBUG_SYNC(thd, "alter_table_online_downgraded");
}
#else
DBUG_ASSERT(!online);
#endif // HAVE_REPLICATION
if (!(copy= new (thd->mem_root) Copy_field[to->s->fields]))
DBUG_RETURN(-1);
if (mysql_trans_prepare_alter_copy_data(thd))
{
online_alter_cleanup_binlog(thd, from->s);
delete [] copy;
DBUG_RETURN(-1);
}
@ -11676,7 +11635,6 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
/* We need external lock before we can disable/enable keys */
if (to->file->ha_external_lock(thd, F_WRLCK))
{
online_alter_cleanup_binlog(thd, from->s);
/* Undo call to mysql_trans_prepare_alter_copy_data() */
ha_enable_transaction(thd, TRUE);
delete [] copy;
@ -11822,141 +11780,166 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
if (!ignore) /* for now, InnoDB needs the undo log for ALTER IGNORE */
to->file->extra(HA_EXTRA_BEGIN_ALTER_COPY);
DEBUG_SYNC(thd, "alter_table_copy_start");
while (likely(!(error= info.read_record())))
if (!(error= info.read_record()))
{
if (unlikely(thd->killed))
#ifdef HAVE_REPLICATION
if (online)
{
thd->send_kill_message();
error= 1;
break;
}
from->s->online_alter_binlog= new (thd->mem_root) Cache_flip_event_log();
if (!from->s->online_alter_binlog)
DBUG_RETURN(1);
from->s->online_alter_binlog->init_pthread_objects();
error= from->s->online_alter_binlog->open(WRITE_CACHE);
if (make_unversioned)
{
if (!from_row_end->is_max())
continue; // Drop history rows.
}
if (unlikely(++thd->progress.counter >= time_to_report_progress))
{
time_to_report_progress+= MY_HOW_OFTEN_TO_WRITE/10;
thd_progress_report(thd, thd->progress.counter,
thd->progress.max_counter);
}
/* Return error if source table isn't empty. */
if (unlikely(alter_ctx->error_if_not_empty))
{
error= 1;
break;
}
for (Copy_field *copy_ptr=copy ; copy_ptr != copy_end ; copy_ptr++)
{
copy_ptr->do_copy(copy_ptr);
}
if (make_versioned)
{
to_row_start->set_notnull();
to_row_start->store_time(&query_start);
to_row_end->set_max();
}
prev_insert_id= to->file->next_insert_id;
if (to->default_field)
to->update_default_fields(ignore);
if (to->vfield)
to->update_virtual_fields(to->file, VCOL_UPDATE_FOR_WRITE);
/* This will set thd->is_error() if fatal failure */
if (to->verify_constraints(ignore) == VIEW_CHECK_SKIP)
continue;
if (unlikely(thd->is_error()))
{
error= 1;
break;
}
if (keep_versioned && to->versioned(VERS_TRX_ID))
to->vers_write= false;
if (to->next_number_field)
{
if (auto_increment_field_copied)
to->auto_increment_field_not_null= TRUE;
else
to->next_number_field->reset();
}
error= to->file->ha_write_row(to->record[0]);
to->auto_increment_field_not_null= FALSE;
if (unlikely(error))
{
if (to->file->is_fatal_error(error, HA_CHECK_DUP))
if (error)
{
/* Not a duplicate key error. */
to->file->print_error(error, MYF(0));
error= 1;
break;
online_alter_cleanup_binlog(thd, from->s);
goto err;
}
else
{
/* Duplicate key error. */
if (unlikely(alter_ctx->fk_error_if_delete_row))
{
/*
We are trying to omit a row from the table which serves as parent
in a foreign key. This might have broken referential integrity so
emit an error. Note that we can't ignore this error even if we are
executing ALTER IGNORE TABLE. IGNORE allows to skip rows, but
doesn't allow to break unique or foreign key constraints,
*/
my_error(ER_FK_CANNOT_DELETE_PARENT, MYF(0),
alter_ctx->fk_error_id,
alter_ctx->fk_error_table);
break;
}
if (ignore)
from->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE);
DEBUG_SYNC(thd, "alter_table_online_downgraded");
}
#else
DBUG_ASSERT(!online);
#endif // HAVE_REPLICATION
do
{
if (unlikely(thd->killed))
{
thd->send_kill_message();
error= 1;
break;
}
if (make_unversioned)
{
if (!from_row_end->is_max())
continue; // Drop history rows.
}
if (unlikely(++thd->progress.counter >= time_to_report_progress))
{
time_to_report_progress+= MY_HOW_OFTEN_TO_WRITE/10;
thd_progress_report(thd, thd->progress.counter,
thd->progress.max_counter);
}
/* Return error if source table isn't empty. */
if (unlikely(alter_ctx->error_if_not_empty))
{
error= 1;
break;
}
for (Copy_field *copy_ptr=copy ; copy_ptr != copy_end ; copy_ptr++)
{
copy_ptr->do_copy(copy_ptr);
}
if (make_versioned)
{
to_row_start->set_notnull();
to_row_start->store_time(&query_start);
to_row_end->set_max();
}
prev_insert_id= to->file->next_insert_id;
if (to->default_field)
to->update_default_fields(ignore);
if (to->vfield)
to->update_virtual_fields(to->file, VCOL_UPDATE_FOR_WRITE);
/* This will set thd->is_error() if fatal failure */
if (to->verify_constraints(ignore) == VIEW_CHECK_SKIP)
continue;
if (unlikely(thd->is_error()))
{
error= 1;
break;
}
if (keep_versioned && to->versioned(VERS_TRX_ID))
to->vers_write= false;
if (to->next_number_field)
{
if (auto_increment_field_copied)
to->auto_increment_field_not_null= TRUE;
else
to->next_number_field->reset();
}
error= to->file->ha_write_row(to->record[0]);
to->auto_increment_field_not_null= FALSE;
if (unlikely(error))
{
if (to->file->is_fatal_error(error, HA_CHECK_DUP))
{
/* This ALTER IGNORE TABLE. Simply skip row and continue. */
to->file->restore_auto_increment(prev_insert_id);
delete_count++;
/* Not a duplicate key error. */
to->file->print_error(error, MYF(0));
error= 1;
break;
}
else
{
/* Ordinary ALTER TABLE. Report duplicate key error. */
uint key_nr= to->file->get_dup_key(error);
if ((int) key_nr >= 0)
/* Duplicate key error. */
if (unlikely(alter_ctx->fk_error_if_delete_row))
{
const char *err_msg= ER_THD(thd, ER_DUP_ENTRY_WITH_KEY_NAME);
if (key_nr == 0 && to->s->keys > 0 &&
(to->key_info[0].key_part[0].field->flags &
AUTO_INCREMENT_FLAG))
err_msg= ER_THD(thd, ER_DUP_ENTRY_AUTOINCREMENT_CASE);
print_keydup_error(to,
key_nr >= to->s->keys ? NULL :
&to->key_info[key_nr],
err_msg, MYF(0));
/*
We are trying to omit a row from the table which serves as parent
in a foreign key. This might have broken referential integrity so
emit an error. Note that we can't ignore this error even if we are
executing ALTER IGNORE TABLE. IGNORE allows to skip rows, but
doesn't allow to break unique or foreign key constraints,
*/
my_error(ER_FK_CANNOT_DELETE_PARENT, MYF(0),
alter_ctx->fk_error_id,
alter_ctx->fk_error_table);
break;
}
if (ignore)
{
/* This ALTER IGNORE TABLE. Simply skip row and continue. */
to->file->restore_auto_increment(prev_insert_id);
delete_count++;
}
else
to->file->print_error(error, MYF(0));
break;
{
/* Ordinary ALTER TABLE. Report duplicate key error. */
uint key_nr= to->file->get_dup_key(error);
if ((int) key_nr >= 0)
{
const char *err_msg= ER_THD(thd, ER_DUP_ENTRY_WITH_KEY_NAME);
if (key_nr == 0 && to->s->keys > 0 &&
(to->key_info[0].key_part[0].field->flags &
AUTO_INCREMENT_FLAG))
err_msg= ER_THD(thd, ER_DUP_ENTRY_AUTOINCREMENT_CASE);
print_keydup_error(to,
key_nr >= to->s->keys ? NULL :
&to->key_info[key_nr],
err_msg, MYF(0));
}
else
to->file->print_error(error, MYF(0));
break;
}
}
}
}
else
{
/* In case of alter ignore, notify the engine about it. */
if (ignore)
to->file->extra(HA_EXTRA_IGNORE_INSERT);
DEBUG_SYNC(thd, "copy_data_between_tables_before");
found_count++;
mysql_stage_set_work_completed(thd->m_stage_progress_psi, found_count);
}
thd->get_stmt_da()->inc_current_row_for_warning();
else
{
/* In case of alter ignore, notify the engine about it. */
if (ignore)
to->file->extra(HA_EXTRA_IGNORE_INSERT);
DEBUG_SYNC(thd, "copy_data_between_tables_before");
found_count++;
mysql_stage_set_work_completed(thd->m_stage_progress_psi, found_count);
}
thd->get_stmt_da()->inc_current_row_for_warning();
} while (!(error= info.read_record()));
}
else
online= false;
DEBUG_SYNC(thd, "alter_table_copy_end");