diff --git a/include/wsrep/buffer.hpp b/include/wsrep/buffer.hpp index 89dc822..1c1ac59 100644 --- a/include/wsrep/buffer.hpp +++ b/include/wsrep/buffer.hpp @@ -66,6 +66,10 @@ namespace wsrep : buffer_() { } + mutable_buffer(const mutable_buffer& b) + : buffer_(b.buffer_) + { } + void resize(size_t s) { buffer_.resize(s); } void clear() @@ -111,6 +115,11 @@ namespace wsrep buffer_ = other.buffer_; return *this; } + + bool operator==(const mutable_buffer& other) const + { + return buffer_ == other.buffer_; + } private: std::vector buffer_; }; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index f675c9d..d16e77f 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -589,6 +589,21 @@ namespace wsrep return transaction_.commit_or_rollback_by_xid(xid, false); } + /** + * Detach a prepared XA transaction + * + * This method cleans up a local XA transaction in prepared state + * and converts it to high priority mode. + * This can be used to handle the case where the client of a XA + * transaction disconnects, and the transaction must not rollback. + * After this call, a different client may later attempt to terminate + * the transaction by calling method commit_by_xid() or rollback_by_xid(). + */ + void xa_detach() + { + transaction_.xa_detach(); + } + // // BF aborting // diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 4e072b8..5761463 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -125,17 +125,17 @@ namespace wsrep bool is_xa() const { - return !xid_.empty(); + return !xid_.is_null(); } void assign_xid(const wsrep::xid& xid) { assert(active()); - assert(xid_.empty()); + assert(!is_xa()); xid_ = xid; } - const wsrep::xid xid() const + const wsrep::xid& xid() const { return xid_; } @@ -144,6 +144,8 @@ namespace wsrep int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit); + void xa_detach(); + bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } diff --git a/include/wsrep/xid.hpp b/include/wsrep/xid.hpp index 4a32d5a..005202d 100644 --- a/include/wsrep/xid.hpp +++ b/include/wsrep/xid.hpp @@ -21,23 +21,80 @@ #define WSREP_XID_HPP #include -#include +#include "buffer.hpp" namespace wsrep { class xid { public: - xid() : xid_() { } - xid(const std::string& str) : xid_(str) { } - xid(const char* s) : xid_(s) { } - bool empty() const { return xid_.empty(); } - void clear() { xid_.clear(); } - bool operator==(const xid& other) const { return xid_ == other.xid_; } + xid() + : format_id_(-1) + , gtrid_len_(0) + , bqual_len_(0) + , data_() + { } + + xid(long format_id, long gtrid_len, + long bqual_len, const char* data) + : format_id_(format_id) + , gtrid_len_(gtrid_len) + , bqual_len_(bqual_len) + , data_() + { + const long len = gtrid_len_ + bqual_len_; + if (len > 0) + { + data_.push_back(data, data + len); + } + } + + xid(const xid& xid) + : format_id_(xid.format_id_) + , gtrid_len_(xid.gtrid_len_) + , bqual_len_(xid.bqual_len_) + , data_(xid.data_) + { } + + bool is_null() const + { + return format_id_ == -1; + } + + void clear() + { + format_id_ = -1; + data_.clear(); + } + + xid& operator= (const xid& other) + { + format_id_ = other.format_id_; + gtrid_len_ = other.gtrid_len_; + bqual_len_ = other.bqual_len_; + data_ = other.data_; + return *this; + } + + bool operator==(const xid& other) const + { + if (format_id_ != other.format_id_ || + gtrid_len_ != other.gtrid_len_ || + bqual_len_ != other.bqual_len_ || + data_.size() != other.data_.size()) + { + return false; + } + return data_ == other.data_; + } + friend std::string to_string(const wsrep::xid& xid); friend std::ostream& operator<<(std::ostream& os, const wsrep::xid& xid); - private: - std::string xid_; + protected: + long format_id_; + long gtrid_len_; + long bqual_len_; + mutable_buffer data_; }; std::string to_string(const wsrep::xid& xid); diff --git a/src/client_state.cpp b/src/client_state.cpp index 2f23f29..40bc4a2 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -50,7 +50,9 @@ void wsrep::client_state::close() debug_log_state("close: enter"); state(lock, s_quitting); lock.unlock(); - if (transaction_.active()) + if (transaction_.active() && + (mode_ != m_local || + transaction_.state() != wsrep::transaction::s_prepared)) { client_service_.bf_rollback(); transaction_.after_statement(); diff --git a/src/server_state.cpp b/src/server_state.cpp index 7b9b7d3..2d6b39c 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -1297,6 +1297,7 @@ wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( { return sa; } + i++; } return NULL; } @@ -1425,6 +1426,19 @@ void wsrep::server_state::recover_streaming_appliers_if_not_recovered( streaming_appliers_recovered_ = true; } +class transaction_state_cmp +{ +public: + transaction_state_cmp(const enum wsrep::transaction::state s) : state_(s) + { } + bool operator()(std::pair& vt) const + { + return vt.second->transaction().state() == state_; + } +private: + enum wsrep::transaction::state state_; +}; + void wsrep::server_state::close_orphaned_sr_transactions( wsrep::unique_lock& lock, wsrep::high_priority_service& high_priority_service) @@ -1445,9 +1459,13 @@ void wsrep::server_state::close_orphaned_sr_transactions( if (current_view_.own_index() == -1 || equal_consecutive_views) { - while (streaming_clients_.empty() == false) + streaming_clients_map::iterator i; + transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared); + while ((i = std::find_if_not(streaming_clients_.begin(), + streaming_clients_.end(), + prepared_state_cmp)) + != streaming_clients_.end()) { - streaming_clients_map::iterator i(streaming_clients_.begin()); wsrep::client_id client_id(i->first); wsrep::transaction_id transaction_id(i->second->transaction().id()); // It is safe to unlock the server state temporarily here. diff --git a/src/transaction.cpp b/src/transaction.cpp index 17085ba..8e7c07b 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -326,6 +326,8 @@ int wsrep::transaction::before_prepare( { // Force fragment replication on XA prepare flags(flags() | wsrep::provider::flag::prepare); + flags(flags() | wsrep::provider::flag::pa_unsafe); + append_sr_keys_for_commit(); const bool force_streaming_step = true; ret = streaming_step(lock, force_streaming_step); if (ret == 0) @@ -423,15 +425,27 @@ int wsrep::transaction::before_commit() switch (client_state_.mode()) { case wsrep::client_state::m_local: - if (state() == s_prepared) + if (is_xa() && + (state() == s_prepared || state() == s_executing)) { - assert(is_xa()); - ret = certify_commit(lock); - assert((ret == 0 && state() == s_committing) || - (state() == s_must_abort || - state() == s_must_replay || - state() == s_cert_failed || - state() == s_prepared)); + ret = 0; + if (state() == s_executing) + { + // XA COMMIT in one phase + // This optimization is not supported... + // fall back to prepare + commit + ret = before_prepare(lock) || after_prepare(lock); + } + if (!ret) + { + assert(state() == s_prepared); + ret = certify_commit(lock); + assert((ret == 0 && state() == s_committing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed || + state() == s_prepared)); + } } else if (state() == s_executing) { @@ -1084,6 +1098,20 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, } } +void wsrep::transaction::xa_detach() +{ + assert(state() == s_prepared); + wsrep::server_state& server_state(client_state_.server_state()); + server_state.convert_streaming_client_to_applier(&client_state_); + client_service_.cleanup_transaction(); + wsrep::unique_lock lock(client_state_.mutex_); + streaming_context_.cleanup(); + state(lock, s_aborting); + state(lock, s_aborted); + provider().release(ws_handle_); + cleanup(); +} + //////////////////////////////////////////////////////////////////////////////// // Private // //////////////////////////////////////////////////////////////////////////////// @@ -1470,7 +1498,7 @@ int wsrep::transaction::certify_commit( state(lock, s_certifying); lock.unlock(); - if (is_streaming()) + if (is_streaming() && !is_xa()) { append_sr_keys_for_commit(); flags(flags() | wsrep::provider::flag::pa_unsafe); diff --git a/src/xid.cpp b/src/xid.cpp index f1f89a8..7757e44 100644 --- a/src/xid.cpp +++ b/src/xid.cpp @@ -22,10 +22,10 @@ std::string wsrep::to_string(const wsrep::xid& xid) { - return xid.xid_; + return std::string(xid.data_.data(), xid.data_.size()); } std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::xid& xid) { - return os << xid.xid_; + return os << to_string(xid); } diff --git a/test/transaction_test_xa.cpp b/test/transaction_test_xa.cpp index 65a1a3f..a64c875 100644 --- a/test/transaction_test_xa.cpp +++ b/test/transaction_test_xa.cpp @@ -7,8 +7,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, replicating_client_fixture_sync_rm) { + wsrep::xid xid(1, 9, 0, "test xid"); + BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); - cc.assign_xid("test xid"); + cc.assign_xid(xid); BOOST_REQUIRE(tc.active()); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); @@ -52,8 +54,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa, BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, applying_client_fixture) { + wsrep::xid xid(1, 9, 0, "test xid"); + start_transaction(wsrep::transaction_id(1), wsrep::seqno(1)); - cc.assign_xid("test xid"); + cc.assign_xid(xid); BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); @@ -85,8 +89,11 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, BOOST_FIXTURE_TEST_CASE(transaction_xa_sr, streaming_client_fixture_byte) { + wsrep::xid xid(1, 9, 0, "test xid"); + BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); - cc.assign_xid("test xid"); + cc.assign_xid(xid); + cc.bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);