1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-21 12:22:06 +03:00

Add provider position field to ws_meta and view

Provider position is needed in coordinated recovery
between application and provider. Pass the position
info from provider to application to allow making
it durable.
This commit is contained in:
Teemu Ollakka
2022-03-22 17:43:52 +02:00
parent 51e162d6f4
commit 7498ed424a
13 changed files with 123 additions and 86 deletions

View File

@ -134,7 +134,8 @@ wsrep::view db::server_service::get_view(wsrep::client_service&,
stored_view.capabilities(),
my_idx,
stored_view.protocol_version(),
stored_view.members()
stored_view.members(),
0
);
return my_view;
}

View File

@ -124,21 +124,25 @@ namespace wsrep
, stid_()
, depends_on_()
, flags_()
, provider_position_()
{ }
ws_meta(const wsrep::gtid& gtid,
const wsrep::stid& stid,
wsrep::seqno depends_on,
int flags)
int flags,
int64_t provider_position)
: gtid_(gtid)
, stid_(stid)
, depends_on_(depends_on)
, flags_(flags)
, provider_position_(provider_position)
{ }
ws_meta(const wsrep::stid& stid)
: gtid_()
, stid_(stid)
, depends_on_()
, flags_()
, provider_position_()
{ }
const wsrep::gtid& gtid() const { return gtid_; }
const wsrep::id& group_id() const
@ -170,6 +174,8 @@ namespace wsrep
wsrep::seqno depends_on() const { return depends_on_; }
int64_t provider_position() const { return provider_position_; }
int flags() const { return flags_; }
bool operator==(const ws_meta& other) const
@ -186,6 +192,8 @@ namespace wsrep
wsrep::stid stid_;
wsrep::seqno depends_on_;
int flags_;
/** Field reserved for provider to report its internal position. */
int64_t provider_position_;
};
std::string flags_to_string(int flags);

View File

@ -70,6 +70,7 @@ namespace wsrep
, capabilities_()
, own_index_(-1)
, protocol_version_(0)
, provider_position_()
, members_()
{ }
view(const wsrep::gtid& state_id,
@ -78,13 +79,15 @@ namespace wsrep
int capabilities,
ssize_t own_index,
int protocol_version,
const std::vector<wsrep::view::member>& members)
const std::vector<wsrep::view::member>& members,
int64_t provider_position)
: state_id_(state_id)
, view_seqno_(view_seqno)
, status_(status)
, capabilities_(capabilities)
, own_index_(own_index)
, protocol_version_(protocol_version)
, provider_position_(provider_position)
, members_(members)
{ }
@ -111,6 +114,9 @@ namespace wsrep
int protocol_version() const
{ return protocol_version_; }
int64_t provider_position() const
{ return provider_position_; }
const std::vector<member>& members() const
{ return members_; }
@ -147,6 +153,8 @@ namespace wsrep
int capabilities_;
ssize_t own_index_;
int protocol_version_;
/** Field reserved for provider to report its internal position. */
int64_t provider_position_;
std::vector<wsrep::view::member> members_;
};

View File

@ -1519,7 +1519,7 @@ void wsrep::server_state::close_orphaned_sr_transactions(
wsrep::ws_meta ws_meta(
wsrep::gtid(),
wsrep::stid(server_id, transaction_id, wsrep::client_id()),
wsrep::seqno::undefined(), 0);
wsrep::seqno::undefined(), 0, 0);
lock.unlock();
if (adopt_error == 0)
{

View File

@ -241,7 +241,7 @@ namespace
sizeof(trx_meta_.stid.node.data)),
wsrep::transaction_id(trx_meta_.stid.trx),
wsrep::client_id(trx_meta_.stid.conn)),
seqno_from_native(trx_meta_.depends_on), flags_);
seqno_from_native(trx_meta_.depends_on), flags_, 0);
}
wsrep_trx_meta* native() { return &trx_meta_; }
@ -346,7 +346,7 @@ namespace
map_capabilities_from_native(view_info.capabilities),
own_idx,
view_info.proto_ver,
members);
members, 0);
}
/////////////////////////////////////////////////////////////////////
@ -500,7 +500,7 @@ namespace
wsrep::transaction_id(meta->stid.trx),
wsrep::client_id(meta->stid.conn)),
wsrep::seqno(seqno_from_native(meta->depends_on)),
map_flags_from_native(flags));
map_flags_from_native(flags), 0);
try
{
if (high_priority_service->apply(ws_handle, ws_meta, data))

View File

@ -171,13 +171,13 @@ namespace
wsrep::seqno seqno)
{
wsrep::ws_handle ws_handle(id, (void*)1);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), seqno),
wsrep::stid(sc.id(),
wsrep::transaction_id(1),
cc.id()),
wsrep::ws_meta ws_meta(
wsrep::gtid(wsrep::id("1"), seqno),
wsrep::stid(sc.id(), wsrep::transaction_id(1), cc.id()),
wsrep::seqno(0),
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit);
wsrep::provider::flag::start_transaction
| wsrep::provider::flag::commit,
0);
BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0);
BOOST_REQUIRE(tc.active() == true);
BOOST_REQUIRE(tc.certified() == true);
@ -207,13 +207,13 @@ namespace
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
wsrep::ws_handle ws_handle(wsrep::transaction_id(1), (void*)1);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
wsrep::stid(sc.id(),
wsrep::transaction_id(1),
cc.id()),
wsrep::ws_meta ws_meta(
wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
wsrep::stid(sc.id(), wsrep::transaction_id(1), cc.id()),
wsrep::seqno(0),
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit);
wsrep::provider::flag::start_transaction
| wsrep::provider::flag::commit,
0);
BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0);
BOOST_REQUIRE(tc.active() == true);
BOOST_REQUIRE(tc.certified() == true);

