diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index 58ffdf3..b919299 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -112,6 +112,17 @@ void db::server_service::log_view(wsrep::high_priority_service*, wsrep::log_info() << "View:\n" << v; logged_view_ = v; } + +void db::server_service::recover_streaming_appliers( + wsrep::client_service&) +{ +} + +void db::server_service::recover_streaming_appliers( + wsrep::high_priority_service&) +{ +} + wsrep::view db::server_service::get_view(wsrep::client_service&, const wsrep::id& own_id) { diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index a21c245..c979be7 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -48,6 +48,8 @@ namespace db override; void log_view(wsrep::high_priority_service*, const wsrep::view&) override; + void recover_streaming_appliers(wsrep::client_service&) override; + void recover_streaming_appliers(wsrep::high_priority_service&) override; wsrep::view get_view(wsrep::client_service&, const wsrep::id&) override; wsrep::gtid get_position(wsrep::client_service&) override; diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 8c513b5..aa318c4 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -130,6 +130,35 @@ namespace wsrep wsrep::high_priority_service* high_priority_service, const wsrep::view& view) = 0; + /** + * Recover streaming appliers from the streaming log. + * The implementation must scan through log of stored streaming + * fragments and reconstruct the streaming applier service + * objects. + * + * This is overload for calls which are done from client context, + * e.g. after SST has been received. + * + * @param client_service Reference to client service object + */ + virtual void recover_streaming_appliers( + wsrep::client_service& client_service) = 0; + + /** + * Recover streaming appliers from the streaming log. + * The implementation must scan through log of stored streaming + * fragments and reconstruct the streaming applier service + * objects. + * + * This is overload for calls which are done from high priority + * context, e.g. when handling cluster view change events. + * + * @param high_priority_service Reference to high priority service + * object. + */ + virtual void recover_streaming_appliers( + wsrep::high_priority_service& high_priority_service) = 0; + /** * Recover a cluster view change event. * The method takes own node ID. diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 46f686a..0a359a4 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -577,6 +577,7 @@ namespace wsrep , pause_seqno_() , streaming_clients_() , streaming_appliers_() + , streaming_appliers_recovered_() , provider_() , name_(name) , id_(wsrep::id::undefined()) @@ -602,6 +603,12 @@ namespace wsrep void wait_until_state(wsrep::unique_lock&, enum state) const; // Interrupt all threads which are waiting for state void interrupt_state_waiters(wsrep::unique_lock&); + + // Recover streaming appliers if not already recoverd + template + void recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock&, C&); + // Close SR transcations whose origin is outside of current // cluster view. void close_orphaned_sr_transactions( @@ -658,6 +665,7 @@ namespace wsrep }; streaming_appliers_map streaming_appliers_; + bool streaming_appliers_recovered_; wsrep::provider* provider_; std::string name_; wsrep::id id_; diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 499ec44..a129f41 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -20,6 +20,7 @@ #ifndef WSREP_STREAMING_CONTEXT_HPP #define WSREP_STREAMING_CONTEXT_HPP +#include "compiler.hpp" #include "logger.hpp" #include "seqno.hpp" #include "transaction_id.hpp" @@ -80,7 +81,7 @@ namespace wsrep void stored(wsrep::seqno seqno) { - assert(seqno.is_undefined() == false); + check_fragment_seqno(seqno); fragments_.push_back(seqno); } @@ -91,7 +92,7 @@ namespace wsrep void applied(wsrep::seqno seqno) { - assert(seqno.is_undefined() == false); + check_fragment_seqno(seqno); ++fragments_certified_; fragments_.push_back(seqno); } @@ -152,6 +153,13 @@ namespace wsrep unit_counter_ = 0; } private: + + void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) + { + assert(seqno.is_undefined() == false); + assert(fragments_.empty() || fragments_.back() < seqno); + } + size_t fragments_certified_; std::vector fragments_; wsrep::transaction_id rollback_replicated_for_; diff --git a/src/server_state.cpp b/src/server_state.cpp index 584da8e..3ee042f 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -580,6 +580,10 @@ void wsrep::server_state::sst_received(wsrep::client_service& cs, current_view_ = v; server_service_.log_view(NULL /* this view is stored already */, v); + + lock.lock(); + recover_streaming_appliers_if_not_recovered(lock, cs); + lock.unlock(); } if (provider().sst_received(gtid, error)) @@ -747,10 +751,14 @@ void wsrep::server_state::on_primary_view( } assert(high_priority_service); + if (high_priority_service) { + recover_streaming_appliers_if_not_recovered(lock, + *high_priority_service); close_orphaned_sr_transactions(lock, *high_priority_service); } + if (server_service_.sst_before_init()) { if (state_ == s_initialized) @@ -1143,6 +1151,20 @@ void wsrep::server_state::interrupt_state_waiters( cond_.notify_all(); } +template +void wsrep::server_state::recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock& lock, C& c) +{ + assert(lock.owns_lock()); + if (streaming_appliers_recovered_ == false) + { + lock.unlock(); + server_service_.recover_streaming_appliers(c); + lock.lock(); + } + streaming_appliers_recovered_ = true; +} + void wsrep::server_state::close_orphaned_sr_transactions( wsrep::unique_lock& lock, wsrep::high_priority_service& high_priority_service) @@ -1242,4 +1264,5 @@ void wsrep::server_state::close_transactions_at_disconnect( streaming_appliers_.erase(i++); server_service_.release_high_priority_service(streaming_applier); } + streaming_appliers_recovered_ = false; } diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 11396fe..4283dab 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -128,6 +128,14 @@ namespace wsrep logged_view_ = view; } + void recover_streaming_appliers(wsrep::client_service&) + WSREP_OVERRIDE + { } + + void recover_streaming_appliers(wsrep::high_priority_service&) + WSREP_OVERRIDE + { } + wsrep::view get_view(wsrep::client_service&, const wsrep::id& own_id) WSREP_OVERRIDE {