From 7d6641764b162089d34666721e7f54fbf9bbe6fc Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Thu, 12 Aug 2021 10:56:00 +0200 Subject: [PATCH] Replay prepared transactions found BFed after prepare Handle the case were prepare is bf aborted after it has replicated a fragment, and before the command finishes in after_command_before_result() and after_command_after_result() hooks. --- include/wsrep/transaction.hpp | 4 ++ src/client_state.cpp | 14 ++--- src/transaction.cpp | 100 ++++++++++++++++++++++++---------- test/transaction_test_xa.cpp | 57 ++++++++++++++++++- 4 files changed, 135 insertions(+), 40 deletions(-) diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index e476377..76835fd 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -194,6 +194,8 @@ namespace wsrep int after_statement(); + void after_command_must_abort(wsrep::unique_lock&); + void after_applying(); bool bf_abort(wsrep::unique_lock& lock, @@ -251,6 +253,8 @@ namespace wsrep int release_commit_order(wsrep::unique_lock&); void streaming_rollback(wsrep::unique_lock&); int replay(wsrep::unique_lock&); + void xa_replay_common(wsrep::unique_lock&); + int xa_replay_commit(wsrep::unique_lock&); void cleanup(); void debug_log_state(const char*) const; void debug_log_key_append(const wsrep::key& key) const; diff --git a/src/client_state.cpp b/src/client_state.cpp index 6857325..994d106 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -177,20 +177,19 @@ void wsrep::client_state::after_command_before_result() if (transaction_.active() && transaction_.state() == wsrep::transaction::s_must_abort) { - override_error(wsrep::e_deadlock_error); - lock.unlock(); - client_service_.bf_rollback(); + transaction_.after_command_must_abort(lock); // If keep current error is set, the result will be propagated // back to client with some future command, so keep the transaction // open here so that error handling can happen in before_command() // hook. if (not keep_command_error_) { + lock.unlock(); (void)transaction_.after_statement(); + lock.lock(); } - lock.lock(); + assert(transaction_.state() == wsrep::transaction::s_aborted); - assert(current_error() != wsrep::e_success); } state(lock, s_result); debug_log_state("after_command_before_result: leave"); @@ -205,11 +204,8 @@ void wsrep::client_state::after_command_after_result() if (transaction_.active() && transaction_.state() == wsrep::transaction::s_must_abort) { - lock.unlock(); - client_service_.bf_rollback(); - lock.lock(); + transaction_.after_command_must_abort(lock); assert(transaction_.state() == wsrep::transaction::s_aborted); - override_error(wsrep::e_deadlock_error); } else if (transaction_.active() == false && not keep_command_error_) { diff --git a/src/transaction.cpp b/src/transaction.cpp index 4a63fa8..385326e 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -835,7 +835,13 @@ int wsrep::transaction::after_statement() break; case s_must_abort: case s_cert_failed: - client_state_.override_error(wsrep::e_deadlock_error); + // Error may be set already. For example, if fragment size + // exceeded the maximum size in certify_fragment(), then + // we already have wsrep::e_error_during_commit + if (client_state_.current_error() == wsrep::e_success) + { + client_state_.override_error(wsrep::e_deadlock_error); + } lock.unlock(); ret = client_service_.bf_rollback(); lock.lock(); @@ -849,7 +855,7 @@ int wsrep::transaction::after_statement() { if (is_xa() && !ordered()) { - ret = xa_replay(lock); + ret = xa_replay_commit(lock); } else { @@ -900,6 +906,34 @@ int wsrep::transaction::after_statement() return ret; } +void wsrep::transaction::after_command_must_abort( + wsrep::unique_lock& lock) +{ + debug_log_state("after_command_must_abort enter"); + assert(active()); + assert(state_ == s_must_abort); + + if (is_xa() && is_streaming()) + { + state(lock, s_must_replay); + } + + lock.unlock(); + client_service_.bf_rollback(); + lock.lock(); + + if (is_xa() && is_streaming()) + { + xa_replay(lock); + } + else + { + client_state_.override_error(wsrep::e_deadlock_error); + } + + debug_log_state("after_command_must_abort leave"); +} + void wsrep::transaction::after_applying() { wsrep::unique_lock lock(client_state_.mutex_); @@ -1173,9 +1207,8 @@ void wsrep::transaction::xa_detach() debug_log_state("xa_detach leave"); } -int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) +void wsrep::transaction::xa_replay_common(wsrep::unique_lock& lock) { - debug_log_state("xa_replay enter"); assert(lock.owns_lock()); assert(is_xa()); assert(is_streaming()); @@ -1197,42 +1230,49 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) { client_service_.emergency_shutdown(); } +} +int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) +{ + debug_log_state("xa_replay enter"); + xa_replay_common(lock); + state(lock, s_aborted); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + client_service_.signal_replayed(); + debug_log_state("xa_replay leave"); + return 0; +} + +int wsrep::transaction::xa_replay_commit(wsrep::unique_lock& lock) +{ + debug_log_state("xa_replay_commit enter"); + xa_replay_common(lock); + lock.unlock(); + enum wsrep::provider::status status(client_service_.commit_by_xid()); + lock.lock(); int ret(1); - if (bf_abort_client_state_ == wsrep::client_state::s_idle) + switch (status) { - state(lock, s_aborted); + case wsrep::provider::success: + state(lock, s_committed); streaming_context_.cleanup(); provider().release(ws_handle_); cleanup(); ret = 0; - } - else - { - lock.unlock(); - enum wsrep::provider::status status(client_service_.commit_by_xid()); - lock.lock(); - switch (status) - { - case wsrep::provider::success: - state(lock, s_committed); - streaming_context_.cleanup(); - provider().release(ws_handle_); - cleanup(); - ret = 0; - break; - default: - log_warning() << "Failed to commit by xid during replay"; - // Commit by xid failed, return a commit - // error and let the client retry - state(lock, s_preparing); - state(lock, s_prepared); - client_state_.override_error(wsrep::e_error_during_commit, status); - } + break; + default: + log_warning() << "Failed to commit by xid during replay"; + // Commit by xid failed, return a commit + // error and let the client retry + state(lock, s_preparing); + state(lock, s_prepared); + client_state_.override_error(wsrep::e_error_during_commit, status); } client_service_.signal_replayed(); - debug_log_state("xa_replay leave"); + debug_log_state("xa_replay_commit leave"); return ret; } diff --git a/test/transaction_test_xa.cpp b/test/transaction_test_xa.cpp index 82106c4..d9fbad2 100644 --- a/test/transaction_test_xa.cpp +++ b/test/transaction_test_xa.cpp @@ -145,7 +145,62 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_replay, BOOST_REQUIRE(cc.unordered_replays() == 1); - // xa_replay() createa a streaming applier, clean it up + // xa_replay() creates a streaming applier, clean it up + wsrep::mock_high_priority_service* hps( + static_cast( + sc.find_streaming_applier(sc.id(), wsrep::transaction_id(1)))); + BOOST_REQUIRE(hps); + hps->rollback(wsrep::ws_handle(), wsrep::ws_meta()); + hps->after_apply(); + sc.stop_streaming_applier(sc.id(), wsrep::transaction_id(1)); + server_service.release_high_priority_service(hps); +} + +BOOST_FIXTURE_TEST_CASE(transaction_xa_replay_after_command_before_result, + replicating_client_fixture_sync_rm) +{ + wsrep::xid xid(1, 1, 1, "id"); + + cc.start_transaction(wsrep::transaction_id(1)); + cc.assign_xid(xid); + cc.before_prepare(); + cc.after_prepare(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_exec); + wsrep_test::bf_abort_unordered(cc); + cc.after_command_before_result(); + cc.after_command_after_result(); + + BOOST_REQUIRE(cc.unordered_replays() == 1); + + // xa_replay() creates a streaming applier, clean it up + wsrep::mock_high_priority_service* hps( + static_cast( + sc.find_streaming_applier(sc.id(), wsrep::transaction_id(1)))); + BOOST_REQUIRE(hps); + hps->rollback(wsrep::ws_handle(), wsrep::ws_meta()); + hps->after_apply(); + sc.stop_streaming_applier(sc.id(), wsrep::transaction_id(1)); + server_service.release_high_priority_service(hps); +} + +BOOST_FIXTURE_TEST_CASE(transaction_xa_replay_after_command_after_result, + replicating_client_fixture_sync_rm) +{ + wsrep::xid xid(1, 1, 1, "id"); + + cc.start_transaction(wsrep::transaction_id(1)); + cc.assign_xid(xid); + cc.before_prepare(); + cc.after_prepare(); + cc.after_command_before_result(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_result); + wsrep_test::bf_abort_unordered(cc); + + cc.after_command_after_result(); + + BOOST_REQUIRE(cc.unordered_replays() == 1); + + // xa_replay() creates a a streaming applier, clean it up wsrep::mock_high_priority_service* hps( static_cast( sc.find_streaming_applier(sc.id(), wsrep::transaction_id(1))));