1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-31 18:24:25 +03:00

This is the first version of the SR speedup feature for MariaDB 10.7.

This commit is contained in:
Pekka Lampio
2021-05-17 10:36:17 +03:00
parent 344544df3e
commit a44484e461
16 changed files with 198 additions and 20 deletions

View File

@ -89,6 +89,19 @@ namespace db
void debug_sync(const char*) override { }
void debug_crash(const char*) override { }
void *get_binlog_cache() override
{
return (NULL);
}
int fragment_cache_remove_transaction(
const wsrep::id&,
wsrep::transaction_id) override
{
return (0);
}
private:
db::client& client_;
wsrep::client_state& client_state_;

View File

@ -88,7 +88,8 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
}
int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
const wsrep::ws_meta& ws_meta,
bool)
{
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false);
int ret(client_.client_state_.before_rollback());

View File

@ -42,12 +42,14 @@ namespace db
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&,
int,
const wsrep::xid&) override
{ return 0; }
int remove_fragments(const wsrep::ws_meta&) override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&,
bool skip_rollback = false) override;
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&) override;
int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&,

View File

@ -35,12 +35,17 @@ namespace db
wsrep::transaction_id,
int,
const wsrep::const_buffer&,
const wsrep::xid&) override
int,
size_t,
const wsrep::xid&,
void *) override
{ throw wsrep::not_implemented_error(); }
int update_fragment_meta(const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int remove_fragments() override
{ throw wsrep::not_implemented_error(); }
int set_fragments_from_table() override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)

View File

@ -217,6 +217,20 @@ namespace wsrep
* been enabled.
*/
virtual void debug_crash(const char* crash_point) = 0;
/**
* Return the binlog cache for the currently executing
* transaction or a NULL pointer if no such cache exists.
*/
virtual void *get_binlog_cache() = 0;
/**
* Remove the given transaction from the fragment cache.
*/
virtual int fragment_cache_remove_transaction(
const wsrep::id& server_id,
wsrep::transaction_id transaction_id) = 0;
};
}

View File

@ -943,6 +943,15 @@ namespace wsrep
return 1;
}
/**
* Return a reference to the transaction associated
* with the client state.
*/
wsrep::transaction& transaction()
{
return transaction_;
}
const wsrep::ws_meta& toi_meta() const
{
return toi_meta_;

View File

@ -98,6 +98,7 @@ namespace wsrep
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
int sr_store,
const wsrep::xid& xid) = 0;
/**
@ -145,7 +146,8 @@ namespace wsrep
* @return Zero in case of success, non-zero in case of failure
*/
virtual int rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta) = 0;
const wsrep::ws_meta& ws_meta,
bool skip_rollback = false) = 0;
/**
* Apply a TOI operation.

View File

@ -65,8 +65,10 @@ namespace wsrep
wsrep::transaction_id client_id,
int flags,
const wsrep::const_buffer& data,
const wsrep::xid& xid) = 0;
int sr_store,
size_t offset,
const wsrep::xid& xid,
void *binlog_cache) = 0;
/**
* Update fragment meta data after certification process.
*/
@ -78,6 +80,13 @@ namespace wsrep
*/
virtual int remove_fragments() = 0;
/**
* Update the list of fragments in the streaming context by
* adding all fragments in the streaming log table for the given
* transaction.
*/
virtual int set_fragments_from_table() = 0;
/**
* Commit the transaction.
*/

View File

