mirror of
https://github.com/MariaDB/server.git
synced 2025-07-29 05:21:33 +03:00
MDEV-31646 Online alter applies binlog cache limit to cache writes
1. Make online disk writes unlimited, same as filesort does. 2. Make proper error handling -- in 32-bit build IO_CACHE capacity limit is 4GB, so it is quite possible to overfill there. 3. Event_log::write_cache complicated with event reparsing, and as it was proven by QA, contains some mistakes. Rewrite introbuce a simpler and much faster version, not featuring reparsing and therefore copying a whole buffer at once. This also disables checksums and crypto. 4. Handle read_log_event errors correctly: error returned is -1 (eof signal for alter table), and my_error is not called. Call my_error and always return 1. There's no test for this, since it shouldn't happen, see the next bullet. 5. An event could be written partially in case of error, if it's bigger than the IO_CACHE buffer. Restore the position where it was before the error was emitted. As a result, online alter is untied of several binlog variables, which was a second aim of this patch.
This commit is contained in:
committed by
Sergei Golubchik
parent
2cecb5a638
commit
d5e59c983f
@ -409,23 +409,23 @@ set debug_sync= 'now SIGNAL start_replication';
|
||||
set debug_sync= 'now WAIT_FOR applied';
|
||||
select stage, progress from INFORMATION_SCHEMA.PROCESSLIST where id = @con;
|
||||
stage progress
|
||||
3 51.220
|
||||
3 53.390
|
||||
set debug_sync= 'now SIGNAL proceed WAIT_FOR applied';
|
||||
select stage, progress from INFORMATION_SCHEMA.PROCESSLIST where id = @con;
|
||||
stage progress
|
||||
3 61.789
|
||||
3 63.559
|
||||
set debug_sync= 'now SIGNAL proceed WAIT_FOR applied';
|
||||
select stage, progress from INFORMATION_SCHEMA.PROCESSLIST where id = @con;
|
||||
stage progress
|
||||
3 70.325
|
||||
3 71.610
|
||||
set debug_sync= 'now SIGNAL proceed WAIT_FOR applied';
|
||||
select stage, progress from INFORMATION_SCHEMA.PROCESSLIST where id = @con;
|
||||
stage progress
|
||||
3 80.894
|
||||
3 81.780
|
||||
set debug_sync= 'now SIGNAL proceed WAIT_FOR applied';
|
||||
select stage, progress from INFORMATION_SCHEMA.PROCESSLIST where id = @con;
|
||||
stage progress
|
||||
3 89.431
|
||||
3 89.831
|
||||
set debug_sync= 'now SIGNAL proceed WAIT_FOR applied';
|
||||
select stage, progress from INFORMATION_SCHEMA.PROCESSLIST where id = @con;
|
||||
stage progress
|
||||
@ -1157,7 +1157,7 @@ a b
|
||||
30 30
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
set debug_dbug= @old_debug;
|
||||
set debug_dbug= @old_dbug;
|
||||
connection default;
|
||||
#
|
||||
# MDEV-30902 Server crash in LEX::first_lists_tables_same
|
||||
@ -1362,9 +1362,57 @@ b
|
||||
556
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
NOT FOUND /Slave SQL/ in mysqld.1.err
|
||||
#
|
||||
# MDEV-31646 Online alter applies binlog cache limit to cache writes
|
||||
#
|
||||
create table t (pk int primary key, a varchar(100)) engine=MyISAM;
|
||||
insert into t select seq, repeat('x', 100) from seq_1_to_500;
|
||||
set @cache.size= @@max_binlog_cache_size;
|
||||
set global max_binlog_cache_size= 4096;
|
||||
set debug_sync= 'now wait_for do_updates';
|
||||
connection con1;
|
||||
set debug_sync= 'alter_table_online_progress signal do_updates wait_for go';
|
||||
alter table t add b int, algorithm=copy, lock=none;
|
||||
connection default;
|
||||
update t set a = repeat('y', 100);
|
||||
show warnings;
|
||||
Level Code Message
|
||||
set debug_sync= 'now signal go';
|
||||
connection con1;
|
||||
show warnings;
|
||||
Level Code Message
|
||||
connection default;
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
set global max_binlog_cache_size= @cache.size;
|
||||
# Now make sure that smaller limits will be processed fine
|
||||
set @old_dbug=@@debug_dbug;
|
||||
create table t (pk int primary key, a text) engine=MyISAM;
|
||||
insert into t select seq, repeat('x', 1000) from seq_1_to_50;
|
||||
connection con1;
|
||||
set debug_sync= 'alter_table_online_progress signal do_updates wait_for go';
|
||||
alter table t add b int, algorithm=copy, lock=none;
|
||||
connection default;
|
||||
set debug_sync= 'now wait_for do_updates';
|
||||
set debug_dbug="+d,online_alter_small_cache";
|
||||
update t set a = repeat('y', 1000);
|
||||
ERROR HY000: Multi-row statements required more than 'max_binlog_stmt_cache_size' bytes of storage.
|
||||
show warnings;
|
||||
Level Code Message
|
||||
Error 1705 Multi-row statements required more than 'max_binlog_stmt_cache_size' bytes of storage.
|
||||
Error 1534 Writing one row to the row-based binary log failed
|
||||
Warning 1196 Some non-transactional changed tables couldn't be rolled back
|
||||
set debug_sync= 'now signal go';
|
||||
connection con1;
|
||||
show warnings;
|
||||
Level Code Message
|
||||
connection default;
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
set debug_dbug= @old_dbug;
|
||||
disconnect con1;
|
||||
disconnect con2;
|
||||
NOT FOUND /Slave SQL/ in mysqld.1.err
|
||||
#
|
||||
# End of 11.2 tests
|
||||
#
|
||||
|
@ -2,6 +2,7 @@
|
||||
--source include/not_embedded.inc
|
||||
--source include/binlog_combinations.inc
|
||||
--source include/have_innodb.inc
|
||||
--source include/have_sequence.inc
|
||||
--source include/have_partition.inc
|
||||
set default_storage_engine= innodb;
|
||||
|
||||
@ -1331,7 +1332,7 @@ select a, b from t;
|
||||
# Cleanup
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
set debug_dbug= @old_debug;
|
||||
set debug_dbug= @old_dbug;
|
||||
--connection default
|
||||
|
||||
--echo #
|
||||
@ -1551,11 +1552,73 @@ drop table t;
|
||||
|
||||
|
||||
set debug_sync= reset;
|
||||
--disconnect con1
|
||||
--disconnect con2
|
||||
let SEARCH_FILE= $MYSQLTEST_VARDIR/log/mysqld.1.err;
|
||||
let SEARCH_PATTERN= Slave SQL;
|
||||
--source include/search_pattern_in_file.inc
|
||||
|
||||
|
||||
--echo #
|
||||
--echo # MDEV-31646 Online alter applies binlog cache limit to cache writes
|
||||
--echo #
|
||||
create table t (pk int primary key, a varchar(100)) engine=MyISAM;
|
||||
insert into t select seq, repeat('x', 100) from seq_1_to_500;
|
||||
|
||||
set @cache.size= @@max_binlog_cache_size;
|
||||
set global max_binlog_cache_size= 4096;
|
||||
|
||||
send set debug_sync= 'now wait_for do_updates';
|
||||
|
||||
--connection con1
|
||||
set debug_sync= 'alter_table_online_progress signal do_updates wait_for go';
|
||||
send alter table t add b int, algorithm=copy, lock=none;
|
||||
|
||||
--connection default
|
||||
--reap
|
||||
update t set a = repeat('y', 100);
|
||||
show warnings;
|
||||
|
||||
set debug_sync= 'now signal go';
|
||||
|
||||
--connection con1
|
||||
--reap
|
||||
show warnings;
|
||||
|
||||
--connection default
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
set global max_binlog_cache_size= @cache.size;
|
||||
|
||||
--echo # Now make sure that smaller limits will be processed fine
|
||||
|
||||
set @old_dbug=@@debug_dbug;
|
||||
create table t (pk int primary key, a text) engine=MyISAM;
|
||||
insert into t select seq, repeat('x', 1000) from seq_1_to_50;
|
||||
|
||||
--connection con1
|
||||
set debug_sync= 'alter_table_online_progress signal do_updates wait_for go';
|
||||
--send
|
||||
alter table t add b int, algorithm=copy, lock=none;
|
||||
|
||||
--connection default
|
||||
set debug_sync= 'now wait_for do_updates';
|
||||
set debug_dbug="+d,online_alter_small_cache";
|
||||
--error ER_STMT_CACHE_FULL
|
||||
update t set a = repeat('y', 1000);
|
||||
show warnings;
|
||||
|
||||
set debug_sync= 'now signal go';
|
||||
|
||||
--connection con1
|
||||
--reap
|
||||
show warnings;
|
||||
|
||||
--connection default
|
||||
drop table t;
|
||||
set debug_sync= reset;
|
||||
set debug_dbug= @old_dbug;
|
||||
|
||||
--disconnect con1
|
||||
--disconnect con2
|
||||
--echo #
|
||||
--echo # End of 11.2 tests
|
||||
--echo #
|
||||
|
56
sql/log.cc
56
sql/log.cc
@ -2286,13 +2286,21 @@ int binlog_log_row_online_alter(TABLE* table, const uchar *before_record,
|
||||
MY_BITMAP *old_rpl_write_set= table->rpl_write_set;
|
||||
table->rpl_write_set= &table->s->all_set;
|
||||
|
||||
table->online_alter_cache->store_prev_position();
|
||||
int error= (*log_func)(thd, table, table->s->online_alter_binlog,
|
||||
table->online_alter_cache, true,
|
||||
table->online_alter_cache,
|
||||
table->file->has_transactions_and_rollback(),
|
||||
before_record, after_record);
|
||||
|
||||
table->rpl_write_set= old_rpl_write_set;
|
||||
|
||||
return unlikely(error) ? HA_ERR_RBR_LOGGING_FAILED : 0;
|
||||
if (unlikely(error))
|
||||
{
|
||||
table->online_alter_cache->restore_prev_position();
|
||||
return HA_ERR_RBR_LOGGING_FAILED;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
@ -3843,9 +3851,8 @@ bool Event_log::open(enum cache_type io_cache_type_arg)
|
||||
if (error)
|
||||
return error;
|
||||
|
||||
longlong bytes_written= write_description_event(
|
||||
(enum_binlog_checksum_alg)binlog_checksum_options,
|
||||
encrypt_binlog, true, false);
|
||||
longlong bytes_written= write_description_event(BINLOG_CHECKSUM_ALG_OFF,
|
||||
false, true, false);
|
||||
return bytes_written < 0;
|
||||
}
|
||||
|
||||
@ -6419,8 +6426,11 @@ write_err:
|
||||
|
||||
|
||||
#ifdef HAVE_REPLICATION
|
||||
static online_alter_cache_data *binlog_setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share)
|
||||
static online_alter_cache_data *
|
||||
online_alter_binlog_setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share)
|
||||
{
|
||||
static ulong online_alter_cache_use= 0, online_alter_cache_disk_use= 0;
|
||||
|
||||
auto cache= new (root) online_alter_cache_data();
|
||||
if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir,
|
||||
LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME)))
|
||||
@ -6432,8 +6442,13 @@ static online_alter_cache_data *binlog_setup_cache_data(MEM_ROOT *root, TABLE_SH
|
||||
share->online_alter_binlog->acquire();
|
||||
cache->hton= share->db_type();
|
||||
cache->sink_log= share->online_alter_binlog;
|
||||
cache->set_binlog_cache_info(max_binlog_cache_size, &binlog_cache_use,
|
||||
&binlog_cache_disk_use);
|
||||
|
||||
my_off_t binlog_max_size= SIZE_T_MAX; // maximum possible cache size
|
||||
DBUG_EXECUTE_IF("online_alter_small_cache", binlog_max_size= 4096;);
|
||||
|
||||
cache->set_binlog_cache_info(binlog_max_size,
|
||||
&online_alter_cache_use,
|
||||
&online_alter_cache_disk_use);
|
||||
cache->store_prev_position();
|
||||
return cache;
|
||||
}
|
||||
@ -6451,7 +6466,7 @@ online_alter_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *tab
|
||||
}
|
||||
|
||||
MEM_ROOT *root= &thd->transaction->mem_root;
|
||||
auto *new_cache_data= binlog_setup_cache_data(root, table->s);
|
||||
auto *new_cache_data= online_alter_binlog_setup_cache_data(root, table->s);
|
||||
list.push_back(*new_cache_data);
|
||||
|
||||
return new_cache_data;
|
||||
@ -7748,7 +7763,7 @@ static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit)
|
||||
{
|
||||
DBUG_ASSERT(cache.cache_log.type != READ_CACHE);
|
||||
mysql_mutex_lock(binlog->get_log_lock());
|
||||
error= binlog->write_cache(thd, &cache.cache_log);
|
||||
error= binlog->write_cache_raw(thd, &cache.cache_log);
|
||||
mysql_mutex_unlock(binlog->get_log_lock());
|
||||
}
|
||||
}
|
||||
@ -7838,6 +7853,27 @@ int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
int Event_log::write_cache_raw(THD *thd, IO_CACHE *cache)
|
||||
{
|
||||
DBUG_ENTER("Event_log::write_cache_raw");
|
||||
mysql_mutex_assert_owner(&LOCK_log);
|
||||
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
|
||||
DBUG_RETURN(ER_ERROR_ON_WRITE);
|
||||
|
||||
IO_CACHE *file= get_log_file();
|
||||
IF_DBUG(size_t total= cache->end_of_file,);
|
||||
do
|
||||
{
|
||||
size_t read_len= cache->read_end - cache->read_pos;
|
||||
int res= my_b_safe_write(file, cache->read_pos, read_len);
|
||||
if (unlikely(res))
|
||||
DBUG_RETURN(res);
|
||||
IF_DBUG(total-= read_len,);
|
||||
} while (my_b_fill(cache));
|
||||
DBUG_ASSERT(total == 0);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
Write the contents of a cache to the binary log.
|
||||
|
||||
|
@ -403,6 +403,7 @@ public:
|
||||
void set_write_error(THD *thd, bool is_transactional);
|
||||
static bool check_write_error(THD *thd);
|
||||
int write_cache(THD *thd, IO_CACHE *cache);
|
||||
int write_cache_raw(THD *thd, IO_CACHE *cache);
|
||||
char* get_name() { return name; }
|
||||
void cleanup()
|
||||
{
|
||||
|
@ -11666,8 +11666,14 @@ static int online_alter_read_from_binlog(THD *thd, rpl_group_info *rgi,
|
||||
{
|
||||
const auto *descr_event= rgi->rli->relay_log.description_event_for_exec;
|
||||
auto *ev= Log_event::read_log_event(log_file, descr_event, false);
|
||||
if (!ev)
|
||||
error= log_file->error;
|
||||
if (unlikely(!ev))
|
||||
{
|
||||
if (error)
|
||||
my_error(ER_IO_READ_ERROR,MYF(0), (ulong)EIO, strerror(EIO), "");
|
||||
break;
|
||||
}
|
||||
DBUG_ASSERT(!error);
|
||||
|
||||
ev->thd= thd;
|
||||
error= ev->apply_event(rgi);
|
||||
@ -11680,7 +11686,7 @@ static int online_alter_read_from_binlog(THD *thd, rpl_group_info *rgi,
|
||||
} while(!error);
|
||||
thd->pop_internal_handler();
|
||||
|
||||
return error;
|
||||
return MY_TEST(error);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
Reference in New Issue
Block a user