View File

@ -116,9 +116,8 @@ namespace wsrep
{
++group_seqno_;
wsrep::gtid gtid(group_id_, wsrep::seqno(group_seqno_));
ws_meta = wsrep::ws_meta(gtid, stid,
wsrep::seqno(group_seqno_ - 1),
flags);
ws_meta = wsrep::ws_meta(
gtid, stid, wsrep::seqno(group_seqno_ - 1), flags, 0);
return wsrep::provider::success;
}
else
@ -127,16 +126,15 @@ namespace wsrep
if (it->second.is_undefined())
{
ws_meta = wsrep::ws_meta(wsrep::gtid(), wsrep::stid(),
wsrep::seqno::undefined(), 0);
wsrep::seqno::undefined(), 0, 0);
ret = wsrep::provider::error_certification_failed;
}
else
{
++group_seqno_;
wsrep::gtid gtid(group_id_, wsrep::seqno(group_seqno_));
ws_meta = wsrep::ws_meta(gtid, stid,
wsrep::seqno(group_seqno_ - 1),
flags);
ws_meta = wsrep::ws_meta(
gtid, stid, wsrep::seqno(group_seqno_ - 1), flags, 0);
ret = wsrep::provider::error_bf_abort;
}
bf_abort_map_.erase(it);
@ -215,8 +213,9 @@ namespace wsrep
wsrep::gtid(group_id_, wsrep::seqno(group_seqno_)),
wsrep::stid(server_id_, tc.id(), cc.id()),
wsrep::seqno(group_seqno_ - 1),
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit);
wsrep::provider::flag::start_transaction
| wsrep::provider::flag::commit,
0);
}
else
{
@ -245,12 +244,10 @@ namespace wsrep
{
++group_seqno_;
wsrep::gtid gtid(group_id_, wsrep::seqno(group_seqno_));
wsrep::stid stid(server_id_,
wsrep::transaction_id::undefined(),
wsrep::stid stid(server_id_, wsrep::transaction_id::undefined(),
client_id);
toi_meta = wsrep::ws_meta(gtid, stid,
wsrep::seqno(group_seqno_ - 1),
flags);
wsrep::seqno(group_seqno_ - 1), flags, 0);
++toi_write_sets_;
if (flags & wsrep::provider::flag::start_transaction)
++toi_start_transaction_;

View File

@ -147,7 +147,8 @@ namespace wsrep
logged_view_.capabilities(),
my_idx,
logged_view_.protocol_version(),
logged_view_.members()
logged_view_.members(),
0
);
return my_view;
}
@ -292,7 +293,8 @@ namespace wsrep
0,
0,
1,
members);
members,
0);
server_state::on_connect(bootstrap_view);
}
else

View File

@ -103,13 +103,11 @@ BOOST_FIXTURE_TEST_CASE(test_applying_nbo,
wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1));
const int nbo_begin_flags(wsrep::provider::flag::start_transaction |
wsrep::provider::flag::isolation);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"),
wsrep::seqno(1)),
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"), wsrep::seqno(1)),
wsrep::stid(wsrep::id("s1"),
wsrep::transaction_id::undefined(),
wsrep::client_id(1)),
wsrep::seqno(0),
nbo_begin_flags);
wsrep::seqno(0), nbo_begin_flags, 0);
std::string nbo_begin("nbo_begin");
BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer(nbo_begin.data(),
@ -156,13 +154,11 @@ BOOST_FIXTURE_TEST_CASE(test_applying_nbo_fail,
wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1));
const int nbo_begin_flags(wsrep::provider::flag::start_transaction |
wsrep::provider::flag::isolation);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"),
wsrep::seqno(1)),
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"), wsrep::seqno(1)),
wsrep::stid(wsrep::id("s1"),
wsrep::transaction_id::undefined(),
wsrep::client_id(1)),
wsrep::seqno(0),
nbo_begin_flags);
wsrep::seqno(0), nbo_begin_flags, 0);
std::string nbo_begin("nbo_begin");
hps.fail_next_toi_ = true;
BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta,

