1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-07 00:04:31 +03:00

MDEV-34705: Binlog in Engine

Initial code to read in the binlog dump thread events from InnoDB binlog.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen
2024-08-07 16:12:15 +02:00
parent 219f643ba0
commit 75c334a9f8
8 changed files with 181 additions and 19 deletions

View File

@@ -25,4 +25,6 @@ while ($i < $num_trx) {
--enable_query_log --enable_query_log
SET SESSION binlog_format= MIXED; SET SESSION binlog_format= MIXED;
--exec $MYSQL_BINLOG --read-from-remote-server --user=root --host=127.0.0.1 --port=$MASTER_MYPORT master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.txt
DROP TABLE t2; DROP TABLE t2;

View File

@@ -9158,3 +9158,67 @@ void handler::set_optimizer_costs(THD *thd)
optimizer_where_cost= thd->variables.optimizer_where_cost; optimizer_where_cost= thd->variables.optimizer_where_cost;
optimizer_scan_setup_cost= thd->variables.optimizer_scan_setup_cost; optimizer_scan_setup_cost= thd->variables.optimizer_scan_setup_cost;
} }
int handler_binlog_reader::read_log_event(String *packet, uint32_t ev_offset,
size_t max_allowed)
{
uint32_t sofar= 0;
bool header_read= false;
uint32_t target_size= EVENT_LEN_OFFSET + 4;
int res;
/*
Loop, first reading the "length" field, and then continuing to read data
until a full event has been placed in the packet.
*/
for (;;)
{
if (buf_data_remain <= 0)
{
res= read_binlog_data(buf, sizeof(buf));
if (res <= 0)
{
res= (res < 0 ? LOG_READ_IO : LOG_READ_EOF);
goto err;
}
buf_data_pos= 0;
buf_data_remain= res;
}
uint32_t amount= std::min(target_size - sofar, buf_data_remain);
packet->append((char *)buf + buf_data_pos, amount);
buf_data_pos+= amount;
buf_data_remain-= amount;
sofar+= amount;
if (target_size == sofar)
{
if (header_read)
break;
else
{
header_read= true;
target_size= uint4korr(&((*packet)[EVENT_LEN_OFFSET + ev_offset]));
if (target_size < LOG_EVENT_MINIMAL_HEADER_LEN)
{
res= LOG_READ_BOGUS;
goto err;
}
else if (target_size > max_allowed)
{
res= LOG_READ_TOO_LARGE;
goto err;
}
/*
Note that here we rely on the fact that all valid events have more
data after the length. This way we avoid conditional for the
(useless) special case where we don't need to read anything more
after having read the first part.
*/
DBUG_ASSERT(LOG_EVENT_MINIMAL_HEADER_LEN > EVENT_LEN_OFFSET+4);
}
}
}
res= 0; /* Success */
err:
return res;
}

View File

@@ -5832,11 +5832,14 @@ public:
uchar buf[32768]; uchar buf[32768];
handler_binlog_reader() handler_binlog_reader()
: cur_file_no(~(uint64_t)0), cur_file_offset(0), cur_file(0), : cur_file_no(~(uint64_t)0), cur_file_offset(0), cur_file((File)-1),
buf_data_pos(0), buf_data_remain(0) buf_data_pos(0), buf_data_remain(0)
{ } { }
virtual ~handler_binlog_reader() { };
virtual int read_binlog_data(uchar *buf, uint32_t len) = 0; virtual int read_binlog_data(uchar *buf, uint32_t len) = 0;
int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed);
/* /*
cur_file_no -> implicitly gives file/tablespace cur_file_no -> implicitly gives file/tablespace
cur_file_offset -> implicitly gives page cur_file_offset -> implicitly gives page
@@ -5844,7 +5847,6 @@ public:
cur_chunk_len cur_chunk_len
cur_chunk_sofar cur_chunk_sofar
*/ */
virtual ~handler_binlog_reader() { };
}; };
#endif /* HANDLER_INCLUDED */ #endif /* HANDLER_INCLUDED */

View File

@@ -12292,3 +12292,5 @@ ER_VECTOR_FORMAT_INVALID
eng "Invalid vector format at offset: %d for '%-.100s'. Must be a valid JSON array of numbers." eng "Invalid vector format at offset: %d for '%-.100s'. Must be a valid JSON array of numbers."
ER_PSEUDO_THREAD_ID_OVERWRITE ER_PSEUDO_THREAD_ID_OVERWRITE
eng "Pseudo thread id should not be modified by the client as it will be overwritten" eng "Pseudo thread id should not be modified by the client as it will be overwritten"
ER_CANNOT_INIT_ENGINE_BINLOG_READER
eng "Cannot initialize binlog reader from storage engine %s"

