1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

s_prepared state for XA transactions

After the XA PREPARE, the XA transactions stay s_prepared until
commit/rollback
This commit is contained in:
Leandro Pacheco
2019-02-21 17:02:18 +01:00
committed by Daniele Sciascia
parent b73df49cff
commit a9987aa970
4 changed files with 55 additions and 41 deletions

View File

@ -47,6 +47,7 @@ namespace wsrep
{ {
s_executing, s_executing,
s_preparing, s_preparing,
s_prepared,
s_certifying, s_certifying,
s_committing, s_committing,
s_ordered_commit, s_ordered_commit,
@ -269,6 +270,7 @@ namespace wsrep
{ {
case wsrep::transaction::s_executing: return "executing"; case wsrep::transaction::s_executing: return "executing";
case wsrep::transaction::s_preparing: return "preparing"; case wsrep::transaction::s_preparing: return "preparing";
case wsrep::transaction::s_prepared: return "prepared";
case wsrep::transaction::s_certifying: return "certifying"; case wsrep::transaction::s_certifying: return "certifying";
case wsrep::transaction::s_committing: return "committing"; case wsrep::transaction::s_committing: return "committing";
case wsrep::transaction::s_ordered_commit: return "ordered_commit"; case wsrep::transaction::s_ordered_commit: return "ordered_commit";

View File

@ -109,6 +109,7 @@ int wsrep::client_state::before_command()
// just before BF abort happened. // just before BF abort happened.
assert(transaction_.active() == false || assert(transaction_.active() == false ||
(transaction_.state() == wsrep::transaction::s_executing || (transaction_.state() == wsrep::transaction::s_executing ||
transaction_.state() == wsrep::transaction::s_prepared ||
transaction_.state() == wsrep::transaction::s_aborted || transaction_.state() == wsrep::transaction::s_aborted ||
transaction_.state() == wsrep::transaction::s_must_abort)); transaction_.state() == wsrep::transaction::s_must_abort));

View File

@ -326,6 +326,7 @@ int wsrep::transaction::before_prepare(
if (ret == 0) if (ret == 0)
{ {
assert(state() == s_executing); assert(state() == s_executing);
state(lock, s_preparing);
} }
} }
else else
@ -353,10 +354,7 @@ int wsrep::transaction::before_prepare(
{ {
assert(state() == s_executing); assert(state() == s_executing);
} }
else state(lock, s_preparing);
{
state(lock, s_preparing);
}
break; break;
default: default:
assert(0); assert(0);
@ -364,7 +362,6 @@ int wsrep::transaction::before_prepare(
} }
assert(state() == s_preparing || assert(state() == s_preparing ||
(state() == s_executing && is_xa()) ||
(ret && (state() == s_must_abort || (ret && (state() == s_must_abort ||
state() == s_must_replay || state() == s_must_replay ||
state() == s_cert_failed || state() == s_cert_failed ||
@ -381,10 +378,10 @@ int wsrep::transaction::after_prepare(
debug_log_state("after_prepare_enter"); debug_log_state("after_prepare_enter");
if (is_xa()) if (is_xa())
{ {
// TODO XA consider adding state s_prepared assert(state() == s_preparing);
assert(state() == s_executing);
assert(client_state_.mode() == wsrep::client_state::m_local || assert(client_state_.mode() == wsrep::client_state::m_local ||
(certified() && ordered())); (certified() && ordered()));
state(lock, s_prepared);
} }
else else
{ {
@ -411,6 +408,7 @@ int wsrep::transaction::before_commit()
debug_log_state("before_commit_enter"); debug_log_state("before_commit_enter");
assert(client_state_.mode() != wsrep::client_state::m_toi); assert(client_state_.mode() != wsrep::client_state::m_toi);
assert(state() == s_executing || assert(state() == s_executing ||
state() == s_prepared ||
state() == s_committing || state() == s_committing ||
state() == s_must_abort || state() == s_must_abort ||
state() == s_replaying); state() == s_replaying);
@ -420,17 +418,14 @@ int wsrep::transaction::before_commit()
switch (client_state_.mode()) switch (client_state_.mode())
{ {
case wsrep::client_state::m_local: case wsrep::client_state::m_local:
if (state() == s_executing && is_xa()) if (state() == s_prepared)
{ {
assert(is_xa());
ret = certify_commit(lock); ret = certify_commit(lock);
assert((ret == 0 && state() == s_preparing) || assert((ret == 0 && state() == s_committing) ||
(state() == s_must_abort || (state() == s_must_abort ||
state() == s_must_replay || state() == s_must_replay ||
state() == s_cert_failed)); state() == s_cert_failed));
if (ret == 0)
{
state(lock, s_committing);
}
} }
else if (state() == s_executing) else if (state() == s_executing)
{ {
@ -492,9 +487,7 @@ int wsrep::transaction::before_commit()
assert(ordered()); assert(ordered());
if (is_xa()) if (is_xa())
{ {
assert(state() == s_executing); assert(state() == s_prepared);
// one more reason to add prepared state?
state(lock, s_preparing);
state(lock, s_committing); state(lock, s_committing);
} }
@ -626,6 +619,7 @@ int wsrep::transaction::before_rollback()
debug_log_state("before_rollback_enter"); debug_log_state("before_rollback_enter");
assert(state() == s_executing || assert(state() == s_executing ||
state() == s_preparing || state() == s_preparing ||
state() == s_prepared ||
state() == s_must_abort || state() == s_must_abort ||
// Background rollbacker or rollback initiated from SE // Background rollbacker or rollback initiated from SE
state() == s_aborting || state() == s_aborting ||
@ -645,6 +639,8 @@ int wsrep::transaction::before_rollback()
// Error detected during prepare phase // Error detected during prepare phase
state(lock, s_must_abort); state(lock, s_must_abort);
// fall through // fall through
case s_prepared:
// fall through
case s_executing: case s_executing:
// Voluntary rollback // Voluntary rollback
if (is_streaming()) if (is_streaming())
@ -689,7 +685,7 @@ int wsrep::transaction::before_rollback()
break; break;
case wsrep::client_state::m_high_priority: case wsrep::client_state::m_high_priority:
// Rollback by rollback write set or BF abort // Rollback by rollback write set or BF abort
assert(state_ == s_executing || state_ == s_aborting); assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting);
if (state_ != s_aborting) if (state_ != s_aborting)
{ {
state(lock, s_aborting); state(lock, s_aborting);
@ -768,6 +764,7 @@ int wsrep::transaction::after_statement()
debug_log_state("after_statement_enter"); debug_log_state("after_statement_enter");
assert(client_state_.mode() == wsrep::client_state::m_local); assert(client_state_.mode() == wsrep::client_state::m_local);
assert(state() == s_executing || assert(state() == s_executing ||
state() == s_prepared ||
state() == s_committed || state() == s_committed ||
state() == s_aborted || state() == s_aborted ||
state() == s_must_abort || state() == s_must_abort ||
@ -786,6 +783,9 @@ int wsrep::transaction::after_statement()
case s_executing: case s_executing:
// ? // ?
break; break;
case s_prepared:
assert(is_xa());
break;
case s_committed: case s_committed:
assert(is_streaming() == false); assert(is_streaming() == false);
break; break;
@ -850,6 +850,7 @@ int wsrep::transaction::after_statement()
} }
assert(state() == s_executing || assert(state() == s_executing ||
state() == s_prepared ||
state() == s_committed || state() == s_committed ||
state() == s_aborted || state() == s_aborted ||
state() == s_must_replay); state() == s_must_replay);
@ -863,7 +864,7 @@ int wsrep::transaction::after_statement()
provider().release(ws_handle_); provider().release(ws_handle_);
} }
if (state() != s_executing) if (state() != s_executing && state() != s_prepared)
{ {
cleanup(); cleanup();
} }
@ -878,15 +879,17 @@ void wsrep::transaction::after_applying()
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
debug_log_state("after_applying enter"); debug_log_state("after_applying enter");
assert(state_ == s_executing || assert(state_ == s_executing ||
state_ == s_prepared ||
state_ == s_committed || state_ == s_committed ||
state_ == s_aborted); state_ == s_aborted);
if (state_ != s_executing)
if (state_ != s_executing && state_ != s_prepared)
{ {
cleanup(); cleanup();
} }
else else
{ {
// State remains executing, so this is a streaming applier. // State remains executing or prepared, so this is a streaming applier.
// Reset the meta data to avoid releasing commit order // Reset the meta data to avoid releasing commit order
// critical section above if the next fragment is rollback // critical section above if the next fragment is rollback
// fragment. Rollback fragment ordering will be handled by // fragment. Rollback fragment ordering will be handled by
@ -1060,19 +1063,20 @@ void wsrep::transaction::state(
next_state == s_aborting); next_state == s_aborting);
static const char allowed[n_states][n_states] = static const char allowed[n_states][n_states] =
{ /* ex pr ce co oc ct cf ma ab ad mr re */ { /* ex pg pd ce co oc ct cf ma ab ad mr re */
{ 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
{ 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */ { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */
{ 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ { 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0}, /* pd */
{ 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
{ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
{ 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
{ 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
{ 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
{ 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */
}; };
if (!allowed[state_][next_state]) if (!allowed[state_][next_state])
@ -1235,7 +1239,7 @@ int wsrep::transaction::certify_fragment(
flags(flags() | wsrep::provider::flag::implicit_deps); flags(flags() | wsrep::provider::flag::implicit_deps);
} }
if (is_xa()) if (is_xa_prepare())
{ {
flags(flags() | wsrep::provider::flag::prepare); flags(flags() | wsrep::provider::flag::prepare);
} }
@ -1466,7 +1470,14 @@ int wsrep::transaction::certify_commit(
switch (state()) switch (state())
{ {
case s_certifying: case s_certifying:
state(lock, s_preparing); if (is_xa())
{
state(lock, s_committing);
}
else
{
state(lock, s_preparing);
}
ret = 0; ret = 0;
break; break;
case s_must_abort: case s_must_abort:

View File

@ -17,13 +17,13 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa,
cc.is_xa_prepare_ = true; cc.is_xa_prepare_ = true;
BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);
// certified() only after the last fragment // certified() only after the last fragment
BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(cc.after_prepare() == 0); BOOST_REQUIRE(cc.after_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
// XA START + PREPARE fragment // XA START + PREPARE fragment
BOOST_REQUIRE(sc.provider().start_fragments() == 1); BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().fragments() == 1); BOOST_REQUIRE(sc.provider().fragments() == 1);
@ -60,12 +60,12 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_applying,
cc.is_xa_prepare_ = true; cc.is_xa_prepare_ = true;
BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
BOOST_REQUIRE(tc.ordered()); BOOST_REQUIRE(tc.ordered());
BOOST_REQUIRE(tc.certified()); BOOST_REQUIRE(tc.certified());
BOOST_REQUIRE(tc.ws_meta().gtid().is_undefined() == false); BOOST_REQUIRE(tc.ws_meta().gtid().is_undefined() == false);
BOOST_REQUIRE(cc.after_prepare() == 0); BOOST_REQUIRE(cc.after_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared);
cc.is_xa_prepare_ = false; cc.is_xa_prepare_ = false;
@ -106,11 +106,11 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr,
cc.is_xa_prepare_ = true; cc.is_xa_prepare_ = true;
BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(tc.certified() == false);
BOOST_REQUIRE(cc.after_prepare() == 0); BOOST_REQUIRE(cc.after_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared);
// XA PREPARE fragment // XA PREPARE fragment
BOOST_REQUIRE(sc.provider().fragments() == 2); BOOST_REQUIRE(sc.provider().fragments() == 2);