From 3c334fbc875dbabbd3ba213b361ebe2d93de35f8 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Sun, 29 Apr 2018 18:42:36 +0300 Subject: [PATCH] More strict server context state management. --- src/client_context.cpp | 2 +- src/client_context.hpp | 2 + src/dbms_simulator.cpp | 10 ++--- src/server_context.cpp | 79 ++++++++++++++++++++++++++++++++++--- src/server_context.hpp | 40 +++++++++++++++---- src/transaction_context.hpp | 2 +- src/view.hpp | 2 +- 7 files changed, 115 insertions(+), 22 deletions(-) diff --git a/src/client_context.cpp b/src/client_context.cpp index ef6e258..ed3c393 100644 --- a/src/client_context.cpp +++ b/src/client_context.cpp @@ -89,7 +89,7 @@ void trrep::client_context::state( enum trrep::client_context::state state) { assert(lock.owns_lock()); - char allowed[state_max_][state_max_] = + static const char allowed[state_max_][state_max_] = { /* idle exec quit */ { 0, 1, 1}, /* idle */ diff --git a/src/client_context.hpp b/src/client_context.hpp index 3f0c6d3..56d17fd 100644 --- a/src/client_context.hpp +++ b/src/client_context.hpp @@ -206,11 +206,13 @@ namespace trrep assert(state_ == s_exec); return transaction_.before_prepare(); } + int after_prepare() { assert(state_ == s_exec); return transaction_.after_prepare(); } + int before_commit() { assert(state_ == s_exec); diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index 94bad25..0e65fbb 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -316,7 +316,7 @@ void dbms_server::applier_thread() dbms_client applier(*this, client_id, trrep::client_context::m_applier, 0); wsrep_status_t ret(provider().run_applier(&applier)); - std::cerr << "Applier thread exited with error code " << ret << "\n"; + std::cout << "Applier thread exited with error code " << ret << "\n"; } trrep::client_context* dbms_server::local_client_context() @@ -391,12 +391,12 @@ void dbms_simulator::start() dbms_server& server(*it.first->second); std::string server_options(params_.wsrep_provider_options); - // server_options += "; base_port=" + server_port(i); + if (server.load_provider(params_.wsrep_provider, server_options)) { throw trrep::runtime_error("Failed to load provider"); } - if (server.provider().connect("sim_cluster", cluster_address, "", + if (server.connect("sim_cluster", cluster_address, "", i == 0)) { throw trrep::runtime_error("Failed to connect"); @@ -427,7 +427,7 @@ void dbms_simulator::stop() std::cout << stats(); std::cout << "######## Stats ############\n"; // REMOVEME: Temporary shortcut - exit(0); + // exit(0); for (auto& i : servers_) { dbms_server& server(*i.second); @@ -439,7 +439,7 @@ void dbms_simulator::stop() std::cout << sv.name() << " = " << sv.value() << "\n"; }); - server.provider().disconnect(); + server.disconnect(); server.wait_until_state(trrep::server_context::s_disconnected); server.stop_applier(); } diff --git a/src/server_context.cpp b/src/server_context.cpp index d832482..907d2f8 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -6,6 +6,7 @@ #include "client_context.hpp" #include "transaction_context.hpp" #include "view.hpp" +#include "compiler.hpp" // Todo: refactor into provider factory #include "mock_provider.hpp" @@ -15,6 +16,8 @@ #include +#include + namespace { @@ -206,6 +209,24 @@ int trrep::server_context::load_provider(const std::string& provider_spec, return 0; } +int trrep::server_context::connect(const std::string& cluster_name, + const std::string& cluster_address, + const std::string& state_donor, + bool bootstrap) +{ + return provider().connect(cluster_name, cluster_address, state_donor, + bootstrap); +} + +int trrep::server_context::disconnect() +{ + { + trrep::unique_lock lock(mutex_); + state(lock, s_disconnecting); + } + return provider().disconnect(); +} + trrep::server_context::~server_context() { delete provider_; @@ -220,18 +241,20 @@ void trrep::server_context::wait_until_state( enum trrep::server_context::state state) const { trrep::unique_lock lock(mutex_); + ++state_waiters_[state]; while (state_ != state) { cond_.wait(lock); } + --state_waiters_[state]; + cond_.notify_all(); } void trrep::server_context::on_connect() { std::cout << "Server " << name_ << " connected to cluster" << "\n"; trrep::unique_lock lock(mutex_); - state_ = s_connected; - cond_.notify_all(); + state(lock, s_connected); } void trrep::server_context::on_view(const trrep::view& view) @@ -253,8 +276,7 @@ void trrep::server_context::on_view(const trrep::view& view) trrep::unique_lock lock(mutex_); if (view.final()) { - state_ = s_disconnected; - cond_.notify_all(); + state(lock, s_disconnected); } } @@ -262,8 +284,10 @@ void trrep::server_context::on_sync() { std::cout << "Synced with group" << "\n"; trrep::unique_lock lock(mutex_); - state_ = s_synced; - cond_.notify_all(); + if (state_ != s_synced) + { + state(lock, s_synced); + } } int trrep::server_context::on_apply( @@ -312,3 +336,46 @@ bool trrep::server_context::statement_allowed_for_streaming( /* Streaming not implemented yet. */ return false; } + +// Private + +void trrep::server_context::state( + trrep::unique_lock& lock TRREP_UNUSED, + enum trrep::server_context::state state) +{ + assert(lock.owns_lock()); + static const char allowed[n_states_][n_states_] = + { + /* dis, ing, ized, cted, jer, jed, dor, sed, ding */ + { 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */ + { 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* ing */ + { 0, 0, 0, 1, 0, 1, 0, 0, 0}, /* ized */ + { 0, 0, 0, 0, 1, 0, 0, 1, 0}, /* cted */ + { 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* jer */ + { 0, 0, 0, 0, 0, 0, 0, 1, 1}, /* jed */ + { 0, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */ + { 0, 0, 0, 0, 0, 1, 1, 0, 1}, /* sed */ + { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */ + }; + + if (allowed[state_][state]) + { + std::cout << "server " << name_ << " state change: " + << state_ << " -> " << state; + state_ = state; + cond_.notify_all(); + while (state_waiters_[state_]) + { + cond_.wait(lock); + } + } + else + { + std::ostringstream os; + os << "server: " << name_ << " unallowed state transition: " + << trrep::to_string(state_) << " -> " << trrep::to_string(state); + std::cerr << os.str() << "\n"; + ::abort(); + // throw trrep::runtime_error(os.str()); + } +} diff --git a/src/server_context.hpp b/src/server_context.hpp index f6db5c4..85b700e 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -67,6 +67,7 @@ #include "wsrep_api.h" #include +#include namespace trrep { @@ -79,7 +80,7 @@ namespace trrep /*! \class Server Context * - * + * */ class server_context { @@ -135,6 +136,9 @@ namespace trrep /*! Server is disconnecting from group */ s_disconnecting }; + + static const int n_states_ = s_disconnecting + 1; + /*! * Rollback Mode enumeration */ @@ -215,8 +219,12 @@ namespace trrep return *provider_; } + int connect(const std::string& cluster_name, + const std::string& cluster_address, + const std::string& state_donor, + bool bootstrap); - + int disconnect(); /*! * Virtual method which will be called when the server * has been joined to the cluster. Must be provided by @@ -248,12 +256,6 @@ namespace trrep /*! * Wait until server reaches given state. - * - * \todo Waiting for transitional states may not be reliable. - * Should introduce array of waiter counts per state, - * on state change the state changing thread is - * not allowed to proceed until all waiters have - * woken up. */ void wait_until_state(trrep::server_context::state) const; @@ -356,6 +358,7 @@ namespace trrep : mutex_(mutex) , cond_(cond) , state_(s_disconnected) + , state_waiters_(n_states_) , provider_() , name_(name) , id_(id) @@ -369,9 +372,12 @@ namespace trrep server_context(const server_context&); server_context& operator=(const server_context&); + void state(trrep::unique_lock&, enum state); + trrep::mutex& mutex_; trrep::condition_variable& cond_; enum state state_; + mutable std::vector state_waiters_; trrep::provider* provider_; std::string name_; std::string id_; @@ -379,6 +385,24 @@ namespace trrep std::string working_dir_; enum rollback_mode rollback_mode_; }; + + static inline std::string to_string(enum trrep::server_context::state state) + { + switch (state) + { + case trrep::server_context::s_disconnected: return "disconnected"; + case trrep::server_context::s_initializing: return "initilizing"; + case trrep::server_context::s_initialized: return "initilized"; + case trrep::server_context::s_connected: return "connected"; + case trrep::server_context::s_joiner: return "joiner"; + case trrep::server_context::s_joined: return "joined"; + case trrep::server_context::s_donor: return "donor"; + case trrep::server_context::s_synced: return "synced"; + case trrep::server_context::s_disconnecting: return "disconnecting"; + } + return "unknown"; + } + } #endif // TRREP_SERVER_CONTEXT_HPP diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp index 7f92484..172c8ef 100644 --- a/src/transaction_context.hpp +++ b/src/transaction_context.hpp @@ -170,8 +170,8 @@ namespace trrep case trrep::transaction_context::s_aborted: return "aborted"; case trrep::transaction_context::s_must_replay: return "must_replay"; case trrep::transaction_context::s_replaying: return "replaying"; - default: return "unknown"; } + return "unknown"; } } diff --git a/src/view.hpp b/src/view.hpp index 3c7c8cc..ca50fe7 100644 --- a/src/view.hpp +++ b/src/view.hpp @@ -73,7 +73,7 @@ namespace trrep // bool final() const { - return (my_idx_ == -1); + return (members_.empty() && my_idx_ == -1); } private: