diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 26fff7b..6b72736 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -29,6 +29,7 @@ #include "buffer.hpp" #include "xid.hpp" +#include #include namespace wsrep @@ -281,6 +282,14 @@ namespace wsrep wsrep::mutable_buffer apply_error_buf_; wsrep::xid xid_; bool streaming_rollback_in_progress_; + /* This flag tells that the transaction has become immutable + against BF aborts. Ideally this would be deduced from transaction + state, i.e. s_ordered_commit or s_committed, but the current + version of wsrep-lib changes the transaction state to + s_ordered_commit too late. Fixing this appears to require + too many changes to application using the lib, so boolean flag + must do. */ + bool is_bf_immutable_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) @@ -308,6 +317,8 @@ namespace wsrep return to_c_string(state); } + std::ostream& operator<<(std::ostream& os, enum wsrep::transaction::state); + } #endif // WSREP_TRANSACTION_HPP diff --git a/src/transaction.cpp b/src/transaction.cpp index 9843c18..df76110 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -112,6 +112,7 @@ wsrep::transaction::transaction( , apply_error_buf_() , xid_() , streaming_rollback_in_progress_(false) + , is_bf_immutable_(false) { } @@ -555,6 +556,12 @@ int wsrep::transaction::before_commit() assert(0); break; } + + if (ret == 0 && state() == s_committing) + { + is_bf_immutable_ = true; + } + debug_log_state("before_commit_leave"); return ret; } @@ -564,6 +571,7 @@ int wsrep::transaction::ordered_commit() wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("ordered_commit_enter"); assert(state() == s_committing); + assert(is_bf_immutable_); assert(ordered()); client_service_.debug_sync("wsrep_before_commit_order_leave"); int ret(provider().commit_order_leave(ws_handle_, ws_meta_, @@ -602,6 +610,7 @@ int wsrep::transaction::after_commit() int ret(0); wsrep::unique_lock lock(client_state_.mutex()); + assert(is_bf_immutable_); debug_log_state("after_commit_enter"); assert(state() == s_ordered_commit); @@ -639,7 +648,7 @@ int wsrep::transaction::after_commit() } assert(ret == 0); state(lock, s_committed); - + is_bf_immutable_ = false; debug_log_state("after_commit_leave"); return ret; } @@ -819,6 +828,7 @@ int wsrep::transaction::after_statement() state() == s_must_abort || state() == s_cert_failed || state() == s_must_replay); + assert(not is_bf_immutable_); if (state() == s_executing && streaming_context_.fragment_size() && @@ -981,6 +991,12 @@ bool wsrep::transaction::bf_abort( wsrep::log::debug_level_transaction, "Transaction not active, skipping bf abort"); } + else if (is_bf_immutable_) + { + WSREP_LOG_DEBUG(client_state_.debug_log_level(), + wsrep::log::debug_level_transaction, + "Transaction has become immutable for BF abort"); + } else { switch (state_at_enter) @@ -2139,3 +2155,9 @@ void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const << int64_t(id().get()) << " append key:\n" << key); } + +std::ostream& wsrep::operator<<(std::ostream& os, + enum wsrep::transaction::state state) +{ + return (os << to_c_string(state)); +} diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index a6f54e5..151ed8e 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -289,6 +289,32 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(not cc.current_error()); } +BOOST_FIXTURE_TEST_CASE( + transaction_1pc_bf_after_before_commit, + replicating_client_fixture_async_rm) +{ + // Start a new transaction with ID 1 + cc.start_transaction(wsrep::transaction_id(1)); + BOOST_REQUIRE(tc.active()); + BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); + + // Run before commit + BOOST_REQUIRE(cc.before_commit() == 0); + BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_committing); + BOOST_REQUIRE(tc.certified() == true); + BOOST_REQUIRE(tc.ordered() == true); + + wsrep_test::bf_abort_ordered(cc); + BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_committing); + + // Clean up + cc.ordered_commit(); + cc.after_commit(); + cc.after_statement(); +} + + // // Test a 1PC transaction for which prepare data fails // @@ -1491,16 +1517,17 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_cert_fail_commit, // // Test streaming BF abort after succesful certification // -BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_committing, - streaming_client_fixture_row) +BOOST_FIXTURE_TEST_CASE( + transaction_row_streaming_bf_abort_during_commit_order_enter, + streaming_client_fixture_row) { BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); - BOOST_REQUIRE(cc.before_commit() == 0); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); - wsrep_test::bf_abort_ordered(cc); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_abort); + sc.provider().commit_order_enter_result_ = wsrep::provider::error_bf_abort; + BOOST_REQUIRE(cc.before_commit()); + BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_must_replay); + sc.provider().commit_order_enter_result_ = wsrep::provider::success; BOOST_REQUIRE(cc.before_rollback() == 0); BOOST_REQUIRE(cc.after_rollback() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_replay);