From a6b38d2428ea73d6468235bb50f5a632c0dc4ad1 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Fri, 18 Jan 2019 14:02:58 +0200 Subject: [PATCH] codership/wsrep-lib#54 Service call to recover streaming appliers Introduced server_service recover_streaming_appliers() interface call which will be called in total order whenever streaming appliers must be recovered. The call comes with two overloads, one which can be called from client context (e.g. after SST has been received) and the other from high priority context (e.g. view event handling). The client context overload should be eventually be deprecated once there is a mechanism to make provider signal that it has joined to the cluster and will start applying events. --- dbsim/db_server_service.cpp | 11 +++++++++++ dbsim/db_server_service.hpp | 2 ++ include/wsrep/server_service.hpp | 29 +++++++++++++++++++++++++++++ include/wsrep/server_state.hpp | 8 ++++++++ include/wsrep/streaming_context.hpp | 12 ++++++++++-- src/server_state.cpp | 23 +++++++++++++++++++++++ test/mock_server_state.hpp | 8 ++++++++ 7 files changed, 91 insertions(+), 2 deletions(-) diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index 58ffdf3..b919299 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -112,6 +112,17 @@ void db::server_service::log_view(wsrep::high_priority_service*, wsrep::log_info() << "View:\n" << v; logged_view_ = v; } + +void db::server_service::recover_streaming_appliers( + wsrep::client_service&) +{ +} + +void db::server_service::recover_streaming_appliers( + wsrep::high_priority_service&) +{ +} + wsrep::view db::server_service::get_view(wsrep::client_service&, const wsrep::id& own_id) { diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index a21c245..c979be7 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -48,6 +48,8 @@ namespace db override; void log_view(wsrep::high_priority_service*, const wsrep::view&) override; + void recover_streaming_appliers(wsrep::client_service&) override; + void recover_streaming_appliers(wsrep::high_priority_service&) override; wsrep::view get_view(wsrep::client_service&, const wsrep::id&) override; wsrep::gtid get_position(wsrep::client_service&) override; diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 8c513b5..aa318c4 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -130,6 +130,35 @@ namespace wsrep wsrep::high_priority_service* high_priority_service, const wsrep::view& view) = 0; + /** + * Recover streaming appliers from the streaming log. + * The implementation must scan through log of stored streaming + * fragments and reconstruct the streaming applier service + * objects. + * + * This is overload for calls which are done from client context, + * e.g. after SST has been received. + * + * @param client_service Reference to client service object + */ + virtual void recover_streaming_appliers( + wsrep::client_service& client_service) = 0; + + /** + * Recover streaming appliers from the streaming log. + * The implementation must scan through log of stored streaming + * fragments and reconstruct the streaming applier service + * objects. + * + * This is overload for calls which are done from high priority + * context, e.g. when handling cluster view change events. + * + * @param high_priority_service Reference to high priority service + * object. + */ + virtual void recover_streaming_appliers( + wsrep::high_priority_service& high_priority_service) = 0; + /** * Recover a cluster view change event. * The method takes own node ID. diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 46f686a..0a359a4 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -577,6 +577,7 @@ namespace wsrep , pause_seqno_() , streaming_clients_() , streaming_appliers_() + , streaming_appliers_recovered_() , provider_() , name_(name) , id_(wsrep::id::undefined()) @@ -602,6 +603,12 @@ namespace wsrep void wait_until_state(wsrep::unique_lock&, enum state) const; // Interrupt all threads which are waiting for state void interrupt_state_waiters(wsrep::unique_lock&); + + // Recover streaming appliers if not already recoverd + template + void recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock&, C&); + // Close SR transcations whose origin is outside of current // cluster view. void close_orphaned_sr_transactions( @@ -658,6 +665,7 @@ namespace wsrep }; streaming_appliers_map streaming_appliers_; + bool streaming_appliers_recovered_; wsrep::provider* provider_; std::string name_; wsrep::id id_; diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 499ec44..a129f41 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -20,6 +20,7 @@ #ifndef WSREP_STREAMING_CONTEXT_HPP #define WSREP_STREAMING_CONTEXT_HPP +#include "compiler.hpp" #include "logger.hpp" #include "seqno.hpp" #include "transaction_id.hpp" @@ -80,7 +81,7 @@ namespace wsrep void stored(wsrep::seqno seqno) { - assert(seqno.is_undefined() == false); + check_fragment_seqno(seqno); fragments_.push_back(seqno); } @@ -91,7 +92,7 @@ namespace wsrep void applied(wsrep::seqno seqno) { - assert(seqno.is_undefined() == false); + check_fragment_seqno(seqno); ++fragments_certified_; fragments_.push_back(seqno); } @@ -152,6 +153,13 @@ namespace wsrep unit_counter_ = 0; } private: + + void check_fragment_seqno(wsrep::seqno seqno WSREP_UNUSED) + { + assert(seqno.is_undefined() == false); + assert(fragments_.empty() || fragments_.back() < seqno); + } + size_t fragments_certified_; std::vector fragments_; wsrep::transaction_id rollback_replicated_for_; diff --git a/src/server_state.cpp b/src/server_state.cpp index 584da8e..3ee042f 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -580,6 +580,10 @@ void wsrep::server_state::sst_received(wsrep::client_service& cs, current_view_ = v; server_service_.log_view(NULL /* this view is stored already */, v); + + lock.lock(); + recover_streaming_appliers_if_not_recovered(lock, cs); + lock.unlock(); } if (provider().sst_received(gtid, error)) @@ -747,10 +751,14 @@ void wsrep::server_state::on_primary_view( } assert(high_priority_service); + if (high_priority_service) { + recover_streaming_appliers_if_not_recovered(lock, + *high_priority_service); close_orphaned_sr_transactions(lock, *high_priority_service); } + if (server_service_.sst_before_init()) { if (state_ == s_initialized) @@ -1143,6 +1151,20 @@ void wsrep::server_state::interrupt_state_waiters( cond_.notify_all(); } +template +void wsrep::server_state::recover_streaming_appliers_if_not_recovered( + wsrep::unique_lock& lock, C& c) +{ + assert(lock.owns_lock()); + if (streaming_appliers_recovered_ == false) + { + lock.unlock(); + server_service_.recover_streaming_appliers(c); + lock.lock(); + } + streaming_appliers_recovered_ = true; +} + void wsrep::server_state::close_orphaned_sr_transactions( wsrep::unique_lock& lock, wsrep::high_priority_service& high_priority_service) @@ -1242,4 +1264,5 @@ void wsrep::server_state::close_transactions_at_disconnect( streaming_appliers_.erase(i++); server_service_.release_high_priority_service(streaming_applier); } + streaming_appliers_recovered_ = false; } diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 11396fe..4283dab 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -128,6 +128,14 @@ namespace wsrep logged_view_ = view; } + void recover_streaming_appliers(wsrep::client_service&) + WSREP_OVERRIDE + { } + + void recover_streaming_appliers(wsrep::high_priority_service&) + WSREP_OVERRIDE + { } + wsrep::view get_view(wsrep::client_service&, const wsrep::id& own_id) WSREP_OVERRIDE {