1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

- Initialize member cluster ID only on connection to cluster and forget

it on disconnect.
 - Don't rely on own index from the view because the view may come from
   another member (IST/SST), instead always determine own index from own ID.

Refs codership/wsrep-lib#13
This commit is contained in:
Alexey Yurchenko
2018-11-06 15:21:48 +02:00
parent a942811ce7
commit ea9971d54b
13 changed files with 185 additions and 52 deletions

View File

@ -27,7 +27,6 @@
db::server::server(simulator& simulator, db::server::server(simulator& simulator,
const std::string& name, const std::string& name,
const std::string& server_id,
const std::string& address) const std::string& address)
: simulator_(simulator) : simulator_(simulator)
, storage_engine_(simulator_.params()) , storage_engine_(simulator_.params())
@ -35,7 +34,7 @@ db::server::server(simulator& simulator,
, cond_() , cond_()
, server_service_(*this) , server_service_(*this)
, server_state_(*this, server_service_, , server_state_(*this, server_service_,
name, server_id, address, "dbsim_" + name + "_data") name, address, "dbsim_" + name + "_data")
, last_client_id_(0) , last_client_id_(0)
, last_transaction_id_(0) , last_transaction_id_(0)
, appliers_() , appliers_()

View File

@ -41,7 +41,6 @@ namespace db
public: public:
server(simulator& simulator, server(simulator& simulator,
const std::string& name, const std::string& name,
const std::string& id,
const std::string& address); const std::string& address);
void applier_thread(); void applier_thread();
void start_applier(); void start_applier();

View File

@ -35,7 +35,6 @@ namespace db
server_state(db::server& server, server_state(db::server& server,
wsrep::server_service& server_service, wsrep::server_service& server_service,
const std::string& name, const std::string& name,
const std::string& server_id,
const std::string& address, const std::string& address,
const std::string& working_dir) const std::string& working_dir)
: wsrep::server_state( : wsrep::server_state(
@ -43,7 +42,6 @@ namespace db
cond_, cond_,
server_service, server_service,
name, name,
server_id,
"", "",
address, address,
working_dir, working_dir,

View File

@ -109,7 +109,6 @@ void db::simulator::start()
std::make_unique<db::server>( std::make_unique<db::server>(
*this, *this,
name_os.str(), name_os.str(),
id_os.str(),
address_os.str())))); address_os.str()))));
if (it.second == false) if (it.second == false)
{ {

View File

@ -296,7 +296,7 @@ namespace wsrep
* A method which will be called when the server * A method which will be called when the server
* has been joined to the cluster * has been joined to the cluster
*/ */
void on_connect(const wsrep::gtid& gtid); void on_connect(const wsrep::view& view);
/** /**
* A method which will be called when a view * A method which will be called when a view
@ -540,7 +540,6 @@ namespace wsrep
wsrep::condition_variable& cond, wsrep::condition_variable& cond,
wsrep::server_service& server_service, wsrep::server_service& server_service,
const std::string& name, const std::string& name,
const std::string& id,
const std::string& incoming_address, const std::string& incoming_address,
const std::string& address, const std::string& address,
const std::string& working_dir, const std::string& working_dir,
@ -565,7 +564,7 @@ namespace wsrep
, streaming_appliers_() , streaming_appliers_()
, provider_() , provider_()
, name_(name) , name_(name)
, id_(id) , id_(wsrep::id::undefined())
, incoming_address_(incoming_address) , incoming_address_(incoming_address)
, address_(address) , address_(address)
, working_dir_(working_dir) , working_dir_(working_dir)

View File

@ -30,6 +30,7 @@
#include "seqno.hpp" #include "seqno.hpp"
#include "gtid.hpp" #include "gtid.hpp"
#include <vector> #include <vector>
#include <iostream>
namespace wsrep namespace wsrep
{ {
@ -111,6 +112,8 @@ namespace wsrep
return (members_.empty() && own_index_ == -1); return (members_.empty() && own_index_ == -1);
} }
void print(std::ostream& os) const;
private: private:
wsrep::gtid state_id_; wsrep::gtid state_id_;
wsrep::seqno view_seqno_; wsrep::seqno view_seqno_;
@ -120,6 +123,12 @@ namespace wsrep
int protocol_version_; int protocol_version_;
std::vector<wsrep::view::member> members_; std::vector<wsrep::view::member> members_;
}; };
static inline
std::ostream& operator<<(std::ostream& os, const wsrep::view& v)
{
v.print(os); return os;
}
} }
#endif // WSREP_VIEW #endif // WSREP_VIEW

View File

@ -11,6 +11,7 @@ add_library(wsrep-lib
logger.cpp logger.cpp
provider.cpp provider.cpp
seqno.cpp seqno.cpp
view.cpp
server_state.cpp server_state.cpp
transaction.cpp transaction.cpp
wsrep_provider_v26.cpp) wsrep_provider_v26.cpp)

View File

@ -565,14 +565,38 @@ wsrep::server_state::causal_read(int timeout) const
return provider_->causal_read(timeout); return provider_->causal_read(timeout);
} }
void wsrep::server_state::on_connect(const wsrep::gtid& gtid) void wsrep::server_state::on_connect(const wsrep::view& view)
{ {
// Sanity checks
if (id_.is_undefined() == false)
{
wsrep::log_warning() << "Unexpected connection in connected state. "
<< "Received view: " << view
<< "Previous ID: " << id_;
assert(0);
}
if (view.own_index() < 0 ||
size_t(view.own_index()) >= view.members().size())
{
std::ostringstream os;
os << "Invalid view on connect: own index out of range: " << view;
wsrep::log_error() << os.str();
assert(0);
throw wsrep::runtime_error(os.str());
}
id_ = view.members()[view.own_index()].id();
wsrep::log_info() << "Server " wsrep::log_info() << "Server "
<< name_ << name_
<< " connected to cluster at position " << " connected to cluster at position "
<< gtid; << view.state_id()
<< " with ID "
<< id_;
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
connected_gtid_ = gtid; connected_gtid_ = view.state_id();
state(lock, s_connected); state(lock, s_connected);
} }
@ -599,22 +623,7 @@ void wsrep::server_state::on_view(const wsrep::view& view,
if (view.status() == wsrep::view::primary) if (view.status() == wsrep::view::primary)
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (view.own_index() >= 0)
{
if (id_.is_undefined())
{
// No identifier was passed during server state initialization
// and the ID was generated by the provider.
id_ = view.members()[view.own_index()].id();
}
else
{
// Own identifier must not change between views.
// assert(id_ == view.members()[view.own_index()].id());
}
}
assert(view.final() == false); assert(view.final() == false);
// //
// Reached primary from connected state. This may mean the following // Reached primary from connected state. This may mean the following
// //
@ -703,6 +712,7 @@ void wsrep::server_state::on_view(const wsrep::view& view,
{ {
close_transactions_at_disconnect(*high_priority_service); close_transactions_at_disconnect(*high_priority_service);
} }
id_ = id::undefined();
state(lock, s_disconnected); state(lock, s_disconnected);
} }
else if (state_ != s_disconnecting) else if (state_ != s_disconnecting)

38
src/view.cpp Normal file
View File

@ -0,0 +1,38 @@
/*
* Copyright (C) 2018 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
#include "wsrep/view.hpp"
void wsrep::view::print(std::ostream& os) const
{
os << " id: " << state_id() << "\n"
<< " status: " << status() << "\n"
<< " prococol_version: " << protocol_version() << "\n"
<< " final: " << final() << "\n"
<< " own_index: " << own_index() << "\n"
<< " members(" << members().size() << "):\n";
for (std::vector<wsrep::view::member>::const_iterator i(members().begin());
i != members().end(); ++i)
{
os << "\t" << (i - members().begin()) /* ordinal index */
<< ") id: " << i->id()
<< ", name: " << i->name() << "\n";
}
}

