From 19eaed587c35c28ce3c62922e5c97c0d90eec2d2 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 2 Mar 2023 08:24:44 +0200 Subject: [PATCH] Fix total order BF abort Streaming rollback for total order BF abort used regular BF abort codepath, which was not correct because the streaming rollback must fully complete before total order operation executes. Fixed this by adjusting bf_aborted_in_total_order_ before streaming_rollback() gets called. --- src/transaction.cpp | 17 +++++++++++++++-- test/test_utils.cpp | 5 +++++ test/test_utils.hpp | 3 +++ test/transaction_test.cpp | 24 ++++++++++++++++++++++-- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/transaction.cpp b/src/transaction.cpp index f91c72c..f050a1a 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -749,6 +749,12 @@ int wsrep::transaction::after_rollback() assert(state() == s_aborting || state() == s_must_replay); + // Note that it would be technically more correct to + // remove fragments after TOI BF abort in before_rollback(), + // it seems to cause deadlocks and is done here instead. + // We assume that the application does not let the TOI + // to proceed until this method returns, e.g. by holding + // MDL locks. It is not clear how to enforce that though. if (is_streaming() && bf_aborted_in_total_order_) { lock.unlock(); @@ -1083,10 +1089,15 @@ bool wsrep::transaction::total_order_bf_abort( wsrep::unique_lock& lock WSREP_UNUSED, wsrep::seqno bf_seqno) { + /* We must set this flag before entering bf_abort() in order + * to streaming_rollback() work correctly. The flag will be + * unset if BF abort was not allowed. Note that we rely in + * bf_abort() not to release lock if the BF abort is not allowed. */ + bf_aborted_in_total_order_ = true; bool ret(bf_abort(lock, bf_seqno)); - if (ret) + if (not ret) { - bf_aborted_in_total_order_ = true; + bf_aborted_in_total_order_ = false; } return ret; } @@ -1953,7 +1964,9 @@ void wsrep::transaction::streaming_rollback( if (bf_aborted_in_total_order_) { lock.unlock(); + server_service_.debug_sync("wsrep_streaming_rollback"); client_state_.server_state_.stop_streaming_client(&client_state_); + // Fragments are removed in after_rollback(). lock.lock(); } else diff --git a/test/test_utils.cpp b/test/test_utils.cpp index 3646e96..02e88cf 100644 --- a/test/test_utils.cpp +++ b/test/test_utils.cpp @@ -34,6 +34,11 @@ void wsrep_test::bf_abort_ordered(wsrep::client_state& cc) assert(cc.transaction().ordered()); cc.bf_abort(wsrep::seqno(0)); } + +void wsrep_test::bf_abort_in_total_order(wsrep::client_state& cc) +{ + cc.total_order_bf_abort(wsrep::seqno(0)); +} // BF abort method to abort transactions via provider void wsrep_test::bf_abort_provider(wsrep::mock_server_state& sc, const wsrep::transaction& tc, diff --git a/test/test_utils.hpp b/test/test_utils.hpp index 69ce700..7e4c896 100644 --- a/test/test_utils.hpp +++ b/test/test_utils.hpp @@ -44,6 +44,9 @@ namespace wsrep_test const wsrep::transaction& tc, wsrep::seqno bf_seqno); + // BF abort in total order + void bf_abort_in_total_order(wsrep::client_state&); + // Terminate streaming applier by applying rollback fragment. void terminate_streaming_applier( wsrep::mock_server_state& sc, diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index 493bda8..a6f54e5 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -1406,6 +1406,28 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_executing, wsrep::transaction_id(1)); } + +BOOST_FIXTURE_TEST_CASE( + transaction_row_streaming_total_order_bf_abort_executing, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); + wsrep_test::bf_abort_in_total_order(cc); + BOOST_REQUIRE(tc.bf_aborted_in_total_order()); + // TO BF abort must not replicate rollback fragment, + // rollback must complete before TO is allowed to + // continue. + BOOST_REQUIRE(sc.provider().rollback_fragments() == 0); + BOOST_REQUIRE(tc.streaming_context().rolled_back()); + BOOST_REQUIRE(cc.before_rollback() == 0); + BOOST_REQUIRE(cc.after_rollback() == 0); + BOOST_REQUIRE(cc.after_statement()); + wsrep_test::terminate_streaming_applier(sc, sc.id(), + wsrep::transaction_id(1)); +} + // // Test streaming certification failure during fragment replication // @@ -1490,8 +1512,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_committing, BOOST_REQUIRE(sc.provider().commit_fragments() == 1); } - - BOOST_FIXTURE_TEST_CASE(transaction_byte_streaming_1pc_commit, streaming_client_fixture_byte) {