mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-22 23:21:53 +03:00
Recover current view from state after SST.
When member joins the group and needs to receive an SST it won't receive the corresponding menbership view event because the SST happens after the event and will already include the effects of all events ordered before it. The view then must be recovered from the received state. Minor renames and cleanups. References codership/wsrep-lib#18
This commit is contained in:
@ -59,6 +59,7 @@ namespace db
|
||||
{ }
|
||||
void start();
|
||||
wsrep::client_state& client_state() { return client_state_; }
|
||||
wsrep::client_service& client_service() { return client_service_; }
|
||||
bool do_2pc() const { return false; }
|
||||
private:
|
||||
friend class db::server_state;
|
||||
|
@ -26,6 +26,7 @@
|
||||
|
||||
db::server_service::server_service(db::server& server)
|
||||
: server_(server)
|
||||
, logged_view_()
|
||||
{ }
|
||||
|
||||
wsrep::storage_service* db::server_service::storage_service(
|
||||
@ -106,9 +107,25 @@ void db::server_service::log_dummy_write_set(
|
||||
}
|
||||
|
||||
void db::server_service::log_view(wsrep::high_priority_service*,
|
||||
const wsrep::view&)
|
||||
const wsrep::view& v)
|
||||
{
|
||||
wsrep::log_info() << "View";
|
||||
wsrep::log_info() << "View:\n" << v;
|
||||
logged_view_ = v;
|
||||
}
|
||||
wsrep::view db::server_service::get_view(wsrep::client_service&,
|
||||
const wsrep::id& own_id)
|
||||
{
|
||||
int const my_idx(logged_view_.member_index(own_id));
|
||||
wsrep::view my_view(
|
||||
logged_view_.state_id(),
|
||||
logged_view_.view_seqno(),
|
||||
logged_view_.status(),
|
||||
logged_view_.capabilities(),
|
||||
my_idx,
|
||||
logged_view_.protocol_version(),
|
||||
logged_view_.members()
|
||||
);
|
||||
return my_view;
|
||||
}
|
||||
|
||||
void db::server_service::log_state_change(
|
||||
|
@ -48,12 +48,15 @@ namespace db
|
||||
override;
|
||||
void log_view(wsrep::high_priority_service*,
|
||||
const wsrep::view&) override;
|
||||
wsrep::view get_view(wsrep::client_service&, const wsrep::id&)
|
||||
override;
|
||||
void log_state_change(enum wsrep::server_state::state,
|
||||
enum wsrep::server_state::state) override;
|
||||
int wait_committing_transactions(int) override;
|
||||
void debug_sync(const char*) override;
|
||||
private:
|
||||
db::server& server_;
|
||||
wsrep::view logged_view_;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
*/
|
||||
|
||||
#include "db_simulator.hpp"
|
||||
#include "db_client.hpp"
|
||||
|
||||
#include "wsrep/logger.hpp"
|
||||
|
||||
@ -62,7 +63,11 @@ void db::simulator::sst(db::server& server,
|
||||
<< server.server_state().name()
|
||||
<< " -> " << request;
|
||||
}
|
||||
i->second->server_state().sst_received(gtid, 0);
|
||||
|
||||
db::client dummy(*(i->second), wsrep::client_id(-1),
|
||||
wsrep::client_state::m_local, params());
|
||||
|
||||
i->second->server_state().sst_received(dummy.client_service(), gtid, 0);
|
||||
server.server_state().sst_sent(gtid, 0);
|
||||
}
|
||||
|
||||
|
@ -244,7 +244,11 @@ namespace wsrep
|
||||
static const int streaming = (1 << 15);
|
||||
static const int snapshot = (1 << 16);
|
||||
static const int nbo = (1 << 17);
|
||||
|
||||
/** decipher capability bitmask */
|
||||
static std::string str(int);
|
||||
};
|
||||
|
||||
provider(wsrep::server_state& server_state)
|
||||
: server_state_(server_state)
|
||||
{ }
|
||||
@ -328,7 +332,7 @@ namespace wsrep
|
||||
* @return Provider status indicating the result of the call.
|
||||
*/
|
||||
virtual std::pair<wsrep::gtid, enum status>
|
||||
causal_read(int timeout) const = 0;
|
||||
causal_read(int timeout) const = 0;
|
||||
virtual enum status wait_for_gtid(const wsrep::gtid&, int timeout) const = 0;
|
||||
/**
|
||||
* Return last committed GTID.
|
||||
|
@ -130,6 +130,17 @@ namespace wsrep
|
||||
wsrep::high_priority_service* high_priority_service,
|
||||
const wsrep::view& view) = 0;
|
||||
|
||||
/**
|
||||
* Recover a cluster view change event.
|
||||
* The method takes own node ID.
|
||||
*
|
||||
* @param client_service Reference to client_service
|
||||
* @param own_id this node ID obtained on connection to cluster
|
||||
*/
|
||||
virtual wsrep::view get_view(
|
||||
wsrep::client_service& client_service,
|
||||
const wsrep::id& own_id) = 0;
|
||||
|
||||
/**
|
||||
* Log a state change event.
|
||||
*
|
||||
|
@ -103,6 +103,7 @@ namespace wsrep
|
||||
class transaction;
|
||||
class const_buffer;
|
||||
class server_service;
|
||||
class client_service;
|
||||
|
||||
/** @class Server Context
|
||||
*
|
||||
@ -446,9 +447,12 @@ namespace wsrep
|
||||
* initialized, the call will shift the state to initializing
|
||||
* and will wait until the initialization is complete.
|
||||
*
|
||||
* @param client_service
|
||||
* @param gtid GTID provided by the SST transfer
|
||||
* @param error code of the SST operation
|
||||
*/
|
||||
void sst_received(const wsrep::gtid& gtid, int error);
|
||||
void sst_received(wsrep::client_service& cs,
|
||||
const wsrep::gtid& gtid, int error);
|
||||
|
||||
/**
|
||||
* This method must be called after the server initialization
|
||||
@ -587,7 +591,7 @@ namespace wsrep
|
||||
void wait_until_state(wsrep::unique_lock<wsrep::mutex>&, enum state) const;
|
||||
// Close SR transcations whose origin is outside of current
|
||||
// cluster view.
|
||||
void close_foreign_sr_transactions(
|
||||
void close_orphaned_sr_transactions(
|
||||
wsrep::unique_lock<wsrep::mutex>&,
|
||||
wsrep::high_priority_service&);
|
||||
|
||||
|
@ -97,6 +97,9 @@ namespace wsrep
|
||||
wsrep::view::status status() const
|
||||
{ return status_; }
|
||||
|
||||
ssize_t capabilities() const
|
||||
{ return capabilities_; }
|
||||
|
||||
ssize_t own_index() const
|
||||
{ return own_index_; }
|
||||
|
||||
@ -112,6 +115,11 @@ namespace wsrep
|
||||
return (members_.empty() && own_index_ == -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return member index in the view
|
||||
*/
|
||||
int member_index(const wsrep::id& member_id) const;
|
||||
|
||||
void print(std::ostream& os) const;
|
||||
|
||||
private:
|
||||
|
@ -49,6 +49,48 @@ wsrep::provider* wsrep::provider::make_provider(
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::string wsrep::provider::capability::str(int caps)
|
||||
{
|
||||
std::ostringstream os;
|
||||
|
||||
#define WSREP_PRINT_CAPABILITY(cap_value, cap_string) \
|
||||
if (caps & cap_value) { \
|
||||
os << cap_string ", "; \
|
||||
caps &= ~cap_value; \
|
||||
}
|
||||
|
||||
WSREP_PRINT_CAPABILITY(multi_master, "MULTI-MASTER");
|
||||
WSREP_PRINT_CAPABILITY(certification, "CERTIFICATION");
|
||||
WSREP_PRINT_CAPABILITY(parallel_applying, "PARALLEL_APPLYING");
|
||||
WSREP_PRINT_CAPABILITY(transaction_replay, "REPLAY");
|
||||
WSREP_PRINT_CAPABILITY(isolation, "ISOLATION");
|
||||
WSREP_PRINT_CAPABILITY(pause, "PAUSE");
|
||||
WSREP_PRINT_CAPABILITY(causal_reads, "CAUSAL_READ");
|
||||
WSREP_PRINT_CAPABILITY(causal_transaction, "CAUSAL_TRX");
|
||||
WSREP_PRINT_CAPABILITY(incremental_writeset, "INCREMENTAL_WS");
|
||||
WSREP_PRINT_CAPABILITY(session_locks, "SESSION_LOCK");
|
||||
WSREP_PRINT_CAPABILITY(distributed_locks, "DISTRIBUTED_LOCK");
|
||||
WSREP_PRINT_CAPABILITY(consistency_check, "CONSISTENCY_CHECK");
|
||||
WSREP_PRINT_CAPABILITY(unordered, "UNORDERED");
|
||||
WSREP_PRINT_CAPABILITY(annotation, "ANNOTATION");
|
||||
WSREP_PRINT_CAPABILITY(preordered, "PREORDERED");
|
||||
WSREP_PRINT_CAPABILITY(streaming, "STREAMING");
|
||||
WSREP_PRINT_CAPABILITY(snapshot, "SNAPSHOT");
|
||||
WSREP_PRINT_CAPABILITY(nbo, "NBO");
|
||||
|
||||
#undef WSREP_PRINT_CAPABILITY
|
||||
|
||||
if (caps)
|
||||
{
|
||||
assert(caps == 0); // to catch missed capabilities
|
||||
os << "UNKNOWN(" << caps << ") ";
|
||||
}
|
||||
|
||||
std::string ret(os.str());
|
||||
if (ret.size() > 2) ret.erase(ret.size() - 2);
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string wsrep::flags_to_string(int flags)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
@ -491,11 +491,13 @@ void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error)
|
||||
}
|
||||
}
|
||||
|
||||
void wsrep::server_state::sst_received(const wsrep::gtid& gtid, int error)
|
||||
void wsrep::server_state::sst_received(wsrep::client_service& cs,
|
||||
const wsrep::gtid& gtid, int error)
|
||||
{
|
||||
wsrep::log_info() << "SST received: " << gtid;
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
assert(state_ == s_joiner || state_ == s_initialized);
|
||||
|
||||
if (server_service_.sst_before_init())
|
||||
{
|
||||
if (init_initialized_ == false)
|
||||
@ -504,20 +506,40 @@ void wsrep::server_state::sst_received(const wsrep::gtid& gtid, int error)
|
||||
wait_until_state(lock, s_initialized);
|
||||
assert(init_initialized_);
|
||||
}
|
||||
state(lock, s_joined);
|
||||
lock.unlock();
|
||||
if (provider().sst_received(gtid, error))
|
||||
{
|
||||
throw wsrep::runtime_error("SST received failed");
|
||||
}
|
||||
}
|
||||
else
|
||||
state(lock, s_joined);
|
||||
lock.unlock();
|
||||
|
||||
if (id_.is_undefined())
|
||||
{
|
||||
state(lock, s_joined);
|
||||
if (provider().sst_received(gtid, error))
|
||||
{
|
||||
throw wsrep::runtime_error("SST received failed");
|
||||
}
|
||||
assert(0);
|
||||
throw wsrep::runtime_error(
|
||||
"wsrep::sst_received() called before connection to cluster");
|
||||
}
|
||||
|
||||
wsrep::view const v(server_service_.get_view(cs, id_));
|
||||
wsrep::log_info() << "Recovered view from SST:\n" << v;
|
||||
|
||||
if (v.state_id().id() != gtid.id() ||
|
||||
v.state_id().seqno() > gtid.seqno())
|
||||
{
|
||||
/* Since IN GENERAL we may not be able to recover SST GTID from
|
||||
* the state data, we have to rely on SST script passing the GTID
|
||||
* value explicitly.
|
||||
* Here we check if the passed GTID makes any sense: it should
|
||||
* have the same UUID and greater or equal seqno than the last
|
||||
* logged view. */
|
||||
std::ostringstream msg;
|
||||
msg << "SST script passed bogus GTID: " << gtid
|
||||
<< ". Preceeding view GTID: " << v.state_id();
|
||||
throw wsrep::runtime_error(msg.str());
|
||||
}
|
||||
|
||||
current_view_ = v;
|
||||
|
||||
if (provider().sst_received(gtid, error))
|
||||
{
|
||||
throw wsrep::runtime_error("wsrep::sst_received() failed");
|
||||
}
|
||||
}
|
||||
|
||||
@ -676,7 +698,7 @@ void wsrep::server_state::on_view(const wsrep::view& view,
|
||||
assert(high_priority_service);
|
||||
if (high_priority_service)
|
||||
{
|
||||
close_foreign_sr_transactions(lock, *high_priority_service);
|
||||
close_orphaned_sr_transactions(lock, *high_priority_service);
|
||||
}
|
||||
if (server_service_.sst_before_init())
|
||||
{
|
||||
@ -1009,7 +1031,7 @@ void wsrep::server_state::wait_until_state(
|
||||
cond_.notify_all();
|
||||
}
|
||||
|
||||
void wsrep::server_state::close_foreign_sr_transactions(
|
||||
void wsrep::server_state::close_orphaned_sr_transactions(
|
||||
wsrep::unique_lock<wsrep::mutex>& lock,
|
||||
wsrep::high_priority_service& high_priority_service)
|
||||
{
|
||||
|
40
src/view.cpp
40
src/view.cpp
@ -18,13 +18,45 @@
|
||||
*/
|
||||
|
||||
#include "wsrep/view.hpp"
|
||||
#include "wsrep/provider.hpp"
|
||||
|
||||
int wsrep::view::member_index(const wsrep::id& member_id) const
|
||||
{
|
||||
// first, quick guess
|
||||
if (own_index_ >= 0 && members_[own_index_].id() == member_id)
|
||||
{
|
||||
return own_index_;
|
||||
}
|
||||
|
||||
// guesing didn't work, scan the list
|
||||
for (unsigned int i(0); i < members_.size(); ++i)
|
||||
{
|
||||
if (i != own_index_ && members_[i].id() == member_id) return i;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static const char* view_status_str(enum wsrep::view::status s)
|
||||
{
|
||||
switch(s)
|
||||
{
|
||||
case wsrep::view::primary: return "PRIMARY";
|
||||
case wsrep::view::non_primary: return "NON-PRIMARY";
|
||||
case wsrep::view::disconnected: return "DISCONNECTED";
|
||||
}
|
||||
|
||||
assert(0);
|
||||
return "invalid status";
|
||||
}
|
||||
|
||||
void wsrep::view::print(std::ostream& os) const
|
||||
{
|
||||
os << " id: " << state_id() << "\n"
|
||||
<< " status: " << status() << "\n"
|
||||
<< " status: " << view_status_str(status()) << "\n"
|
||||
<< " prococol_version: " << protocol_version() << "\n"
|
||||
<< " final: " << final() << "\n"
|
||||
<< " capabilities: " << provider::capability::str(capabilities())<<"\n"
|
||||
<< " final: " << (final() ? "yes" : "no") << "\n"
|
||||
<< " own_index: " << own_index() << "\n"
|
||||
<< " members(" << members().size() << "):\n";
|
||||
|
||||
@ -32,7 +64,7 @@ void wsrep::view::print(std::ostream& os) const
|
||||
i != members().end(); ++i)
|
||||
{
|
||||
os << "\t" << (i - members().begin()) /* ordinal index */
|
||||
<< ") id: " << i->id()
|
||||
<< ", name: " << i->name() << "\n";
|
||||
<< ": " << i->id()
|
||||
<< ", " << i->name() << "\n";
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ namespace wsrep
|
||||
, server_state_(server_state)
|
||||
, last_client_id_(0)
|
||||
, last_transaction_id_(0)
|
||||
, logged_view_()
|
||||
{ }
|
||||
|
||||
wsrep::storage_service* storage_service(wsrep::client_service&)
|
||||
@ -120,8 +121,28 @@ namespace wsrep
|
||||
WSREP_OVERRIDE
|
||||
{
|
||||
}
|
||||
void log_view(wsrep::high_priority_service*, const wsrep::view&)
|
||||
WSREP_OVERRIDE { }
|
||||
void log_view(wsrep::high_priority_service*, const wsrep::view& view)
|
||||
WSREP_OVERRIDE
|
||||
{
|
||||
logged_view_ = view;
|
||||
}
|
||||
|
||||
wsrep::view get_view(wsrep::client_service&, const wsrep::id& own_id)
|
||||
WSREP_OVERRIDE
|
||||
{
|
||||
int const my_idx(logged_view_.member_index(own_id));
|
||||
wsrep::view my_view(
|
||||
logged_view_.state_id(),
|
||||
logged_view_.view_seqno(),
|
||||
logged_view_.status(),
|
||||
logged_view_.capabilities(),
|
||||
my_idx,
|
||||
logged_view_.protocol_version(),
|
||||
logged_view_.members()
|
||||
);
|
||||
return my_view;
|
||||
}
|
||||
|
||||
void log_state_change(enum wsrep::server_state::state,
|
||||
enum wsrep::server_state::state)
|
||||
{ }
|
||||
@ -169,6 +190,7 @@ namespace wsrep
|
||||
wsrep::server_state& server_state_;
|
||||
unsigned long long last_client_id_;
|
||||
unsigned long long last_transaction_id_;
|
||||
wsrep::view logged_view_;
|
||||
};
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user