1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-08-06 15:02:41 +03:00

Removed SR store implementation detail from wsrep-lib

The following was moved to application side implementation:

- Removed sr_store from streaming context.
- Removed sr_state from transaction.
- Removed get_binlog_cache() from client_service interface.

Other:
- Add SR applier reference to append_fragment_and_commit() to
  make it available for application.
- Add separate interface call to rollback SR transactions
  on disconnect. Rolling back SR transactions due to rollback
  fragment and rolling back SR transactions due to disconnect have
  different behaviors. Have separate calls for these different
  cases for clarity.
- Remove non-const transaction accessor, not needed anymore because
  SR state has been moved to application side.
- Remove unneeded set_fragments_from_table().
This commit is contained in:
Teemu Ollakka
2022-09-28 17:30:39 +03:00
parent a44484e461
commit f3c8392ea8
16 changed files with 54 additions and 169 deletions

View File

@@ -90,11 +90,6 @@ namespace db
void debug_sync(const char*) override { }
void debug_crash(const char*) override { }
void *get_binlog_cache() override
{
return (NULL);
}
int fragment_cache_remove_transaction(
const wsrep::id&,
wsrep::transaction_id) override

View File

@@ -88,8 +88,7 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
}
int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool)
const wsrep::ws_meta& ws_meta)
{
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false);
int ret(client_.client_state_.before_rollback());
@@ -100,6 +99,17 @@ int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
return ret;
}
int db::high_priority_service::rollback_sr_on_disconnect()
{
auto ret = client_.client_state_.before_rollback();
assert(ret == 0);
client_.se_trx_.rollback();
ret = client_.client_state_.after_rollback();
assert(ret == 0);
return ret;
}
void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err)
{
client_.client_state_.adopt_apply_error(err);

View File

@@ -39,17 +39,17 @@ namespace db
const wsrep::const_buffer&,
wsrep::mutable_buffer&) override;
int append_fragment_and_commit(
wsrep::high_priority_service&,
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&,
int,
const wsrep::xid&) override
{ return 0; }
int remove_fragments(const wsrep::ws_meta&) override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&,
bool skip_rollback = false) override;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
int rollback_sr_on_disconnect() override;
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&) override;
int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&,

View File

@@ -35,17 +35,12 @@ namespace db
wsrep::transaction_id,
int,
const wsrep::const_buffer&,
int,
size_t,
const wsrep::xid&,
void *) override
const wsrep::xid&) override
{ throw wsrep::not_implemented_error(); }
int update_fragment_meta(const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int remove_fragments() override
{ throw wsrep::not_implemented_error(); }
int set_fragments_from_table() override
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override
{ throw wsrep::not_implemented_error(); }
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)

View File

@@ -218,12 +218,6 @@ namespace wsrep
*/
virtual void debug_crash(const char* crash_point) = 0;
/**
* Return the binlog cache for the currently executing
* transaction or a NULL pointer if no such cache exists.
*/
virtual void *get_binlog_cache() = 0;
/**
* Remove the given transaction from the fragment cache.
*/

View File

@@ -943,15 +943,6 @@ namespace wsrep
return 1;
}
/**
* Return a reference to the transaction associated
* with the client state.
*/
wsrep::transaction& transaction()
{
return transaction_;
}
const wsrep::ws_meta& toi_meta() const
{
return toi_meta_;

View File

@@ -93,12 +93,18 @@ namespace wsrep
*
* Note that the call is not done from streaming transaction
* context, but from applier context.
*
* @param sr_hps Object that is hosting the streaming transaction.
* @param ws_handle Write set handle corresponding to fragment.
* @param ws_meta Write set meta data corresponding to fragment.
* @param data Fragment data.
* @param xid XID corresponding to streaming transaction.
*/
virtual int append_fragment_and_commit(
high_priority_service& sr_hps,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
int sr_store,
const wsrep::xid& xid) = 0;
/**
@@ -146,8 +152,16 @@ namespace wsrep
* @return Zero in case of success, non-zero in case of failure
*/
virtual int rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool skip_rollback = false) = 0;
const wsrep::ws_meta& ws_meta) = 0;
/**
* Roll back a SR transaction when disconnecting from cluster.
*
* The implementation is supposed to roll back the transaction, but
* keep the fragments in fragment store intact to allow recovering
* the ongoing SR transactions on reconnect.
*/
virtual int rollback_sr_on_disconnect() = 0;
/**
* Apply a TOI operation.

View File

@@ -65,10 +65,7 @@ namespace wsrep
wsrep::transaction_id client_id,
int flags,
const wsrep::const_buffer& data,
int sr_store,
size_t offset,
const wsrep::xid& xid,
void *binlog_cache) = 0;
const wsrep::xid& xid) = 0;
/**
* Update fragment meta data after certification process.
*/
@@ -80,13 +77,6 @@ namespace wsrep
*/
virtual int remove_fragments() = 0;
/**
* Update the list of fragments in the streaming context by
* adding all fragments in the streaming log table for the given
* transaction.
*/
virtual int set_fragments_from_table() = 0;
/**
* Commit the transaction.
*/

