From 22d7a31d81d448273cd92b772fc9c69279263c72 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 12 Jul 2018 18:00:52 +0300 Subject: [PATCH] Fixes to SR rollback: * Enable codepath to BF abort high priority SR applier * Pass ws_handle, ws_meta to high priority service rollback call to allow total ordering of rollback process --- dbsim/db_high_priority_service.cpp | 9 +++++--- dbsim/db_high_priority_service.hpp | 2 +- include/wsrep/high_priority_service.hpp | 17 ++++++++++++++- src/server_state.cpp | 6 +++-- src/transaction.cpp | 29 ++++++++++++++++++++----- test/mock_high_priority_service.cpp | 5 ++++- test/mock_high_priority_service.hpp | 2 +- 7 files changed, 55 insertions(+), 15 deletions(-) diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index e3788ab..4a75fdc 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -46,9 +46,10 @@ int db::high_priority_service::apply_toi( throw wsrep::not_implemented_error(); } -int db::high_priority_service::commit(const wsrep::ws_handle&, - const wsrep::ws_meta&) +int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) { + client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); int ret(client_.client_state_.before_commit()); if (ret == 0) client_.se_trx_.commit(); ret = ret || client_.client_state_.ordered_commit(); @@ -56,8 +57,10 @@ int db::high_priority_service::commit(const wsrep::ws_handle&, return ret; } -int db::high_priority_service::rollback() +int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) { + client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false); int ret(client_.client_state_.before_rollback()); assert(ret == 0); client_.se_trx_.rollback(); diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index aeaca90..db9abce 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -29,7 +29,7 @@ namespace db int remove_fragments(const wsrep::ws_meta&) override { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; - int rollback() override; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override; void after_apply() override; void store_globals() override { } diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 12fc49c..24e7206 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -98,13 +98,28 @@ namespace wsrep * * @param ws_handle Write set handle * @param ws_meta Write set meta + * + * @return Zero in case of success, non-zero in case of failure */ virtual int commit(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) = 0; /** * Roll back a transaction + * + * An implementation must call + * wsrep::client_state::prepare_for_ordering() to set + * the ws_handle and ws_meta before the rollback if + * the rollback process will go through client state + * rollback processing. Otherwise the implementation + * must release commit order explicitly via provider. + * + * @param ws_handle Write set handle + * @param ws_meta Write set meta + * + * @return Zero in case of success, non-zero in case of failure */ - virtual int rollback() = 0; + virtual int rollback(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) = 0; /** * Apply a TOI operation. diff --git a/src/server_state.cpp b/src/server_state.cpp index 70a7804..b297c4c 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -95,7 +95,9 @@ static int rollback_fragment(wsrep::server_state& server_state, { wsrep::high_priority_switch ws( high_priority_service, *streaming_applier); - streaming_applier->rollback(); + // Streaming applier rolls back out of order. Fragment + // removal grabs commit order below. + streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta()); streaming_applier->after_apply(); } server_state.stop_streaming_applier( @@ -132,7 +134,7 @@ static int apply_write_set(wsrep::server_state& server_state, high_priority_service.commit(ws_handle, ws_meta); if (ret) { - high_priority_service.rollback(); + high_priority_service.rollback(ws_handle, ws_meta); } high_priority_service.after_apply(); } diff --git a/src/transaction.cpp b/src/transaction.cpp index faf7389..5555d74 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -561,8 +561,12 @@ int wsrep::transaction::before_rollback() } break; case wsrep::client_state::m_high_priority: - assert(state_ == s_executing); - state(lock, s_aborting); + // Rollback by rollback write set or BF abort + assert(state_ == s_executing || state_ == s_aborting); + if (state_ != s_aborting) + { + state(lock, s_aborting); + } break; default: assert(0); @@ -809,9 +813,13 @@ bool wsrep::transaction::bf_abort( if (ret) { bf_abort_client_state_ = client_state_.state(); - if (client_state_.state() == wsrep::client_state::s_idle && - client_state_.server_state().rollback_mode() == - wsrep::server_state::rm_sync) + if ((client_state_.state() == wsrep::client_state::s_idle && + client_state_.server_state().rollback_mode() == + wsrep::server_state::rm_sync) // locally processing idle + || + // high priority streaming + (client_state_.mode() == wsrep::client_state::m_high_priority && + is_streaming())) { // We need to change the state to aborting under the // lock protection to avoid a race between client thread, @@ -821,7 +829,6 @@ bool wsrep::transaction::bf_abort( state(lock, wsrep::transaction::s_aborting); if (client_state_.mode() == wsrep::client_state::m_high_priority) { - assert(is_streaming()); client_state_.server_state().stop_streaming_applier( server_id_, id_); } @@ -878,6 +885,10 @@ void wsrep::transaction::state( if (allowed[state_][next_state]) { state_hist_.push_back(state_); + if (state_hist_.size() == 12) + { + state_hist_.erase(state_hist_.begin()); + } state_ = next_state; } else @@ -1006,8 +1017,13 @@ int wsrep::transaction::certify_fragment( } client_state_.override_error(wsrep::e_deadlock_error); } + else if (state_ == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + } else { + assert(state_ == s_certifying); state(lock, s_executing); flags(flags() & ~wsrep::provider::flag::start_transaction); } @@ -1221,6 +1237,7 @@ void wsrep::transaction::streaming_rollback() sa->adopt_transaction(*this); streaming_context_.cleanup(); streaming_context_.rolled_back(id_); + client_service_.debug_sync("wsrep_before_SR_rollback"); enum wsrep::provider::status ret; if ((ret = provider().rollback(id_))) { diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index 483e037..3232582 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -51,8 +51,11 @@ int wsrep::mock_high_priority_service::commit( client_state_->after_commit()); } -int wsrep::mock_high_priority_service::rollback() +int wsrep::mock_high_priority_service::rollback( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) { + client_state_->prepare_for_ordering(ws_handle, ws_meta, false); return (client_state_->before_rollback() || client_state_->after_rollback()); } diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index fc7ad15..f4f91ec 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -45,7 +45,7 @@ namespace wsrep { return 0; } int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; - int rollback() WSREP_OVERRIDE; + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE; void after_apply() WSREP_OVERRIDE;