1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Call before_prepare()/after_prepare() for applier in before_commit()

if no 2PC is required.
This commit is contained in:
Teemu Ollakka
2018-06-13 15:34:49 +03:00
parent 4ccd1490d4
commit 4c1ea388f8
3 changed files with 81 additions and 16 deletions

View File

@ -132,7 +132,8 @@ int wsrep::transaction_context::before_prepare(
assert(lock.owns_lock());
int ret(0);
debug_log_state("before_prepare_enter");
assert(state() == s_executing || state() == s_must_abort);
assert(state() == s_executing || state() == s_must_abort ||
state() == s_replaying);
if (state() == s_must_abort)
{
@ -141,7 +142,10 @@ int wsrep::transaction_context::before_prepare(
return 1;
}
if (state() != s_replaying)
{
state(lock, s_preparing);
}
switch (client_context_.mode())
{
@ -168,14 +172,17 @@ int wsrep::transaction_context::before_prepare(
break;
case wsrep::client_context::m_local:
case wsrep::client_context::m_applier:
if (is_streaming())
{
client_context_.remove_fragments(*this);
}
break;
default:
assert(0);
break;
}
assert(state() == s_preparing);
assert(state() == s_preparing || state() == s_replaying);
debug_log_state("before_prepare_leave");
return ret;
}
@ -186,7 +193,8 @@ int wsrep::transaction_context::after_prepare(
assert(lock.owns_lock());
int ret(1);
debug_log_state("after_prepare_enter");
assert(state() == s_preparing || state() == s_must_abort);
assert(state() == s_preparing || state() == s_must_abort ||
state() == s_replaying);
if (state() == s_must_abort)
{
assert(client_context_.mode() == wsrep::client_context::m_replicating);
@ -205,8 +213,11 @@ int wsrep::transaction_context::after_prepare(
break;
case wsrep::client_context::m_local:
case wsrep::client_context::m_applier:
if (state() != s_replaying)
{
state(lock, s_certifying);
state(lock, s_committing);
}
ret = 0;
break;
default:
@ -241,6 +252,7 @@ int wsrep::transaction_context::before_commit()
case wsrep::client_context::m_replicating:
if (state() == s_executing)
{
assert(client_context_.do_2pc() == false);
ret = before_prepare(lock) || after_prepare(lock);
assert((ret == 0 && state() == s_committing)
||
@ -297,9 +309,13 @@ int wsrep::transaction_context::before_commit()
assert(ordered());
if (client_context_.do_2pc() == false)
{
client_context_.remove_fragments(*this);
ret = before_prepare(lock) || after_prepare(lock);
}
ret = provider_.commit_order_enter(ws_handle_, ws_meta_);
else
{
ret = 0;
}
ret = ret || provider_.commit_order_enter(ws_handle_, ws_meta_);
if (ret)
{
state(lock, s_must_abort);

View File

@ -51,6 +51,25 @@ namespace
const wsrep::transaction_context& tc;
};
struct replicating_client_fixture_2pc
{
replicating_client_fixture_2pc()
: sc("s1", "s1", wsrep::server_context::rm_sync)
, cc(sc, wsrep::client_id(1),
wsrep::client_context::m_replicating, false, true)
, tc(cc.transaction())
{
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing);
}
wsrep::fake_server_context sc;
wsrep::fake_client_context cc;
const wsrep::transaction_context& tc;
};
struct replicating_client_fixture_autocommit
{
replicating_client_fixture_autocommit()
@ -100,6 +119,36 @@ namespace
const wsrep::transaction_context& tc;
};
struct applying_client_fixture_2pc
{
applying_client_fixture_2pc()
: sc("s1", "s1",
wsrep::server_context::rm_async)
, cc(sc,
wsrep::client_id(1),
wsrep::client_context::m_applier, false, true)
, tc(cc.transaction())
{
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
wsrep::ws_handle ws_handle(1, (void*)1);
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
wsrep::stid(sc.id(), 1, cc.id()),
wsrep::seqno(0),
wsrep::provider::flag::start_transaction |
wsrep::provider::flag::commit);
BOOST_REQUIRE(cc.start_transaction(ws_handle, ws_meta) == 0);
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(cc.start_transaction() == 0);
BOOST_REQUIRE(tc.active() == true);
BOOST_REQUIRE(tc.certified() == true);
BOOST_REQUIRE(tc.ordered() == true);
}
wsrep::fake_server_context sc;
wsrep::fake_client_context cc;
const wsrep::transaction_context& tc;
};
struct streaming_client_fixture_row
{
streaming_client_fixture_row()

View File

@ -8,7 +8,7 @@
// Test a succesful 2PC transaction lifecycle
//
BOOST_FIXTURE_TEST_CASE(transaction_context_2pc,
replicating_client_fixture_sync_rm)
replicating_client_fixture_2pc)
{
cc.start_transaction(1);
BOOST_REQUIRE(tc.active());
@ -36,7 +36,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_2pc,
//
BOOST_FIXTURE_TEST_CASE(
transaction_context_2pc_bf_before_before_prepare,
replicating_client_fixture_sync_rm)
replicating_client_fixture_2pc)
{
cc.start_transaction(1);
BOOST_REQUIRE(tc.active());
@ -63,7 +63,7 @@ BOOST_FIXTURE_TEST_CASE(
//
BOOST_FIXTURE_TEST_CASE(
transaction_context_2pc_bf_before_after_prepare,
replicating_client_fixture_sync_rm)
replicating_client_fixture_2pc)
{
cc.start_transaction(1);
BOOST_REQUIRE(tc.active());
@ -93,7 +93,7 @@ BOOST_FIXTURE_TEST_CASE(
//
BOOST_FIXTURE_TEST_CASE(
transaction_context_2pc_bf_after_after_prepare,
replicating_client_fixture_sync_rm)
replicating_client_fixture_2pc)
{
cc.start_transaction(1);
BOOST_REQUIRE(tc.active());
@ -121,7 +121,7 @@ BOOST_FIXTURE_TEST_CASE(
//
BOOST_FIXTURE_TEST_CASE(
transaction_context_2pc_bf_before_before_commit,
replicating_client_fixture_sync_rm)
replicating_client_fixture_2pc)
{
cc.start_transaction(1);
BOOST_REQUIRE(tc.active());
@ -154,7 +154,7 @@ BOOST_FIXTURE_TEST_CASE(
//
BOOST_FIXTURE_TEST_CASE(
transaction_context_2pc_bf_during_commit_order_enter,
replicating_client_fixture_sync_rm)
replicating_client_fixture_2pc)
{
cc.start_transaction(1);
BOOST_REQUIRE(tc.active());
@ -227,7 +227,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_streaming_2pc_commit_two_statements,
///////////////////////////////////////////////////////////////////////////////
BOOST_FIXTURE_TEST_CASE(transaction_context_2pc_applying,
applying_client_fixture)
applying_client_fixture_2pc)
{
BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_preparing);