mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-06-14 15:02:27 +03:00
Fixes required to 1PC streaming replay to pass.
This commit is contained in:
@ -164,9 +164,23 @@ int wsrep::server_context::on_apply(
|
|||||||
assert(ret ||
|
assert(ret ||
|
||||||
txc.state() == wsrep::transaction_context::s_committed);
|
txc.state() == wsrep::transaction_context::s_committed);
|
||||||
}
|
}
|
||||||
|
else if (commits_transaction(ws_meta.flags()))
|
||||||
|
{
|
||||||
|
if (not_replaying)
|
||||||
|
{
|
||||||
|
// SR commit not implemented yet
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ret = client_context.start_replaying() ||
|
||||||
|
client_context.apply(wsrep::const_buffer()) ||
|
||||||
|
client_context.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// SR not implemented yet
|
// SR fragment applying not implemented yet
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
if (not_replaying)
|
if (not_replaying)
|
||||||
|
@ -168,6 +168,7 @@ int wsrep::transaction_context::before_prepare(
|
|||||||
break;
|
break;
|
||||||
case wsrep::client_context::m_local:
|
case wsrep::client_context::m_local:
|
||||||
case wsrep::client_context::m_applier:
|
case wsrep::client_context::m_applier:
|
||||||
|
client_context_.remove_fragments(*this);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
assert(0);
|
assert(0);
|
||||||
@ -294,6 +295,10 @@ int wsrep::transaction_context::before_commit()
|
|||||||
case wsrep::client_context::m_applier:
|
case wsrep::client_context::m_applier:
|
||||||
assert(certified());
|
assert(certified());
|
||||||
assert(ordered());
|
assert(ordered());
|
||||||
|
if (client_context_.do_2pc() == false)
|
||||||
|
{
|
||||||
|
client_context_.remove_fragments(*this);
|
||||||
|
}
|
||||||
ret = provider_.commit_order_enter(ws_handle_, ws_meta_);
|
ret = provider_.commit_order_enter(ws_handle_, ws_meta_);
|
||||||
if (ret)
|
if (ret)
|
||||||
{
|
{
|
||||||
@ -343,16 +348,19 @@ int wsrep::transaction_context::after_commit()
|
|||||||
debug_log_state("after_commit_enter");
|
debug_log_state("after_commit_enter");
|
||||||
assert(state() == s_ordered_commit);
|
assert(state() == s_ordered_commit);
|
||||||
|
|
||||||
|
if (is_streaming())
|
||||||
|
{
|
||||||
|
assert(client_context_.mode() == wsrep::client_context::m_replicating ||
|
||||||
|
client_context_.mode() == wsrep::client_context::m_applier);
|
||||||
|
clear_fragments();
|
||||||
|
}
|
||||||
|
|
||||||
switch (client_context_.mode())
|
switch (client_context_.mode())
|
||||||
{
|
{
|
||||||
case wsrep::client_context::m_local:
|
case wsrep::client_context::m_local:
|
||||||
// Nothing to do
|
// Nothing to do
|
||||||
break;
|
break;
|
||||||
case wsrep::client_context::m_replicating:
|
case wsrep::client_context::m_replicating:
|
||||||
if (is_streaming())
|
|
||||||
{
|
|
||||||
clear_fragments();
|
|
||||||
}
|
|
||||||
ret = provider_.release(ws_handle_);
|
ret = provider_.release(ws_handle_);
|
||||||
break;
|
break;
|
||||||
case wsrep::client_context::m_applier:
|
case wsrep::client_context::m_applier:
|
||||||
|
@ -1020,7 +1020,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_row_streaming_bf_abort_committing,
|
|||||||
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_committed);
|
BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_committed);
|
||||||
BOOST_REQUIRE(sc.provider().fragments() == 2);
|
BOOST_REQUIRE(sc.provider().fragments() == 2);
|
||||||
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
|
BOOST_REQUIRE(sc.provider().start_fragments() == 1);
|
||||||
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);
|
BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user