View File

@ -287,7 +287,8 @@ namespace
{ {
return capabilities; return capabilities;
} }
wsrep::view view_from_native(const wsrep_view_info& view_info) wsrep::view view_from_native(const wsrep_view_info& view_info,
const wsrep::id& own_id)
{ {
std::vector<wsrep::view::member> members; std::vector<wsrep::view::member> members;
for (int i(0); i < view_info.memb_num; ++i) for (int i(0); i < view_info.memb_num; ++i)
@ -303,6 +304,25 @@ namespace
sizeof(view_info.members[i].incoming))); sizeof(view_info.members[i].incoming)));
members.push_back(wsrep::view::member(id, name, incoming)); members.push_back(wsrep::view::member(id, name, incoming));
} }
int own_idx(-1);
if (own_id.is_undefined())
{
// If own ID is undefined, obtain it from the view. This is
// the case on the initial connect to cluster.
own_idx = view_info.my_idx;
}
else
{
// If the node has already obtained its ID from cluster,
// its position in the view (or lack thereof) must be determined
// by the ID.
for (size_t i(0); i < members.size(); ++i)
{
if (own_id == members[i].id()) { own_idx = i; break; }
}
}
return wsrep::view( return wsrep::view(
wsrep::gtid( wsrep::gtid(
wsrep::id(view_info.state_id.uuid.data, wsrep::id(view_info.state_id.uuid.data,
@ -311,7 +331,7 @@ namespace
wsrep::seqno(view_info.view), wsrep::seqno(view_info.view),
map_view_status_from_native(view_info.status), map_view_status_from_native(view_info.status),
map_capabilities_from_native(view_info.capabilities), map_capabilities_from_native(view_info.capabilities),
view_info.my_idx, own_idx,
view_info.proto_ver, view_info.proto_ver,
members); members);
} }
@ -325,12 +345,14 @@ namespace
const wsrep_view_info_t* view_info) const wsrep_view_info_t* view_info)
{ {
assert(app_ctx); assert(app_ctx);
wsrep::view view(view_from_native(*view_info));
wsrep::server_state& server_state( wsrep::server_state& server_state(
*reinterpret_cast<wsrep::server_state*>(app_ctx)); *reinterpret_cast<wsrep::server_state*>(app_ctx));
assert(server_state.id().is_undefined());
wsrep::view view(view_from_native(*view_info, server_state.id()));
assert(view.own_index() >= 0);
try try
{ {
server_state.on_connect(view.state_id()); server_state.on_connect(view);
return WSREP_CB_SUCCESS; return WSREP_CB_SUCCESS;
} }
catch (const wsrep::runtime_error& e) catch (const wsrep::runtime_error& e)
@ -354,7 +376,7 @@ namespace
reinterpret_cast<wsrep::high_priority_service*>(recv_ctx)); reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
try try
{ {
wsrep::view view(view_from_native(*view_info)); wsrep::view view(view_from_native(*view_info, server_state.id()));
server_state.on_view(view, high_priority_service); server_state.on_view(view, high_priority_service);
return WSREP_CB_SUCCESS; return WSREP_CB_SUCCESS;
} }

View File

@ -32,11 +32,12 @@ namespace
{ {
replicating_client_fixture_sync_rm() replicating_client_fixture_sync_rm()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service) , sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local) wsrep::client_state::m_local)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
@ -54,11 +55,12 @@ namespace
{ {
replicating_client_fixture_async_rm() replicating_client_fixture_async_rm()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_async, server_service) , sc("s1", wsrep::server_state::rm_async, server_service)
, cc(sc, wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local) wsrep::client_state::m_local)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
@ -76,11 +78,12 @@ namespace
{ {
replicating_client_fixture_2pc() replicating_client_fixture_2pc()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service) , sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local) wsrep::client_state::m_local)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
cc.do_2pc_ = true; cc.do_2pc_ = true;
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
@ -99,11 +102,12 @@ namespace
{ {
replicating_client_fixture_autocommit() replicating_client_fixture_autocommit()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service) , sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, wsrep::client_id(1), , cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local) wsrep::client_state::m_local)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
cc.is_autocommit_ = true; cc.is_autocommit_ = true;
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
@ -122,13 +126,14 @@ namespace
{ {
applying_client_fixture() applying_client_fixture()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", , sc("s1",
wsrep::server_state::rm_async, server_service) wsrep::server_state::rm_async, server_service)
, cc(sc, , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_high_priority) wsrep::client_state::m_high_priority)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
@ -155,13 +160,14 @@ namespace
{ {
applying_client_fixture_2pc() applying_client_fixture_2pc()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", , sc("s1",
wsrep::server_state::rm_async, server_service) wsrep::server_state::rm_async, server_service)
, cc(sc, , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_high_priority) wsrep::client_state::m_high_priority)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
cc.do_2pc_ = true; cc.do_2pc_ = true;
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
@ -189,12 +195,13 @@ namespace
{ {
streaming_client_fixture_row() streaming_client_fixture_row()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service) , sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_local) wsrep::client_state::m_local)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
@ -214,12 +221,13 @@ namespace
{ {
streaming_client_fixture_byte() streaming_client_fixture_byte()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service) , sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_local) wsrep::client_state::m_local)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);
@ -238,12 +246,13 @@ namespace
{ {
streaming_client_fixture_statement() streaming_client_fixture_statement()
: server_service(sc) : server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service) , sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, , cc(sc,
wsrep::client_id(1), wsrep::client_id(1),
wsrep::client_state::m_local) wsrep::client_state::m_local)
, tc(cc.transaction()) , tc(cc.transaction())
{ {
sc.mock_connect();
cc.open(cc.id()); cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0); BOOST_REQUIRE(cc.before_statement() == 0);

View File

@ -176,11 +176,10 @@ namespace wsrep
{ {
public: public:
mock_server_state(const std::string& name, mock_server_state(const std::string& name,
const std::string& id,
enum wsrep::server_state::rollback_mode rollback_mode, enum wsrep::server_state::rollback_mode rollback_mode,
wsrep::server_service& server_service) wsrep::server_service& server_service)
: wsrep::server_state(mutex_, cond_, server_service, : wsrep::server_state(mutex_, cond_, server_service,
name, id, "", "", "./", name, "", "", "./",
wsrep::gtid::undefined(), wsrep::gtid::undefined(),
1, 1,
rollback_mode) rollback_mode)
@ -191,6 +190,48 @@ namespace wsrep
wsrep::mock_provider& provider() const WSREP_OVERRIDE wsrep::mock_provider& provider() const WSREP_OVERRIDE
{ return provider_; } { return provider_; }
// mock connected state for tests without overriding the connect()
// method.
int mock_connect(const std::string& own_id,
const std::string& cluster_name,
const std::string& cluster_address,
const std::string& state_donor,
bool bootstrap)
{
int const ret(server_state::connect(cluster_name,
cluster_address,
state_donor,
bootstrap));
if (0 == ret)
{
wsrep::id cluster_id("1");
wsrep::gtid state_id(cluster_id, wsrep::seqno(0));
std::vector<wsrep::view::member> members;
members.push_back(wsrep::view::member(wsrep::id(own_id),
"name", ""));
wsrep::view bootstrap_view(state_id,
wsrep::seqno(1),
wsrep::view::primary,
0,
0,
1,
members);
server_state::on_connect(bootstrap_view);
}
else
{
assert(0);
}
return ret;
}
int mock_connect()
{
return mock_connect(name(), "cluster", "local", "0", false);
}
private: private:
wsrep::default_mutex mutex_; wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_; wsrep::default_condition_variable cond_;

View File

@ -23,11 +23,11 @@
namespace namespace
{ {
struct applying_server_fixture struct server_fixture_base
{ {
applying_server_fixture() server_fixture_base()
: server_service(ss) : server_service(ss)
, ss("s1", "s1", , ss("s1",
wsrep::server_state::rm_sync, server_service) wsrep::server_state::rm_sync, server_service)
, cc(ss, , cc(ss,
wsrep::client_id(1), wsrep::client_id(1),
@ -52,19 +52,28 @@ namespace
wsrep::ws_meta ws_meta; wsrep::ws_meta ws_meta;
}; };
struct sst_first_server_fixture : applying_server_fixture struct applying_server_fixture : server_fixture_base
{
applying_server_fixture()
: server_fixture_base()
{
ss.mock_connect();
}
};
struct sst_first_server_fixture : server_fixture_base
{ {
sst_first_server_fixture() sst_first_server_fixture()
: applying_server_fixture() : server_fixture_base()
{ {
server_service.sst_before_init_ = true; server_service.sst_before_init_ = true;
} }
}; };
struct init_first_server_fixture : applying_server_fixture struct init_first_server_fixture : server_fixture_base
{ {
init_first_server_fixture() init_first_server_fixture()
: applying_server_fixture() : server_fixture_base()
{ {
server_service.sst_before_init_ = false; server_service.sst_before_init_ = false;
} }
@ -194,7 +203,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_sst_first_boostrap,
1, 1,
members); members);
BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0); BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0);
ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0))); ss.on_connect(bootstrap_view);
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
server_service.sync_point_enabled_ = "on_view_wait_initialized"; server_service.sync_point_enabled_ = "on_view_wait_initialized";
server_service.sync_point_action_ = server_service.spa_initialize; server_service.sync_point_action_ = server_service.spa_initialize;
@ -222,7 +231,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_init_first_boostrap,
ss.initialized(); ss.initialized();
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_initialized); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_initialized);
BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0); BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0);
ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0))); ss.on_connect(bootstrap_view);
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
ss.on_view(bootstrap_view, &hps); ss.on_view(bootstrap_view, &hps);
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined);