1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

More strict server context state management.

This commit is contained in:
Teemu Ollakka
2018-04-29 18:42:36 +03:00
parent c301c0896b
commit 3c334fbc87
7 changed files with 115 additions and 22 deletions

View File

@ -89,7 +89,7 @@ void trrep::client_context::state(
enum trrep::client_context::state state) enum trrep::client_context::state state)
{ {
assert(lock.owns_lock()); assert(lock.owns_lock());
char allowed[state_max_][state_max_] = static const char allowed[state_max_][state_max_] =
{ {
/* idle exec quit */ /* idle exec quit */
{ 0, 1, 1}, /* idle */ { 0, 1, 1}, /* idle */

View File

@ -206,11 +206,13 @@ namespace trrep
assert(state_ == s_exec); assert(state_ == s_exec);
return transaction_.before_prepare(); return transaction_.before_prepare();
} }
int after_prepare() int after_prepare()
{ {
assert(state_ == s_exec); assert(state_ == s_exec);
return transaction_.after_prepare(); return transaction_.after_prepare();
} }
int before_commit() int before_commit()
{ {
assert(state_ == s_exec); assert(state_ == s_exec);

View File

@ -316,7 +316,7 @@ void dbms_server::applier_thread()
dbms_client applier(*this, client_id, dbms_client applier(*this, client_id,
trrep::client_context::m_applier, 0); trrep::client_context::m_applier, 0);
wsrep_status_t ret(provider().run_applier(&applier)); wsrep_status_t ret(provider().run_applier(&applier));
std::cerr << "Applier thread exited with error code " << ret << "\n"; std::cout << "Applier thread exited with error code " << ret << "\n";
} }
trrep::client_context* dbms_server::local_client_context() trrep::client_context* dbms_server::local_client_context()
@ -391,12 +391,12 @@ void dbms_simulator::start()
dbms_server& server(*it.first->second); dbms_server& server(*it.first->second);
std::string server_options(params_.wsrep_provider_options); std::string server_options(params_.wsrep_provider_options);
// server_options += "; base_port=" + server_port(i);
if (server.load_provider(params_.wsrep_provider, server_options)) if (server.load_provider(params_.wsrep_provider, server_options))
{ {
throw trrep::runtime_error("Failed to load provider"); throw trrep::runtime_error("Failed to load provider");
} }
if (server.provider().connect("sim_cluster", cluster_address, "", if (server.connect("sim_cluster", cluster_address, "",
i == 0)) i == 0))
{ {
throw trrep::runtime_error("Failed to connect"); throw trrep::runtime_error("Failed to connect");
@ -427,7 +427,7 @@ void dbms_simulator::stop()
std::cout << stats(); std::cout << stats();
std::cout << "######## Stats ############\n"; std::cout << "######## Stats ############\n";
// REMOVEME: Temporary shortcut // REMOVEME: Temporary shortcut
exit(0); // exit(0);
for (auto& i : servers_) for (auto& i : servers_)
{ {
dbms_server& server(*i.second); dbms_server& server(*i.second);
@ -439,7 +439,7 @@ void dbms_simulator::stop()
std::cout << sv.name() << " = " << sv.value() << "\n"; std::cout << sv.name() << " = " << sv.value() << "\n";
}); });
server.provider().disconnect(); server.disconnect();
server.wait_until_state(trrep::server_context::s_disconnected); server.wait_until_state(trrep::server_context::s_disconnected);
server.stop_applier(); server.stop_applier();
} }

View File

