1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-03 16:22:35 +03:00

* Return provider status from provider connect

* Call to get server status variables along with provider variables
* Deal with intermediate non-prims
This commit is contained in:
Teemu Ollakka
2018-06-27 15:36:52 +03:00
parent d659c6bebd
commit fd9cf87141
9 changed files with 171 additions and 35 deletions

View File

@ -223,7 +223,7 @@ namespace wsrep
{ } { }
virtual ~provider() { } virtual ~provider() { }
// Provider state management // Provider state management
virtual int connect(const std::string& cluster_name, virtual enum status connect(const std::string& cluster_name,
const std::string& cluster_url, const std::string& cluster_url,
const std::string& state_donor, const std::string& state_donor,
bool bootstrap) = 0; bool bootstrap) = 0;

View File

@ -439,6 +439,10 @@ namespace wsrep
assert(lock.owns_lock()); assert(lock.owns_lock());
return state_; return state_;
} }
std::vector<wsrep::provider::status_variable> status() const;
/** /**
* Set server wide wsrep debug logging level. * Set server wide wsrep debug logging level.
* *
@ -459,6 +463,7 @@ namespace wsrep
void debug_log_filter(const std::string&); void debug_log_filter(const std::string&);
wsrep::mutex& mutex() { return mutex_; } wsrep::mutex& mutex() { return mutex_; }
protected: protected:
/** Server state constructor /** Server state constructor
* *

View File

@ -15,6 +15,52 @@
namespace namespace
{ {
std::string cluster_status_string(enum wsrep::server_state::state state)
{
switch (state)
{
case wsrep::server_state::s_joined:
case wsrep::server_state::s_synced:
return "Primary";
default:
return "non-Primary";
}
}
std::string cluster_size_string(enum wsrep::server_state::state state,
const wsrep::view& current_view)
{
std::ostringstream oss;
switch (state)
{
case wsrep::server_state::s_joined:
case wsrep::server_state::s_synced:
oss << current_view.members().size();
break;
default:
oss << 0;
break;
}
return oss.str();
}
std::string local_index_string(enum wsrep::server_state::state state,
const wsrep::view& current_view)
{
std::ostringstream oss;
switch (state)
{
case wsrep::server_state::s_joined:
case wsrep::server_state::s_synced:
oss << current_view.own_index();
break;
default:
oss << -1;
break;
}
return oss.str();
}
int apply_write_set(wsrep::server_state& server_state, int apply_write_set(wsrep::server_state& server_state,
wsrep::client_state& client_state, wsrep::client_state& client_state,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
@ -249,6 +295,20 @@ wsrep::server_state::~server_state()
delete provider_; delete provider_;
} }
std::vector<wsrep::provider::status_variable>
wsrep::server_state::status() const
{
typedef wsrep::provider::status_variable sv;
std::vector<sv> ret(provider_->status());
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
ret.push_back(sv("cluster_status", cluster_status_string(state_)));
ret.push_back(sv("cluster_size",
cluster_size_string(state_, current_view_)));
ret.push_back(sv("local_index",
local_index_string(state_, current_view_)));
return ret;
}
wsrep::seqno wsrep::server_state::pause() wsrep::seqno wsrep::server_state::pause()
{ {
@ -339,12 +399,26 @@ void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error)
void wsrep::server_state::sst_transferred(const wsrep::gtid& gtid) void wsrep::server_state::sst_transferred(const wsrep::gtid& gtid)
{ {
wsrep::log_info() << "SST transferred"; wsrep::log_info() << "SST transferred: " << gtid;
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
sst_gtid_ = gtid; sst_gtid_ = gtid;
if (server_service_.sst_before_init()) if (server_service_.sst_before_init())
{ {
state(lock, s_initializing); state(lock, s_initializing);
// Incremental state transfer was received
// TODO: Sanity checks for gtid continuity
if (init_initialized_)
{
state(lock, s_initialized);
state(lock, s_joined);
lock.unlock();
// TODO: This should not be here, sst_received() should be
// called instead
if (provider().sst_received(sst_gtid_, 0))
{
throw wsrep::runtime_error("SST received failed");
}
}
} }
else else
{ {
@ -462,7 +536,9 @@ void wsrep::server_state::on_view(const wsrep::view& view)
if (view.status() == wsrep::view::primary) if (view.status() == wsrep::view::primary)
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(view.final() == false);
current_view_ = view; current_view_ = view;
// Cluster was bootstrapped
if (state_ == s_connected && view.members().size() == 1) if (state_ == s_connected && view.members().size() == 1)
{ {
state(lock, s_joiner); state(lock, s_joiner);
@ -485,11 +561,32 @@ void wsrep::server_state::on_view(const wsrep::view& view)
state(lock, s_synced); state(lock, s_synced);
} }
} }
}
else if (view.status() == wsrep::view::non_primary)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
wsrep::log_info() << "Non-primary view";
if (state_ == s_disconnecting)
{
if (view.final()) if (view.final())
{ {
state(lock, s_disconnected); state(lock, s_disconnected);
} }
else
{
wsrep::log_debug() << "Ignoring non-prim while disconnecting";
}
}
else if (state_ != s_connected)
{
state(lock, s_connected);
}
}
else
{
assert(view.final());
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
state(lock, s_disconnected);
} }
server_service_.log_view(view); server_service_.log_view(view);
} }
@ -499,7 +596,8 @@ void wsrep::server_state::on_sync()
wsrep::log_info() << "Server " << name_ << " synced with group"; wsrep::log_info() << "Server " << name_ << " synced with group";
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (server_service_.sst_before_init()) // Initial sync
if (server_service_.sst_before_init() && init_synced_ == false)
{ {
switch (state_) switch (state_)
{ {
@ -519,6 +617,10 @@ void wsrep::server_state::on_sync()
state(lock, s_synced); state(lock, s_synced);
}; };
} }
else
{
state(lock, s_synced);
}
init_synced_ = true; init_synced_ = true;
} }
@ -634,11 +736,11 @@ void wsrep::server_state::state(
{ 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */ { 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */
{ 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* ing */ { 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* ing */
{ 0, 0, 0, 1, 0, 1, 0, 0, 0}, /* ized */ { 0, 0, 0, 1, 0, 1, 0, 0, 0}, /* ized */
{ 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* cted */ { 0, 0, 0, 0, 1, 0, 0, 1, 0}, /* cted */
{ 0, 1, 0, 0, 0, 1, 0, 0, 0}, /* jer */ { 0, 1, 0, 0, 0, 1, 0, 0, 0}, /* jer */
{ 0, 0, 0, 0, 0, 0, 0, 1, 1}, /* jed */ { 0, 0, 0, 0, 0, 0, 0, 1, 1}, /* jed */
{ 0, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */ { 0, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */
{ 0, 0, 0, 0, 0, 1, 1, 0, 1}, /* sed */ { 0, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */
{ 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */ { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */
}; };

View File

@ -967,7 +967,7 @@ void wsrep::transaction::debug_log_state(
const char* context) const const char* context) const
{ {
WSREP_TC_LOG_DEBUG( WSREP_TC_LOG_DEBUG(
0, context 1, context
<< ": server: " << client_state_.server_state().name() << ": server: " << client_state_.server_state().name()
<< " client: " << client_state_.id().get() << " client: " << client_state_.id().get()
<< " trx: " << int64_t(id_.get()) << " trx: " << int64_t(id_.get())

View File

@ -500,25 +500,17 @@ wsrep::wsrep_provider_v26::~wsrep_provider_v26()
wsrep_unload(wsrep_); wsrep_unload(wsrep_);
} }
int wsrep::wsrep_provider_v26::connect( enum wsrep::provider::status wsrep::wsrep_provider_v26::connect(
const std::string& cluster_name, const std::string& cluster_name,
const std::string& cluster_url, const std::string& cluster_url,
const std::string& state_donor, const std::string& state_donor,
bool bootstrap) bool bootstrap)
{ {
int ret(0); return map_return_value(wsrep_->connect(wsrep_,
wsrep_status_t wret;
if ((wret = wsrep_->connect(wsrep_,
cluster_name.c_str(), cluster_name.c_str(),
cluster_url.c_str(), cluster_url.c_str(),
state_donor.c_str(), state_donor.c_str(),
bootstrap)) != WSREP_OK) bootstrap));
{
std::cerr << "Failed to connect cluster: "
<< wret << "\n";
ret = 1;
}
return ret;
} }
int wsrep::wsrep_provider_v26::disconnect() int wsrep::wsrep_provider_v26::disconnect()

View File

@ -18,7 +18,8 @@ namespace wsrep
wsrep_provider_v26(wsrep::server_state&, const std::string&, wsrep_provider_v26(wsrep::server_state&, const std::string&,
const std::string&); const std::string&);
~wsrep_provider_v26(); ~wsrep_provider_v26();
int connect(const std::string&, const std::string&, const std::string&, enum wsrep::provider::status
connect(const std::string&, const std::string&, const std::string&,
bool); bool);
int disconnect(); int disconnect();
int capabilities() const; int capabilities() const;

View File

@ -39,9 +39,10 @@ namespace wsrep
, rollback_fragments_() , rollback_fragments_()
{ } { }
int connect(const std::string&, const std::string&, const std::string&, enum wsrep::provider::status
connect(const std::string&, const std::string&, const std::string&,
bool) bool)
{ return 0; } { return wsrep::provider::success; }
int disconnect() { return 0; } int disconnect() { return 0; }
int capabilities() const { return 0; } int capabilities() const { return 0; }
int desync() { return 0; } int desync() { return 0; }

View File

@ -23,6 +23,7 @@ namespace wsrep
enum wsrep::server_state::rollback_mode rollback_mode) enum wsrep::server_state::rollback_mode rollback_mode)
: wsrep::server_state(mutex_, cond_, *this, : wsrep::server_state(mutex_, cond_, *this,
name, id, "", "./", 1, rollback_mode) name, id, "", "./", 1, rollback_mode)
, sst_before_init_()
, mutex_() , mutex_()
, cond_() , cond_()
, provider_(*this) , provider_(*this)
@ -63,11 +64,12 @@ namespace wsrep
} }
void log_view(const wsrep::view&) { } void log_view(const wsrep::view&) { }
void on_connect() WSREP_OVERRIDE { } // void on_connect(const wsrep::gtid& ) WSREP_OVERRIDE { }
void wait_until_connected() WSREP_OVERRIDE { } // void wait_until_connected() WSREP_OVERRIDE { }
void on_view(const wsrep::view&) WSREP_OVERRIDE { } // void on_view(const wsrep::view&) WSREP_OVERRIDE { }
void on_sync() WSREP_OVERRIDE { } // void on_sync() WSREP_OVERRIDE { }
bool sst_before_init() const WSREP_OVERRIDE { return false; } bool sst_before_init() const WSREP_OVERRIDE
{ return sst_before_init_; }
std::string sst_request() WSREP_OVERRIDE { return ""; } std::string sst_request() WSREP_OVERRIDE { return ""; }
int start_sst(const std::string&, int start_sst(const std::string&,
const wsrep::gtid&, const wsrep::gtid&,
@ -78,6 +80,9 @@ namespace wsrep
client_state.before_rollback(); client_state.before_rollback();
client_state.after_rollback(); client_state.after_rollback();
} }
bool sst_before_init_;
private: private:
wsrep::default_mutex mutex_; wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_; wsrep::default_condition_variable cond_;

View File

@ -30,6 +30,15 @@ namespace
wsrep::ws_handle ws_handle; wsrep::ws_handle ws_handle;
wsrep::ws_meta ws_meta; wsrep::ws_meta ws_meta;
}; };
struct sst_first_server_fixture : applying_server_fixture
{
sst_first_server_fixture()
: applying_server_fixture()
{
sc.sst_before_init_ = true;
}
};
} }
// Test on_apply() method for 1pc // Test on_apply() method for 1pc
@ -140,3 +149,24 @@ BOOST_AUTO_TEST_CASE(server_state_state_strings)
BOOST_REQUIRE(wsrep::to_string( BOOST_REQUIRE(wsrep::to_string(
static_cast<enum wsrep::server_state::state>(0xff)) == "unknown"); static_cast<enum wsrep::server_state::state>(0xff)) == "unknown");
} }
BOOST_FIXTURE_TEST_CASE(server_state_sst_first_boostrap,
sst_first_server_fixture)
{
wsrep::id cluster_id("1");
wsrep::gtid state_id(cluster_id, wsrep::seqno(0));
std::vector<wsrep::view::member> members;
members.push_back(wsrep::view::member(wsrep::id("1"), "name", ""));
wsrep::view bootstrap_view(state_id,
wsrep::seqno(1),
wsrep::view::primary,
0,
0,
1,
members);
BOOST_REQUIRE(sc.connect("cluster", "local", "0", false) == 0);
sc.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0)));
// @todo Blocks on state wait, need to figure out a way to avoid
// that.
// sc.on_view(bootstrap_view);
}