1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-24 10:42:31 +03:00

Deal with init before SST case.

This commit is contained in:
Teemu Ollakka
2018-06-29 17:02:35 +03:00
parent 0851970c53
commit c35f59cb5b
6 changed files with 139 additions and 36 deletions

View File

@ -67,3 +67,8 @@ void db::server_service::log_view(const wsrep::view&)
{
wsrep::log_info() << "View";
}
void db::server_service::debug_sync(const char*)
{
}

View File

@ -27,6 +27,7 @@ namespace db
void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
override;
void log_view(const wsrep::view&) override;
void debug_sync(const char*) override;
private:
db::server& server_;
};

View File

@ -124,7 +124,10 @@ namespace wsrep
const wsrep::gtid& gtid,
bool bypass) = 0;
/**
* Provide a server level debug sync point for a caller.
*/
virtual void debug_sync(const char* sync_point) = 0;
};
}

View File

@ -443,17 +443,24 @@ void wsrep::server_state::initialized()
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
wsrep::log_info() << "Server initialized";
init_initialized_ = true;
state(lock, s_initialized);
if (sst_gtid_.is_undefined() == false &&
server_service_.sst_before_init())
if (server_service_.sst_before_init())
{
lock.unlock();
if (provider().sst_received(sst_gtid_, 0))
state(lock, s_initialized);
if (sst_gtid_.is_undefined() == false)
{
throw wsrep::runtime_error("SST received failed");
lock.unlock();
if (provider().sst_received(sst_gtid_, 0))
{
throw wsrep::runtime_error("SST received failed");
}
lock.lock();
sst_gtid_ = wsrep::gtid::undefined();
}
lock.lock();
sst_gtid_ = wsrep::gtid::undefined();
}
else
{
state(lock, s_initializing);
state(lock, s_initialized);
}
}
@ -545,14 +552,27 @@ void wsrep::server_state::on_view(const wsrep::view& view)
// all states leading to joined to notify possible state
// waiters in other threads.
//
if (state_ == s_connected)
if (server_service_.sst_before_init())
{
state(lock, s_joiner);
state(lock, s_initializing);
if (state_ == s_connected)
{
state(lock, s_joiner);
state(lock, s_initializing);
}
}
else
{
if (state_ == s_connected)
{
state(lock, s_joiner);
}
}
if (init_initialized_ == false)
{
lock.unlock();
server_service_.debug_sync("on_view_wait_initialized");
lock.lock();
wait_until_state(lock, s_initialized);
}
assert(init_initialized_);
@ -563,12 +583,26 @@ void wsrep::server_state::on_view(const wsrep::view& view)
bootstrap_ = false;
}
if (state_ == s_initialized)
if (server_service_.sst_before_init())
{
state(lock, s_joined);
if (init_synced_)
if (state_ == s_initialized)
{
state(lock, s_synced);
state(lock, s_joined);
if (init_synced_)
{
state(lock, s_synced);
}
}
}
else
{
if (state_ == s_joiner)
{
state(lock, s_joined);
if (init_synced_)
{
state(lock, s_synced);
}
}
}
}

View File

@ -23,6 +23,8 @@ namespace wsrep
enum wsrep::server_state::rollback_mode rollback_mode)
: wsrep::server_state(mutex_, cond_, *this,
name, id, "", "./", wsrep::gtid::undefined(), 1, rollback_mode)
, sync_point_enabled_()
, sync_point_action_()
, sst_before_init_()
, mutex_()
, cond_()
@ -82,6 +84,24 @@ namespace wsrep
client_state.after_rollback();
}
void debug_sync(const char* sync_point) WSREP_OVERRIDE
{
if (sync_point_enabled_ == sync_point)
{
switch (sync_point_action_)
{
case spa_initialize:
initialized();
break;
}
}
}
std::string sync_point_enabled_;
enum sync_point_action
{
spa_initialize
} sync_point_action_;
bool sst_before_init_;
private:

View File

