1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-08 11:22:35 +03:00

MDEV-34705: Binlog-in-engine: Implement PURGE BINARY LOGS

Still ToDo: is to restrict auto-purge so that it does not purge any binlog
file with out-of-band data that might still be needed by a connected slave.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2025-02-04 21:22:12 +00:00
parent d4b37fcc85
commit 0671add213
15 changed files with 1040 additions and 88 deletions

View File

@@ -612,9 +612,7 @@ static my_bool adjust_callback(THD *thd, my_off_t *purge_offset)
we just started reading the index file. In that case
we have nothing to adjust
*/
if (linfo->index_file_offset < *purge_offset)
linfo->fatal= (linfo->index_file_offset != 0);
else
if (linfo->index_file_offset >= *purge_offset)
linfo->index_file_offset-= *purge_offset;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
@@ -654,7 +652,7 @@ static my_bool log_in_use_callback(THD *thd, st_log_in_use *arg)
/*
Check if a log is in use.
Check if a log is in use (legacy binlog).
@return 0 Not used
@return 1 A slave is reading from the log
@@ -676,6 +674,94 @@ int log_in_use(const char* log_name, uint min_connected)
}
struct st_engine_binlog_in_use {
uint64_t min_file_no;
uint count;
};
my_bool
engine_binlog_in_use_callback(THD *thd, st_engine_binlog_in_use *arg)
{
if (thd->current_linfo)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
if (LOG_INFO *linfo= thd->current_linfo)
{
uint64_t file_no= linfo->file_no.load(std::memory_order_relaxed);
if (file_no < arg->min_file_no)
arg->min_file_no= file_no;
if (file_no != ~(uint64_t)0)
++arg->count;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
return FALSE;
}
/*
Find earliest binlog file in use (--binlog-storage-engine).
Returns a pair of the earliest file_no binlog in use by a dump thread,
and the number of actively running dump threads.
*/
std::pair<uint64_t, uint>
engine_binlog_in_use()
{
DBUG_ASSERT(opt_binlog_engine_hton);
st_engine_binlog_in_use arg{~(uint64_t)0, 0};
server_threads.iterate(engine_binlog_in_use_callback, &arg);
return {arg.min_file_no, arg.count};
}
/*
Inform engine about server state relevant for automatic binlog purge.
Used by engines that implement --binlog-storage-engine.
Returns true if automatic purge should proceed with supplied information,
false if automatic purge is disabled due to
--slave-connections-needed-for-purge.
*/
bool
ha_binlog_purge_info(handler_binlog_purge_info *out_info)
{
auto p= engine_binlog_in_use();
out_info->limit_file_no= p.first;
uint num_dump_threads= p.second;
out_info->purge_by_name= false;
out_info->limit_name= nullptr;
if (binlog_expire_logs_seconds)
{
out_info->purge_by_date= true;
out_info->limit_date= my_time(0) - binlog_expire_logs_seconds;
}
else
out_info->purge_by_date= false;
if (binlog_space_limit)
{
out_info->purge_by_size= true;
out_info->limit_size= binlog_space_limit;
}
else
out_info->purge_by_size= false;
out_info->nonpurge_filename[0]= '\0';
if (num_dump_threads >= slave_connections_needed_for_purge)
{
out_info->nonpurge_reason= nullptr;
return true;
}
else
{
out_info->nonpurge_reason= "less than 'slave_connections_needed_for_purge' "
"slaves have processed it";
return false;
}
}
bool purge_error_message(THD* thd, int res)
{
uint errcode;
@@ -703,17 +789,53 @@ bool purge_error_message(THD* thd, int res)
*/
bool purge_master_logs(THD* thd, const char* to_log)
{
char search_file_name[FN_REFLEN];
if (!mysql_bin_log.is_open())
{
my_ok(thd);
return FALSE;
}
mysql_bin_log.make_log_name(search_file_name, to_log);
return purge_error_message(thd,
mysql_bin_log.purge_logs(thd, search_file_name,
0, 1, 1, 1, NULL));
int res;
if (!opt_binlog_engine_hton)
{
char search_file_name[FN_REFLEN];
mysql_bin_log.make_log_name(search_file_name, to_log);
res= mysql_bin_log.purge_logs(thd, search_file_name, 0, 1, 1, 1, NULL);
}
else
{
handler_binlog_purge_info purge_info;
auto p= engine_binlog_in_use();
purge_info.limit_file_no= p.first;
uint num_dump_threads= p.second;
if (num_dump_threads < slave_connections_needed_for_purge)
{
/*
Prevent purging any file.
We need to do it this way, since we have to call into the engine to let
it check if there are any files to potentially purge. If there are, we
want to give an error that purge was not possible. But if there were no
files to purge in any case, we do not want to give any error.
*/
purge_info.limit_file_no= 0;
purge_info.nonpurge_reason= "less than "
"'slave_connections_needed_for_purge' slaves have processed it";
}
else
purge_info.nonpurge_reason= nullptr;
purge_info.nonpurge_filename[0]= '\0';
purge_info.purge_by_date= false;
purge_info.limit_date= (time_t)0;
purge_info.purge_by_size= false;
purge_info.limit_size= 0;
purge_info.purge_by_name= true;
purge_info.limit_name= to_log;
res= (*opt_binlog_engine_hton->binlog_purge)(&purge_info);
if (res && purge_info.nonpurge_reason)
give_purge_note(purge_info.nonpurge_reason,
purge_info.nonpurge_filename, true);
}
return purge_error_message(thd, res);
}
@@ -735,10 +857,36 @@ bool purge_master_logs_before_date(THD* thd, time_t purge_time)
my_ok(thd);
return 0;
}
return purge_error_message(thd,
mysql_bin_log.purge_logs_before_date(thd,
purge_time,
1));
int res;
if (!opt_binlog_engine_hton)
res= mysql_bin_log.purge_logs_before_date(thd, purge_time, 1);
else
{
handler_binlog_purge_info purge_info;
auto p= engine_binlog_in_use();
purge_info.limit_file_no= p.first;
uint num_dump_threads= p.second;
if (num_dump_threads < slave_connections_needed_for_purge)
{
purge_info.limit_file_no= 0;
purge_info.nonpurge_reason= "less than "
"'slave_connections_needed_for_purge' slaves have processed it";
}
else
purge_info.nonpurge_reason= nullptr;
purge_info.nonpurge_filename[0]= '\0';
purge_info.purge_by_date= true;
purge_info.limit_date= purge_time;
purge_info.purge_by_size= false;
purge_info.limit_size= 0;
purge_info.purge_by_name= false;
purge_info.limit_name= nullptr;
res= (*opt_binlog_engine_hton->binlog_purge)(&purge_info);
if (res && purge_info.nonpurge_reason)
give_purge_note(purge_info.nonpurge_reason,
purge_info.nonpurge_filename, true);
}
return purge_error_message(thd, res);
}
void set_read_error(binlog_send_info *info, int error)
@@ -3169,7 +3317,7 @@ static int send_events(binlog_send_info *info, IO_CACHE* log, LOG_INFO* linfo,
* return 0 - OK
* else NOK
*/
static int send_engine_events(binlog_send_info *info)
static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo)
{
int error;
ulong ev_offset;
@@ -3191,6 +3339,10 @@ static int send_engine_events(binlog_send_info *info)
set_read_error(info, error);
return 1;
}
linfo->file_no.store(reader->cur_file_no, std::memory_order_relaxed);
linfo->pos= (my_off_t) reader->cur_file_pos;
if (error == LOG_READ_EOF)
{
PSI_stage_info old_stage;
@@ -3280,7 +3432,7 @@ static int send_one_binlog_file(binlog_send_info *info,
if (opt_binlog_engine_hton)
{
info->dirlen= 0;
if (send_engine_events(info))
if (send_engine_events(info, linfo))
return 1;
}
else
@@ -5105,7 +5257,8 @@ retry:
cur_link->name.str+= dir_len;
cur_link->name.length-= dir_len;
if (mysql_bin_log.get_reset_master_count() > expected_reset_masters)
if (!opt_binlog_engine_hton &&
mysql_bin_log.get_reset_master_count() > expected_reset_masters)
{
/*
Reset master was called after we cached filenames.
@@ -5115,7 +5268,8 @@ retry:
goto retry;
}
if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
if (!opt_binlog_engine_hton &&
!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
cur_link->size= cur.pos; /* The active log, use the active position */
else
{