mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-24 10:42:31 +03:00
Fixes to SR rollback:
* Enable codepath to BF abort high priority SR applier * Pass ws_handle, ws_meta to high priority service rollback call to allow total ordering of rollback process
This commit is contained in:
@ -46,9 +46,10 @@ int db::high_priority_service::apply_toi(
|
||||
throw wsrep::not_implemented_error();
|
||||
}
|
||||
|
||||
int db::high_priority_service::commit(const wsrep::ws_handle&,
|
||||
const wsrep::ws_meta&)
|
||||
int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
|
||||
int ret(client_.client_state_.before_commit());
|
||||
if (ret == 0) client_.se_trx_.commit();
|
||||
ret = ret || client_.client_state_.ordered_commit();
|
||||
@ -56,8 +57,10 @@ int db::high_priority_service::commit(const wsrep::ws_handle&,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int db::high_priority_service::rollback()
|
||||
int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false);
|
||||
int ret(client_.client_state_.before_rollback());
|
||||
assert(ret == 0);
|
||||
client_.se_trx_.rollback();
|
||||
|
@ -29,7 +29,7 @@ namespace db
|
||||
int remove_fragments(const wsrep::ws_meta&) override
|
||||
{ return 0; }
|
||||
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
|
||||
int rollback() override;
|
||||
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
|
||||
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override;
|
||||
void after_apply() override;
|
||||
void store_globals() override { }
|
||||
|
@ -98,13 +98,28 @@ namespace wsrep
|
||||
*
|
||||
* @param ws_handle Write set handle
|
||||
* @param ws_meta Write set meta
|
||||
*
|
||||
* @return Zero in case of success, non-zero in case of failure
|
||||
*/
|
||||
virtual int commit(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta) = 0;
|
||||
/**
|
||||
* Roll back a transaction
|
||||
*
|
||||
* An implementation must call
|
||||
* wsrep::client_state::prepare_for_ordering() to set
|
||||
* the ws_handle and ws_meta before the rollback if
|
||||
* the rollback process will go through client state
|
||||
* rollback processing. Otherwise the implementation
|
||||
* must release commit order explicitly via provider.
|
||||
*
|
||||
* @param ws_handle Write set handle
|
||||
* @param ws_meta Write set meta
|
||||
*
|
||||
* @return Zero in case of success, non-zero in case of failure
|
||||
*/
|
||||
virtual int rollback() = 0;
|
||||
virtual int rollback(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta) = 0;
|
||||
|
||||
/**
|
||||
* Apply a TOI operation.
|
||||
|
@ -95,7 +95,9 @@ static int rollback_fragment(wsrep::server_state& server_state,
|
||||
{
|
||||
wsrep::high_priority_switch ws(
|
||||
high_priority_service, *streaming_applier);
|
||||
streaming_applier->rollback();
|
||||
// Streaming applier rolls back out of order. Fragment
|
||||
// removal grabs commit order below.
|
||||
streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta());
|
||||
streaming_applier->after_apply();
|
||||
}
|
||||
server_state.stop_streaming_applier(
|
||||
@ -132,7 +134,7 @@ static int apply_write_set(wsrep::server_state& server_state,
|
||||
high_priority_service.commit(ws_handle, ws_meta);
|
||||
if (ret)
|
||||
{
|
||||
high_priority_service.rollback();
|
||||
high_priority_service.rollback(ws_handle, ws_meta);
|
||||
}
|
||||
high_priority_service.after_apply();
|
||||
}
|
||||
|
@ -561,8 +561,12 @@ int wsrep::transaction::before_rollback()
|
||||
}
|
||||
break;
|
||||
case wsrep::client_state::m_high_priority:
|
||||
assert(state_ == s_executing);
|
||||
state(lock, s_aborting);
|
||||
// Rollback by rollback write set or BF abort
|
||||
assert(state_ == s_executing || state_ == s_aborting);
|
||||
if (state_ != s_aborting)
|
||||
{
|
||||
state(lock, s_aborting);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
@ -809,9 +813,13 @@ bool wsrep::transaction::bf_abort(
|
||||
if (ret)
|
||||
{
|
||||
bf_abort_client_state_ = client_state_.state();
|
||||
if (client_state_.state() == wsrep::client_state::s_idle &&
|
||||
client_state_.server_state().rollback_mode() ==
|
||||
wsrep::server_state::rm_sync)
|
||||
if ((client_state_.state() == wsrep::client_state::s_idle &&
|
||||
client_state_.server_state().rollback_mode() ==
|
||||
wsrep::server_state::rm_sync) // locally processing idle
|
||||
||
|
||||
// high priority streaming
|
||||
(client_state_.mode() == wsrep::client_state::m_high_priority &&
|
||||
is_streaming()))
|
||||
{
|
||||
// We need to change the state to aborting under the
|
||||
// lock protection to avoid a race between client thread,
|
||||
@ -821,7 +829,6 @@ bool wsrep::transaction::bf_abort(
|
||||
state(lock, wsrep::transaction::s_aborting);
|
||||
if (client_state_.mode() == wsrep::client_state::m_high_priority)
|
||||
{
|
||||
assert(is_streaming());
|
||||
client_state_.server_state().stop_streaming_applier(
|
||||
server_id_, id_);
|
||||
}
|
||||
@ -878,6 +885,10 @@ void wsrep::transaction::state(
|
||||
if (allowed[state_][next_state])
|
||||
{
|
||||
state_hist_.push_back(state_);
|
||||
if (state_hist_.size() == 12)
|
||||
{
|
||||
state_hist_.erase(state_hist_.begin());
|
||||
}
|
||||
state_ = next_state;
|
||||
}
|
||||
else
|
||||
@ -1006,8 +1017,13 @@ int wsrep::transaction::certify_fragment(
|
||||
}
|
||||
client_state_.override_error(wsrep::e_deadlock_error);
|
||||
}
|
||||
else if (state_ == s_must_abort)
|
||||
{
|
||||
client_state_.override_error(wsrep::e_deadlock_error);
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(state_ == s_certifying);
|
||||
state(lock, s_executing);
|
||||
flags(flags() & ~wsrep::provider::flag::start_transaction);
|
||||
}
|
||||
@ -1221,6 +1237,7 @@ void wsrep::transaction::streaming_rollback()
|
||||
sa->adopt_transaction(*this);
|
||||
streaming_context_.cleanup();
|
||||
streaming_context_.rolled_back(id_);
|
||||
client_service_.debug_sync("wsrep_before_SR_rollback");
|
||||
enum wsrep::provider::status ret;
|
||||
if ((ret = provider().rollback(id_)))
|
||||
{
|
||||
|
@ -51,8 +51,11 @@ int wsrep::mock_high_priority_service::commit(
|
||||
client_state_->after_commit());
|
||||
}
|
||||
|
||||
int wsrep::mock_high_priority_service::rollback()
|
||||
int wsrep::mock_high_priority_service::rollback(
|
||||
const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
client_state_->prepare_for_ordering(ws_handle, ws_meta, false);
|
||||
return (client_state_->before_rollback() ||
|
||||
client_state_->after_rollback());
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ namespace wsrep
|
||||
{ return 0; }
|
||||
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&)
|
||||
WSREP_OVERRIDE;
|
||||
int rollback() WSREP_OVERRIDE;
|
||||
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE;
|
||||
int apply_toi(const wsrep::ws_meta&,
|
||||
const wsrep::const_buffer&) WSREP_OVERRIDE;
|
||||
void after_apply() WSREP_OVERRIDE;
|
||||
|
Reference in New Issue
Block a user