View File

@@ -130,6 +130,7 @@ struct binlog_send_info {
slave_connection_state *until_gtid_state; slave_connection_state *until_gtid_state;
slave_connection_state until_gtid_state_obj; slave_connection_state until_gtid_state_obj;
Format_description_log_event *fdev; Format_description_log_event *fdev;
handler_binlog_reader *engine_binlog_reader;
int mariadb_slave_capability; int mariadb_slave_capability;
enum_gtid_skip_type gtid_skip_group; enum_gtid_skip_type gtid_skip_group;
enum_gtid_until_state gtid_until_group; enum_gtid_until_state gtid_until_group;
@@ -169,6 +170,7 @@ struct binlog_send_info {
char *lfn) char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg), : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
log_file_name(lfn), until_gtid_state(NULL), fdev(NULL), log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
engine_binlog_reader(NULL),
gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE), gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
slave_gtid_strict_mode(false), send_fake_gtid_list(false), slave_gtid_strict_mode(false), send_fake_gtid_list(false),
@@ -188,6 +190,7 @@ struct binlog_send_info {
bzero(&error_gtid, sizeof(error_gtid)); bzero(&error_gtid, sizeof(error_gtid));
until_binlog_state.init(); until_binlog_state.init();
} }
~binlog_send_info() { delete engine_binlog_reader; }
}; };
// prototype // prototype
@@ -2332,6 +2335,16 @@ static int init_binlog_sender(binlog_send_info *info,
String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
connect_gtid_state.length(0); connect_gtid_state.length(0);
/* ToDo: Need more complex logic here. I want to be able to at least switch from legacy to innodb binlog without having to RESET MASTER. So we need to be able to start reading from legacy and then switch over to the binlog in innodb. Also, we might want to pass the init GTID position in so that the binlog reader can find the place to start by itself? But probably still want to allocate the reader here like this. */
if (opt_binlog_engine_hton &&
!(info->engine_binlog_reader=
(*opt_binlog_engine_hton->get_binlog_reader)()))
{
LEX_CSTRING *engine_name= hton_name(opt_binlog_engine_hton);
my_error(ER_CANNOT_INIT_ENGINE_BINLOG_READER, MYF(0), engine_name->str);
return 1;
}
/** save start file/pos that was requested by slave */ /** save start file/pos that was requested by slave */
strmake(info->start_log_file_name, log_ident, strmake(info->start_log_file_name, log_ident,
sizeof(info->start_log_file_name)); sizeof(info->start_log_file_name));
@@ -2919,11 +2932,18 @@ static int send_events(binlog_send_info *info, IO_CACHE* log, LOG_INFO* linfo,
if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
return 1; return 1;
info->last_pos= linfo->pos; handler_binlog_reader *reader= info->engine_binlog_reader;
error= Log_event::read_log_event(log, packet, info->fdev, if (!reader)
opt_master_verify_checksum ? info->current_checksum_alg {
: BINLOG_CHECKSUM_ALG_OFF); info->last_pos= linfo->pos;
linfo->pos= my_b_tell(log); error= Log_event::read_log_event(log, packet, info->fdev,
opt_master_verify_checksum ? info->current_checksum_alg
: BINLOG_CHECKSUM_ALG_OFF);
linfo->pos= my_b_tell(log);
}
else
error= reader->read_log_event(packet, packet->length(),
info->thd->variables.max_allowed_packet);
if (unlikely(error)) if (unlikely(error))
{ {

View File

@@ -4335,6 +4335,25 @@ end:
} }
void fsp_binlog_close()
{
uint64_t file_no= binlog_file_no.load(std::memory_order_relaxed);
// ToDo: this check needs to find another way to determine how many tablespaces are still open, once we no longer always start from 1.
if (file_no >= 2)
fsp_binlog_tablespace_close(file_no - 1);
if (file_no >= 1)
fsp_binlog_tablespace_close(file_no);
/*
ToDo: This doesn't seem to free all memory. I'm still getting leaks in eg. --valgrind. Find out why and fix. Example:
==3464576== at 0x48407B4: malloc (vg_replace_malloc.c:381)
==3464576== by 0x15318CD: mem_strdup(char const*) (mem0mem.inl:452)
==3464576== by 0x15321DF: fil_space_t::add(char const*, pfs_os_file_t, unsigned int, bool, bool, unsigned int) (fil0fil.cc:306)
==3464576== by 0x1558445: fsp_binlog_tablespace_create(unsigned long) (fsp0fsp.cc:3900)
==3464576== by 0x1558C70: fsp_binlog_write_cache(st_io_cache*, unsigned long, mtr_t*) (fsp0fsp.cc:4013)
*/
}
/** Create a binlog tablespace file /** Create a binlog tablespace file
@param[in] file_no Index of the binlog tablespace @param[in] file_no Index of the binlog tablespace
@return DB_SUCCESS or error code */ @return DB_SUCCESS or error code */
@@ -4446,12 +4465,17 @@ void fsp_binlog_append(const uchar *data, uint32_t len, mtr_t *mtr)
void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr) void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
{ {
uint32_t page_size= (uint32_t)srv_page_size;
uint32_t page_size_shift= srv_page_size_shift;
fil_space_t *space= binlog_space; fil_space_t *space= binlog_space;
const uint32_t page_end= (uint32_t)srv_page_size - FIL_PAGE_DATA_END; const uint32_t page_end= page_size - FIL_PAGE_DATA_END;
uint32_t page_no= binlog_cur_page_no; uint32_t page_no= binlog_cur_page_no;
uint32_t page_offset= binlog_cur_page_offset; uint32_t page_offset= binlog_cur_page_offset;
/* ToDo: What is the lifetime of what's pointed to by binlog_cur_block, is there some locking needed around it or something? */ /* ToDo: What is the lifetime of what's pointed to by binlog_cur_block, is there some locking needed around it or something? */
buf_block_t *block= binlog_cur_block; buf_block_t *block= binlog_cur_block;
uint64_t file_no= binlog_file_no.load(std::memory_order_relaxed);
uint32_t idx= file_no & 1;
uint64_t pending_prev_end_offset= 0;
/* /*
Write out the event data in chunks of whatever size will fit in the current Write out the event data in chunks of whatever size will fit in the current
@@ -4481,17 +4505,17 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
to re-use tablespace ids between just two, SRV_SPACE_ID_BINLOG0 and to re-use tablespace ids between just two, SRV_SPACE_ID_BINLOG0 and
SRV_SPACE_ID_BINLOG1. SRV_SPACE_ID_BINLOG1.
*/ */
uint64_t file_no= binlog_file_no.load(std::memory_order_relaxed); pending_prev_end_offset= page_no << page_size_shift;
if (file_no >= 1) if (file_no >= 2)
fsp_binlog_tablespace_close(file_no - 1); fsp_binlog_tablespace_close(file_no - 1);
++file_no; ++file_no;
idx= idx ^ 1;
/* /*
Create a new binlog tablespace. Create a new binlog tablespace.
ToDo: pre-create the next tablespace in a background thread, avoiding ToDo: pre-create the next tablespace in a background thread, avoiding
a large stall here in the common case where pre-allocation is fast a large stall here in the common case where pre-allocation is fast
enough. enough.
*/ */
uint32_t idx= file_no & 1;
binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_written_offset[idx].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed); binlog_cur_end_offset[idx].store(0, std::memory_order_relaxed);
binlog_file_no.store(file_no, std::memory_order_release); binlog_file_no.store(file_no, std::memory_order_release);
@@ -4563,6 +4587,11 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
binlog_cur_block= block; binlog_cur_block= block;
binlog_cur_page_no= page_no; binlog_cur_page_no= page_no;
binlog_cur_page_offset= page_offset; binlog_cur_page_offset= page_offset;
if (UNIV_UNLIKELY(pending_prev_end_offset != 0))
binlog_cur_end_offset[idx ^ 1].store(pending_prev_end_offset,
std::memory_order_relaxed);
binlog_cur_end_offset[idx].store((page_no << page_size_shift) + page_offset,
std::memory_order_relaxed);
} }
@@ -4596,6 +4625,8 @@ void fsp_binlog_test(const uchar *data, uint32_t len)
class ha_innodb_binlog_reader : public handler_binlog_reader { class ha_innodb_binlog_reader : public handler_binlog_reader {
/* Buffer to hold a page read directly from the binlog file. */ /* Buffer to hold a page read directly from the binlog file. */
uchar *page_buf; uchar *page_buf;
/* Length of the currently open file (cur_file). */
uint64_t cur_file_length;
/* Used to keep track of partial chunk returned to reader. */ /* Used to keep track of partial chunk returned to reader. */
uint32_t chunk_pos; uint32_t chunk_pos;
uint32_t chunk_remain; uint32_t chunk_remain;
@@ -4617,11 +4648,16 @@ ha_innodb_binlog_reader::ha_innodb_binlog_reader()
: chunk_pos(0), chunk_remain(0) : chunk_pos(0), chunk_remain(0)
{ {
page_buf= (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, srv_page_size, MYF(0)); /* ToDo: InnoDB alloc function? */ page_buf= (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, srv_page_size, MYF(0)); /* ToDo: InnoDB alloc function? */
// ToDo: Need some mechanism to find where to start reading. This is just "start from 0" for early testing.
// And ToDo 2: we should be starting from binlog 000000, not 000001 as we do now.
cur_file_no= 1;
} }
ha_innodb_binlog_reader::~ha_innodb_binlog_reader() ha_innodb_binlog_reader::~ha_innodb_binlog_reader()
{ {
if (cur_file != (File)-1)
my_close(cur_file, MYF(0));
my_free(page_buf); /* ToDo: InnoDB alloc function? */ my_free(page_buf); /* ToDo: InnoDB alloc function? */
} }
@@ -4652,20 +4688,37 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len)
uint64_t offset= cur_file_offset; uint64_t offset= cur_file_offset;
uint64_t active_file_no= binlog_file_no.load(std::memory_order_acquire); uint64_t active_file_no= binlog_file_no.load(std::memory_order_acquire);
ut_ad(active_file_no >= file_no); ut_ad(active_file_no >= file_no);
if (active_file_no > file_no + 1) if (active_file_no > file_no + 1) {
// ToDo: I think there is a bug here, if we're at the end of active_file_no-2, we will be reading directly from active_file_no-1 without checking properly if buffer pool is needed instead. */
return read_from_file(~(uint64_t)0, buf, len); return read_from_file(~(uint64_t)0, buf, len);
}
uint32_t idx= file_no & 1; uint32_t idx= file_no & 1;
uint64_t write_offset= uint64_t write_offset=
binlog_cur_written_offset[idx].load(std::memory_order_relaxed); binlog_cur_written_offset[idx].load(std::memory_order_relaxed);
// ToDo: I'm not 100% confident about this dirty read of the end_offset. I need to make sure it's not possible to end up using a wrong end offset when reading from a file. When reading from a file that is not the latest, active binlog file, the end offset should basically always come from the file size.
uint64_t end_offset= uint64_t end_offset=
binlog_cur_end_offset[idx].load(std::memory_order_relaxed); binlog_cur_end_offset[idx].load(std::memory_order_relaxed);
/* ToDo: Should I check end_offset? It might be stale and completely wrong? But on the other hand, I _must_ check it somehow, otherwise I might read not yet committed data (possibly never committed). But I should be able to read and check, it cannot be completely wrong, as I have the per-tablespace-id values. And in case of stale, I will then go to lock and wait and get the real value in a safe way, which also results in correct behaviour. And I can never read stale data from the file, data will not be written out until valid and synced in the redo log. */ /* ToDo: Should I check end_offset? It might be stale and completely wrong? But on the other hand, I _must_ check it somehow, otherwise I might read not yet committed data (possibly never committed). But I should be able to read and check, it cannot be completely wrong, as I have the per-tablespace-id values. And in case of stale, I will then go to lock and wait and get the real value in a safe way, which also results in correct behaviour. And I can never read stale data from the file, data will not be written out until valid and synced in the redo log. */
if (file_no == active_file_no) {
ut_ad(end_offset >= offset);
if (end_offset <= offset)
return 0;
} else { /* file_no == active_file_no - 1 */
if (offset >= end_offset) { // ToDo what if this end_pos is stale? Need somehow an extra check afterwards if we are now active-2, and then do a simple file read, not EOF on the stale end_offset.
/* Handle moving to the currently active file. */
cur_file_no= ++file_no;
cur_file_offset= offset= 0;
idx= file_no & 1;
write_offset=
binlog_cur_written_offset[idx].load(std::memory_order_relaxed);
end_offset=
binlog_cur_end_offset[idx].load(std::memory_order_relaxed);
}
}
if (write_offset > offset) if (write_offset > offset)
return read_from_file(std::min(write_offset, end_offset), buf, len); return read_from_file(std::min(write_offset, end_offset), buf, len);
ut_ad(end_offset >= offset);
if (end_offset <= offset)
return 0;
/* /*
The data we need may not yet been written and available to read from the The data we need may not yet been written and available to read from the
@@ -4728,15 +4781,30 @@ ha_innodb_binlog_reader::read_from_file(uint64_t end_offset,
{ {
uint64_t mask= ((uint64_t)1 << srv_page_size_shift) - 1; uint64_t mask= ((uint64_t)1 << srv_page_size_shift) - 1;
uint64_t offset= cur_file_offset; uint64_t offset= cur_file_offset;
uint64_t page_start_offset= offset & ~mask; uint64_t page_start_offset;
if (!cur_file) { if (cur_file < (File)0 || cur_file_offset >= cur_file_length) {
if (!(cur_file < (File)0)) {
my_close(cur_file, MYF(0));
++cur_file_no;
}
char filename[BINLOG_NAME_LEN]; char filename[BINLOG_NAME_LEN];
binlog_name_make(filename, cur_file_no); binlog_name_make(filename, cur_file_no);
if (!(cur_file= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME)))) if ((cur_file= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME))) < (File)0)
return -1; return -1;
/* ToDo: Handle closing the file when we reach the end. In fact, handle reaching the end of a file in the first place. */ /* ToDo: Handle closing the file when we reach the end. In fact, handle reaching the end of a file in the first place. */
MY_STAT stat_buf;
if (my_fstat(cur_file, &stat_buf, MYF(0))) {
my_error(ER_CANT_GET_STAT, MYF(0), filename, errno);
my_close(cur_file, MYF(0));
cur_file= (File)-1;
return -1;
}
cur_file_length= stat_buf.st_size;
cur_file_offset= offset= 0;
} }
page_start_offset= offset & ~mask;
size_t res= my_pread(cur_file, page_buf, srv_page_size, page_start_offset, size_t res= my_pread(cur_file, page_buf, srv_page_size, page_start_offset,
MYF(MY_WME)); MYF(MY_WME));
if (res == (size_t)-1) if (res == (size_t)-1)
@@ -4786,6 +4854,7 @@ ha_innodb_binlog_reader::read_from_page(uchar *page_ptr, uint64_t end_offset,
memcpy(buf, page_ptr + in_page_offset + chunk_pos, len); memcpy(buf, page_ptr + in_page_offset + chunk_pos, len);
chunk_pos+= len; chunk_pos+= len;
chunk_remain= sofar - len; chunk_remain= sofar - len;
cur_file_offset= offset + len;
return len; return len;
} }
} }
@@ -4801,7 +4870,7 @@ ha_innodb_binlog_reader::read_from_page(uchar *page_ptr, uint64_t end_offset,
} }
uint32_t size= uint32_t size=
page_ptr[in_page_offset + 1] + (uint32_t)(page_ptr[in_page_offset + 2] << 8); page_ptr[in_page_offset + 1] + (uint32_t)(page_ptr[in_page_offset + 2] << 8);
if (type != 1 /* ToDo FSP_BINLOG_TYPE_COMMIT */) { if ((type & 0x7f) != 1 /* ToDo FSP_BINLOG_TYPE_COMMIT */) {
/* Skip non-binlog-event record. */ /* Skip non-binlog-event record. */
in_page_offset += 3 + size; in_page_offset += 3 + size;
continue; continue;
@@ -4818,6 +4887,7 @@ ha_innodb_binlog_reader::read_from_page(uchar *page_ptr, uint64_t end_offset,
memcpy(buf + sofar, page_ptr + (in_page_offset + 3), rest); memcpy(buf + sofar, page_ptr + (in_page_offset + 3), rest);
chunk_pos= rest; chunk_pos= rest;
chunk_remain= size - rest; chunk_remain= size - rest;
sofar+= rest;
break; break;
} }

View File

@@ -560,6 +560,7 @@ extern void fsp_binlog_test(const uchar *data, uint32_t len);
extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr); extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr);
class handler_binlog_reader; class handler_binlog_reader;
extern handler_binlog_reader *innodb_get_binlog_reader(); extern handler_binlog_reader *innodb_get_binlog_reader();
extern void fsp_binlog_close();
#ifndef UNIV_DEBUG #ifndef UNIV_DEBUG
# define fsp_init_file_page(space, block, mtr) fsp_init_file_page(block, mtr) # define fsp_init_file_page(space, block, mtr) fsp_init_file_page(block, mtr)

View File

@@ -2044,6 +2044,7 @@ void innodb_shutdown()
logs_empty_and_mark_files_at_shutdown(); logs_empty_and_mark_files_at_shutdown();
} }
fsp_binlog_close();
os_aio_free(); os_aio_free();
fil_space_t::close_all(); fil_space_t::close_all();
/* Exit any remaining threads. */ /* Exit any remaining threads. */