diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 7fb30c9..f1682d9 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -574,6 +574,9 @@ namespace wsrep wsrep::unique_lock&, wsrep::high_priority_service&); + // Close transactions when handling disconnect from the group. + void close_transactions_at_disconnect(wsrep::high_priority_service&); + wsrep::mutex& mutex_; wsrep::condition_variable& cond_; wsrep::server_service& server_service_; diff --git a/src/server_state.cpp b/src/server_state.cpp index 15ebb0a..f14a3ac 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -688,6 +688,11 @@ void wsrep::server_state::on_view(const wsrep::view& view, wsrep::log_info() << "Non-primary view"; if (view.final()) { + assert(high_priority_service); + if (high_priority_service) + { + close_transactions_at_disconnect(*high_priority_service); + } state(lock, s_disconnected); } else if (state_ != s_disconnecting) @@ -698,6 +703,11 @@ void wsrep::server_state::on_view(const wsrep::view& view, else { assert(view.final()); + assert(high_priority_service); + if (high_priority_service) + { + close_transactions_at_disconnect(*high_priority_service); + } wsrep::unique_lock lock(mutex_); state(lock, s_disconnected); id_ = wsrep::id::undefined(); @@ -1054,3 +1064,25 @@ void wsrep::server_state::close_foreign_sr_transactions( } } } + +void wsrep::server_state::close_transactions_at_disconnect( + wsrep::high_priority_service& high_priority_service) +{ + // Close streaming applier without removing fragments + // from fragment storage. When the server is started again, + // it must be able to recover ongoing streaming transactions. + streaming_appliers_map::iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* streaming_applier(i->second); + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta()); + streaming_applier->after_apply(); + } + streaming_appliers_.erase(i++); + server_service_.release_high_priority_service(streaming_applier); + } +} diff --git a/src/transaction.cpp b/src/transaction.cpp index 318101f..e20a58f 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -609,6 +609,17 @@ int wsrep::transaction::before_rollback() break; } + debug_log_state("before_rollback_leave"); + return 0; +} + +int wsrep::transaction::after_rollback() +{ + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("after_rollback_enter"); + assert(state() == s_aborting || + state() == s_must_replay); + if (is_streaming() && bf_aborted_in_total_order_) { lock.unlock(); @@ -626,17 +637,6 @@ int wsrep::transaction::before_rollback() streaming_context_.cleanup(); } - debug_log_state("before_rollback_leave"); - return 0; -} - -int wsrep::transaction::after_rollback() -{ - wsrep::unique_lock lock(client_state_.mutex()); - debug_log_state("after_rollback_enter"); - assert(state() == s_aborting || - state() == s_must_replay); - if (is_streaming() && state() != s_must_replay) { clear_fragments();