diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index a40a096..ae7fe64 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -491,6 +491,7 @@ namespace wsrep , init_synced_() , sst_gtid_() , desync_count_() + , pause_count_() , pause_seqno_() , desynced_on_pause_() , streaming_appliers_() @@ -527,6 +528,7 @@ namespace wsrep bool init_synced_; wsrep::gtid sst_gtid_; size_t desync_count_; + size_t pause_count_; wsrep::seqno pause_seqno_; bool desynced_on_pause_; typedef std::map, wsrep::client_state*> streaming_appliers_map; diff --git a/src/server_state.cpp b/src/server_state.cpp index bcaa0b6..f010c55 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -253,10 +253,14 @@ wsrep::server_state::~server_state() wsrep::seqno wsrep::server_state::pause() { wsrep::unique_lock lock(mutex_); - if (pause_seqno_.is_undefined() == false) + // Disallow concurrent calls to pause to in order to have non-concurrent + // access to desynced_on_pause_ which is checked in resume() call. + while (pause_count_ > 0) { - throw wsrep::runtime_error("Trying to pause already paused provider"); + cond_.wait(lock); } + ++pause_count_; + assert(pause_seqno_.is_undefined()); if (state_ == s_synced) { if (desync(lock)) @@ -265,10 +269,14 @@ wsrep::seqno wsrep::server_state::pause() } desynced_on_pause_ = true; } + lock.unlock(); pause_seqno_ = provider_->pause(); + lock.lock(); if (pause_seqno_.is_undefined()) { + --pause_count_; resync(); + desynced_on_pause_ = false; } return pause_seqno_; } @@ -277,6 +285,7 @@ void wsrep::server_state::resume() { wsrep::unique_lock lock(mutex_); assert(pause_seqno_.is_undefined() == false); + assert(pause_count_ == 1); if (desynced_on_pause_) { resync(lock); @@ -287,6 +296,8 @@ void wsrep::server_state::resume() throw wsrep::runtime_error("Failed to resume provider"); } pause_seqno_ = wsrep::seqno::undefined(); + --pause_count_; + cond_.notify_all(); } std::string wsrep::server_state::prepare_for_sst() @@ -492,6 +503,8 @@ void wsrep::server_state::on_sync() { switch (state_) { + case s_synced: + break; case s_connected: state(lock, s_joiner); // fall through @@ -588,24 +601,24 @@ int wsrep::server_state::desync(wsrep::unique_lock& lock) { assert(lock.owns_lock()); ++desync_count_; - if (state_ == s_synced) + lock.unlock(); + int ret(provider_->desync()); + lock.lock(); + if (ret) { - assert(desync_count_ == 0); - return provider_->desync(); + --desync_count_; } - return 0; + return ret; } void wsrep::server_state::resync(wsrep::unique_lock& lock) { assert(lock.owns_lock()); + assert(desync_count_ > 0); --desync_count_; - if (desync_count_ == 0) + if (provider_->resync()) { - if (provider_->resync()) - { - throw wsrep::runtime_error("Failed to resync"); - } + throw wsrep::runtime_error("Failed to resync"); } }