mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-03 16:22:35 +03:00
Transaction not immune after becoming ordered for commit
The transaction state is set to s_ordered_commit in ordered_commit(). However, this is too late for making the transaction immune for BF aborts after commit order has been established, which happens in before_commit(). Moving the state change into before_commit() would be the right thing to do, but that would require too many fixes to existing applications which are using the lib. In order to make the transaction immune for BF abort after it has been ordered to commit, introduce additional boolean flag which is set to true at the end of before_commit() and is taken into account in bf_abort().
This commit is contained in:
@ -29,6 +29,7 @@
|
||||
#include "buffer.hpp"
|
||||
#include "xid.hpp"
|
||||
|
||||
#include <iosfwd>
|
||||
#include <vector>
|
||||
|
||||
namespace wsrep
|
||||
@ -281,6 +282,14 @@ namespace wsrep
|
||||
wsrep::mutable_buffer apply_error_buf_;
|
||||
wsrep::xid xid_;
|
||||
bool streaming_rollback_in_progress_;
|
||||
/* This flag tells that the transaction has become immutable
|
||||
against BF aborts. Ideally this would be deduced from transaction
|
||||
state, i.e. s_ordered_commit or s_committed, but the current
|
||||
version of wsrep-lib changes the transaction state to
|
||||
s_ordered_commit too late. Fixing this appears to require
|
||||
too many changes to application using the lib, so boolean flag
|
||||
must do. */
|
||||
bool is_bf_immutable_;
|
||||
};
|
||||
|
||||
static inline const char* to_c_string(enum wsrep::transaction::state state)
|
||||
@ -308,6 +317,8 @@ namespace wsrep
|
||||
return to_c_string(state);
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, enum wsrep::transaction::state);
|
||||
|
||||
}
|
||||
|
||||
#endif // WSREP_TRANSACTION_HPP
|
||||
|
@ -112,6 +112,7 @@ wsrep::transaction::transaction(
|
||||
, apply_error_buf_()
|
||||
, xid_()
|
||||
, streaming_rollback_in_progress_(false)
|
||||
, is_bf_immutable_(false)
|
||||
{ }
|
||||
|
||||
|
||||
@ -555,6 +556,12 @@ int wsrep::transaction::before_commit()
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret == 0 && state() == s_committing)
|
||||
{
|
||||
is_bf_immutable_ = true;
|
||||
}
|
||||
|
||||
debug_log_state("before_commit_leave");
|
||||
return ret;
|
||||
}
|
||||
@ -564,6 +571,7 @@ int wsrep::transaction::ordered_commit()
|
||||
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
||||
debug_log_state("ordered_commit_enter");
|
||||
assert(state() == s_committing);
|
||||
assert(is_bf_immutable_);
|
||||
assert(ordered());
|
||||
client_service_.debug_sync("wsrep_before_commit_order_leave");
|
||||
int ret(provider().commit_order_leave(ws_handle_, ws_meta_,
|
||||
@ -602,6 +610,7 @@ int wsrep::transaction::after_commit()
|
||||
int ret(0);
|
||||
|
||||
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
||||
assert(is_bf_immutable_);
|
||||
debug_log_state("after_commit_enter");
|
||||
assert(state() == s_ordered_commit);
|
||||
|
||||
@ -639,7 +648,7 @@ int wsrep::transaction::after_commit()
|
||||
}
|
||||
assert(ret == 0);
|
||||
state(lock, s_committed);
|
||||
|
||||
is_bf_immutable_ = false;
|
||||
debug_log_state("after_commit_leave");
|
||||
return ret;
|
||||
}
|
||||
@ -819,6 +828,7 @@ int wsrep::transaction::after_statement()
|
||||
state() == s_must_abort ||
|
||||
state() == s_cert_failed ||
|
||||
state() == s_must_replay);
|
||||
assert(not is_bf_immutable_);
|
||||
|
||||
if (state() == s_executing &&
|
||||
streaming_context_.fragment_size() &&
|
||||
@ -981,6 +991,12 @@ bool wsrep::transaction::bf_abort(
|
||||
wsrep::log::debug_level_transaction,
|
||||
"Transaction not active, skipping bf abort");
|
||||
}
|
||||
else if (is_bf_immutable_)
|
||||
{
|
||||
WSREP_LOG_DEBUG(client_state_.debug_log_level(),
|
||||
wsrep::log::debug_level_transaction,
|
||||
"Transaction has become immutable for BF abort");
|
||||
}
|
||||
else
|
||||
{
|
||||
switch (state_at_enter)
|
||||
@ -2139,3 +2155,9 @@ void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const
|
||||
<< int64_t(id().get())
|
||||
<< " append key:\n" << key);
|
||||
}
|
||||
|
||||
std::ostream& wsrep::operator<<(std::ostream& os,
|
||||
enum wsrep::transaction::state state)
|
||||
{
|
||||
return (os << to_c_string(state));
|
||||
}
|
||||
|
@ -289,6 +289,32 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE(
|
||||
BOOST_REQUIRE(not cc.current_error());
|
||||
}
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(
|
||||
transaction_1pc_bf_after_before_commit,
|
||||
replicating_client_fixture_async_rm)
|
||||
{
|
||||
// Start a new transaction with ID 1
|
||||
cc.start_transaction(wsrep::transaction_id(1));
|
||||
BOOST_REQUIRE(tc.active());
|
||||
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
|
||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
|
||||
|
||||
// Run before commit
|
||||
BOOST_REQUIRE(cc.before_commit() == 0);
|
||||
BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_committing);
|
||||
BOOST_REQUIRE(tc.certified() == true);
|
||||
BOOST_REQUIRE(tc.ordered() == true);
|
||||
|
||||
wsrep_test::bf_abort_ordered(cc);
|
||||
BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_committing);
|
||||
|
||||
// Clean up
|
||||
cc.ordered_commit();
|
||||
cc.after_commit();
|
||||
cc.after_statement();
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// Test a 1PC transaction for which prepare data fails
|
||||
//
|
||||
@ -1491,16 +1517,17 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_cert_fail_commit,
|
||||
//
|
||||
// Test streaming BF abort after succesful certification
|
||||
//
|
||||
BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_bf_abort_committing,
|
||||
BOOST_FIXTURE_TEST_CASE(
|
||||
transaction_row_streaming_bf_abort_during_commit_order_enter,
|
||||
streaming_client_fixture_row)
|
||||
{
|
||||
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
|
||||
BOOST_REQUIRE(cc.after_row() == 0);
|
||||
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
|
||||
BOOST_REQUIRE(cc.before_commit() == 0);
|
||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing);
|
||||
wsrep_test::bf_abort_ordered(cc);
|
||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_abort);
|
||||
sc.provider().commit_order_enter_result_ = wsrep::provider::error_bf_abort;
|
||||
BOOST_REQUIRE(cc.before_commit());
|
||||
BOOST_REQUIRE_EQUAL(tc.state(), wsrep::transaction::s_must_replay);
|
||||
sc.provider().commit_order_enter_result_ = wsrep::provider::success;
|
||||
BOOST_REQUIRE(cc.before_rollback() == 0);
|
||||
BOOST_REQUIRE(cc.after_rollback() == 0);
|
||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_replay);
|
||||
|
Reference in New Issue
Block a user