View File

@ -39,7 +39,8 @@ namespace
wsrep::client_id(1)),
wsrep::seqno(0),
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit)
wsrep::provider::flag::commit,
0)
, cluster_id("1")
, bootstrap_view()
, second_view()
@ -55,7 +56,8 @@ namespace
0, // capabilities
0, // own index
1, // protocol version
members);
members,
0);
members.push_back(wsrep::view::member(
wsrep::id("s2"), "s2", ""));
@ -65,7 +67,8 @@ namespace
0, // capabilities
1, // own index
1, // protocol version
members);
members,
0);
members.push_back(wsrep::view::member(
wsrep::id("s3"), "s3", ""));
@ -76,7 +79,8 @@ namespace
0, // capabilities
1, // own index
1, // protocol version
members);
members,
0);
cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0);
@ -114,7 +118,8 @@ namespace
0, // capabilities
-1, // own_index
0, // protocol ver
std::vector<wsrep::view::member>() // members
std::vector<wsrep::view::member>(),// members
0
);
ss.on_view(view, &hps);
}
@ -272,7 +277,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_streaming, applying_server_fixture)
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(0),
wsrep::provider::flag::start_transaction);
wsrep::provider::flag::start_transaction,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(ss.find_streaming_applier(
@ -282,6 +288,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_streaming, applying_server_fixture)
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(1),
0,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
@ -290,7 +297,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_streaming, applying_server_fixture)
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(1),
wsrep::provider::flag::commit);
wsrep::provider::flag::commit,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta,
wsrep::const_buffer("1", 1)) == 0);
BOOST_REQUIRE(ss.find_streaming_applier(
@ -395,7 +403,8 @@ BOOST_FIXTURE_TEST_CASE(
0, // capabilities
0, // own index
1, // protocol version
members);
members,
0);
ss.on_connect(view);
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
// As storage engines have been initialized, there should not be
@ -556,7 +565,8 @@ BOOST_FIXTURE_TEST_CASE(
0, // capabilities
0, // own index
1, // protocol version
members);
members,
0);
ss.on_connect(view);
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
// As storage engines have been initialized, there should not be
@ -688,7 +698,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_close_orphaned_transactions,
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(1),
wsrep::provider::flag::start_transaction);
wsrep::provider::flag::start_transaction,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, meta_s2,
wsrep::const_buffer("1", 1)) == 0);
@ -701,7 +712,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_close_orphaned_transactions,
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(2),
wsrep::provider::flag::start_transaction);
wsrep::provider::flag::start_transaction,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, meta_s3,
wsrep::const_buffer("1", 1)) == 0);
@ -717,7 +729,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_close_orphaned_transactions,
0, // capabilities
0, // own index
1, // protocol version
members), &hps);
members,
0), &hps);
// transaction from s2 is still present
BOOST_REQUIRE(ss.find_streaming_applier(
@ -735,7 +748,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_close_orphaned_transactions,
0, // capabilities
0, // own index
1, // protocol version
members), &hps);
members, 0), &hps);
// no streaming appliers are closed on non-primary view,
// so transaction from s2 is still present
@ -750,7 +763,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_close_orphaned_transactions,
0, // capabilities
0, // own index
1, // protocol version
members), &hps);
members,
0), &hps);
// transaction s2 is still present after non-primary view
BOOST_REQUIRE(ss.find_streaming_applier(
@ -764,7 +778,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_close_orphaned_transactions,
0, // capabilities
0, // own index
1, // protocol version
members), &hps);
members,
0), &hps);
// finally, transaction from s2 is still present (part of primary view)
// and transaction from s3 is gone
@ -779,7 +794,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_close_orphaned_transactions,
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(3),
wsrep::provider::flag::commit);
wsrep::provider::flag::commit,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, meta_commit_s2,
wsrep::const_buffer("1", 1)) == 0);
@ -806,7 +822,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_equal_consecutive_views,
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(1),
wsrep::provider::flag::start_transaction);
wsrep::provider::flag::start_transaction,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, meta_s2,
wsrep::const_buffer("1", 1)) == 0);
@ -819,7 +836,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_equal_consecutive_views,
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(2),
wsrep::provider::flag::start_transaction);
wsrep::provider::flag::start_transaction,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, meta_s3,
wsrep::const_buffer("1", 1)) == 0);
@ -833,7 +851,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_equal_consecutive_views,
0, // capabilities
0, // own index
1, // protocol version
ss.current_view().members()), &hps);
ss.current_view().members(),
0), &hps);
// transaction from s2 and s3 are gone
BOOST_REQUIRE(not ss.find_streaming_applier(
@ -862,7 +881,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_xa_not_orphaned,
wsrep::client_id(1)),
wsrep::seqno(1),
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::prepare);
wsrep::provider::flag::prepare,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, meta_s3,
wsrep::const_buffer("1", 1)) == 0);
@ -879,7 +899,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_xa_not_orphaned,
0, // capabilities
0, // own index
1, // protocol version
members), &hps);
members,
0), &hps);
// transaction from s3 is still present
BOOST_REQUIRE(ss.find_streaming_applier(
@ -893,7 +914,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_xa_not_orphaned,
0, // capabilities
0, // own index
1, // protocol version
members), &hps);
members,
0), &hps);
// transaction from s3 is still present
BOOST_REQUIRE(ss.find_streaming_applier(
@ -905,7 +927,8 @@ BOOST_FIXTURE_TEST_CASE(server_state_xa_not_orphaned,
wsrep::transaction_id(1),
wsrep::client_id(1)),
wsrep::seqno(3),
wsrep::provider::flag::commit);
wsrep::provider::flag::commit,
0);
BOOST_REQUIRE(ss.on_apply(hps, ws_handle, meta_commit_s3,
wsrep::const_buffer("1", 1)) == 0);

