diff --git a/src/transaction.cpp b/src/transaction.cpp index e17694d..fb0d920 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -816,10 +816,11 @@ void wsrep::transaction::after_applying() } bool wsrep::transaction::bf_abort( - wsrep::unique_lock& lock WSREP_UNUSED, + wsrep::unique_lock& lock, wsrep::seqno bf_seqno) { bool ret(false); + enum wsrep::transaction::state prev_state(state()); assert(lock.owns_lock()); if (active() == false) @@ -883,7 +884,7 @@ bool wsrep::transaction::bf_abort( // storage engine operations and streaming rollback will be // handled from before_rollback() call. if (client_state_.mode() == wsrep::client_state::m_local && - is_streaming() && state() == s_executing) + is_streaming() && prev_state == s_executing) { streaming_rollback(lock); } @@ -1461,15 +1462,19 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lo debug_log_state("streaming_rollback enter"); assert(state_ != s_must_replay); assert(is_streaming()); - if (streaming_context_.rolled_back() == false) { + // We must set rolled_back id before stopping streaming client + // or converting to applier. Accessing server_state requires + // releasing the client_state lock in order to avoid violating + // locking order, and this will open up a possibility for two + // threads accessing this block simultaneously. + streaming_context_.rolled_back(id_); if (bf_aborted_in_total_order_) { lock.unlock(); client_state_.server_state_.stop_streaming_client(&client_state_); lock.lock(); - streaming_context_.rolled_back(id_); } else { @@ -1483,11 +1488,14 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lo &client_state_); lock.lock(); streaming_context_.cleanup(); + // Cleanup cleans rolled_back_for from streaming context, but + // we want to preserve it to avoid executing this block + // more than once. streaming_context_.rolled_back(id_); enum wsrep::provider::status ret; if ((ret = provider().rollback(id_))) { - wsrep::log_warning() + wsrep::log_debug() << "Failed to replicate rollback fragment for " << id_ << ": " << ret; } diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index 7e840ca..5ad1d96 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -1082,6 +1082,21 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_rollback, server_service.release_high_priority_service(hps); } +// +// Test streaming BF abort in executing state. +// +BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_executing, + streaming_client_fixture_row) +{ + BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); + BOOST_REQUIRE(cc.after_row() == 0); + BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); + wsrep_test::bf_abort_unordered(cc); + BOOST_REQUIRE(tc.streaming_context().rolled_back()); + BOOST_REQUIRE(cc.before_rollback() == 0); + BOOST_REQUIRE(cc.after_rollback() == 0); + BOOST_REQUIRE(cc.after_statement()); +} // // Test streaming certification failure during fragment replication //