View File

@@ -47,7 +47,6 @@ namespace wsrep
, fragment_size_()
, unit_counter_()
, log_position_()
, sr_store_(0)
{ }
/**
@@ -190,17 +189,6 @@ namespace wsrep
unit_counter_ = 0;
log_position_ = 0;
}
void set_sr_store(int store_type)
{
sr_store_ = store_type;
}
int get_sr_store() const
{
return (sr_store_);
}
private:
void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED)
@@ -216,7 +204,6 @@ namespace wsrep
size_t fragment_size_;
size_t unit_counter_;
size_t log_position_;
int sr_store_;
};
}

View File

@@ -142,48 +142,6 @@ namespace wsrep
void xa_detach();
int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
int fragment_cache_remove_transaction(
const wsrep::id& server_id, wsrep::transaction_id transaction_id);
void *get_binlog_cache();
/* state of Streaming Replication Speedup feature for the
transaction. This describes the relationship of this WSREP
transaction and the underlying InnoDB transaction.
*/
enum sr_state
{
/* this is not an SR Speedup transaction */
sr_state_none,
/* this is an SR Speedup transaction, but SR XID is not set
for the underlying InnoDB transaction
*/
sr_state_require_xid,
/* this is an SR Speedup transaction, and SR XID is set
for the underlying InnoDB transaction
*/
sr_state_xid_set
};
static const int n_sr_states = sr_state_xid_set + 1;
enum sr_state sr_state() const
{ return sr_state_; }
void require_sr_xid()
{
if (sr_state_ == sr_state_none) {
sr_state_ = sr_state_require_xid;
}
}
void sr_xid_was_set()
{
sr_state_ = sr_state_xid_set;
}
bool sr_xid_is_required()
{
return sr_state_ == sr_state_require_xid;
}
bool sr_xid_is_set()
{
return sr_state_ == sr_state_xid_set;
}
bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); }
void pa_unsafe(bool pa_unsafe) {
if (pa_unsafe) {
@@ -323,7 +281,6 @@ namespace wsrep
wsrep::mutable_buffer apply_error_buf_;
wsrep::xid xid_;
bool streaming_rollback_in_progress_;
enum sr_state sr_state_;
};
static inline const char* to_c_string(enum wsrep::transaction::state state)

View File

@@ -92,8 +92,6 @@ static int apply_fragment(wsrep::server_state& server_state,
int ret(0);
int apply_err;
wsrep::mutable_buffer err;
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
@@ -134,7 +132,7 @@ static int apply_fragment(wsrep::server_state& server_state,
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
const wsrep::xid xid(streaming_applier->transaction().xid());
ret = high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data, sr_store, xid);
*streaming_applier, ws_handle, ws_meta, data, xid);
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
ret = ret || (high_priority_service.after_apply(), 0);
}
@@ -1555,10 +1553,7 @@ void wsrep::server_state::close_transactions_at_disconnect(
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
int sr_store = streaming_applier->transaction().streaming_context().
get_sr_store();
streaming_applier->rollback(
wsrep::ws_handle(), wsrep::ws_meta(), sr_store != 0);
streaming_applier->rollback_sr_on_disconnect();
streaming_applier->after_apply();
}
streaming_appliers_.erase(i++);

View File

