diff --git a/dbsim/dbms_simulator.cpp b/dbsim/dbms_simulator.cpp index edbafdb..b651396 100644 --- a/dbsim/dbms_simulator.cpp +++ b/dbsim/dbms_simulator.cpp @@ -388,7 +388,8 @@ private: } void will_replay(wsrep::transaction_context&) override { } - int replay(wsrep::transaction_context& txc) override + enum wsrep::provider::status + replay(wsrep::transaction_context& txc) override { // wsrep::log() << "replay: " << txc.id().get(); wsrep::client_applier_mode applier_mode(*this); @@ -490,6 +491,11 @@ private: if (err == 0) se_trx_.commit(); err = err || ordered_commit(); err = err || after_commit(); + + if (err) + { + rollback(); + } return err; }); diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index 6127235..b6f2136 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -223,10 +223,10 @@ namespace wsrep return transaction_.start_transaction(wsh, meta); } - int start_replaying() + int start_replaying(const wsrep::ws_meta& ws_meta) { assert(mode_ == m_applier); - return transaction_.start_replaying(); + return transaction_.start_replaying(ws_meta); } void adopt_transaction(wsrep::transaction_context& transaction) @@ -483,7 +483,8 @@ namespace wsrep /*! * Replay the transaction. */ - virtual int replay(wsrep::transaction_context& tc) = 0; + virtual enum wsrep::provider::status + replay(wsrep::transaction_context& tc) = 0; /*! diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 89a253b..08860b6 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -243,7 +243,8 @@ namespace wsrep * * @return Zero in case of success, non-zero on failure. */ - virtual int replay(wsrep::ws_handle& ws_handle, void* applier_ctx) = 0; + virtual enum status replay( + wsrep::ws_handle& ws_handle, void* applier_ctx) = 0; virtual int sst_sent(const wsrep::gtid&, int) = 0; virtual int sst_received(const wsrep::gtid&, int) = 0; diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index 4953b48..1a8dbdd 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -85,7 +85,7 @@ namespace wsrep int start_transaction(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta); - int start_replaying(); + int start_replaying(const wsrep::ws_meta&); int append_key(const wsrep::key&); diff --git a/src/server_context.cpp b/src/server_context.cpp index d793410..5ecb619 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -137,7 +137,7 @@ int wsrep::server_context::on_apply( } else { - client_context.start_replaying(); + client_context.start_replaying(ws_meta); } if (client_context.apply(data)) @@ -251,7 +251,7 @@ int wsrep::server_context::on_apply( } else { - ret = client_context.start_replaying() || + ret = client_context.start_replaying(ws_meta) || client_context.apply(wsrep::const_buffer()) || client_context.commit(); } diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index c3bda95..5a7a439 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -80,8 +80,9 @@ int wsrep::transaction_context::start_transaction( return 0; } -int wsrep::transaction_context::start_replaying() +int wsrep::transaction_context::start_replaying(const wsrep::ws_meta& ws_meta) { + ws_meta_ = ws_meta; assert(ws_meta_.flags() & wsrep::provider::flag::commit); assert(active()); assert(client_context_.mode() == wsrep::client_context::m_applier); @@ -506,12 +507,32 @@ int wsrep::transaction_context::after_statement() // Continue to replay if rollback() changed the state to s_must_replay // Fall through case s_must_replay: + { state(lock, s_replaying); lock.unlock(); - ret = client_context_.replay(*this); + enum wsrep::provider::status replay_ret(client_context_.replay(*this)); + switch (replay_ret) + { + case wsrep::provider::success: + break; + case wsrep::provider::error_certification_failed: + client_context_.override_error( + wsrep::e_deadlock_error); + ret = 1; + break; + default: + client_context_.abort(); + break; + } lock.lock(); + if (ret) + { + wsrep::log_info() << "Replay ret " << replay_ret; + state(lock, s_aborted); + } provider_.release(ws_handle_); break; + } case s_aborted: break; default: @@ -650,7 +671,7 @@ void wsrep::transaction_context::state( { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ - { 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0} /* re */ + { 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ }; if (allowed[state_][next_state]) { @@ -852,7 +873,6 @@ int wsrep::transaction_context::certify_commit( // yet known. Therefore the transaction must roll back // and go through replay either to replay and commit the whole // transaction or to determine failed certification status. - assert(ordered()); client_context_.will_replay(*this); if (state() != s_must_abort) { diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 0b76c57..96ab19f 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -548,11 +548,13 @@ int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle) return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK); } -int wsrep::wsrep_provider_v26::replay(wsrep::ws_handle& ws_handle, - void* applier_ctx) +enum wsrep::provider::status +wsrep::wsrep_provider_v26::replay(wsrep::ws_handle& ws_handle, + void* applier_ctx) { mutable_ws_handle mwsh(ws_handle); - return (wsrep_->replay_trx(wsrep_, mwsh.native(), applier_ctx) != WSREP_OK); + return map_return_value( + wsrep_->replay_trx(wsrep_, mwsh.native(), applier_ctx)); } int wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err) diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index db650db..c6e8020 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -41,7 +41,7 @@ namespace wsrep int commit_order_leave(const wsrep::ws_handle&, const wsrep::ws_meta&); int release(wsrep::ws_handle&); - int replay(wsrep::ws_handle&, void*); + enum wsrep::provider::status replay(wsrep::ws_handle&, void*); int sst_sent(const wsrep::gtid&,int); int sst_received(const wsrep::gtid& gtid, int); diff --git a/test/mock_client_context.hpp b/test/mock_client_context.hpp index 4ea534a..16c0823 100644 --- a/test/mock_client_context.hpp +++ b/test/mock_client_context.hpp @@ -55,9 +55,11 @@ namespace wsrep void remove_fragments(const wsrep::transaction_context& ) WSREP_OVERRIDE { } void will_replay(wsrep::transaction_context&) WSREP_OVERRIDE { } - int replay(wsrep::transaction_context& tc) WSREP_OVERRIDE + enum wsrep::provider::status + replay(wsrep::transaction_context& tc) WSREP_OVERRIDE { - int ret(provider().replay(tc.ws_handle(), this)); + enum wsrep::provider::status ret( + provider().replay(tc.ws_handle(), this)); ++replays_; return ret; } diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 991a591..6f552be 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -26,6 +26,7 @@ namespace wsrep , commit_order_enter_result_() , commit_order_leave_result_() , release_result_() + , replay_result_() , group_id_("1") , server_id_("1") , group_seqno_(0) @@ -138,14 +139,43 @@ namespace wsrep int release(wsrep::ws_handle&) { return release_result_; } - int replay(wsrep::ws_handle&, void* ctx) + enum wsrep::provider::status replay(wsrep::ws_handle&, void* ctx) { wsrep::mock_client_context& cc( *static_cast(ctx)); wsrep::client_applier_mode applier_mode(cc); const wsrep::transaction_context& tc(cc.transaction()); - return server_context_.on_apply(cc, tc.ws_handle(), tc.ws_meta(), - wsrep::const_buffer()); + wsrep::ws_meta ws_meta; + if (replay_result_ == wsrep::provider::success) + { + // If the ws_meta was not assigned yet, the certify + // returned early due to BF abort. + if (tc.ws_meta().seqno().nil()) + { + ++group_seqno_; + ws_meta = wsrep::ws_meta( + wsrep::gtid(group_id_, wsrep::seqno(group_seqno_)), + wsrep::stid(server_id_, tc.id(), cc.id()), + wsrep::seqno(group_seqno_ - 1), + wsrep::provider::flag::start_transaction | + wsrep::provider::flag::commit); + } + else + { + ws_meta = tc.ws_meta(); + } + } + else + { + return replay_result_; + } + + if (server_context_.on_apply(cc, tc.ws_handle(), ws_meta, + wsrep::const_buffer())) + { + return wsrep::provider::error_fatal; + } + return wsrep::provider::success; } int sst_sent(const wsrep::gtid&, int) { return 0; } @@ -183,6 +213,7 @@ namespace wsrep enum wsrep::provider::status commit_order_enter_result_; enum wsrep::provider::status commit_order_leave_result_; enum wsrep::provider::status release_result_; + enum wsrep::provider::status replay_result_; size_t start_fragments() const { return start_fragments_; } size_t fragments() const { return fragments_; } diff --git a/test/transaction_context_test.cpp b/test/transaction_context_test.cpp index bfcc3c1..9c3de8c 100644 --- a/test/transaction_context_test.cpp +++ b/test/transaction_context_test.cpp @@ -307,6 +307,45 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(cc.current_error()); } +// +// Test a transaction which gets BF aborted inside provider before +// certification result is known. Replaying will be successful +// +BOOST_FIXTURE_TEST_CASE( + transaction_context_bf_before_cert_result_replay_success, + replicating_client_fixture_sync_rm) +{ + BOOST_REQUIRE(cc.start_transaction(1) == 0); + sc.provider().certify_result_ = wsrep::provider::error_bf_abort; + sc.provider().replay_result_ = wsrep::provider::success; + + BOOST_REQUIRE(cc.before_commit()); + BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_must_replay); + BOOST_REQUIRE(cc.after_statement() == 0); + BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_committed); + BOOST_REQUIRE(cc.current_error() == wsrep::e_success); +} + +// +// Test a transaction which gets BF aborted inside provider before +// certification result is known. Replaying will fail because of +// certification failure. +// +BOOST_FIXTURE_TEST_CASE( + transaction_context_bf_before_cert_result_replay_cert_fail, + replicating_client_fixture_sync_rm) +{ + BOOST_REQUIRE(cc.start_transaction(1) == 0); + sc.provider().certify_result_ = wsrep::provider::error_bf_abort; + sc.provider().replay_result_ = wsrep::provider::error_certification_failed; + + BOOST_REQUIRE(cc.before_commit()); + BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_must_replay); + BOOST_REQUIRE(cc.after_statement() == wsrep::client_context::asr_error); + BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_aborted); + BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error); + BOOST_REQUIRE(tc.active() == false); +} // // Test a 1PC transaction which gets BF aborted during before_commit via