@ -47,6 +47,7 @@ namespace wsrep
, fragment_size_()
, unit_counter_()
, log_position_()
, sr_store_(0)
{ }
/**
@ -189,6 +190,17 @@ namespace wsrep
unit_counter_ = 0;
log_position_ = 0;
}
void set_sr_store(int store_type)
{
sr_store_ = store_type;
}
int get_sr_store() const
{
return (sr_store_);
}
private:
void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED)
@ -204,6 +216,7 @@ namespace wsrep
size_t fragment_size_;
size_t unit_counter_;
size_t log_position_;
int sr_store_;
};
}

View File

@ -142,6 +142,47 @@ namespace wsrep
void xa_detach();
int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
int fragment_cache_remove_transaction(
const wsrep::id& server_id, wsrep::transaction_id transaction_id);
void *get_binlog_cache();
/* state of Streaming Replication Speedup feature for the
transaction. This describes the relationship of this WSREP
transaction and the underlying InnoDB transaction.
*/
enum sr_state
{
/* this is not an SR Speedup transaction */
sr_state_none,
/* this is an SR Speedup transaction, but SR XID is not set
for the underlying InnoDB transaction
*/
sr_state_require_xid,
/* this is an SR Speedup transaction, and SR XID is set
for the underlying InnoDB transaction
*/
sr_state_xid_set
};
static const int n_sr_states = sr_state_xid_set + 1;
enum sr_state sr_state() const
{ return sr_state_; }
void require_sr_xid()
{
if (sr_state_ == sr_state_none) {
sr_state_ = sr_state_require_xid;
}
}
void sr_xid_was_set()
{
sr_state_ = sr_state_xid_set;
}
bool sr_xid_is_required()
{
return sr_state_ == sr_state_require_xid;
}
bool sr_xid_is_set()
{
return sr_state_ == sr_state_xid_set;
}
bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); }
void pa_unsafe(bool pa_unsafe) {
@ -253,6 +294,7 @@ namespace wsrep
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&);
int replay(wsrep::unique_lock<wsrep::mutex>&);
void clear_fragments();
void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&);
int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&);
void cleanup();
@ -281,6 +323,7 @@ namespace wsrep
wsrep::mutable_buffer apply_error_buf_;
wsrep::xid xid_;
bool streaming_rollback_in_progress_;
enum sr_state sr_state_;
};
static inline const char* to_c_string(enum wsrep::transaction::state state)

View File

@ -92,6 +92,8 @@ static int apply_fragment(wsrep::server_state& server_state,
int ret(0);
int apply_err;
wsrep::mutable_buffer err;
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
@ -132,7 +134,7 @@ static int apply_fragment(wsrep::server_state& server_state,
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
const wsrep::xid xid(streaming_applier->transaction().xid());
ret = high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data, xid);
ws_handle, ws_meta, data, sr_store, xid);
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
ret = ret || (high_priority_service.after_apply(), 0);
}
@ -1553,8 +1555,10 @@ void wsrep::server_state::close_transactions_at_disconnect(
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
streaming_applier->rollback(
wsrep::ws_handle(), wsrep::ws_meta());
wsrep::ws_handle(), wsrep::ws_meta(), sr_store != 0);
streaming_applier->after_apply();
}
streaming_appliers_.erase(i++);

View File