@@ -109,7 +109,6 @@ wsrep::transaction::transaction(
, apply_error_buf_()
, xid_()
, streaming_rollback_in_progress_(false)
, sr_state_(sr_state_none)
{ }
@@ -127,7 +126,6 @@ int wsrep::transaction::start_transaction(
server_id_ = client_state_.server_state().id();
id_ = id;
state_ = s_executing;
sr_state_ = sr_state_none;
state_hist_.clear();
ws_handle_ = wsrep::ws_handle(id);
flags(wsrep::provider::flag::start_transaction);
@@ -1282,26 +1280,6 @@ int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
return ret;
}
int wsrep::transaction::fragment_cache_remove_transaction(
const wsrep::id& server_id, wsrep::transaction_id transaction_id)
{
int rcode = client_service_.fragment_cache_remove_transaction(
server_id, transaction_id);
return (rcode);
}
void *wsrep::transaction::get_binlog_cache()
{
void *cache = client_service_.get_binlog_cache();
assert(cache);
return (cache);
}
////////////////////////////////////////////////////////////////////////////////
// Private //
////////////////////////////////////////////////////////////////////////////////
@@ -1468,8 +1446,6 @@ int wsrep::transaction::certify_fragment(
assert(streaming_context_.rolled_back() == false ||
state() == s_must_abort);
int sr_store = streaming_context_.get_sr_store();
client_service_.wait_for_replayers(lock);
if (abort_or_interrupt(lock))
{
@@ -1571,24 +1547,11 @@ int wsrep::transaction::certify_fragment(
error = wsrep::e_append_fragment_error;
}
if (ret == 0 &&
(storage_service.start_transaction(ws_handle_) ||
(sr_store == 0 ?
storage_service.append_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
0, 0, xid(), nullptr)
:
storage_service.append_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size()),
streaming_context_.get_sr_store(),
log_position - data.size(),
xid(), get_binlog_cache()))))
if (ret == 0
&& (storage_service.start_transaction(ws_handle_)
|| storage_service.append_fragment(
server_id, id(), flags(),
wsrep::const_buffer(data.data(), data.size()), xid())))
{
ret = 1;
error = wsrep::e_append_fragment_error;
@@ -1719,9 +1682,6 @@ int wsrep::transaction::certify_fragment(
flags(flags() & ~wsrep::provider::flag::start_transaction);
flags(flags() & ~wsrep::provider::flag::pa_unsafe);
}
if (sr_store != 0) {
require_sr_xid();
}
return ret;
}
@@ -2082,7 +2042,7 @@ int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
void wsrep::transaction::clear_fragments()
{
streaming_context_.cleanup();
fragment_cache_remove_transaction(server_id_, id_);
client_service_.fragment_cache_remove_transaction(server_id_, id_);
}
void wsrep::transaction::cleanup()

View File

@@ -198,11 +198,6 @@ namespace wsrep
// Not going to do this while unit testing
}
void *get_binlog_cache() WSREP_OVERRIDE
{
return (NULL);
}
int fragment_cache_remove_transaction(
const wsrep::id&,
wsrep::transaction_id) WSREP_OVERRIDE

View File

@@ -106,14 +106,19 @@ int wsrep::mock_high_priority_service::commit(
int wsrep::mock_high_priority_service::rollback(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool)
const wsrep::ws_meta& ws_meta)
{
client_state_->prepare_for_ordering(ws_handle, ws_meta, false);
return (client_state_->before_rollback() ||
client_state_->after_rollback());
}
int wsrep::mock_high_priority_service::rollback_sr_on_disconnect()
{
return (client_state_->before_rollback()
|| client_state_->after_rollback());
}
int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&,
const wsrep::const_buffer&,
wsrep::mutable_buffer&)

View File

@@ -57,18 +57,19 @@ namespace wsrep
const wsrep::const_buffer&,
wsrep::mutable_buffer&) WSREP_OVERRIDE;
int append_fragment_and_commit(
wsrep::high_priority_service&,
const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&,
int,
const wsrep::xid&) WSREP_OVERRIDE
{ return 0; }
int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE
{ return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&,
bool skip_rollback = false) WSREP_OVERRIDE;
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;
int rollback_sr_on_disconnect() WSREP_OVERRIDE;
int apply_toi(const wsrep::ws_meta&,
const wsrep::const_buffer&,
wsrep::mutable_buffer&) WSREP_OVERRIDE;

View File

@@ -40,16 +40,12 @@ class mock_server_state;
wsrep::transaction_id,
int,
const wsrep::const_buffer&,
int,
size_t,
const wsrep::xid&,
void *) WSREP_OVERRIDE
const wsrep::xid&) WSREP_OVERRIDE
{ return 0; }
int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE
{ return 0; }
int remove_fragments() WSREP_OVERRIDE { return 0; }
int set_fragments_from_table() WSREP_OVERRIDE { return 0; }
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE;