diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index e04f48d..f0701e8 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -47,6 +47,7 @@ namespace wsrep { s_executing, s_preparing, + s_prepared, s_certifying, s_committing, s_ordered_commit, @@ -269,6 +270,7 @@ namespace wsrep { case wsrep::transaction::s_executing: return "executing"; case wsrep::transaction::s_preparing: return "preparing"; + case wsrep::transaction::s_prepared: return "prepared"; case wsrep::transaction::s_certifying: return "certifying"; case wsrep::transaction::s_committing: return "committing"; case wsrep::transaction::s_ordered_commit: return "ordered_commit"; diff --git a/src/client_state.cpp b/src/client_state.cpp index 2b8ead9..f4bfb64 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -109,6 +109,7 @@ int wsrep::client_state::before_command() // just before BF abort happened. assert(transaction_.active() == false || (transaction_.state() == wsrep::transaction::s_executing || + transaction_.state() == wsrep::transaction::s_prepared || transaction_.state() == wsrep::transaction::s_aborted || transaction_.state() == wsrep::transaction::s_must_abort)); diff --git a/src/transaction.cpp b/src/transaction.cpp index df7869a..cebf4fc 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -326,6 +326,7 @@ int wsrep::transaction::before_prepare( if (ret == 0) { assert(state() == s_executing); + state(lock, s_preparing); } } else @@ -353,10 +354,7 @@ int wsrep::transaction::before_prepare( { assert(state() == s_executing); } - else - { - state(lock, s_preparing); - } + state(lock, s_preparing); break; default: assert(0); @@ -364,7 +362,6 @@ int wsrep::transaction::before_prepare( } assert(state() == s_preparing || - (state() == s_executing && is_xa()) || (ret && (state() == s_must_abort || state() == s_must_replay || state() == s_cert_failed || @@ -381,10 +378,10 @@ int wsrep::transaction::after_prepare( debug_log_state("after_prepare_enter"); if (is_xa()) { - // TODO XA consider adding state s_prepared - assert(state() == s_executing); + assert(state() == s_preparing); assert(client_state_.mode() == wsrep::client_state::m_local || (certified() && ordered())); + state(lock, s_prepared); } else { @@ -411,6 +408,7 @@ int wsrep::transaction::before_commit() debug_log_state("before_commit_enter"); assert(client_state_.mode() != wsrep::client_state::m_toi); assert(state() == s_executing || + state() == s_prepared || state() == s_committing || state() == s_must_abort || state() == s_replaying); @@ -420,17 +418,14 @@ int wsrep::transaction::before_commit() switch (client_state_.mode()) { case wsrep::client_state::m_local: - if (state() == s_executing && is_xa()) + if (state() == s_prepared) { + assert(is_xa()); ret = certify_commit(lock); - assert((ret == 0 && state() == s_preparing) || + assert((ret == 0 && state() == s_committing) || (state() == s_must_abort || state() == s_must_replay || state() == s_cert_failed)); - if (ret == 0) - { - state(lock, s_committing); - } } else if (state() == s_executing) { @@ -492,9 +487,7 @@ int wsrep::transaction::before_commit() assert(ordered()); if (is_xa()) { - assert(state() == s_executing); - // one more reason to add prepared state? - state(lock, s_preparing); + assert(state() == s_prepared); state(lock, s_committing); } @@ -626,6 +619,7 @@ int wsrep::transaction::before_rollback() debug_log_state("before_rollback_enter"); assert(state() == s_executing || state() == s_preparing || + state() == s_prepared || state() == s_must_abort || // Background rollbacker or rollback initiated from SE state() == s_aborting || @@ -645,6 +639,8 @@ int wsrep::transaction::before_rollback() // Error detected during prepare phase state(lock, s_must_abort); // fall through + case s_prepared: + // fall through case s_executing: // Voluntary rollback if (is_streaming()) @@ -689,7 +685,7 @@ int wsrep::transaction::before_rollback() break; case wsrep::client_state::m_high_priority: // Rollback by rollback write set or BF abort - assert(state_ == s_executing || state_ == s_aborting); + assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting); if (state_ != s_aborting) { state(lock, s_aborting); @@ -768,6 +764,7 @@ int wsrep::transaction::after_statement() debug_log_state("after_statement_enter"); assert(client_state_.mode() == wsrep::client_state::m_local); assert(state() == s_executing || + state() == s_prepared || state() == s_committed || state() == s_aborted || state() == s_must_abort || @@ -786,6 +783,9 @@ int wsrep::transaction::after_statement() case s_executing: // ? break; + case s_prepared: + assert(is_xa()); + break; case s_committed: assert(is_streaming() == false); break; @@ -850,6 +850,7 @@ int wsrep::transaction::after_statement() } assert(state() == s_executing || + state() == s_prepared || state() == s_committed || state() == s_aborted || state() == s_must_replay); @@ -863,7 +864,7 @@ int wsrep::transaction::after_statement() provider().release(ws_handle_); } - if (state() != s_executing) + if (state() != s_executing && state() != s_prepared) { cleanup(); } @@ -878,15 +879,17 @@ void wsrep::transaction::after_applying() wsrep::unique_lock lock(client_state_.mutex_); debug_log_state("after_applying enter"); assert(state_ == s_executing || + state_ == s_prepared || state_ == s_committed || state_ == s_aborted); - if (state_ != s_executing) + + if (state_ != s_executing && state_ != s_prepared) { cleanup(); } else { - // State remains executing, so this is a streaming applier. + // State remains executing or prepared, so this is a streaming applier. // Reset the meta data to avoid releasing commit order // critical section above if the next fragment is rollback // fragment. Rollback fragment ordering will be handled by @@ -1060,19 +1063,20 @@ void wsrep::transaction::state( next_state == s_aborting); static const char allowed[n_states][n_states] = - { /* ex pr ce co oc ct cf ma ab ad mr re */ - { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ - { 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */ - { 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ - { 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ - { 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ - { 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ - { 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */ - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ - { 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ + { /* ex pg pd ce co oc ct cf ma ab ad mr re */ + { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ + { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */ + { 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0}, /* pd */ + { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ + { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ + { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ + { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ + { 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ }; if (!allowed[state_][next_state]) @@ -1235,7 +1239,7 @@ int wsrep::transaction::certify_fragment( flags(flags() | wsrep::provider::flag::implicit_deps); } - if (is_xa()) + if (is_xa_prepare()) { flags(flags() | wsrep::provider::flag::prepare); } @@ -1466,7 +1470,14 @@ int wsrep::transaction::certify_commit( switch (state()) { case s_certifying: - state(lock, s_preparing); + if (is_xa()) + { + state(lock, s_committing); + } + else + { + state(lock, s_preparing); + } ret = 0; break; case s_must_abort: diff --git a/test/transaction_test_xa.cpp b/test/transaction_test_xa.cpp index a5f961e..6165473 100644 --- a/test/transaction_test_xa.cpp +++ b/test/transaction_test_xa.cpp @@ -17,13 +17,13 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, cc.is_xa_prepare_ = true; BOOST_REQUIRE(cc.before_prepare() == 0); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.ordered() == false); // certified() only after the last fragment BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(cc.after_prepare() == 0); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); // XA START + PREPARE fragment BOOST_REQUIRE(sc.provider().start_fragments() == 1); BOOST_REQUIRE(sc.provider().fragments() == 1); @@ -60,12 +60,12 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, cc.is_xa_prepare_ = true; BOOST_REQUIRE(cc.before_prepare() == 0); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.ordered()); BOOST_REQUIRE(tc.certified()); BOOST_REQUIRE(tc.ws_meta().gtid().is_undefined() == false); BOOST_REQUIRE(cc.after_prepare() == 0); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared); cc.is_xa_prepare_ = false; @@ -106,11 +106,11 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr, cc.is_xa_prepare_ = true; BOOST_REQUIRE(cc.before_prepare() == 0); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(cc.after_prepare() == 0); - BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared); // XA PREPARE fragment BOOST_REQUIRE(sc.provider().fragments() == 2);