diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index 693592d..fd012f9 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -59,6 +59,7 @@ namespace db { } void start(); wsrep::client_state& client_state() { return client_state_; } + wsrep::client_service& client_service() { return client_service_; } bool do_2pc() const { return false; } private: friend class db::server_state; diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index 9bcf8a8..07b88fa 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -26,6 +26,7 @@ db::server_service::server_service(db::server& server) : server_(server) + , logged_view_() { } wsrep::storage_service* db::server_service::storage_service( @@ -106,9 +107,25 @@ void db::server_service::log_dummy_write_set( } void db::server_service::log_view(wsrep::high_priority_service*, - const wsrep::view&) + const wsrep::view& v) { - wsrep::log_info() << "View"; + wsrep::log_info() << "View:\n" << v; + logged_view_ = v; +} +wsrep::view db::server_service::get_view(wsrep::client_service&, + const wsrep::id& own_id) +{ + int const my_idx(logged_view_.member_index(own_id)); + wsrep::view my_view( + logged_view_.state_id(), + logged_view_.view_seqno(), + logged_view_.status(), + logged_view_.capabilities(), + my_idx, + logged_view_.protocol_version(), + logged_view_.members() + ); + return my_view; } void db::server_service::log_state_change( diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index bffda11..420becc 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -48,12 +48,15 @@ namespace db override; void log_view(wsrep::high_priority_service*, const wsrep::view&) override; + wsrep::view get_view(wsrep::client_service&, const wsrep::id&) + override; void log_state_change(enum wsrep::server_state::state, enum wsrep::server_state::state) override; int wait_committing_transactions(int) override; void debug_sync(const char*) override; private: db::server& server_; + wsrep::view logged_view_; }; } diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp index 9f8fbea..990d1ac 100644 --- a/dbsim/db_simulator.cpp +++ b/dbsim/db_simulator.cpp @@ -18,6 +18,7 @@ */ #include "db_simulator.hpp" +#include "db_client.hpp" #include "wsrep/logger.hpp" @@ -62,7 +63,11 @@ void db::simulator::sst(db::server& server, << server.server_state().name() << " -> " << request; } - i->second->server_state().sst_received(gtid, 0); + + db::client dummy(*(i->second), wsrep::client_id(-1), + wsrep::client_state::m_local, params()); + + i->second->server_state().sst_received(dummy.client_service(), gtid, 0); server.server_state().sst_sent(gtid, 0); } diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 81cd4c6..d34e927 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -244,7 +244,11 @@ namespace wsrep static const int streaming = (1 << 15); static const int snapshot = (1 << 16); static const int nbo = (1 << 17); + + /** decipher capability bitmask */ + static std::string str(int); }; + provider(wsrep::server_state& server_state) : server_state_(server_state) { } @@ -328,7 +332,7 @@ namespace wsrep * @return Provider status indicating the result of the call. */ virtual std::pair - causal_read(int timeout) const = 0; + causal_read(int timeout) const = 0; virtual enum status wait_for_gtid(const wsrep::gtid&, int timeout) const = 0; /** * Return last committed GTID. diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index ec33831..df2cbc8 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -130,6 +130,17 @@ namespace wsrep wsrep::high_priority_service* high_priority_service, const wsrep::view& view) = 0; + /** + * Recover a cluster view change event. + * The method takes own node ID. + * + * @param client_service Reference to client_service + * @param own_id this node ID obtained on connection to cluster + */ + virtual wsrep::view get_view( + wsrep::client_service& client_service, + const wsrep::id& own_id) = 0; + /** * Log a state change event. * diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 9d29d10..729d863 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -103,6 +103,7 @@ namespace wsrep class transaction; class const_buffer; class server_service; + class client_service; /** @class Server Context * @@ -446,9 +447,12 @@ namespace wsrep * initialized, the call will shift the state to initializing * and will wait until the initialization is complete. * + * @param client_service * @param gtid GTID provided by the SST transfer + * @param error code of the SST operation */ - void sst_received(const wsrep::gtid& gtid, int error); + void sst_received(wsrep::client_service& cs, + const wsrep::gtid& gtid, int error); /** * This method must be called after the server initialization @@ -587,7 +591,7 @@ namespace wsrep void wait_until_state(wsrep::unique_lock&, enum state) const; // Close SR transcations whose origin is outside of current // cluster view. - void close_foreign_sr_transactions( + void close_orphaned_sr_transactions( wsrep::unique_lock&, wsrep::high_priority_service&); diff --git a/include/wsrep/view.hpp b/include/wsrep/view.hpp index 99d18b7..ecac3ee 100644 --- a/include/wsrep/view.hpp +++ b/include/wsrep/view.hpp @@ -97,6 +97,9 @@ namespace wsrep wsrep::view::status status() const { return status_; } + ssize_t capabilities() const + { return capabilities_; } + ssize_t own_index() const { return own_index_; } @@ -112,6 +115,11 @@ namespace wsrep return (members_.empty() && own_index_ == -1); } + /** + * Return member index in the view + */ + int member_index(const wsrep::id& member_id) const; + void print(std::ostream& os) const; private: diff --git a/src/provider.cpp b/src/provider.cpp index e526d56..abe78c7 100644 --- a/src/provider.cpp +++ b/src/provider.cpp @@ -49,6 +49,48 @@ wsrep::provider* wsrep::provider::make_provider( return 0; } +std::string wsrep::provider::capability::str(int caps) +{ + std::ostringstream os; + +#define WSREP_PRINT_CAPABILITY(cap_value, cap_string) \ + if (caps & cap_value) { \ + os << cap_string ", "; \ + caps &= ~cap_value; \ + } + + WSREP_PRINT_CAPABILITY(multi_master, "MULTI-MASTER"); + WSREP_PRINT_CAPABILITY(certification, "CERTIFICATION"); + WSREP_PRINT_CAPABILITY(parallel_applying, "PARALLEL_APPLYING"); + WSREP_PRINT_CAPABILITY(transaction_replay, "REPLAY"); + WSREP_PRINT_CAPABILITY(isolation, "ISOLATION"); + WSREP_PRINT_CAPABILITY(pause, "PAUSE"); + WSREP_PRINT_CAPABILITY(causal_reads, "CAUSAL_READ"); + WSREP_PRINT_CAPABILITY(causal_transaction, "CAUSAL_TRX"); + WSREP_PRINT_CAPABILITY(incremental_writeset, "INCREMENTAL_WS"); + WSREP_PRINT_CAPABILITY(session_locks, "SESSION_LOCK"); + WSREP_PRINT_CAPABILITY(distributed_locks, "DISTRIBUTED_LOCK"); + WSREP_PRINT_CAPABILITY(consistency_check, "CONSISTENCY_CHECK"); + WSREP_PRINT_CAPABILITY(unordered, "UNORDERED"); + WSREP_PRINT_CAPABILITY(annotation, "ANNOTATION"); + WSREP_PRINT_CAPABILITY(preordered, "PREORDERED"); + WSREP_PRINT_CAPABILITY(streaming, "STREAMING"); + WSREP_PRINT_CAPABILITY(snapshot, "SNAPSHOT"); + WSREP_PRINT_CAPABILITY(nbo, "NBO"); + +#undef WSREP_PRINT_CAPABILITY + + if (caps) + { + assert(caps == 0); // to catch missed capabilities + os << "UNKNOWN(" << caps << ") "; + } + + std::string ret(os.str()); + if (ret.size() > 2) ret.erase(ret.size() - 2); + return ret; +} + std::string wsrep::flags_to_string(int flags) { std::ostringstream oss; diff --git a/src/server_state.cpp b/src/server_state.cpp index 45c7d60..3998d03 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -491,11 +491,13 @@ void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error) } } -void wsrep::server_state::sst_received(const wsrep::gtid& gtid, int error) +void wsrep::server_state::sst_received(wsrep::client_service& cs, + const wsrep::gtid& gtid, int error) { wsrep::log_info() << "SST received: " << gtid; wsrep::unique_lock lock(mutex_); assert(state_ == s_joiner || state_ == s_initialized); + if (server_service_.sst_before_init()) { if (init_initialized_ == false) @@ -504,20 +506,40 @@ void wsrep::server_state::sst_received(const wsrep::gtid& gtid, int error) wait_until_state(lock, s_initialized); assert(init_initialized_); } - state(lock, s_joined); - lock.unlock(); - if (provider().sst_received(gtid, error)) - { - throw wsrep::runtime_error("SST received failed"); - } } - else + state(lock, s_joined); + lock.unlock(); + + if (id_.is_undefined()) { - state(lock, s_joined); - if (provider().sst_received(gtid, error)) - { - throw wsrep::runtime_error("SST received failed"); - } + assert(0); + throw wsrep::runtime_error( + "wsrep::sst_received() called before connection to cluster"); + } + + wsrep::view const v(server_service_.get_view(cs, id_)); + wsrep::log_info() << "Recovered view from SST:\n" << v; + + if (v.state_id().id() != gtid.id() || + v.state_id().seqno() > gtid.seqno()) + { + /* Since IN GENERAL we may not be able to recover SST GTID from + * the state data, we have to rely on SST script passing the GTID + * value explicitly. + * Here we check if the passed GTID makes any sense: it should + * have the same UUID and greater or equal seqno than the last + * logged view. */ + std::ostringstream msg; + msg << "SST script passed bogus GTID: " << gtid + << ". Preceeding view GTID: " << v.state_id(); + throw wsrep::runtime_error(msg.str()); + } + + current_view_ = v; + + if (provider().sst_received(gtid, error)) + { + throw wsrep::runtime_error("wsrep::sst_received() failed"); } } @@ -676,7 +698,7 @@ void wsrep::server_state::on_view(const wsrep::view& view, assert(high_priority_service); if (high_priority_service) { - close_foreign_sr_transactions(lock, *high_priority_service); + close_orphaned_sr_transactions(lock, *high_priority_service); } if (server_service_.sst_before_init()) { @@ -1009,7 +1031,7 @@ void wsrep::server_state::wait_until_state( cond_.notify_all(); } -void wsrep::server_state::close_foreign_sr_transactions( +void wsrep::server_state::close_orphaned_sr_transactions( wsrep::unique_lock& lock, wsrep::high_priority_service& high_priority_service) { diff --git a/src/view.cpp b/src/view.cpp index 1680c40..362b7ac 100644 --- a/src/view.cpp +++ b/src/view.cpp @@ -18,13 +18,45 @@ */ #include "wsrep/view.hpp" +#include "wsrep/provider.hpp" + +int wsrep::view::member_index(const wsrep::id& member_id) const +{ + // first, quick guess + if (own_index_ >= 0 && members_[own_index_].id() == member_id) + { + return own_index_; + } + + // guesing didn't work, scan the list + for (unsigned int i(0); i < members_.size(); ++i) + { + if (i != own_index_ && members_[i].id() == member_id) return i; + } + + return -1; +} + +static const char* view_status_str(enum wsrep::view::status s) +{ + switch(s) + { + case wsrep::view::primary: return "PRIMARY"; + case wsrep::view::non_primary: return "NON-PRIMARY"; + case wsrep::view::disconnected: return "DISCONNECTED"; + } + + assert(0); + return "invalid status"; +} void wsrep::view::print(std::ostream& os) const { os << " id: " << state_id() << "\n" - << " status: " << status() << "\n" + << " status: " << view_status_str(status()) << "\n" << " prococol_version: " << protocol_version() << "\n" - << " final: " << final() << "\n" + << " capabilities: " << provider::capability::str(capabilities())<<"\n" + << " final: " << (final() ? "yes" : "no") << "\n" << " own_index: " << own_index() << "\n" << " members(" << members().size() << "):\n"; @@ -32,7 +64,7 @@ void wsrep::view::print(std::ostream& os) const i != members().end(); ++i) { os << "\t" << (i - members().begin()) /* ordinal index */ - << ") id: " << i->id() - << ", name: " << i->name() << "\n"; + << ": " << i->id() + << ", " << i->name() << "\n"; } } diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 05060be..04375f2 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -41,6 +41,7 @@ namespace wsrep , server_state_(server_state) , last_client_id_(0) , last_transaction_id_(0) + , logged_view_() { } wsrep::storage_service* storage_service(wsrep::client_service&) @@ -120,8 +121,28 @@ namespace wsrep WSREP_OVERRIDE { } - void log_view(wsrep::high_priority_service*, const wsrep::view&) - WSREP_OVERRIDE { } + void log_view(wsrep::high_priority_service*, const wsrep::view& view) + WSREP_OVERRIDE + { + logged_view_ = view; + } + + wsrep::view get_view(wsrep::client_service&, const wsrep::id& own_id) + WSREP_OVERRIDE + { + int const my_idx(logged_view_.member_index(own_id)); + wsrep::view my_view( + logged_view_.state_id(), + logged_view_.view_seqno(), + logged_view_.status(), + logged_view_.capabilities(), + my_idx, + logged_view_.protocol_version(), + logged_view_.members() + ); + return my_view; + } + void log_state_change(enum wsrep::server_state::state, enum wsrep::server_state::state) { } @@ -169,6 +190,7 @@ namespace wsrep wsrep::server_state& server_state_; unsigned long long last_client_id_; unsigned long long last_transaction_id_; + wsrep::view logged_view_; };