View File

@ -58,13 +58,10 @@ void wsrep_test::terminate_streaming_applier(
mc.before_command();
wsrep::mock_high_priority_service hps(sc, &mc, false);
wsrep::ws_handle ws_handle(transaction_id, (void*)(1));
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"),
wsrep::seqno(100)),
wsrep::stid(server_id,
transaction_id,
wsrep::client_id(1)),
wsrep::seqno(0),
wsrep::provider::flag::rollback);
wsrep::ws_meta ws_meta(
wsrep::gtid(wsrep::id("cluster1"), wsrep::seqno(100)),
wsrep::stid(server_id, transaction_id, wsrep::client_id(1)),
wsrep::seqno(0), wsrep::provider::flag::rollback, 0);
wsrep::const_buffer data(0, 0);
sc.on_apply(hps, ws_handle, ws_meta, data);
}

View File

@ -50,13 +50,13 @@ BOOST_FIXTURE_TEST_CASE(test_toi_applying,
applying_client_fixture)
{
BOOST_REQUIRE(cc.toi_mode() == wsrep::client_state::m_undefined);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
wsrep::stid(sc.id(),
wsrep::transaction_id::undefined(),
cc.id()),
wsrep::ws_meta ws_meta(
wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
wsrep::stid(sc.id(), wsrep::transaction_id::undefined(), cc.id()),
wsrep::seqno(1),
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit);
wsrep::provider::flag::start_transaction
| wsrep::provider::flag::commit,
0);
cc.enter_toi_mode(ws_meta);
BOOST_REQUIRE(cc.in_toi());
BOOST_REQUIRE(cc.toi_mode() == wsrep::client_state::m_high_priority);

View File

@ -33,7 +33,8 @@ BOOST_AUTO_TEST_CASE(view_test_member_index)
0,
1,
0,
members);
members,
0);
BOOST_REQUIRE(view.member_index(wsrep::id("1")) == 0);
BOOST_REQUIRE(view.member_index(wsrep::id("2")) == 1);
BOOST_REQUIRE(view.member_index(wsrep::id("3")) == 2);
@ -64,7 +65,8 @@ BOOST_AUTO_TEST_CASE(view_test_equal_membership)
0,
1,
0,
m1);
m1,
0);
wsrep::view v2(wsrep::gtid(wsrep::id("cluster"), wsrep::seqno(1)),
wsrep::seqno(1),
@ -72,7 +74,8 @@ BOOST_AUTO_TEST_CASE(view_test_equal_membership)
0,
1,
0,
m2);
m2,
0);
wsrep::view v3(wsrep::gtid(wsrep::id("cluster"), wsrep::seqno(1)),
wsrep::seqno(1),
@ -80,7 +83,8 @@ BOOST_AUTO_TEST_CASE(view_test_equal_membership)
0,
1,
0,
m3);
m3,
0);
BOOST_REQUIRE(v1.equal_membership(v2));
BOOST_REQUIRE(v2.equal_membership(v1));
@ -97,7 +101,8 @@ BOOST_AUTO_TEST_CASE(view_test_is_member)
1,
0,
{ wsrep::view::member(wsrep::id("1"), "", ""),
wsrep::view::member(wsrep::id("2"), "", "") });
wsrep::view::member(wsrep::id("2"), "", "") },
0);
BOOST_REQUIRE(view.is_member(wsrep::id("2")));
BOOST_REQUIRE(view.is_member(wsrep::id("1")));