@ -6,6 +6,7 @@
#include "client_context.hpp" #include "client_context.hpp"
#include "transaction_context.hpp" #include "transaction_context.hpp"
#include "view.hpp" #include "view.hpp"
#include "compiler.hpp"
// Todo: refactor into provider factory // Todo: refactor into provider factory
#include "mock_provider.hpp" #include "mock_provider.hpp"
@ -15,6 +16,8 @@
#include <cassert> #include <cassert>
#include <sstream>
namespace namespace
{ {
@ -206,6 +209,24 @@ int trrep::server_context::load_provider(const std::string& provider_spec,
return 0; return 0;
} }
int trrep::server_context::connect(const std::string& cluster_name,
const std::string& cluster_address,
const std::string& state_donor,
bool bootstrap)
{
return provider().connect(cluster_name, cluster_address, state_donor,
bootstrap);
}
int trrep::server_context::disconnect()
{
{
trrep::unique_lock<trrep::mutex> lock(mutex_);
state(lock, s_disconnecting);
}
return provider().disconnect();
}
trrep::server_context::~server_context() trrep::server_context::~server_context()
{ {
delete provider_; delete provider_;
@ -220,18 +241,20 @@ void trrep::server_context::wait_until_state(
enum trrep::server_context::state state) const enum trrep::server_context::state state) const
{ {
trrep::unique_lock<trrep::mutex> lock(mutex_); trrep::unique_lock<trrep::mutex> lock(mutex_);
++state_waiters_[state];
while (state_ != state) while (state_ != state)
{ {
cond_.wait(lock); cond_.wait(lock);
} }
--state_waiters_[state];
cond_.notify_all();
} }
void trrep::server_context::on_connect() void trrep::server_context::on_connect()
{ {
std::cout << "Server " << name_ << " connected to cluster" << "\n"; std::cout << "Server " << name_ << " connected to cluster" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_); trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_connected; state(lock, s_connected);
cond_.notify_all();
} }
void trrep::server_context::on_view(const trrep::view& view) void trrep::server_context::on_view(const trrep::view& view)
@ -253,8 +276,7 @@ void trrep::server_context::on_view(const trrep::view& view)
trrep::unique_lock<trrep::mutex> lock(mutex_); trrep::unique_lock<trrep::mutex> lock(mutex_);
if (view.final()) if (view.final())
{ {
state_ = s_disconnected; state(lock, s_disconnected);
cond_.notify_all();
} }
} }
@ -262,8 +284,10 @@ void trrep::server_context::on_sync()
{ {
std::cout << "Synced with group" << "\n"; std::cout << "Synced with group" << "\n";
trrep::unique_lock<trrep::mutex> lock(mutex_); trrep::unique_lock<trrep::mutex> lock(mutex_);
state_ = s_synced; if (state_ != s_synced)
cond_.notify_all(); {
state(lock, s_synced);
}
} }
int trrep::server_context::on_apply( int trrep::server_context::on_apply(
@ -312,3 +336,46 @@ bool trrep::server_context::statement_allowed_for_streaming(
/* Streaming not implemented yet. */ /* Streaming not implemented yet. */
return false; return false;
} }
// Private
void trrep::server_context::state(
trrep::unique_lock<trrep::mutex>& lock TRREP_UNUSED,
enum trrep::server_context::state state)
{
assert(lock.owns_lock());
static const char allowed[n_states_][n_states_] =
{
/* dis, ing, ized, cted, jer, jed, dor, sed, ding */
{ 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */
{ 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* ing */
{ 0, 0, 0, 1, 0, 1, 0, 0, 0}, /* ized */
{ 0, 0, 0, 0, 1, 0, 0, 1, 0}, /* cted */
{ 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* jer */
{ 0, 0, 0, 0, 0, 0, 0, 1, 1}, /* jed */
{ 0, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */
{ 0, 0, 0, 0, 0, 1, 1, 0, 1}, /* sed */
{ 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */
};
if (allowed[state_][state])
{
std::cout << "server " << name_ << " state change: "
<< state_ << " -> " << state;
state_ = state;
cond_.notify_all();
while (state_waiters_[state_])
{
cond_.wait(lock);
}
}
else
{
std::ostringstream os;
os << "server: " << name_ << " unallowed state transition: "
<< trrep::to_string(state_) << " -> " << trrep::to_string(state);
std::cerr << os.str() << "\n";
::abort();
// throw trrep::runtime_error(os.str());
}
}

View File

@ -67,6 +67,7 @@
#include "wsrep_api.h" #include "wsrep_api.h"
#include <string> #include <string>
#include <vector>
namespace trrep namespace trrep
{ {
@ -79,7 +80,7 @@ namespace trrep
/*! \class Server Context /*! \class Server Context
* *
* *
*/ */
class server_context class server_context
{ {
@ -135,6 +136,9 @@ namespace trrep
/*! Server is disconnecting from group */ /*! Server is disconnecting from group */
s_disconnecting s_disconnecting
}; };
static const int n_states_ = s_disconnecting + 1;
/*! /*!
* Rollback Mode enumeration * Rollback Mode enumeration
*/ */
@ -215,8 +219,12 @@ namespace trrep
return *provider_; return *provider_;
} }
int connect(const std::string& cluster_name,
const std::string& cluster_address,
const std::string& state_donor,
bool bootstrap);
int disconnect();
/*! /*!
* Virtual method which will be called when the server * Virtual method which will be called when the server
* has been joined to the cluster. Must be provided by * has been joined to the cluster. Must be provided by
@ -248,12 +256,6 @@ namespace trrep
/*! /*!
* Wait until server reaches given state. * Wait until server reaches given state.
*
* \todo Waiting for transitional states may not be reliable.
* Should introduce array of waiter counts per state,
* on state change the state changing thread is
* not allowed to proceed until all waiters have
* woken up.
*/ */
void wait_until_state(trrep::server_context::state) const; void wait_until_state(trrep::server_context::state) const;
@ -356,6 +358,7 @@ namespace trrep
: mutex_(mutex) : mutex_(mutex)
, cond_(cond) , cond_(cond)
, state_(s_disconnected) , state_(s_disconnected)
, state_waiters_(n_states_)
, provider_() , provider_()
, name_(name) , name_(name)
, id_(id) , id_(id)
@ -369,9 +372,12 @@ namespace trrep
server_context(const server_context&); server_context(const server_context&);
server_context& operator=(const server_context&); server_context& operator=(const server_context&);
void state(trrep::unique_lock<trrep::mutex>&, enum state);
trrep::mutex& mutex_; trrep::mutex& mutex_;
trrep::condition_variable& cond_; trrep::condition_variable& cond_;
enum state state_; enum state state_;
mutable std::vector<int> state_waiters_;
trrep::provider* provider_; trrep::provider* provider_;
std::string name_; std::string name_;
std::string id_; std::string id_;
@ -379,6 +385,24 @@ namespace trrep
std::string working_dir_; std::string working_dir_;
enum rollback_mode rollback_mode_; enum rollback_mode rollback_mode_;
}; };
static inline std::string to_string(enum trrep::server_context::state state)
{
switch (state)
{
case trrep::server_context::s_disconnected: return "disconnected";
case trrep::server_context::s_initializing: return "initilizing";
case trrep::server_context::s_initialized: return "initilized";
case trrep::server_context::s_connected: return "connected";
case trrep::server_context::s_joiner: return "joiner";
case trrep::server_context::s_joined: return "joined";
case trrep::server_context::s_donor: return "donor";
case trrep::server_context::s_synced: return "synced";
case trrep::server_context::s_disconnecting: return "disconnecting";
}
return "unknown";
}
} }
#endif // TRREP_SERVER_CONTEXT_HPP #endif // TRREP_SERVER_CONTEXT_HPP

View File

@ -170,8 +170,8 @@ namespace trrep
case trrep::transaction_context::s_aborted: return "aborted"; case trrep::transaction_context::s_aborted: return "aborted";
case trrep::transaction_context::s_must_replay: return "must_replay"; case trrep::transaction_context::s_must_replay: return "must_replay";
case trrep::transaction_context::s_replaying: return "replaying"; case trrep::transaction_context::s_replaying: return "replaying";
default: return "unknown";
} }
return "unknown";
} }
} }

View File

@ -73,7 +73,7 @@ namespace trrep
// //
bool final() const bool final() const
{ {
return (my_idx_ == -1); return (members_.empty() && my_idx_ == -1);
} }
private: private: