diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 38d4e6a..9a39a7e 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -44,18 +44,6 @@ namespace db return 0; } void cleanup_transaction() override { } - bool is_xa () const override - { - return false; - } - bool is_xa_prepare() const override - { - return false; - } - std::string xid() const override - { - return ""; - } size_t bytes_generated() const override { return 0; diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index ac0f845..d81d3a7 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -73,24 +73,6 @@ namespace wsrep */ virtual void cleanup_transaction() = 0; - // - // XA - // - /** - * Return true if the current transactions is XA - */ - virtual bool is_xa() const = 0; - - /** - * Return true if the current statement is XA PREPARE - */ - virtual bool is_xa_prepare() const = 0; - - /** - * Return a string representing the xid of the transaction - */ - virtual std::string xid() const = 0; - // // Streaming // diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 68870b3..688542a 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -305,36 +305,6 @@ namespace wsrep return transaction_.assign_read_view(gtid); } - /** - * Restores the client's transaction to prepared state - * - * The purpose of this method is to restore transaction state - * during recovery of a prepared XA transaction. - */ - int restore_prepared_transaction() - { - return transaction_.restore_to_prepared_state(); - } - - /** - * Terminate transaction with the given xid - * - * Sends a commit or rollback fragment to terminate the a - * transaction with the given xid. For the fragment to be - * sent, a streaming applier for the transaction must exist - * and the transaction must be in prepared state. - * - * @param xid the xid of the the transaction to terminate - * @param commit whether to send a commmit or rollback fragment - * - * @return Zero on success, non-zero on error. In case of error - * the client_state's current_error is set - */ - int commit_or_rollback_by_xid(const std::string& xid, bool commit) - { - return transaction_.commit_or_rollback_by_xid(xid, commit); - } - /** * Append a key into transaction write set. * @@ -552,6 +522,52 @@ namespace wsrep void wait_rollback_complete_and_acquire_ownership(); /** @} */ + // + // XA + // + /** + * Assign transaction external id. + * + * Other than storing the xid, the transaction is marked as XA. + * This should be called when XA transaction is started. + * + * @param xid transaction id + */ + void assign_xid(const std::string& xid) + { + transaction_.assign_xid(xid); + } + + /** + * Restores the client's transaction to prepared state + * + * The purpose of this method is to restore transaction state + * during recovery of a prepared XA transaction. + */ + int restore_xid(std::string& xid) + { + return transaction_.restore_to_prepared_state(xid); + } + + /** + * Terminate transaction with the given xid + * + * Sends a commit or rollback fragment to terminate a + * transaction with the given xid. For the fragment to be + * sent, a streaming applier for the transaction must exist + * and the transaction must be in prepared state. + * + * @param xid the xid of the the transaction to terminate + * @param commit whether to send a commmit or rollback fragment + * + * @return Zero on success, non-zero on error. In case of error + * the client_state's current_error is set + */ + int commit_or_rollback_by_xid(const std::string& xid, bool commit) + { + return transaction_.commit_or_rollback_by_xid(xid, commit); + } + // // BF aborting // diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 45d30a4..948ec94 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -124,23 +124,25 @@ namespace wsrep bool is_xa() const { - return client_service_.is_xa(); + return !xid_.empty(); } - bool is_xa_prepare() const + void assign_xid(const std::string& xid) { - return client_service_.is_xa_prepare(); + assert(active()); + assert(xid_.empty()); + xid_ = xid; } - int restore_to_prepared_state(); - - int commit_or_rollback_by_xid(const std::string& xid, bool commit); - const std::string xid() const { - return client_service_.xid(); + return xid_; } + int restore_to_prepared_state(const std::string& xid); + + int commit_or_rollback_by_xid(const std::string& xid, bool commit); + bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } @@ -239,7 +241,7 @@ namespace wsrep // The call will adjust transaction state and set client_state // error status accordingly. bool abort_or_interrupt(wsrep::unique_lock&); - int streaming_step(wsrep::unique_lock&); + int streaming_step(wsrep::unique_lock&, bool force = false); int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); int append_sr_keys_for_commit(); @@ -271,7 +273,7 @@ namespace wsrep wsrep::streaming_context streaming_context_; wsrep::sr_key_set sr_keys_; wsrep::mutable_buffer apply_error_buf_; - bool is_recovered_xa_; + std::string xid_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/transaction.cpp b/src/transaction.cpp index 1c7a0e2..ca6fd06 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -108,7 +108,7 @@ wsrep::transaction::transaction( , streaming_context_() , sr_keys_() , apply_error_buf_() - , is_recovered_xa_(false) + , xid_() { } @@ -121,6 +121,7 @@ int wsrep::transaction::start_transaction( { debug_log_state("start_transaction enter"); assert(active() == false); + assert(is_xa() == false); assert(flags() == 0); server_id_ = client_state_.server_state().id(); id_ = id; @@ -323,7 +324,10 @@ int wsrep::transaction::before_prepare( { if (is_xa()) { - ret = streaming_step(lock); + // Force fragment replication on XA prepare + flags(flags() | wsrep::provider::flag::prepare); + const bool force_streaming_step = true; + ret = streaming_step(lock, force_streaming_step); if (ret == 0) { assert(state() == s_executing); @@ -568,7 +572,7 @@ int wsrep::transaction::after_commit() assert(client_state_.mode() == wsrep::client_state::m_local || client_state_.mode() == wsrep::client_state::m_high_priority); - if (is_xa() || is_recovered_xa_) + if (is_xa()) { // XA fragment removal happens here, // see comment in before_prepare @@ -1036,7 +1040,7 @@ void wsrep::transaction::after_replay(const wsrep::transaction& other) clear_fragments(); } -int wsrep::transaction::restore_to_prepared_state() +int wsrep::transaction::restore_to_prepared_state(const std::string& xid) { wsrep::unique_lock lock(client_state_.mutex_); assert(active()); @@ -1045,7 +1049,7 @@ int wsrep::transaction::restore_to_prepared_state() flags(flags() & ~wsrep::provider::flag::start_transaction); state(lock, s_certifying); state(lock, s_prepared); - is_recovered_xa_ = true; + xid_ = xid; return 0; } @@ -1198,7 +1202,8 @@ bool wsrep::transaction::abort_or_interrupt( return false; } -int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) +int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, + bool force) { assert(lock.owns_lock()); assert(streaming_context_.fragment_size() || is_xa()); @@ -1232,18 +1237,19 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) break; } - if (streaming_context_.fragment_size_exceeded() || is_xa_prepare()) + // Some statements have no effect. Do not atttempt to + // replicate a fragment if no data has been generated since + // last fragment replication. + // We use `force = true` on XA prepare: a fragment will be + // generated even if no data is pending replication. + if (bytes_to_replicate <= 0 && !force) { - // Some statements have no effect. Do not atttempt to - // replicate a fragment if no data has been generated since - // last fragment replication. A XA PREPARE statement generates a - // fragment even if no data is currently pending replication. - if (bytes_to_replicate <= 0 && !is_xa_prepare()) - { - assert(bytes_to_replicate == 0); - return ret; - } + assert(bytes_to_replicate == 0); + return ret; + } + if (streaming_context_.fragment_size_exceeded() || force) + { streaming_context_.reset_unit_counter(); ret = certify_fragment(lock); } @@ -1307,11 +1313,6 @@ int wsrep::transaction::certify_fragment( flags(flags() | wsrep::provider::flag::implicit_deps); } - if (is_xa_prepare()) - { - flags(flags() | wsrep::provider::flag::prepare); - } - int ret(0); enum wsrep::client_error error(wsrep::e_success); enum wsrep::provider::status cert_ret(wsrep::provider::success); @@ -1746,7 +1747,7 @@ void wsrep::transaction::cleanup() streaming_context_.cleanup(); client_service_.cleanup_transaction(); apply_error_buf_.clear(); - is_recovered_xa_ = false; + xid_.clear(); debug_log_state("cleanup_leave"); } diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index d86890d..9610457 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -62,8 +62,6 @@ namespace wsrep : wsrep::client_service() , is_autocommit_() , do_2pc_() - , is_xa_() - , is_xa_prepare_() // , fail_next_applying_() // , fail_next_toi_() , bf_abort_during_wait_() @@ -131,21 +129,6 @@ namespace wsrep void cleanup_transaction() WSREP_OVERRIDE { } - bool is_xa() const WSREP_OVERRIDE - { - return is_xa_; - } - - bool is_xa_prepare() const WSREP_OVERRIDE - { - return is_xa_prepare_; - } - - std::string xid() const WSREP_OVERRIDE - { - return ""; - } - size_t bytes_generated() const WSREP_OVERRIDE { return bytes_generated_; @@ -197,8 +180,6 @@ namespace wsrep // bool is_autocommit_; bool do_2pc_; - bool is_xa_; - bool is_xa_prepare_; // bool fail_next_applying_; // bool fail_next_toi_; bool bf_abort_during_wait_; diff --git a/test/transaction_test_xa.cpp b/test/transaction_test_xa.cpp index 6165473..d313f98 100644 --- a/test/transaction_test_xa.cpp +++ b/test/transaction_test_xa.cpp @@ -8,14 +8,12 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, replicating_client_fixture_sync_rm) { BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); - cc.is_xa_ = true; + cc.assign_xid("test xid"); BOOST_REQUIRE(tc.active()); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); - cc.is_xa_prepare_ = true; - BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.ordered() == false); @@ -28,8 +26,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, BOOST_REQUIRE(sc.provider().start_fragments() == 1); BOOST_REQUIRE(sc.provider().fragments() == 1); - cc.is_xa_prepare_ = false; - BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); BOOST_REQUIRE(cc.ordered_commit() == 0); @@ -38,7 +34,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, BOOST_REQUIRE(tc.certified()); BOOST_REQUIRE(cc.after_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committed); - // XA PRERAPRE and XA COMMIT fragments + // XA PREPARE and XA COMMIT fragments BOOST_REQUIRE(sc.provider().fragments() == 2); BOOST_REQUIRE(sc.provider().commit_fragments() == 1); @@ -56,8 +52,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, applying_client_fixture) { - cc.is_xa_ = true; - cc.is_xa_prepare_ = true; + cc.assign_xid("test xid"); BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); @@ -67,8 +62,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, BOOST_REQUIRE(cc.after_prepare() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared); - cc.is_xa_prepare_ = false; - BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); BOOST_REQUIRE(cc.ordered_commit() == 0); @@ -92,7 +85,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr, streaming_client_fixture_byte) { BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); - cc.is_xa_ = true; + cc.assign_xid("test xid"); cc.bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); @@ -103,8 +96,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr, BOOST_REQUIRE(tc.active()); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); - cc.is_xa_prepare_ = true; - BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.ordered() == false); @@ -114,8 +105,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr, // XA PREPARE fragment BOOST_REQUIRE(sc.provider().fragments() == 2); - cc.is_xa_prepare_ = false; - BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); BOOST_REQUIRE(cc.ordered_commit() == 0);