diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 722cea6..ac28df6 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -640,8 +640,6 @@ namespace wsrep /** * Clone enough state from another transaction so that replaing will * be possible with a transaction contained in this client state. - * Method after_replay() must be used to inject the state after - * replaying back to this client state. * * @param transaction Transaction which is to be replied in this * client state @@ -652,16 +650,6 @@ namespace wsrep transaction_.clone_for_replay(transaction); } - /** - * Copy state from another transaction context after replay. - * - * @param transaction Transaction which was used for replaying. - */ - void after_replay(const wsrep::transaction& transaction) - { - transaction_.after_replay(transaction); - } - /** @name Non-transactional operations */ /** @{*/ diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index bc81288..4e072b8 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -199,8 +199,6 @@ namespace wsrep void clone_for_replay(const wsrep::transaction& other); - void after_replay(const wsrep::transaction& other); - bool bf_aborted() const { return (bf_abort_client_state_ != 0); @@ -248,6 +246,7 @@ namespace wsrep int append_sr_keys_for_commit(); int release_commit_order(wsrep::unique_lock&); void streaming_rollback(wsrep::unique_lock&); + int replay(wsrep::unique_lock&); void clear_fragments(); void cleanup(); void debug_log_state(const char*) const; diff --git a/src/transaction.cpp b/src/transaction.cpp index 2922ccc..7130b19 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -808,37 +808,7 @@ int wsrep::transaction::after_statement() // Fall through case s_must_replay: { - state(lock, s_replaying); - // Need to remember streaming state before replay, entering - // after_commit() after succesful replay will clear - // fragments. - const bool was_streaming(is_streaming()); - lock.unlock(); - enum wsrep::provider::status replay_ret(client_service_.replay()); - if (was_streaming) - { - client_state_.server_state_.stop_streaming_client(&client_state_); - } - switch (replay_ret) - { - case wsrep::provider::success: - provider().release(ws_handle_); - break; - case wsrep::provider::error_certification_failed: - client_state_.override_error( - wsrep::e_deadlock_error); - ret = 1; - break; - default: - client_service_.emergency_shutdown(); - break; - } - lock.lock(); - if (ret) - { - wsrep::log_info() << "Replay ret " << replay_ret; - state(lock, s_aborted); - } + ret = replay(lock); break; } case s_aborted: @@ -886,10 +856,12 @@ void wsrep::transaction::after_applying() assert(state_ == s_executing || state_ == s_prepared || state_ == s_committed || - state_ == s_aborted); + state_ == s_aborted || + state_ == s_replaying); if (state_ != s_executing && state_ != s_prepared) { + if (state_ == s_replaying) state(lock, s_aborted); cleanup(); } else @@ -1032,14 +1004,6 @@ void wsrep::transaction::clone_for_replay(const wsrep::transaction& other) state_ = s_replaying; } -void wsrep::transaction::after_replay(const wsrep::transaction& other) -{ - // Other must have been terminated - assert(other.state() == s_committed || other.state() == s_aborted); - state_ = other.state(); - clear_fragments(); -} - int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) { wsrep::unique_lock lock(client_state_.mutex_); @@ -1154,7 +1118,7 @@ void wsrep::transaction::state( { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ - { 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ + { 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0} /* re */ }; if (!allowed[state_][next_state]) @@ -1721,6 +1685,59 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lo debug_log_state("streaming_rollback leave"); } +int wsrep::transaction::replay(wsrep::unique_lock& lock) +{ + int ret(0); + state(lock, s_replaying); + // Need to remember streaming state before replay, entering + // after_commit() after succesful replay will clear + // fragments. + const bool was_streaming(is_streaming()); + lock.unlock(); + client_service_.debug_sync("wsrep_before_replay"); + enum wsrep::provider::status replay_ret(client_service_.replay()); + if (was_streaming) + { + client_state_.server_state_.stop_streaming_client(&client_state_); + } + lock.lock(); + switch (replay_ret) + { + case wsrep::provider::success: + if (state() == s_replaying) + { + // Replay was done by using different client state, adjust state + // to committed. + state(lock, s_committed); + } + if (is_streaming()) + { + clear_fragments(); + } + provider().release(ws_handle_); + break; + case wsrep::provider::error_certification_failed: + client_state_.override_error( + wsrep::e_deadlock_error); + if (is_streaming()) + { + client_service_.remove_fragments(); + clear_fragments(); + } + state(lock, s_aborted); + ret = 1; + break; + default: + client_service_.emergency_shutdown(); + break; + } + + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "replay returned" << replay_ret); + return ret; +} + void wsrep::transaction::clear_fragments() { streaming_context_.cleanup();