diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index da646f5..ba16cc6 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -71,6 +71,7 @@ #include "condition_variable.hpp" #include "server_service.hpp" #include "id.hpp" +#include "view.hpp" #include "transaction_id.hpp" #include "provider.hpp" @@ -85,7 +86,6 @@ namespace wsrep class ws_meta; class client_state; class transaction; - class view; class const_buffer; /** @class Server Context @@ -278,8 +278,16 @@ namespace wsrep /** * Wait until server reaches given state. */ - void wait_until_state(wsrep::server_state::state) const; + void wait_until_state(enum state state) const + { + wsrep::unique_lock lock(mutex_); + wait_until_state(lock, state); + } + /** + * Return current view + */ + const wsrep::view& current_view() const { return current_view_; } /** * Set last committed GTID. */ @@ -311,6 +319,33 @@ namespace wsrep */ enum wsrep::provider::status causal_read(int timeout) const; + /** + * Desynchronize the server. + * + * If the server state is synced, this call will desynchronize + * the server from the cluster. + */ + int desync() + { + wsrep::unique_lock lock(mutex_); + return desync(lock); + } + + /** + * Resynchronize the server. + */ + void resync() + { + wsrep::unique_lock lock(mutex_); + resync(lock); + } + + wsrep::seqno pause(); + + wsrep::seqno pause_seqno() const { return pause_seqno_; } + + void resume(); + /** * Prepares server state for SST. * @@ -336,9 +371,18 @@ namespace wsrep */ void sst_sent(const wsrep::gtid& gtid, int error); + /** + * This method should be called on joiner after the + * SST has been transferred but before DBMS has been + * initialized. + */ + void sst_transferred(const wsrep::gtid& gtid); + /** * This method must be called by the joiner after the SST - * transfer has been received. + * transfer has been received and DBMS state has been completely + * initialized. This will signal the provider that it can + * start applying write sets. * * @param gtid GTID provided by the SST transfer */ @@ -375,6 +419,12 @@ namespace wsrep enum state state() const { wsrep::unique_lock lock(mutex_); + return state(lock); + } + + enum state state(wsrep::unique_lock& lock) const + { + assert(lock.owns_lock()); return state_; } /** @@ -396,6 +446,7 @@ namespace wsrep */ void debug_log_filter(const std::string&); + wsrep::mutex& mutex() { return mutex_; } protected: /** Server state constructor * @@ -421,7 +472,14 @@ namespace wsrep , cond_(cond) , server_service_(server_service) , state_(s_disconnected) + , state_hist_() , state_waiters_(n_states_) + , init_initialized_() + , init_synced_() + , sst_gtid_() + , desync_count_() + , pause_seqno_() + , desynced_on_pause_() , streaming_appliers_() , provider_() , name_(name) @@ -429,6 +487,7 @@ namespace wsrep , address_(address) , working_dir_(working_dir) , rollback_mode_(rollback_mode) + , current_view_() , last_committed_gtid_() , debug_log_level_(0) { } @@ -438,13 +497,23 @@ namespace wsrep server_state(const server_state&); server_state& operator=(const server_state&); + int desync(wsrep::unique_lock&); + void resync(wsrep::unique_lock&); void state(wsrep::unique_lock&, enum state); + void wait_until_state(wsrep::unique_lock&, enum state) const; wsrep::mutex& mutex_; wsrep::condition_variable& cond_; wsrep::server_service& server_service_; enum state state_; + std::vector state_hist_; mutable std::vector state_waiters_; + bool init_initialized_; + bool init_synced_; + wsrep::gtid sst_gtid_; + size_t desync_count_; + wsrep::seqno pause_seqno_; + bool desynced_on_pause_; typedef std::map, wsrep::client_state*> streaming_appliers_map; streaming_appliers_map streaming_appliers_; wsrep::provider* provider_; @@ -453,6 +522,7 @@ namespace wsrep std::string address_; std::string working_dir_; enum rollback_mode rollback_mode_; + wsrep::view current_view_; wsrep::gtid last_committed_gtid_; int debug_log_level_; }; @@ -471,13 +541,14 @@ namespace wsrep wsrep::server_service& server_service_; }; - static inline std::string to_string(enum wsrep::server_state::state state) + static inline const char* to_c_string( + enum wsrep::server_state::state state) { switch (state) { case wsrep::server_state::s_disconnected: return "disconnected"; - case wsrep::server_state::s_initializing: return "initilizing"; - case wsrep::server_state::s_initialized: return "initilized"; + case wsrep::server_state::s_initializing: return "initializing"; + case wsrep::server_state::s_initialized: return "initialized"; case wsrep::server_state::s_connected: return "connected"; case wsrep::server_state::s_joiner: return "joiner"; case wsrep::server_state::s_joined: return "joined"; @@ -488,6 +559,11 @@ namespace wsrep return "unknown"; } + static inline std::string to_string(enum wsrep::server_state::state state) + { + return (to_c_string(state)); + } + } #endif // WSREP_SERVER_STATE_HPP diff --git a/include/wsrep/view.hpp b/include/wsrep/view.hpp index a809669..3e46c94 100644 --- a/include/wsrep/view.hpp +++ b/include/wsrep/view.hpp @@ -12,6 +12,8 @@ #define WSREP_VIEW_HPP #include "id.hpp" +#include "seqno.hpp" +#include "gtid.hpp" #include namespace wsrep @@ -45,6 +47,15 @@ namespace wsrep std::string incoming_; }; + view() + : state_id_() + , view_seqno_() + , status_(disconnected) + , capabilities_() + , own_index_(-1) + , protocol_version_(0) + , members_() + { } view(const wsrep::gtid& state_id, wsrep::seqno view_seqno, enum wsrep::view::status status, diff --git a/src/client_state.cpp b/src/client_state.cpp index 58682a7..88b21f2 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -18,27 +18,33 @@ wsrep::provider& wsrep::client_state::provider() const void wsrep::client_state::open(wsrep::client_id id) { wsrep::unique_lock lock(mutex_); + debug_log_state("open: enter"); owning_thread_id_ = wsrep::this_thread::get_id(); current_thread_id_ = owning_thread_id_; state(lock, s_idle); id_ = id; + debug_log_state("open: leave"); } void wsrep::client_state::close() { wsrep::unique_lock lock(mutex_); + debug_log_state("close: enter"); state(lock, s_quitting); lock.unlock(); if (transaction_.active()) { client_service_.rollback(); } + debug_log_state("close: leave"); } void wsrep::client_state::cleanup() { wsrep::unique_lock lock(mutex_); + debug_log_state("cleanup: enter"); state(lock, s_none); + debug_log_state("cleanup: leave"); } void wsrep::client_state::override_error(enum wsrep::client_error error) @@ -281,6 +287,7 @@ void wsrep::client_state::debug_log_state(const char* context) const wsrep::log_debug() << "client_state: " << context << ": server: " << server_state_.name() << " client: " << id_.get() + << " state: " << to_c_string(state_) << " current_error: " << current_error_; } } diff --git a/src/server_state.cpp b/src/server_state.cpp index a9c6dbf..83a8912 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -55,6 +55,46 @@ wsrep::server_state::~server_state() delete provider_; } + +wsrep::seqno wsrep::server_state::pause() +{ + wsrep::unique_lock lock(mutex_); + if (pause_seqno_.is_undefined() == false) + { + throw wsrep::runtime_error("Trying to pause already paused provider"); + } + if (state_ == s_synced) + { + if (desync(lock)) + { + return wsrep::seqno::undefined(); + } + desynced_on_pause_ = true; + } + pause_seqno_ = provider_->pause(); + if (pause_seqno_.is_undefined()) + { + resync(); + } + return pause_seqno_; +} + +void wsrep::server_state::resume() +{ + wsrep::unique_lock lock(mutex_); + assert(pause_seqno_.is_undefined() == false); + if (desynced_on_pause_) + { + resync(lock); + desynced_on_pause_ = false; + } + if (provider_->resume()) + { + throw wsrep::runtime_error("Failed to resume provider"); + } + pause_seqno_ = wsrep::seqno::undefined(); +} + std::string wsrep::server_state::prepare_for_sst() { wsrep::unique_lock lock(mutex_); @@ -82,6 +122,7 @@ int wsrep::server_state::start_sst(const std::string& sst_request, void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error) { + wsrep::log_info() << "SST sent: " << gtid << ": " << error; if (provider_->sst_sent(gtid, error)) { server_service_.log_message(wsrep::log::warning, @@ -91,8 +132,24 @@ void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error) state(lock, s_joined); } +void wsrep::server_state::sst_transferred(const wsrep::gtid& gtid) +{ + wsrep::log_info() << "SST transferred"; + wsrep::unique_lock lock(mutex_); + sst_gtid_ = gtid; + if (server_service_.sst_before_init()) + { + state(lock, s_initializing); + } + else + { + state(lock, s_joined); + } +} + void wsrep::server_state::sst_received(const wsrep::gtid& gtid, int error) { + wsrep::log_info() << "SST received: " << gtid << ": " << error; if (provider_->sst_received(gtid, error)) { throw wsrep::runtime_error("SST received failed"); @@ -101,10 +158,29 @@ void wsrep::server_state::sst_received(const wsrep::gtid& gtid, int error) state(lock, s_joined); } -void wsrep::server_state::wait_until_state( - enum wsrep::server_state::state state) const +void wsrep::server_state::initialized() { wsrep::unique_lock lock(mutex_); + wsrep::log_info() << "Server initialized"; + init_initialized_ = true; + if (sst_gtid_.is_undefined() == false && + server_service_.sst_before_init()) + { + lock.unlock(); + if (provider().sst_received(sst_gtid_, 0)) + { + throw wsrep::runtime_error("SST received failed"); + } + lock.lock(); + sst_gtid_ = wsrep::gtid::undefined(); + } + state(lock, s_initialized); +} + +void wsrep::server_state::wait_until_state( + wsrep::unique_lock& lock, + enum wsrep::server_state::state state) const +{ ++state_waiters_[state]; while (state_ != state) { @@ -158,24 +234,52 @@ void wsrep::server_state::on_connect() void wsrep::server_state::on_view(const wsrep::view& view) { - wsrep::log_info() << "================================================\nView:\n" - << "id: " << view.state_id() << "\n" - << "status: " << view.status() << "\n" - << "own_index: " << view.own_index() << "\n" - << "final: " << view.final() << "\n" - << "members"; + wsrep::log_info() + << "================================================\nView:\n" + << " id: " << view.state_id() << "\n" + << " status: " << view.status() << "\n" + << " own_index: " << view.own_index() << "\n" + << " final: " << view.final() << "\n" + << " members"; const std::vector& members(view.members()); for (std::vector::const_iterator i(members.begin()); i != members.end(); ++i) { - wsrep::log_info() << "id: " << i->id() << " " + wsrep::log_info() << " id: " << i->id() << " " << "name: " << i->name(); } wsrep::log_info() << "================================================="; - wsrep::unique_lock lock(mutex_); - if (view.final()) + if (view.status() == wsrep::view::primary) { - state(lock, s_disconnected); + wsrep::unique_lock lock(mutex_); + if (state_ == s_connected && view.members().size() == 1) + { + state(lock, s_joiner); + state(lock, s_initializing); + } + + if (init_initialized_ == false) + { + // DBMS has not been initialized yet + wait_until_state(lock, s_initialized); + } + + assert(init_initialized_); + + if (state_ == s_initialized) + { + state(lock, s_joined); + if (init_synced_) + { + state(lock, s_synced); + } + } + + if (view.final()) + { + state(lock, s_disconnected); + } + current_view_ = view; } } @@ -183,21 +287,21 @@ void wsrep::server_state::on_sync() { wsrep::log_info() << "Server " << name_ << " synced with group"; wsrep::unique_lock lock(mutex_); - /* Follow the path joiner -> joined -> synced to notify possible - waiters. */ - switch (state_) + if (server_service_.sst_before_init()) { - case s_connected: - state(lock, s_joiner); - case s_joiner: - state(lock, s_joined); - case s_joined: - state(lock, s_synced); - break; - default: - /* State */ - state(lock, s_synced); - }; + switch (state_) + { + case s_connected: + state(lock, s_joiner); + case s_joiner: + state(lock, s_initializing); + break; + default: + /* State */ + state(lock, s_synced); + }; + } + init_synced_ = true; } int wsrep::server_state::on_apply( @@ -400,6 +504,33 @@ wsrep::client_state* wsrep::server_state::find_streaming_applier( // Private + +int wsrep::server_state::desync(wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + ++desync_count_; + if (state_ == s_synced) + { + assert(desync_count_ == 0); + return provider_->desync(); + } + return 0; +} + +void wsrep::server_state::resync(wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + --desync_count_; + if (desync_count_ == 0) + { + if (provider_->resync()) + { + throw wsrep::runtime_error("Failed to resync"); + } + } +} + + void wsrep::server_state::state( wsrep::unique_lock& lock WSREP_UNUSED, enum wsrep::server_state::state state) @@ -412,7 +543,7 @@ void wsrep::server_state::state( { 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* ing */ { 0, 0, 0, 1, 0, 1, 0, 0, 0}, /* ized */ { 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* cted */ - { 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* jer */ + { 0, 1, 0, 0, 0, 1, 0, 0, 0}, /* jer */ { 0, 0, 0, 0, 0, 0, 0, 1, 1}, /* jed */ { 0, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */ { 0, 0, 0, 0, 0, 1, 1, 0, 1}, /* sed */ @@ -422,7 +553,9 @@ void wsrep::server_state::state( if (allowed[state_][state]) { wsrep::log_info() << "server " << name_ << " state change: " - << state_ << " -> " << state; + << to_c_string(state_) << " -> " + << to_c_string(state); + state_hist_.push_back(state_); state_ = state; cond_.notify_all(); while (state_waiters_[state_]) diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 3eb5342..b2ef338 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -339,8 +339,9 @@ namespace try { std::string req(server_state.prepare_for_sst()); - *sst_req = ::strdup(req.c_str()); - *sst_req_len = strlen(req.c_str()); + *sst_req = ::malloc(req.size() + 1); + memcpy(*sst_req, req.data(), req.size() + 1); + *sst_req_len = req.size() + 1; return WSREP_CB_SUCCESS; } catch (const wsrep::runtime_error& e) diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index 7c61ebf..9340ed1 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -122,9 +122,9 @@ BOOST_AUTO_TEST_CASE(server_state_state_strings) BOOST_REQUIRE(wsrep::to_string( wsrep::server_state::s_disconnected) == "disconnected"); BOOST_REQUIRE(wsrep::to_string( - wsrep::server_state::s_initializing) == "initilizing"); + wsrep::server_state::s_initializing) == "initializing"); BOOST_REQUIRE(wsrep::to_string( - wsrep::server_state::s_initialized) == "initilized"); + wsrep::server_state::s_initialized) == "initialized"); BOOST_REQUIRE(wsrep::to_string( wsrep::server_state::s_connected) == "connected"); BOOST_REQUIRE(wsrep::to_string(