@ -11,9 +11,9 @@ namespace
struct applying_server_fixture
{
applying_server_fixture()
: sc("s1", "s1",
: ss("s1", "s1",
wsrep::server_state::rm_sync)
, cc(sc,
, cc(ss,
wsrep::client_id(1),
wsrep::client_state::m_high_priority)
, ws_handle(1, (void*)1)
@ -25,7 +25,7 @@ namespace
{
cc.open(cc.id());
}
wsrep::mock_server_state sc;
wsrep::mock_server_state ss;
wsrep::mock_client cc;
wsrep::ws_handle ws_handle;
wsrep::ws_meta ws_meta;
@ -36,9 +36,19 @@ namespace
sst_first_server_fixture()
: applying_server_fixture()
{
sc.sst_before_init_ = true;
ss.sst_before_init_ = true;
}
};
struct init_first_server_fixture : applying_server_fixture
{
init_first_server_fixture()
: applying_server_fixture()
{
ss.sst_before_init_ = false;
}
};
}
// Test on_apply() method for 1pc
@ -46,7 +56,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc,
applying_server_fixture)
{
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 0);
const wsrep::transaction& txc(cc.transaction());
// ::abort();
@ -60,7 +70,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc,
applying_server_fixture)
{
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 0);
const wsrep::transaction& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction::s_committed);
@ -73,7 +83,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback,
{
cc.fail_next_applying_ = true;
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 1);
const wsrep::transaction& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted);
@ -86,7 +96,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback,
{
cc.fail_next_applying_ = true;
char buf[1] = { 1 };
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer(buf, 1)) == 1);
const wsrep::transaction& txc(cc.transaction());
BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted);
@ -94,9 +104,9 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback,
BOOST_AUTO_TEST_CASE(server_state_streaming)
{
wsrep::mock_server_state sc("s1", "s1",
wsrep::mock_server_state ss("s1", "s1",
wsrep::server_state::rm_sync);
wsrep::mock_client cc(sc,
wsrep::mock_client cc(ss,
wsrep::client_id(1),
wsrep::client_state::m_high_priority);
wsrep::ws_handle ws_handle(1, (void*)1);
@ -105,23 +115,23 @@ BOOST_AUTO_TEST_CASE(server_state_streaming)
wsrep::seqno(0),
wsrep::provider::flag::start_transaction);
cc.open(cc.id());
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(sc.find_streaming_applier(
BOOST_REQUIRE(ss.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(1),
0);
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
wsrep::stid(wsrep::id("1"), 1, 1),
wsrep::seqno(1),
wsrep::provider::flag::commit);
BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta,
BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(sc.find_streaming_applier(
BOOST_REQUIRE(ss.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
}
@ -164,9 +174,39 @@ BOOST_FIXTURE_TEST_CASE(server_state_sst_first_boostrap,
0,
1,
members);
BOOST_REQUIRE(sc.connect("cluster", "local", "0", false) == 0);
sc.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0)));
// @todo Blocks on state wait, need to figure out a way to avoid
// that.
// sc.on_view(bootstrap_view);
BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0);
ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0)));
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
ss.sync_point_enabled_ = "on_view_wait_initialized";
ss.sync_point_action_ = ss.spa_initialize;
ss.on_view(bootstrap_view);
ss.sync_point_enabled_ = "";
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined);
ss.on_sync();
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_synced);
}
BOOST_FIXTURE_TEST_CASE(server_state_init_first_boostrap,
init_first_server_fixture)
{
wsrep::id cluster_id("1");
wsrep::gtid state_id(cluster_id, wsrep::seqno(0));
std::vector<wsrep::view::member> members;
members.push_back(wsrep::view::member(wsrep::id("1"), "name", ""));
wsrep::view bootstrap_view(state_id,
wsrep::seqno(1),
wsrep::view::primary,
0,
0,
1,
members);
ss.initialized();
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_initialized);
BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0);
ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0)));
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
ss.on_view(bootstrap_view);
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined);
ss.on_sync();
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_synced);
}