diff --git a/src/transaction.cpp b/src/transaction.cpp index f91c72c..f050a1a 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -749,6 +749,12 @@ int wsrep::transaction::after_rollback() assert(state() == s_aborting || state() == s_must_replay); + // Note that it would be technically more correct to + // remove fragments after TOI BF abort in before_rollback(), + // it seems to cause deadlocks and is done here instead. + // We assume that the application does not let the TOI + // to proceed until this method returns, e.g. by holding + // MDL locks. It is not clear how to enforce that though. if (is_streaming() && bf_aborted_in_total_order_) { lock.unlock(); @@ -1083,10 +1089,15 @@ bool wsrep::transaction::total_order_bf_abort( wsrep::unique_lock& lock WSREP_UNUSED, wsrep::seqno bf_seqno) { + /* We must set this flag before entering bf_abort() in order + * to streaming_rollback() work correctly. The flag will be + * unset if BF abort was not allowed. Note that we rely in + * bf_abort() not to release lock if the BF abort is not allowed. */ + bf_aborted_in_total_order_ = true; bool ret(bf_abort(lock, bf_seqno)); - if (ret) + if (not ret) { - bf_aborted_in_total_order_ = true; + bf_aborted_in_total_order_ = false; } return ret; } @@ -1953,7 +1964,9 @@ void wsrep::transaction::streaming_rollback( if (bf_aborted_in_total_order_) { lock.unlock(); + server_service_.debug_sync("wsrep_streaming_rollback"); client_state_.server_state_.stop_streaming_client(&client_state_); + // Fragments are removed in after_rollback(). lock.lock(); } else diff --git a/test/test_utils.cpp b/test/test_utils.cpp index 3646e96..02e88cf 100644 --- a/test/test_utils.cpp +++ b/test/test_utils.cpp @@ -34,6 +34,11 @@ void wsrep_test::bf_abort_ordered(wsrep::client_state& cc) assert(cc.transaction().ordered()); cc.bf_abort(wsrep::seqno(0)); } + +void wsrep_test::bf_abort_in_total_order(wsrep::client_state& cc) +{ + cc.total_order_bf_abort(wsrep::seqno(0)); +} // BF abort method to abort transactions via provider void wsrep_test::bf_abort_provider(wsrep::mock_server_state& sc, const wsrep::transaction& tc, diff --git a/test/test_utils.hpp b/test/test_utils.hpp index 69ce700..7e4c896 100644 --- a/test/test_utils.hpp +++ b/test/test_utils.hpp @@ -44,6 +44,9 @@ namespace wsrep_test const wsrep::transaction& tc, wsrep::seqno bf_seqno); + // BF abort in total order + void bf_abort_in_total_order(wsrep::client_state&); + // Terminate streaming applier by applying rollback fragment. void terminate_streaming_applier( wsrep::mock_server_state& sc, diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index 493bda8..a6f54e5 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -1406,6 +1406,28 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_executing, wsrep::transaction_id(1)); } + +BOOST_FIXTURE_TEST_CASE( + transaction_row_streaming_total_order_bf_abort_executing, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); + wsrep_test::bf_abort_in_total_order(cc); + BOOST_REQUIRE(tc.bf_aborted_in_total_order()); + // TO BF abort must not replicate rollback fragment, + // rollback must complete before TO is allowed to + // continue. + BOOST_REQUIRE(sc.provider().rollback_fragments() == 0); + BOOST_REQUIRE(tc.streaming_context().rolled_back()); + BOOST_REQUIRE(cc.before_rollback() == 0); + BOOST_REQUIRE(cc.after_rollback() == 0); + BOOST_REQUIRE(cc.after_statement()); + wsrep_test::terminate_streaming_applier(sc, sc.id(), + wsrep::transaction_id(1)); +} + // // Test streaming certification failure during fragment replication // @@ -1490,8 +1512,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_committing, BOOST_REQUIRE(sc.provider().commit_fragments() == 1); } - - BOOST_FIXTURE_TEST_CASE(transaction_byte_streaming_1pc_commit, streaming_client_fixture_byte) {