@ -109,6 +109,7 @@ wsrep::transaction::transaction(
, apply_error_buf_()
, xid_()
, streaming_rollback_in_progress_(false)
, sr_state_(sr_state_none)
{ }
@ -126,6 +127,7 @@ int wsrep::transaction::start_transaction(
server_id_ = client_state_.server_state().id();
id_ = id;
state_ = s_executing;
sr_state_ = sr_state_none;
state_hist_.clear();
ws_handle_ = wsrep::ws_handle(id);
flags(wsrep::provider::flag::start_transaction);
@ -631,7 +633,7 @@ int wsrep::transaction::after_commit()
client_state_.server_state_.stop_streaming_client(&client_state_);
lock.lock();
}
streaming_context_.cleanup();
clear_fragments();
}
switch (client_state_.mode())
@ -768,7 +770,7 @@ int wsrep::transaction::after_rollback()
if (is_streaming() && state() != s_must_replay)
{
streaming_context_.cleanup();
clear_fragments();
}
if (state() == s_aborting)
@ -1280,6 +1282,26 @@ int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
return ret;
}
int wsrep::transaction::fragment_cache_remove_transaction(
const wsrep::id& server_id, wsrep::transaction_id transaction_id)
{
int rcode = client_service_.fragment_cache_remove_transaction(
server_id, transaction_id);
return (rcode);
}
void *wsrep::transaction::get_binlog_cache()
{
void *cache = client_service_.get_binlog_cache();
assert(cache);
return (cache);
}
////////////////////////////////////////////////////////////////////////////////
// Private //
////////////////////////////////////////////////////////////////////////////////
@ -1446,6 +1468,8 @@ int wsrep::transaction::certify_fragment(
assert(streaming_context_.rolled_back() == false ||
state() == s_must_abort);
int sr_store = streaming_context_.get_sr_store();
client_service_.wait_for_replayers(lock);
if (abort_or_interrupt(lock))
{
@ -1549,12 +1573,22 @@ int wsrep::transaction::certify_fragment(
if (ret == 0 &&
(storage_service.start_transaction(ws_handle_) ||
storage_service.append_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
xid())))
(sr_store == 0 ?
storage_service.append_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
0, 0, xid(), nullptr)
:
storage_service.append_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
streaming_context_.get_sr_store(),
log_position - data.size(),
xid(), get_binlog_cache()))))
{
ret = 1;
error = wsrep::e_append_fragment_error;
@ -1685,6 +1719,10 @@ int wsrep::transaction::certify_fragment(
flags(flags() & ~wsrep::provider::flag::start_transaction);
flags(flags() & ~wsrep::provider::flag::pa_unsafe);
}
if (sr_store != 0) {
require_sr_xid();
}
return ret;
}
@ -2015,6 +2053,7 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
if (is_streaming())
{
streaming_context_.cleanup();
clear_fragments();
}
provider().release(ws_handle_);
break;
@ -2024,7 +2063,7 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
if (is_streaming())
{
client_service_.remove_fragments();
streaming_context_.cleanup();
clear_fragments();
}
state(lock, s_aborted);
ret = 1;
@ -2040,6 +2079,12 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
return ret;
}
void wsrep::transaction::clear_fragments()
{
streaming_context_.cleanup();
fragment_cache_remove_transaction(server_id_, id_);
}
void wsrep::transaction::cleanup()
{
debug_log_state("cleanup_enter");

View File

@ -198,6 +198,17 @@ namespace wsrep
// Not going to do this while unit testing
}
void *get_binlog_cache() WSREP_OVERRIDE
{
return (NULL);
}
int fragment_cache_remove_transaction(
const wsrep::id&,
wsrep::transaction_id) WSREP_OVERRIDE
{
return (0);
}
//
// Knobs to tune the behavior

View File

@ -106,7 +106,8 @@ int wsrep::mock_high_priority_service::commit(
int wsrep::mock_high_priority_service::rollback(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
const wsrep::ws_meta& ws_meta,
bool)
{
client_state_->prepare_for_ordering(ws_handle, ws_meta, false);
return (client_state_->before_rollback() ||

View File

@ -60,13 +60,15 @@ namespace wsrep
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&,
int,
const wsrep::xid&) WSREP_OVERRIDE
{ return 0; }
int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&,
bool skip_rollback = false) WSREP_OVERRIDE;
int apply_toi(const wsrep::ws_meta&,
const wsrep::const_buffer&,
wsrep::mutable_buffer&) WSREP_OVERRIDE;

View File

@ -40,12 +40,16 @@ class mock_server_state;
wsrep::transaction_id,
int,
const wsrep::const_buffer&,
const wsrep::xid&) WSREP_OVERRIDE
int,
size_t,
const wsrep::xid&,
void *) WSREP_OVERRIDE
{ return 0; }
int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE
{ return 0; }
int remove_fragments() WSREP_OVERRIDE { return 0; }
int set_fragments_from_table() WSREP_OVERRIDE { return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;