diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index b4ebaa1..d9d4365 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -27,7 +27,6 @@ db::server::server(simulator& simulator, const std::string& name, - const std::string& server_id, const std::string& address) : simulator_(simulator) , storage_engine_(simulator_.params()) @@ -35,7 +34,7 @@ db::server::server(simulator& simulator, , cond_() , server_service_(*this) , server_state_(*this, server_service_, - name, server_id, address, "dbsim_" + name + "_data") + name, address, "dbsim_" + name + "_data") , last_client_id_(0) , last_transaction_id_(0) , appliers_() diff --git a/dbsim/db_server.hpp b/dbsim/db_server.hpp index 59103b3..b64947c 100644 --- a/dbsim/db_server.hpp +++ b/dbsim/db_server.hpp @@ -41,7 +41,6 @@ namespace db public: server(simulator& simulator, const std::string& name, - const std::string& id, const std::string& address); void applier_thread(); void start_applier(); diff --git a/dbsim/db_server_state.hpp b/dbsim/db_server_state.hpp index d3034df..bab13d3 100644 --- a/dbsim/db_server_state.hpp +++ b/dbsim/db_server_state.hpp @@ -35,7 +35,6 @@ namespace db server_state(db::server& server, wsrep::server_service& server_service, const std::string& name, - const std::string& server_id, const std::string& address, const std::string& working_dir) : wsrep::server_state( @@ -43,7 +42,6 @@ namespace db cond_, server_service, name, - server_id, "", address, working_dir, diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp index b2a631a..1e700f6 100644 --- a/dbsim/db_simulator.cpp +++ b/dbsim/db_simulator.cpp @@ -109,7 +109,6 @@ void db::simulator::start() std::make_unique( *this, name_os.str(), - id_os.str(), address_os.str())))); if (it.second == false) { diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 43b4823..9d29d10 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -296,7 +296,7 @@ namespace wsrep * A method which will be called when the server * has been joined to the cluster */ - void on_connect(const wsrep::gtid& gtid); + void on_connect(const wsrep::view& view); /** * A method which will be called when a view @@ -540,7 +540,6 @@ namespace wsrep wsrep::condition_variable& cond, wsrep::server_service& server_service, const std::string& name, - const std::string& id, const std::string& incoming_address, const std::string& address, const std::string& working_dir, @@ -565,7 +564,7 @@ namespace wsrep , streaming_appliers_() , provider_() , name_(name) - , id_(id) + , id_(wsrep::id::undefined()) , incoming_address_(incoming_address) , address_(address) , working_dir_(working_dir) diff --git a/include/wsrep/view.hpp b/include/wsrep/view.hpp index 8571d70..99d18b7 100644 --- a/include/wsrep/view.hpp +++ b/include/wsrep/view.hpp @@ -30,6 +30,7 @@ #include "seqno.hpp" #include "gtid.hpp" #include +#include namespace wsrep { @@ -111,6 +112,8 @@ namespace wsrep return (members_.empty() && own_index_ == -1); } + void print(std::ostream& os) const; + private: wsrep::gtid state_id_; wsrep::seqno view_seqno_; @@ -120,6 +123,12 @@ namespace wsrep int protocol_version_; std::vector members_; }; + + static inline + std::ostream& operator<<(std::ostream& os, const wsrep::view& v) + { + v.print(os); return os; + } } #endif // WSREP_VIEW diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ad1cba1..6f24b3a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -11,6 +11,7 @@ add_library(wsrep-lib logger.cpp provider.cpp seqno.cpp + view.cpp server_state.cpp transaction.cpp wsrep_provider_v26.cpp) diff --git a/src/server_state.cpp b/src/server_state.cpp index 8aafbaf..45c7d60 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -565,14 +565,38 @@ wsrep::server_state::causal_read(int timeout) const return provider_->causal_read(timeout); } -void wsrep::server_state::on_connect(const wsrep::gtid& gtid) +void wsrep::server_state::on_connect(const wsrep::view& view) { + // Sanity checks + if (id_.is_undefined() == false) + { + wsrep::log_warning() << "Unexpected connection in connected state. " + << "Received view: " << view + << "Previous ID: " << id_; + assert(0); + } + + if (view.own_index() < 0 || + size_t(view.own_index()) >= view.members().size()) + { + std::ostringstream os; + os << "Invalid view on connect: own index out of range: " << view; + wsrep::log_error() << os.str(); + assert(0); + throw wsrep::runtime_error(os.str()); + } + + id_ = view.members()[view.own_index()].id(); + wsrep::log_info() << "Server " << name_ << " connected to cluster at position " - << gtid; + << view.state_id() + << " with ID " + << id_; + wsrep::unique_lock lock(mutex_); - connected_gtid_ = gtid; + connected_gtid_ = view.state_id(); state(lock, s_connected); } @@ -599,22 +623,7 @@ void wsrep::server_state::on_view(const wsrep::view& view, if (view.status() == wsrep::view::primary) { wsrep::unique_lock lock(mutex_); - if (view.own_index() >= 0) - { - if (id_.is_undefined()) - { - // No identifier was passed during server state initialization - // and the ID was generated by the provider. - id_ = view.members()[view.own_index()].id(); - } - else - { - // Own identifier must not change between views. - // assert(id_ == view.members()[view.own_index()].id()); - } - } assert(view.final() == false); - // // Reached primary from connected state. This may mean the following // @@ -703,6 +712,7 @@ void wsrep::server_state::on_view(const wsrep::view& view, { close_transactions_at_disconnect(*high_priority_service); } + id_ = id::undefined(); state(lock, s_disconnected); } else if (state_ != s_disconnecting) diff --git a/src/view.cpp b/src/view.cpp new file mode 100644 index 0000000..1680c40 --- /dev/null +++ b/src/view.cpp @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2018 Codership Oy + * + * This file is part of wsrep-lib. + * + * Wsrep-lib is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-lib is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-lib. If not, see . + */ + +#include "wsrep/view.hpp" + +void wsrep::view::print(std::ostream& os) const +{ + os << " id: " << state_id() << "\n" + << " status: " << status() << "\n" + << " prococol_version: " << protocol_version() << "\n" + << " final: " << final() << "\n" + << " own_index: " << own_index() << "\n" + << " members(" << members().size() << "):\n"; + + for (std::vector::const_iterator i(members().begin()); + i != members().end(); ++i) + { + os << "\t" << (i - members().begin()) /* ordinal index */ + << ") id: " << i->id() + << ", name: " << i->name() << "\n"; + } +} diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index eb00263..afc5c3b 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -287,7 +287,8 @@ namespace { return capabilities; } - wsrep::view view_from_native(const wsrep_view_info& view_info) + wsrep::view view_from_native(const wsrep_view_info& view_info, + const wsrep::id& own_id) { std::vector members; for (int i(0); i < view_info.memb_num; ++i) @@ -303,6 +304,25 @@ namespace sizeof(view_info.members[i].incoming))); members.push_back(wsrep::view::member(id, name, incoming)); } + + int own_idx(-1); + if (own_id.is_undefined()) + { + // If own ID is undefined, obtain it from the view. This is + // the case on the initial connect to cluster. + own_idx = view_info.my_idx; + } + else + { + // If the node has already obtained its ID from cluster, + // its position in the view (or lack thereof) must be determined + // by the ID. + for (size_t i(0); i < members.size(); ++i) + { + if (own_id == members[i].id()) { own_idx = i; break; } + } + } + return wsrep::view( wsrep::gtid( wsrep::id(view_info.state_id.uuid.data, @@ -311,7 +331,7 @@ namespace wsrep::seqno(view_info.view), map_view_status_from_native(view_info.status), map_capabilities_from_native(view_info.capabilities), - view_info.my_idx, + own_idx, view_info.proto_ver, members); } @@ -325,12 +345,14 @@ namespace const wsrep_view_info_t* view_info) { assert(app_ctx); - wsrep::view view(view_from_native(*view_info)); wsrep::server_state& server_state( *reinterpret_cast(app_ctx)); + assert(server_state.id().is_undefined()); + wsrep::view view(view_from_native(*view_info, server_state.id())); + assert(view.own_index() >= 0); try { - server_state.on_connect(view.state_id()); + server_state.on_connect(view); return WSREP_CB_SUCCESS; } catch (const wsrep::runtime_error& e) @@ -354,7 +376,7 @@ namespace reinterpret_cast(recv_ctx)); try { - wsrep::view view(view_from_native(*view_info)); + wsrep::view view(view_from_native(*view_info, server_state.id())); server_state.on_view(view, high_priority_service); return WSREP_CB_SUCCESS; } diff --git a/test/client_state_fixture.hpp b/test/client_state_fixture.hpp index 6073d25..44fca34 100644 --- a/test/client_state_fixture.hpp +++ b/test/client_state_fixture.hpp @@ -32,11 +32,12 @@ namespace { replicating_client_fixture_sync_rm() : server_service(sc) - , sc("s1", "s1", wsrep::server_state::rm_sync, server_service) + , sc("s1", wsrep::server_state::rm_sync, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_local) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -54,11 +55,12 @@ namespace { replicating_client_fixture_async_rm() : server_service(sc) - , sc("s1", "s1", wsrep::server_state::rm_async, server_service) + , sc("s1", wsrep::server_state::rm_async, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_local) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -76,11 +78,12 @@ namespace { replicating_client_fixture_2pc() : server_service(sc) - , sc("s1", "s1", wsrep::server_state::rm_sync, server_service) + , sc("s1", wsrep::server_state::rm_sync, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_local) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); cc.do_2pc_ = true; BOOST_REQUIRE(cc.before_command() == 0); @@ -99,11 +102,12 @@ namespace { replicating_client_fixture_autocommit() : server_service(sc) - , sc("s1", "s1", wsrep::server_state::rm_sync, server_service) + , sc("s1", wsrep::server_state::rm_sync, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_local) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); cc.is_autocommit_ = true; BOOST_REQUIRE(cc.before_command() == 0); @@ -122,13 +126,14 @@ namespace { applying_client_fixture() : server_service(sc) - , sc("s1", "s1", + , sc("s1", wsrep::server_state::rm_async, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_high_priority) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -155,13 +160,14 @@ namespace { applying_client_fixture_2pc() : server_service(sc) - , sc("s1", "s1", + , sc("s1", wsrep::server_state::rm_async, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_high_priority) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); cc.do_2pc_ = true; BOOST_REQUIRE(cc.before_command() == 0); @@ -189,12 +195,13 @@ namespace { streaming_client_fixture_row() : server_service(sc) - , sc("s1", "s1", wsrep::server_state::rm_sync, server_service) + , sc("s1", wsrep::server_state::rm_sync, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_local) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -214,12 +221,13 @@ namespace { streaming_client_fixture_byte() : server_service(sc) - , sc("s1", "s1", wsrep::server_state::rm_sync, server_service) + , sc("s1", wsrep::server_state::rm_sync, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_local) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); @@ -238,12 +246,13 @@ namespace { streaming_client_fixture_statement() : server_service(sc) - , sc("s1", "s1", wsrep::server_state::rm_sync, server_service) + , sc("s1", wsrep::server_state::rm_sync, server_service) , cc(sc, wsrep::client_id(1), wsrep::client_state::m_local) , tc(cc.transaction()) { + sc.mock_connect(); cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 0f8b481..05060be 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -176,11 +176,10 @@ namespace wsrep { public: mock_server_state(const std::string& name, - const std::string& id, enum wsrep::server_state::rollback_mode rollback_mode, wsrep::server_service& server_service) : wsrep::server_state(mutex_, cond_, server_service, - name, id, "", "", "./", + name, "", "", "./", wsrep::gtid::undefined(), 1, rollback_mode) @@ -191,6 +190,48 @@ namespace wsrep wsrep::mock_provider& provider() const WSREP_OVERRIDE { return provider_; } + + // mock connected state for tests without overriding the connect() + // method. + int mock_connect(const std::string& own_id, + const std::string& cluster_name, + const std::string& cluster_address, + const std::string& state_donor, + bool bootstrap) + { + int const ret(server_state::connect(cluster_name, + cluster_address, + state_donor, + bootstrap)); + if (0 == ret) + { + wsrep::id cluster_id("1"); + wsrep::gtid state_id(cluster_id, wsrep::seqno(0)); + std::vector members; + members.push_back(wsrep::view::member(wsrep::id(own_id), + "name", "")); + wsrep::view bootstrap_view(state_id, + wsrep::seqno(1), + wsrep::view::primary, + 0, + 0, + 1, + members); + server_state::on_connect(bootstrap_view); + } + else + { + assert(0); + } + + return ret; + } + + int mock_connect() + { + return mock_connect(name(), "cluster", "local", "0", false); + } + private: wsrep::default_mutex mutex_; wsrep::default_condition_variable cond_; diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index 18e504d..f8340b9 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -23,11 +23,11 @@ namespace { - struct applying_server_fixture + struct server_fixture_base { - applying_server_fixture() + server_fixture_base() : server_service(ss) - , ss("s1", "s1", + , ss("s1", wsrep::server_state::rm_sync, server_service) , cc(ss, wsrep::client_id(1), @@ -52,19 +52,28 @@ namespace wsrep::ws_meta ws_meta; }; - struct sst_first_server_fixture : applying_server_fixture + struct applying_server_fixture : server_fixture_base + { + applying_server_fixture() + : server_fixture_base() + { + ss.mock_connect(); + } + }; + + struct sst_first_server_fixture : server_fixture_base { sst_first_server_fixture() - : applying_server_fixture() + : server_fixture_base() { server_service.sst_before_init_ = true; } }; - struct init_first_server_fixture : applying_server_fixture + struct init_first_server_fixture : server_fixture_base { init_first_server_fixture() - : applying_server_fixture() + : server_fixture_base() { server_service.sst_before_init_ = false; } @@ -194,7 +203,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_sst_first_boostrap, 1, members); BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0); - ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0))); + ss.on_connect(bootstrap_view); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); server_service.sync_point_enabled_ = "on_view_wait_initialized"; server_service.sync_point_action_ = server_service.spa_initialize; @@ -222,7 +231,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_init_first_boostrap, ss.initialized(); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_initialized); BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0); - ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0))); + ss.on_connect(bootstrap_view); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); ss.on_view(bootstrap_view, &hps); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined);