mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
MDEV-34705: Binlog-in-engine: Implement SHOW BINLOG EVENTS
Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
@@ -17,6 +17,24 @@ NONE
|
||||
SHOW MASTER STATUS;
|
||||
File Position Binlog_Do_DB Binlog_Ignore_DB
|
||||
binlog-000000.ibb 767
|
||||
SHOW BINLOG EVENTS IN "binlog-000000.ibb";
|
||||
Log_name Pos Event_type Server_id End_log_pos Info
|
||||
binlog-000000.ibb 0 Gtid 1 0 GTID 0-1-1
|
||||
binlog-000000.ibb 0 Query 1 0 use `test`; CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB
|
||||
binlog-000000.ibb 0 Gtid 1 0 BEGIN GTID 0-1-2
|
||||
binlog-000000.ibb 0 Query 1 0 use `test`; INSERT INTO t1 VALUES (1)
|
||||
binlog-000000.ibb 0 Xid 1 0 COMMIT /* xid=34 */
|
||||
binlog-000000.ibb 0 Gtid 1 0 BEGIN GTID 0-1-3
|
||||
binlog-000000.ibb 0 Query 1 0 use `test`; INSERT INTO t1 VALUES (2)
|
||||
binlog-000000.ibb 0 Query 1 0 use `test`; INSERT INTO t1 VALUES (3)
|
||||
binlog-000000.ibb 0 Xid 1 0 COMMIT /* xid=36 */
|
||||
binlog-000000.ibb 0 Gtid 1 0 GTID 0-1-4
|
||||
binlog-000000.ibb 0 Query 1 0 use `test`; DROP TABLE `t1` /* generated by server */
|
||||
SHOW BINLOG EVENTS LIMIT 2, 3;
|
||||
Log_name Pos Event_type Server_id End_log_pos Info
|
||||
binlog-000000.ibb 0 Gtid 1 0 BEGIN GTID 0-1-2
|
||||
binlog-000000.ibb 0 Query 1 0 use `test`; INSERT INTO t1 VALUES (1)
|
||||
binlog-000000.ibb 0 Xid 1 0 COMMIT /* xid=34 */
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB;
|
||||
SET SESSION binlog_format= ROW;
|
||||
*** Do 1500 transactions ...
|
||||
|
@@ -28,6 +28,8 @@ SELECT @@GLOBAL.binlog_checksum;
|
||||
# checks that the reported file and position is within some reasonable range
|
||||
# of the value left by current code.
|
||||
SHOW MASTER STATUS;
|
||||
SHOW BINLOG EVENTS IN "binlog-000000.ibb";
|
||||
SHOW BINLOG EVENTS LIMIT 2, 3;
|
||||
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY, b VARCHAR(2048)) ENGINE=InnoDB;
|
||||
|
||||
|
@@ -5927,6 +5927,13 @@ public:
|
||||
*/
|
||||
virtual int init_gtid_pos(slave_connection_state *pos,
|
||||
rpl_binlog_state_base *state) = 0;
|
||||
/*
|
||||
Initialize to a legacy-type position (filename, offset). This mostly to
|
||||
support legacy SHOW BINLOG EVENTS.
|
||||
*/
|
||||
virtual int init_legacy_pos(const char *filename, ulonglong offset) = 0;
|
||||
/* Get a binlog name from a file_no. */
|
||||
virtual void get_filename(char name[FN_REFLEN], uint64_t file_no) = 0;
|
||||
|
||||
int read_log_event(String *packet, uint32_t ev_offset, size_t max_allowed);
|
||||
};
|
||||
|
@@ -12302,3 +12302,5 @@ ER_ENGINE_BINLOG_NO_DELETE_DOMAIN
|
||||
eng "The binlog engine does not support DELETE_DOMAIN_ID"
|
||||
ER_BINLOG_CANNOT_READ_STATE
|
||||
eng "Error reading GTID state from the binlog"
|
||||
ER_BINLOG_POS_INVALID
|
||||
eng "The binlog offset %llu is invalid"
|
||||
|
@@ -4821,6 +4821,88 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
|
||||
}
|
||||
|
||||
|
||||
/* Version of mysql_show_binlog_events() for --binlog-storage-engine. */
|
||||
static bool
|
||||
show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi)
|
||||
{
|
||||
bool err= false;
|
||||
|
||||
DBUG_ASSERT(opt_binlog_engine_hton);
|
||||
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS);
|
||||
handler_binlog_reader *reader= (*opt_binlog_engine_hton->get_binlog_reader)();
|
||||
if (!reader)
|
||||
{
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
||||
return true;
|
||||
}
|
||||
|
||||
if (reader->init_legacy_pos(lex_mi->log_file_name, lex_mi->pos))
|
||||
{
|
||||
err= true;
|
||||
goto end;
|
||||
}
|
||||
|
||||
{
|
||||
SELECT_LEX_UNIT *unit= &thd->lex->unit;
|
||||
unit->set_limit(thd->lex->current_select);
|
||||
uint64_t file_no= reader->cur_file_no;
|
||||
ha_rows limit= unit->lim.get_select_limit();
|
||||
String packet;
|
||||
Format_description_log_event fd(4);
|
||||
char name_buf[FN_REFLEN];
|
||||
reader->get_filename(name_buf, file_no);
|
||||
|
||||
for (ha_rows event_count= 0;
|
||||
reader->cur_file_no == file_no && event_count < limit;
|
||||
++event_count)
|
||||
{
|
||||
packet.length(0);
|
||||
int reader_error= reader->read_log_event(&packet, 0,
|
||||
thd->variables.max_allowed_packet + MAX_LOG_EVENT_HEADER);
|
||||
if (reader_error)
|
||||
{
|
||||
if (reader_error != LOG_READ_EOF)
|
||||
{
|
||||
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
|
||||
"SHOW BINLOG EVENTS", "error reading event data");
|
||||
err= true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (unit->lim.check_offset(event_count))
|
||||
continue;
|
||||
const char *errmsg;
|
||||
Log_event *ev= Log_event::read_log_event((const uchar *)packet.ptr(),
|
||||
(uint)packet.length(),
|
||||
&errmsg, &fd, false, false);
|
||||
if (!ev)
|
||||
{
|
||||
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
|
||||
"SHOW BINLOG EVENTS", errmsg);
|
||||
err= true;
|
||||
break;
|
||||
}
|
||||
int send_err= ev->net_send(protocol, name_buf, 0);
|
||||
delete ev;
|
||||
if (send_err)
|
||||
{
|
||||
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
|
||||
"SHOW BINLOG EVENTS", "Net error");
|
||||
err= true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
end:
|
||||
if (!err)
|
||||
my_eof(thd);
|
||||
delete reader;
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
Execute a SHOW BINLOG EVENTS statement.
|
||||
|
||||
@@ -4860,6 +4942,10 @@ bool mysql_show_binlog_events(THD* thd)
|
||||
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
if (opt_binlog_engine_hton &&
|
||||
thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS)
|
||||
DBUG_RETURN(show_engine_binlog_events(thd, protocol, lex_mi));
|
||||
|
||||
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
|
||||
thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
|
||||
|
||||
|
@@ -243,6 +243,8 @@ public:
|
||||
virtual bool data_available() final;
|
||||
virtual int init_gtid_pos(slave_connection_state *pos,
|
||||
rpl_binlog_state_base *state) final;
|
||||
virtual int init_legacy_pos(const char *filename, ulonglong offset) final;
|
||||
virtual void get_filename(char name[FN_REFLEN], uint64_t file_no) final;
|
||||
};
|
||||
|
||||
|
||||
@@ -2162,6 +2164,50 @@ ha_innodb_binlog_reader::init_gtid_pos(slave_connection_state *pos,
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ha_innodb_binlog_reader::init_legacy_pos(const char *filename, ulonglong offset)
|
||||
{
|
||||
uint64_t file_no;
|
||||
if (!filename)
|
||||
{
|
||||
mysql_mutex_lock(&purge_binlog_mutex);
|
||||
file_no= earliest_binlog_file_no;
|
||||
mysql_mutex_unlock(&purge_binlog_mutex);
|
||||
}
|
||||
else if (!is_binlog_name(filename, &file_no))
|
||||
{
|
||||
my_error(ER_UNKNOWN_TARGET_BINLOG, MYF(0));
|
||||
return -1;
|
||||
}
|
||||
if ((uint64_t)offset >= (uint64_t)(UINT32_MAX) << srv_page_size_shift)
|
||||
{
|
||||
my_error(ER_BINLOG_POS_INVALID, MYF(0), offset);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
ToDo: Here, we could start at the beginning of the page containing the
|
||||
requested position. Then read forwards until the requested position is
|
||||
reached. This way we avoid reading garbaga data for invalid request
|
||||
offset.
|
||||
*/
|
||||
chunk_rd.seek(file_no, (uint64_t)offset);
|
||||
chunk_rd.skip_partial(true);
|
||||
cur_file_no= chunk_rd.current_file_no();
|
||||
cur_file_pos= chunk_rd.current_pos();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ha_innodb_binlog_reader::get_filename(char name[FN_REFLEN], uint64_t file_no)
|
||||
{
|
||||
static_assert(BINLOG_NAME_MAX_LEN <= FN_REFLEN,
|
||||
"FN_REFLEN too shot to hold InnoDB binlog name");
|
||||
binlog_name_make_short(name, file_no);
|
||||
}
|
||||
|
||||
|
||||
extern "C" void binlog_get_cache(THD *, IO_CACHE **,
|
||||
handler_binlog_event_group_info **,
|
||||
const rpl_gtid **);
|
||||
|
Reference in New Issue
Block a user