diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 8e05e75..5925fa2 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -40,7 +40,7 @@ const wsrep::transaction& db::high_priority_service::transaction() const return client_.client_state().transaction(); } -void db::high_priority_service::adopt_transaction(const wsrep::transaction&) +int db::high_priority_service::adopt_transaction(const wsrep::transaction&) { throw wsrep::not_implemented_error(); } diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 07df781..279ac69 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -33,7 +33,7 @@ namespace db int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) override; const wsrep::transaction& transaction() const override; - void adopt_transaction(const wsrep::transaction&) override; + int adopt_transaction(const wsrep::transaction&) override; int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) override; int append_fragment_and_commit( diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 34d9210..3989656 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -60,7 +60,7 @@ namespace wsrep /** * Adopt a transaction. */ - virtual void adopt_transaction(const wsrep::transaction&) = 0; + virtual int adopt_transaction(const wsrep::transaction&) = 0; /** * Apply a write set. diff --git a/src/server_state.cpp b/src/server_state.cpp index b7a13c1..b5a1af5 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -49,6 +49,17 @@ static bool is_bootstrap(const std::string& cluster_address, bool bootstrap) return (bootstrap || cluster_address == "gcomm://"); } +// Helper method to provide detailed error message if transaction +// adopt for fragment removal fails. +static void log_adopt_error(const wsrep::transaction& transaction) +{ + wsrep::log_warning() << "Adopting a transaction (" + << transaction.server_id() << "," << transaction.id() + << ") for rollback failed, " + << "this may leave stale entries to streaming log " + << "which may need to be removed manually."; +} + static int apply_fragment(wsrep::high_priority_service& high_priority_service, wsrep::high_priority_service& streaming_applier, const wsrep::ws_handle& ws_handle, @@ -114,11 +125,18 @@ static int rollback_fragment(wsrep::server_state& server_state, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - int ret= 0; + int ret(0); // Adopts transaction state and starts a transaction for // high priority service - high_priority_service.adopt_transaction( - streaming_applier->transaction()); + int adopt_error; + if ((adopt_error = high_priority_service.adopt_transaction( + streaming_applier->transaction()))) + { + log_adopt_error(streaming_applier->transaction()); + } + // Even if the adopt above fails we roll back the streaming transaction. + // Adopt failure will leave stale entries in streaming log which can + // be removed manually. { wsrep::high_priority_switch ws( high_priority_service, *streaming_applier); @@ -132,8 +150,11 @@ static int rollback_fragment(wsrep::server_state& server_state, server_state.server_service().release_high_priority_service( streaming_applier); - high_priority_service.remove_fragments(ws_meta); - high_priority_service.commit(ws_handle, ws_meta); + if (adopt_error == 0) + { + high_priority_service.remove_fragments(ws_meta); + high_priority_service.commit(ws_handle, ws_meta); + } high_priority_service.after_apply(); return ret; } @@ -1014,7 +1035,13 @@ void wsrep::server_state::convert_streaming_client_to_applier( wsrep::high_priority_service* streaming_applier( server_service_.streaming_applier_service( client_state->client_service())); - streaming_applier->adopt_transaction(client_state->transaction()); + if (streaming_applier->adopt_transaction(client_state->transaction())) + { + log_adopt_error(client_state->transaction()); + streaming_applier->after_apply(); + server_service_.release_high_priority_service(streaming_applier); + return; + } if (streaming_appliers_.insert( std::make_pair( std::make_pair(client_state->transaction().server_id(), @@ -1275,8 +1302,15 @@ void wsrep::server_state::close_orphaned_sr_transactions( wsrep::id server_id(i->first.first); wsrep::transaction_id transaction_id(i->first.second); wsrep::high_priority_service* streaming_applier(i->second); - high_priority_service.adopt_transaction( - streaming_applier->transaction()); + int adopt_error; + if ((adopt_error = high_priority_service.adopt_transaction( + streaming_applier->transaction()))) + { + log_adopt_error(streaming_applier->transaction()); + } + // Even if the transaction adopt above fails, we roll back + // the transaction. Adopt error will leave stale entries + // in the streaming log which can be removed manually. { wsrep::high_priority_switch sw(high_priority_service, *streaming_applier); @@ -1292,9 +1326,12 @@ void wsrep::server_state::close_orphaned_sr_transactions( wsrep::stid(server_id, transaction_id, wsrep::client_id()), wsrep::seqno::undefined(), 0); lock.unlock(); - high_priority_service.remove_fragments(ws_meta); - high_priority_service.commit(wsrep::ws_handle(transaction_id, 0), - ws_meta); + if (adopt_error == 0) + { + high_priority_service.remove_fragments(ws_meta); + high_priority_service.commit(wsrep::ws_handle(transaction_id, 0), + ws_meta); + } high_priority_service.after_apply(); lock.lock(); } diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index b3b18dc..54cb20a 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -26,10 +26,11 @@ int wsrep::mock_high_priority_service::start_transaction( return client_state_->start_transaction(ws_handle, ws_meta); } -void wsrep::mock_high_priority_service::adopt_transaction( +int wsrep::mock_high_priority_service::adopt_transaction( const wsrep::transaction& transaction) { client_state_->adopt_transaction(transaction); + return 0; } int wsrep::mock_high_priority_service::apply_write_set( diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index 545de8e..5e204cd 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -45,7 +45,7 @@ namespace wsrep const wsrep::transaction& transaction() const WSREP_OVERRIDE { return client_state_->transaction(); } - void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; + int adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) WSREP_OVERRIDE; int append_fragment_and_commit(