From b76e94f84aa83c0dcd16a4e2e2fd82b856ff52e3 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Tue, 25 Apr 2023 08:45:46 +0300 Subject: [PATCH] Transaction not immune after becoming ordered for commit The transaction state is set to s_ordered_commit in ordered_commit(). However, this is too late for making the transaction immune for BF aborts after commit order has been established, which happens in before_commit(). Moving the state change into before_commit() would be the right thing to do, but that would require too many fixes to existing applications which are using the lib. In order to make the transaction immune for BF abort after it has been ordered to commit, introduce additional boolean flag which is set to true at the end of before_commit() and is taken into account in bf_abort(). --- include/wsrep/transaction.hpp | 11 ++++++++++ src/transaction.cpp | 24 ++++++++++++++++++++- test/transaction_test.cpp | 39 +++++++++++++++++++++++++++++------ 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 26fff7b..6b72736 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -29,6 +29,7 @@ #include "buffer.hpp" #include "xid.hpp" +#include #include namespace wsrep @@ -281,6 +282,14 @@ namespace wsrep wsrep::mutable_buffer apply_error_buf_; wsrep::xid xid_; bool streaming_rollback_in_progress_; + /* This flag tells that the transaction has become immutable + against BF aborts. Ideally this would be deduced from transaction + state, i.e. s_ordered_commit or s_committed, but the current + version of wsrep-lib changes the transaction state to + s_ordered_commit too late. Fixing this appears to require + too many changes to application using the lib, so boolean flag + must do. */ + bool is_bf_immutable_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) @@ -308,6 +317,8 @@ namespace wsrep return to_c_string(state); } + std::ostream& operator<<(std::ostream& os, enum wsrep::transaction::state); + } #endif // WSREP_TRANSACTION_HPP diff --git a/src/transaction.cpp b/src/transaction.cpp index 9843c18..df76110 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -112,6 +112,7 @@ wsrep::transaction::transaction( , apply_error_buf_() , xid_() , streaming_rollback_in_progress_(false) + , is_bf_immutable_(false) { } @@ -555,6 +556,12 @@ int wsrep::transaction::before_commit() assert(0); break; } + + if (ret == 0 && state() == s_committing) + { + is_bf_immutable_ = true; + } + debug_log_state("before_commit_leave"); return ret; } @@ -564,6 +571,7 @@ int wsrep::transaction::ordered_commit() wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("ordered_commit_enter"); assert(state() == s_committing); + assert(is_bf_immutable_); assert(ordered()); client_service_.debug_sync("wsrep_before_commit_order_leave"); int ret(provider().commit_order_leave(ws_handle_, ws_meta_, @@ -602,6 +610,7 @@ int wsrep::transaction::after_commit() int ret(0); wsrep::unique_lock lock(client_state_.mutex()); + assert(is_bf_immutable_); debug_log_state("after_commit_enter"); assert(state() == s_ordered_commit); @@ -639,7 +648,7 @@ int wsrep::transaction::after_commit() } assert(ret == 0); state(lock, s_committed); - + is_bf_immutable_ = false; debug_log_state("after_commit_leave"); return ret; } @@ -819,6 +828,7 @@ int wsrep::transaction::after_statement() state() == s_must_abort || state() == s_cert_failed || state() == s_must_replay); + assert(not is_bf_immutable_); if (state() == s_executing && streaming_context_.fragment_size() && @@ -981,6 +991,12 @@ bool wsrep::transaction::bf_abort( wsrep::log::debug_level_transaction, "Transaction not active, skipping bf abort"); } + else if (is_bf_immutable_) + { + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Transaction has become immutable for BF abort"); + } else { switch (state_at_enter) @@ -2139,3 +2155,9 @@ void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const << int64_t(id().get()) << " append key:\n" << key); } + +std::ostream& wsrep::operator<<(std::ostream& os, + enum wsrep::transaction::state state) +{ + return (os << to_c_string(state)); +} diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index a6f54e5..151ed8e 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -289,6 +289,32 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(not cc.current_error()); } +BOOST_FIXTURE_TEST_CASE( + transaction_1pc_bf_after_before_commit, + replicating_client_fixture_async_rm) +{ + // Start a new transaction with ID 1 + cc.start_transaction(wsrep::transaction_id(1)); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); + + // Run before commit + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_committing); + BOOST_REQUIRE(tc.certified() == true); + BOOST_REQUIRE(tc.ordered() == true); + + wsrep_test::bf_abort_ordered(cc); + BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_committing); + + // Clean up + cc.ordered_commit(); + cc.after_commit(); + cc.after_statement(); +} + + // // Test a 1PC transaction for which prepare data fails // @@ -1491,16 +1517,17 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_cert_fail_commit, // // Test streaming BF abort after succesful certification // -BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_committing, - streaming_client_fixture_row) +BOOST_FIXTURE_TEST_CASE( + transaction_row_streaming_bf_abort_during_commit_order_enter, + streaming_client_fixture_row) { BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); - BOOST_REQUIRE(cc.before_commit() == 0); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); - wsrep_test::bf_abort_ordered(cc); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_abort); + sc.provider().commit_order_enter_result_ = wsrep::provider::error_bf_abort; + BOOST_REQUIRE(cc.before_commit()); + BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_must_replay); + sc.provider().commit_order_enter_result_ = wsrep::provider::success; BOOST_REQUIRE(cc.before_rollback() == 0); BOOST_REQUIRE(cc.after_rollback() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_replay);