mirror of
https://github.com/MariaDB/server.git
synced 2025-08-08 11:22:35 +03:00
MDEV-34705: Binlog in Engine: Searchability for GTID position
Every N bytes (hardcoded at 64k for now, to become a configurable setting), write the binlog GTID state into the binlog tablespace. This allows to quickly find a given GTID position by binary search to the prior GTID state in the tablespace and then a small linear scan from that point. The full binlog state is dumped at the start of the binlog file; remaining states dumped are differential states containing only the changed (domain_id, server_id) pairs, to save space if binlog space is large. This commit only implements the writing of the binlog state to the tablespace at regular intervals. The binary search to be implemented in a subsequent commit. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
@@ -60,6 +60,7 @@ class Field_blob;
|
|||||||
class Column_definition;
|
class Column_definition;
|
||||||
class select_result;
|
class select_result;
|
||||||
class handler_binlog_reader;
|
class handler_binlog_reader;
|
||||||
|
struct rpl_gtid;
|
||||||
|
|
||||||
// the following is for checking tables
|
// the following is for checking tables
|
||||||
|
|
||||||
@@ -1531,7 +1532,8 @@ struct handlerton
|
|||||||
|
|
||||||
/* Optional implementation of binlog in the engine. */
|
/* Optional implementation of binlog in the engine. */
|
||||||
bool (*binlog_init)(size_t binlog_size);
|
bool (*binlog_init)(size_t binlog_size);
|
||||||
bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size);
|
bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size,
|
||||||
|
const rpl_gtid *gtid);
|
||||||
handler_binlog_reader * (*get_binlog_reader)();
|
handler_binlog_reader * (*get_binlog_reader)();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
24
sql/log.cc
24
sql/log.cc
@@ -1836,11 +1836,13 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
|
|||||||
|
|
||||||
extern "C"
|
extern "C"
|
||||||
void
|
void
|
||||||
binlog_get_cache(THD *thd, IO_CACHE **out_cache, size_t *out_main_size)
|
binlog_get_cache(THD *thd, IO_CACHE **out_cache, size_t *out_main_size,
|
||||||
|
const rpl_gtid **out_gtid)
|
||||||
{
|
{
|
||||||
IO_CACHE *cache= nullptr;
|
IO_CACHE *cache= nullptr;
|
||||||
size_t main_size= 0;
|
size_t main_size= 0;
|
||||||
binlog_cache_mngr *cache_mngr;
|
binlog_cache_mngr *cache_mngr;
|
||||||
|
const rpl_gtid *gtid= nullptr;
|
||||||
if (likely(!opt_bootstrap /* ToDo needed? */) &&
|
if (likely(!opt_bootstrap /* ToDo needed? */) &&
|
||||||
opt_binlog_engine_hton &&
|
opt_binlog_engine_hton &&
|
||||||
(cache_mngr= thd->binlog_get_cache_mngr()))
|
(cache_mngr= thd->binlog_get_cache_mngr()))
|
||||||
@@ -1848,9 +1850,11 @@ binlog_get_cache(THD *thd, IO_CACHE **out_cache, size_t *out_main_size)
|
|||||||
main_size= cache_mngr->gtid_cache_offset;
|
main_size= cache_mngr->gtid_cache_offset;
|
||||||
cache= !cache_mngr->trx_cache.empty() ?
|
cache= !cache_mngr->trx_cache.empty() ?
|
||||||
&cache_mngr->trx_cache.cache_log : &cache_mngr->stmt_cache.cache_log;
|
&cache_mngr->trx_cache.cache_log : &cache_mngr->stmt_cache.cache_log;
|
||||||
|
gtid= thd->get_last_commit_gtid();
|
||||||
}
|
}
|
||||||
*out_cache= cache;
|
*out_cache= cache;
|
||||||
*out_main_size= main_size;
|
*out_main_size= main_size;
|
||||||
|
*out_gtid= gtid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -7087,6 +7091,17 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ToDo: Should this be a service? */
|
||||||
|
bool
|
||||||
|
load_global_binlog_state(rpl_binlog_state_base *state)
|
||||||
|
{
|
||||||
|
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
|
||||||
|
bool err= state->load_nolock(&rpl_global_gtid_binlog_state);
|
||||||
|
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
MYSQL_BIN_LOG::append_state_pos(String *str)
|
MYSQL_BIN_LOG::append_state_pos(String *str)
|
||||||
{
|
{
|
||||||
@@ -7471,7 +7486,8 @@ err:
|
|||||||
mysql_mutex_unlock(&LOCK_log);
|
mysql_mutex_unlock(&LOCK_log);
|
||||||
mysql_mutex_lock(&LOCK_commit_ordered);
|
mysql_mutex_lock(&LOCK_commit_ordered);
|
||||||
mysql_mutex_unlock(&LOCK_after_binlog_sync);
|
mysql_mutex_unlock(&LOCK_after_binlog_sync);
|
||||||
if ((*opt_binlog_engine_hton->binlog_write_direct)(file, binlog_main_bytes))
|
if ((*opt_binlog_engine_hton->binlog_write_direct)
|
||||||
|
(file, binlog_main_bytes, thd->get_last_commit_gtid()))
|
||||||
goto engine_fail;
|
goto engine_fail;
|
||||||
/* ToDo: Need to set last_commit_pos_offset here? */
|
/* ToDo: Need to set last_commit_pos_offset here? */
|
||||||
/* ToDo: Maybe binlog_write_direct() could return the coords. */
|
/* ToDo: Maybe binlog_write_direct() could return the coords. */
|
||||||
@@ -7576,14 +7592,14 @@ err:
|
|||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
MYSQL_BIN_LOG::update_gtid_index(uint32 offset, rpl_gtid gtid)
|
MYSQL_BIN_LOG::update_gtid_index(uint32 offset, const rpl_gtid *gtid)
|
||||||
{
|
{
|
||||||
if (!unlikely(gtid_index))
|
if (!unlikely(gtid_index))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
rpl_gtid *gtid_list;
|
rpl_gtid *gtid_list;
|
||||||
uint32 gtid_count;
|
uint32 gtid_count;
|
||||||
int err= gtid_index->process_gtid_check_batch(offset, >id,
|
int err= gtid_index->process_gtid_check_batch(offset, gtid,
|
||||||
>id_list, >id_count);
|
>id_list, >id_count);
|
||||||
if (err)
|
if (err)
|
||||||
return;
|
return;
|
||||||
|
@@ -600,6 +600,7 @@ class binlog_cache_mngr;
|
|||||||
class binlog_cache_data;
|
class binlog_cache_data;
|
||||||
struct rpl_gtid;
|
struct rpl_gtid;
|
||||||
struct wait_for_commit;
|
struct wait_for_commit;
|
||||||
|
struct rpl_binlog_state_base;
|
||||||
|
|
||||||
class MYSQL_BIN_LOG: public TC_LOG, private Event_log
|
class MYSQL_BIN_LOG: public TC_LOG, private Event_log
|
||||||
{
|
{
|
||||||
@@ -763,7 +764,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private Event_log
|
|||||||
bool write_transaction_to_binlog_events(group_commit_entry *entry);
|
bool write_transaction_to_binlog_events(group_commit_entry *entry);
|
||||||
void trx_group_commit_leader(group_commit_entry *leader);
|
void trx_group_commit_leader(group_commit_entry *leader);
|
||||||
bool is_xidlist_idle_nolock();
|
bool is_xidlist_idle_nolock();
|
||||||
void update_gtid_index(uint32 offset, rpl_gtid gtid);
|
void update_gtid_index(uint32 offset, const rpl_gtid *gtid);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void purge(bool all);
|
void purge(bool all);
|
||||||
@@ -1191,6 +1192,9 @@ public:
|
|||||||
char binlog_end_pos_file[FN_REFLEN];
|
char binlog_end_pos_file[FN_REFLEN];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
extern bool load_global_binlog_state(rpl_binlog_state_base *state);
|
||||||
|
|
||||||
|
|
||||||
class Log_event_handler
|
class Log_event_handler
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
@@ -5512,7 +5512,7 @@ private:
|
|||||||
rpl_gtid m_last_commit_gtid;
|
rpl_gtid m_last_commit_gtid;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
rpl_gtid get_last_commit_gtid() { return m_last_commit_gtid; }
|
const rpl_gtid *get_last_commit_gtid() { return &m_last_commit_gtid; }
|
||||||
void set_last_commit_gtid(rpl_gtid >id);
|
void set_last_commit_gtid(rpl_gtid >id);
|
||||||
|
|
||||||
|
|
||||||
|
@@ -2324,7 +2324,7 @@ Sys_var_last_gtid::session_value_ptr(THD *thd, const LEX_CSTRING *base) const
|
|||||||
bool first= true;
|
bool first= true;
|
||||||
|
|
||||||
str.length(0);
|
str.length(0);
|
||||||
rpl_gtid gtid= thd->get_last_commit_gtid();
|
rpl_gtid gtid= *thd->get_last_commit_gtid();
|
||||||
if ((gtid.seq_no > 0 &&
|
if ((gtid.seq_no > 0 &&
|
||||||
rpl_slave_state_tostring_helper(&str, >id, &first)) ||
|
rpl_slave_state_tostring_helper(&str, >id, &first)) ||
|
||||||
!(p= thd->strmake(str.ptr(), str.length())))
|
!(p= thd->strmake(str.ptr(), str.length())))
|
||||||
|
@@ -51,6 +51,7 @@ Created 11/29/1995 Heikki Tuuri
|
|||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
#include "trx0undo.h"
|
#include "trx0undo.h"
|
||||||
#include "trx0trx.h"
|
#include "trx0trx.h"
|
||||||
|
#include "ut0compr_int.h"
|
||||||
#include "rpl_gtid_base.h"
|
#include "rpl_gtid_base.h"
|
||||||
|
|
||||||
/** Returns the first extent descriptor for a segment.
|
/** Returns the first extent descriptor for a segment.
|
||||||
@@ -4277,6 +4278,24 @@ func_exit:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Binlog implementation in InnoDB.
|
||||||
|
ToDo: Move this somewhere reasonable, its own file(s) etc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
How often (in terms of bytes written) to dump a (differential) binlog state
|
||||||
|
at the start of the page, to speed up finding the initial GTID position for
|
||||||
|
a connecting slave.
|
||||||
|
|
||||||
|
Must be a power-of-two multiple of the page size.
|
||||||
|
|
||||||
|
ToDo: An InnoDB setting for this - which needs to be persisted in the
|
||||||
|
tablespace header or maybe initial full binlog state record.
|
||||||
|
ToDo: Default should be like 1M or so, this 64k is for testing.
|
||||||
|
*/
|
||||||
|
#define INNODB_BINLOG_STATE_INTERVAL (64*1024)
|
||||||
|
|
||||||
static uint32_t binlog_size_in_pages;
|
static uint32_t binlog_size_in_pages;
|
||||||
buf_block_t *binlog_cur_block;
|
buf_block_t *binlog_cur_block;
|
||||||
uint32_t binlog_cur_page_no;
|
uint32_t binlog_cur_page_no;
|
||||||
@@ -4309,6 +4328,12 @@ std::atomic<uint64_t> first_open_binlog_file_no;
|
|||||||
uint64_t last_created_binlog_file_no;
|
uint64_t last_created_binlog_file_no;
|
||||||
fil_space_t *last_created_binlog_space;
|
fil_space_t *last_created_binlog_space;
|
||||||
|
|
||||||
|
/*
|
||||||
|
Differential binlog state in the currently active binlog tablespace, relative
|
||||||
|
to the state at the start.
|
||||||
|
*/
|
||||||
|
static rpl_binlog_state_base binlog_diff_state;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Point at which it is guaranteed that all data has been written out to the
|
Point at which it is guaranteed that all data has been written out to the
|
||||||
binlog file (on the OS level; not necessarily fsync()'ed yet).
|
binlog file (on the OS level; not necessarily fsync()'ed yet).
|
||||||
@@ -4386,15 +4411,17 @@ fsp_binlog_tablespace_close(uint64_t file_no)
|
|||||||
/*
|
/*
|
||||||
Write out any remaining pages in the buffer pool to the binlog tablespace.
|
Write out any remaining pages in the buffer pool to the binlog tablespace.
|
||||||
Then flush the file to disk, and close the old tablespace.
|
Then flush the file to disk, and close the old tablespace.
|
||||||
|
|
||||||
|
ToDo: Will this turn into a busy-wait if some of the pages are still latched
|
||||||
|
in this tablespace, maybe because even though the tablespace has been
|
||||||
|
written full, the mtr that's ending in the next tablespace may still be
|
||||||
|
active? This will need fixing, no busy-wait should be done here.
|
||||||
*/
|
*/
|
||||||
while (buf_flush_list_space(space))
|
while (buf_flush_list_space(space))
|
||||||
;
|
;
|
||||||
os_aio_wait_until_no_pending_writes(false);
|
os_aio_wait_until_no_pending_writes(false);
|
||||||
space->flush<false>();
|
space->flush<false>();
|
||||||
mysql_mutex_lock(&fil_system.mutex);
|
fil_space_free(space_id, false);
|
||||||
fil_system.detach(space, false);
|
|
||||||
mysql_mutex_unlock(&fil_system.mutex);
|
|
||||||
|
|
||||||
res= DB_SUCCESS;
|
res= DB_SUCCESS;
|
||||||
end:
|
end:
|
||||||
return res;
|
return res;
|
||||||
@@ -4411,6 +4438,7 @@ fsp_binlog_init()
|
|||||||
{
|
{
|
||||||
mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr);
|
mysql_mutex_init(fsp_active_binlog_mutex_key, &active_binlog_mutex, nullptr);
|
||||||
pthread_cond_init(&active_binlog_cond, nullptr);
|
pthread_cond_init(&active_binlog_cond, nullptr);
|
||||||
|
binlog_diff_state.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -4849,6 +4877,7 @@ void fsp_binlog_close()
|
|||||||
==3464576== by 0x1558445: fsp_binlog_tablespace_create(unsigned long) (fsp0fsp.cc:3900)
|
==3464576== by 0x1558445: fsp_binlog_tablespace_create(unsigned long) (fsp0fsp.cc:3900)
|
||||||
==3464576== by 0x1558C70: fsp_binlog_write_cache(st_io_cache*, unsigned long, mtr_t*) (fsp0fsp.cc:4013)
|
==3464576== by 0x1558C70: fsp_binlog_write_cache(st_io_cache*, unsigned long, mtr_t*) (fsp0fsp.cc:4013)
|
||||||
*/
|
*/
|
||||||
|
binlog_diff_state.free();
|
||||||
pthread_cond_destroy(&active_binlog_cond);
|
pthread_cond_destroy(&active_binlog_cond);
|
||||||
mysql_mutex_destroy(&active_binlog_mutex);
|
mysql_mutex_destroy(&active_binlog_mutex);
|
||||||
}
|
}
|
||||||
@@ -4987,54 +5016,127 @@ fsp_binlog_prealloc_thread()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void fsp_binlog_write_start(uint32_t page_no,
|
__attribute__((noinline))
|
||||||
const uchar *data, uint32_t len, mtr_t *mtr)
|
static ssize_t
|
||||||
|
serialize_gtid_state(rpl_binlog_state_base *state, byte *buf, size_t buf_size,
|
||||||
|
bool is_first_page)
|
||||||
{
|
{
|
||||||
buf_block_t *block= fsp_page_create(active_binlog_space, page_no, mtr);
|
unsigned char *p= (unsigned char *)buf;
|
||||||
mtr->memcpy<mtr_t::MAYBE_NOP>(*block, FIL_PAGE_DATA + block->page.frame,
|
ut_ad(buf_size >= 2*COMPR_INT_MAX32 + 2*COMPR_INT_MAX64);
|
||||||
data, len);
|
if (is_first_page) {
|
||||||
binlog_cur_block= block;
|
/*
|
||||||
}
|
In the first page where we put the full state, include the value of the
|
||||||
|
setting for the interval at which differential states are binlogged, so
|
||||||
void fsp_binlog_write_offset(uint32_t page_no, uint32_t offset,
|
we know how to search them independent of how the setting changes.
|
||||||
const uchar *data, uint32_t len, mtr_t *mtr)
|
*/
|
||||||
{
|
p= compr_int_write(p, INNODB_BINLOG_STATE_INTERVAL);
|
||||||
dberr_t err;
|
|
||||||
/* ToDo: Is RW_SX_LATCH appropriate here? */
|
|
||||||
buf_block_t *block= buf_page_get_gen(page_id_t{active_binlog_space->id, page_no},
|
|
||||||
0, RW_SX_LATCH, binlog_cur_block,
|
|
||||||
BUF_GET, mtr, &err);
|
|
||||||
ut_a(err == DB_SUCCESS);
|
|
||||||
mtr->memcpy<mtr_t::MAYBE_NOP>(*block,
|
|
||||||
offset + block->page.frame,
|
|
||||||
data, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
void fsp_binlog_append(const uchar *data, uint32_t len, mtr_t *mtr)
|
|
||||||
{
|
|
||||||
ut_ad(binlog_cur_page_offset <= srv_page_size - FIL_PAGE_DATA_END);
|
|
||||||
uint32_t remain= ((uint32_t)srv_page_size - FIL_PAGE_DATA_END) -
|
|
||||||
binlog_cur_page_offset;
|
|
||||||
// ToDo: Some kind of mutex to protect binlog access.
|
|
||||||
while (len > 0) {
|
|
||||||
if (remain < 4) {
|
|
||||||
binlog_cur_page_offset= FIL_PAGE_DATA;
|
|
||||||
remain= ((uint32_t)srv_page_size - FIL_PAGE_DATA_END) -
|
|
||||||
binlog_cur_page_offset;
|
|
||||||
++binlog_cur_page_no;
|
|
||||||
}
|
|
||||||
uint32_t this_len= std::min<uint32_t>(len, remain);
|
|
||||||
if (binlog_cur_page_offset == FIL_PAGE_DATA)
|
|
||||||
fsp_binlog_write_start(binlog_cur_page_no, data, this_len, mtr);
|
|
||||||
else
|
|
||||||
fsp_binlog_write_offset(binlog_cur_page_no, binlog_cur_page_offset,
|
|
||||||
data, this_len, mtr);
|
|
||||||
len-= this_len;
|
|
||||||
data+= this_len;
|
|
||||||
binlog_cur_page_offset+= this_len;
|
|
||||||
}
|
}
|
||||||
|
unsigned char * const pmax=
|
||||||
|
p + (buf_size - (2*COMPR_INT_MAX32 + COMPR_INT_MAX64));
|
||||||
|
|
||||||
|
if (state->iterate(
|
||||||
|
[buf, buf_size, pmax, &p] (const rpl_gtid *gtid) {
|
||||||
|
if (UNIV_UNLIKELY(p > pmax))
|
||||||
|
return true;
|
||||||
|
p= compr_int_write(p, gtid->domain_id);
|
||||||
|
p= compr_int_write(p, gtid->server_id);
|
||||||
|
p= compr_int_write(p, gtid->seq_no);
|
||||||
|
return false;
|
||||||
|
}))
|
||||||
|
return -1;
|
||||||
|
else
|
||||||
|
return p - (unsigned char *)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
|
||||||
|
buf_block_t * &block, uint32_t &page_no,
|
||||||
|
uint32_t &page_offset, fil_space_t *space)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Use a small, efficient stack-allocated buffer by default, falling back to
|
||||||
|
malloc() if needed for large GTID state.
|
||||||
|
*/
|
||||||
|
byte small_buf[192];
|
||||||
|
byte *buf, *alloced_buf;
|
||||||
|
|
||||||
|
ssize_t used_bytes= serialize_gtid_state(state, small_buf, sizeof(small_buf),
|
||||||
|
page_no==0);
|
||||||
|
if (used_bytes >= 0)
|
||||||
|
{
|
||||||
|
buf= small_buf;
|
||||||
|
alloced_buf= nullptr;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
size_t buf_size=
|
||||||
|
state->count_nolock() * (2*COMPR_INT_MAX32 + COMPR_INT_MAX64);
|
||||||
|
/* ToDo: InnoDB alloc function? */
|
||||||
|
alloced_buf= (byte *)my_malloc(PSI_INSTRUMENT_ME, buf_size, MYF(MY_WME));
|
||||||
|
if (UNIV_UNLIKELY(!alloced_buf))
|
||||||
|
return true;
|
||||||
|
buf= alloced_buf;
|
||||||
|
used_bytes= serialize_gtid_state(state, buf, buf_size, page_no==0);
|
||||||
|
if (UNIV_UNLIKELY(used_bytes < 0))
|
||||||
|
{
|
||||||
|
ut_ad(0 /* Shouldn't happen, as we allocated maximum needed size. */);
|
||||||
|
my_free(alloced_buf);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const uint32_t page_size= (uint32_t)srv_page_size;
|
||||||
|
const uint32_t page_room= page_size - (FIL_PAGE_DATA + FIL_PAGE_DATA_END);
|
||||||
|
uint32_t needed_pages= (uint32_t)((used_bytes + page_room - 1) / page_room);
|
||||||
|
|
||||||
|
/* For now, GTID state always at the start of a page. */
|
||||||
|
ut_ad(page_offset == FIL_PAGE_DATA);
|
||||||
|
|
||||||
|
/*
|
||||||
|
Only write the GTID state record if there is room for actual event data
|
||||||
|
afterwards. There is no point in using space to allow fast search to a
|
||||||
|
point if there is no data to search for after that point.
|
||||||
|
*/
|
||||||
|
if (page_no + needed_pages < space->size)
|
||||||
|
{
|
||||||
|
while (used_bytes > 0)
|
||||||
|
{
|
||||||
|
ut_ad(page_no < space->size);
|
||||||
|
block= fsp_page_create(space, page_no, mtr);
|
||||||
|
ut_a(block /* ToDo: error handling? */);
|
||||||
|
page_offset= FIL_PAGE_DATA;
|
||||||
|
byte *ptr= page_offset + block->page.frame;
|
||||||
|
ssize_t chunk= used_bytes;
|
||||||
|
if (chunk > page_room - 3) {
|
||||||
|
chunk= page_room - 3;
|
||||||
|
++page_no;
|
||||||
|
}
|
||||||
|
ptr[0]= 0x02 /* ToDo: FSP_BINLOG_TYPE_GTID_STATE */ | ((chunk < used_bytes) << 7);
|
||||||
|
ptr[1] = (byte)chunk & 0xff;
|
||||||
|
ptr[2] = (byte)(chunk >> 8);
|
||||||
|
ut_ad(chunk <= 0xffff);
|
||||||
|
memcpy(ptr+3, buf, chunk);
|
||||||
|
mtr->memcpy(*block, page_offset, chunk+3);
|
||||||
|
page_offset+= (uint32_t)(chunk+3);
|
||||||
|
buf+= chunk;
|
||||||
|
used_bytes-= chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (page_offset == FIL_PAGE_DATA_END) {
|
||||||
|
block= nullptr;
|
||||||
|
page_offset= FIL_PAGE_DATA;
|
||||||
|
++page_no;
|
||||||
|
block= fsp_page_create(space, page_no, mtr);
|
||||||
|
ut_a(block /* ToDo: error handling? */);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
my_free(alloced_buf);
|
||||||
|
return false; // No error
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
|
void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
|
||||||
{
|
{
|
||||||
uint32_t page_size= (uint32_t)srv_page_size;
|
uint32_t page_size= (uint32_t)srv_page_size;
|
||||||
@@ -5114,7 +5216,32 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
|
|||||||
mysql_mutex_unlock(&active_binlog_mutex);
|
mysql_mutex_unlock(&active_binlog_mutex);
|
||||||
binlog_cur_page_no= page_no= 0;
|
binlog_cur_page_no= page_no= 0;
|
||||||
}
|
}
|
||||||
block= fsp_page_create(space, page_no, mtr);
|
|
||||||
|
/* Must be a power of two and larger than page size. */
|
||||||
|
ut_ad(INNODB_BINLOG_STATE_INTERVAL > page_size);
|
||||||
|
ut_ad(INNODB_BINLOG_STATE_INTERVAL ==
|
||||||
|
(uint64_t)1 << (63 - nlz((uint64_t)INNODB_BINLOG_STATE_INTERVAL)));
|
||||||
|
|
||||||
|
if (0 == (page_no &
|
||||||
|
((INNODB_BINLOG_STATE_INTERVAL >> page_size_shift) - 1))) {
|
||||||
|
if (page_no == 0) {
|
||||||
|
rpl_binlog_state_base full_state;
|
||||||
|
full_state.init();
|
||||||
|
bool err= load_global_binlog_state(&full_state);
|
||||||
|
ut_a(!err /* ToDo error handling */);
|
||||||
|
err= binlog_gtid_state(&full_state, mtr, block, page_no,
|
||||||
|
page_offset, space);
|
||||||
|
ut_a(!err /* ToDo error handling */);
|
||||||
|
ut_ad(block);
|
||||||
|
full_state.free();
|
||||||
|
binlog_diff_state.reset_nolock();
|
||||||
|
} else {
|
||||||
|
bool err= binlog_gtid_state(&binlog_diff_state, mtr, block, page_no,
|
||||||
|
page_offset, space);
|
||||||
|
ut_a(!err /* ToDo error handling */);
|
||||||
|
}
|
||||||
|
} else
|
||||||
|
block= fsp_page_create(space, page_no, mtr);
|
||||||
} else {
|
} else {
|
||||||
dberr_t err;
|
dberr_t err;
|
||||||
/* ToDo: Is RW_SX_LATCH appropriate here? */
|
/* ToDo: Is RW_SX_LATCH appropriate here? */
|
||||||
@@ -5185,30 +5312,23 @@ void fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
extern "C" void binlog_get_cache(THD *, IO_CACHE **, size_t *);
|
extern "C" void binlog_get_cache(THD *, IO_CACHE **, size_t *,
|
||||||
|
const rpl_gtid **);
|
||||||
|
|
||||||
void
|
void
|
||||||
fsp_binlog_trx(trx_t *trx, mtr_t *mtr)
|
fsp_binlog_trx(trx_t *trx, mtr_t *mtr)
|
||||||
{
|
{
|
||||||
IO_CACHE *cache;
|
IO_CACHE *cache;
|
||||||
size_t main_size;
|
size_t main_size;
|
||||||
|
const rpl_gtid *gtid;
|
||||||
|
|
||||||
if (!trx->mysql_thd)
|
if (!trx->mysql_thd)
|
||||||
return;
|
return;
|
||||||
binlog_get_cache(trx->mysql_thd, &cache, &main_size);
|
binlog_get_cache(trx->mysql_thd, &cache, &main_size, >id);
|
||||||
if (main_size)
|
if (main_size) {
|
||||||
|
binlog_diff_state.update_nolock(gtid);
|
||||||
fsp_binlog_write_cache(cache, main_size, mtr);
|
fsp_binlog_write_cache(cache, main_size, mtr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void fsp_binlog_test(const uchar *data, uint32_t len)
|
|
||||||
{
|
|
||||||
mtr_t mtr;
|
|
||||||
mtr.start();
|
|
||||||
if (!active_binlog_space)
|
|
||||||
fsp_binlog_tablespace_create(0, &active_binlog_space);
|
|
||||||
fsp_binlog_append(data, len, &mtr);
|
|
||||||
mtr.commit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -5276,7 +5396,9 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len)
|
|||||||
uint64_t file_no= cur_file_no;
|
uint64_t file_no= cur_file_no;
|
||||||
uint64_t offset= cur_file_offset;
|
uint64_t offset= cur_file_offset;
|
||||||
uint64_t active_file_no= active_binlog_file_no.load(std::memory_order_acquire);
|
uint64_t active_file_no= active_binlog_file_no.load(std::memory_order_acquire);
|
||||||
if (first_open_binlog_file_no.load(std::memory_order_relaxed) > file_no + /* Temporary hack to work-around the next line ToDo: comment */ (offset >= cur_file_length)) {
|
if (first_open_binlog_file_no.load(std::memory_order_relaxed) > file_no +
|
||||||
|
/* Temporary hack to work-around the next line ToDo: comment */
|
||||||
|
(cur_file != (File)-1 && offset >= cur_file_length)) {
|
||||||
// ToDo: I think there is a bug here, if we're at the end of active_file_no-2, we will be reading directly from active_file_no-1 without checking properly if buffer pool is needed instead. */
|
// ToDo: I think there is a bug here, if we're at the end of active_file_no-2, we will be reading directly from active_file_no-1 without checking properly if buffer pool is needed instead. */
|
||||||
return read_from_file(~(uint64_t)0, buf, len);
|
return read_from_file(~(uint64_t)0, buf, len);
|
||||||
}
|
}
|
||||||
@@ -5296,6 +5418,10 @@ int ha_innodb_binlog_reader::read_binlog_data(uchar *buf, uint32_t len)
|
|||||||
} else { /* file_no == active_file_no - 1 */
|
} else { /* file_no == active_file_no - 1 */
|
||||||
if (offset >= end_offset) { // ToDo what if this end_pos is stale? Need somehow an extra check afterwards if we are now active-2, and then do a simple file read, not EOF on the stale end_offset.
|
if (offset >= end_offset) { // ToDo what if this end_pos is stale? Need somehow an extra check afterwards if we are now active-2, and then do a simple file read, not EOF on the stale end_offset.
|
||||||
/* Handle moving to the currently active file. */
|
/* Handle moving to the currently active file. */
|
||||||
|
if (!(cur_file < (File)0)) {
|
||||||
|
my_close(cur_file, MYF(0));
|
||||||
|
cur_file= (File)-1;
|
||||||
|
}
|
||||||
cur_file_no= ++file_no;
|
cur_file_no= ++file_no;
|
||||||
cur_file_offset= offset= 0;
|
cur_file_offset= offset= 0;
|
||||||
idx= file_no & 1;
|
idx= file_no & 1;
|
||||||
@@ -5372,11 +5498,14 @@ ha_innodb_binlog_reader::read_from_file(uint64_t end_offset,
|
|||||||
uint64_t offset= cur_file_offset;
|
uint64_t offset= cur_file_offset;
|
||||||
uint64_t page_start_offset;
|
uint64_t page_start_offset;
|
||||||
|
|
||||||
if (cur_file < (File)0 || cur_file_offset >= cur_file_length) {
|
if (cur_file >= (File)0 && cur_file_offset >= cur_file_length) {
|
||||||
if (!(cur_file < (File)0)) {
|
/* At the end of currently open file, move to the next file. */
|
||||||
my_close(cur_file, MYF(0));
|
my_close(cur_file, MYF(0));
|
||||||
++cur_file_no;
|
cur_file= (File)-1;
|
||||||
}
|
++cur_file_no;
|
||||||
|
cur_file_offset= offset= 0;
|
||||||
|
}
|
||||||
|
if (cur_file < (File)0) {
|
||||||
char filename[BINLOG_NAME_LEN];
|
char filename[BINLOG_NAME_LEN];
|
||||||
binlog_name_make(filename, cur_file_no);
|
binlog_name_make(filename, cur_file_no);
|
||||||
if ((cur_file= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME))) < (File)0)
|
if ((cur_file= my_open(filename, O_RDONLY | O_BINARY, MYF(MY_WME))) < (File)0)
|
||||||
@@ -5390,7 +5519,6 @@ ha_innodb_binlog_reader::read_from_file(uint64_t end_offset,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
cur_file_length= stat_buf.st_size;
|
cur_file_length= stat_buf.st_size;
|
||||||
cur_file_offset= offset= 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
page_start_offset= offset & ~mask;
|
page_start_offset= offset & ~mask;
|
||||||
@@ -5501,9 +5629,12 @@ innodb_get_binlog_reader()
|
|||||||
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size)
|
innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size,
|
||||||
|
const rpl_gtid *gtid)
|
||||||
{
|
{
|
||||||
mtr_t mtr;
|
mtr_t mtr;
|
||||||
|
if (gtid)
|
||||||
|
binlog_diff_state.update_nolock(gtid);
|
||||||
mtr.start();
|
mtr.start();
|
||||||
fsp_binlog_write_cache(cache, main_size, &mtr);
|
fsp_binlog_write_cache(cache, main_size, &mtr);
|
||||||
mtr.commit();
|
mtr.commit();
|
||||||
|
@@ -556,10 +556,10 @@ void fsp_system_tablespace_truncate(bool shutdown);
|
|||||||
/** Truncate the temporary tablespace */
|
/** Truncate the temporary tablespace */
|
||||||
void fsp_shrink_temp_space();
|
void fsp_shrink_temp_space();
|
||||||
|
|
||||||
extern void fsp_binlog_test(const uchar *data, uint32_t len);
|
|
||||||
extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr);
|
extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr);
|
||||||
class handler_binlog_reader;
|
class handler_binlog_reader;
|
||||||
extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size);
|
extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size,
|
||||||
|
const struct rpl_gtid *gtid);
|
||||||
extern handler_binlog_reader *innodb_get_binlog_reader();
|
extern handler_binlog_reader *innodb_get_binlog_reader();
|
||||||
extern void fsp_binlog_init();
|
extern void fsp_binlog_init();
|
||||||
extern bool innodb_binlog_init(size_t binlog_size);
|
extern bool innodb_binlog_init(size_t binlog_size);
|
||||||
|
Reference in New Issue
Block a user