diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 7537390..38d4e6a 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -52,6 +52,10 @@ namespace db { return false; } + std::string xid() const override + { + return ""; + } size_t bytes_generated() const override { return 0; diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 20999b6..cad003d 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -40,8 +40,9 @@ namespace db wsrep::mutable_buffer&) override; int append_fragment_and_commit( const wsrep::ws_handle&, - const wsrep::ws_meta&, const wsrep::const_buffer&) - override + const wsrep::ws_meta&, + const wsrep::const_buffer&, + const std::string&) override { return 0; } int remove_fragments(const wsrep::ws_meta&) override { return 0; } diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index af63ea8..3fe5485 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -34,7 +34,8 @@ namespace db int append_fragment(const wsrep::id&, wsrep::transaction_id, int, - const wsrep::const_buffer&) override + const wsrep::const_buffer&, + const std::string&) override { throw wsrep::not_implemented_error(); } int update_fragment_meta(const wsrep::ws_meta&) override { throw wsrep::not_implemented_error(); } diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 795e502..ac0f845 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -73,6 +73,9 @@ namespace wsrep */ virtual void cleanup_transaction() = 0; + // + // XA + // /** * Return true if the current transactions is XA */ @@ -83,6 +86,11 @@ namespace wsrep */ virtual bool is_xa_prepare() const = 0; + /** + * Return a string representing the xid of the transaction + */ + virtual std::string xid() const = 0; + // // Streaming // diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 726fd22..68870b3 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -305,6 +305,36 @@ namespace wsrep return transaction_.assign_read_view(gtid); } + /** + * Restores the client's transaction to prepared state + * + * The purpose of this method is to restore transaction state + * during recovery of a prepared XA transaction. + */ + int restore_prepared_transaction() + { + return transaction_.restore_to_prepared_state(); + } + + /** + * Terminate transaction with the given xid + * + * Sends a commit or rollback fragment to terminate the a + * transaction with the given xid. For the fragment to be + * sent, a streaming applier for the transaction must exist + * and the transaction must be in prepared state. + * + * @param xid the xid of the the transaction to terminate + * @param commit whether to send a commmit or rollback fragment + * + * @return Zero on success, non-zero on error. In case of error + * the client_state's current_error is set + */ + int commit_or_rollback_by_xid(const std::string& xid, bool commit) + { + return transaction_.commit_or_rollback_by_xid(xid, commit); + } + /** * Append a key into transaction write set. * diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 6ef306b..63514dd 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -96,7 +96,8 @@ namespace wsrep virtual int append_fragment_and_commit( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) = 0; + const wsrep::const_buffer& data, + const std::string& xid) = 0; /** * Remove fragments belonging to streaming transaction. diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index dee6224..970cdf3 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -130,7 +130,12 @@ namespace wsrep , depends_on_(depends_on) , flags_(flags) { } - + ws_meta(const wsrep::stid& stid) + : gtid_() + , stid_(stid) + , depends_on_() + , flags_() + { } const wsrep::gtid& gtid() const { return gtid_; } const wsrep::id& group_id() const { diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 75f279f..f1c6728 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -256,12 +256,20 @@ namespace wsrep void stop_streaming_applier( const wsrep::id&, const wsrep::transaction_id&); + /** - * Return reference to streaming applier. + * Find a streaming applier matching server and transaction ids */ wsrep::high_priority_service* find_streaming_applier( const wsrep::id&, const wsrep::transaction_id&) const; + + /** + * Find a streaming applier matching xid + */ + wsrep::high_priority_service* find_streaming_applier( + const std::string& xid) const; + /** * Load WSRep provider. * diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index 2cee48a..e71431d 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -63,7 +63,8 @@ namespace wsrep virtual int append_fragment(const wsrep::id& server_id, wsrep::transaction_id client_id, int flags, - const wsrep::const_buffer& data) = 0; + const wsrep::const_buffer& data, + const std::string& xid) = 0; /** * Update fragment meta data after certification process. diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index f0701e8..45d30a4 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -132,6 +132,15 @@ namespace wsrep return client_service_.is_xa_prepare(); } + int restore_to_prepared_state(); + + int commit_or_rollback_by_xid(const std::string& xid, bool commit); + + const std::string xid() const + { + return client_service_.xid(); + } + bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } @@ -262,6 +271,7 @@ namespace wsrep wsrep::streaming_context streaming_context_; wsrep::sr_key_set sr_keys_; wsrep::mutable_buffer apply_error_buf_; + bool is_recovered_xa_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/server_state.cpp b/src/server_state.cpp index 9f4b29e..d527768 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -130,8 +130,9 @@ static int apply_fragment(wsrep::server_state& server_state, if (!apply_err) { high_priority_service.debug_crash("crash_apply_cb_before_append_frag"); + const std::string xid(streaming_applier->transaction().xid()); ret = high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data); + ws_handle, ws_meta, data, xid); high_priority_service.debug_crash("crash_apply_cb_after_append_frag"); ret = ret || (high_priority_service.after_apply(), 0); } @@ -148,7 +149,6 @@ static int apply_fragment(wsrep::server_state& server_state, return ret; } - static int commit_fragment(wsrep::server_state& server_state, wsrep::high_priority_service& high_priority_service, wsrep::high_priority_service* streaming_applier, @@ -180,7 +180,7 @@ static int commit_fragment(wsrep::server_state& server_state, const wsrep::transaction& trx(streaming_applier->transaction()); // Fragment removal for XA is going to happen in after_commit - if (!trx.is_xa()) + if (trx.state() != wsrep::transaction::s_prepared) { streaming_applier->debug_crash( "crash_apply_cb_before_fragment_removal"); @@ -381,7 +381,8 @@ static int apply_write_set(wsrep::server_state& server_state, else { sa->next_fragment(ws_meta); - ret = apply_fragment(high_priority_service, + ret = apply_fragment(server_state, + high_priority_service, sa, ws_handle, ws_meta, @@ -1256,6 +1257,22 @@ wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( return (i == streaming_appliers_.end() ? 0 : i->second); } +wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( + const std::string& xid) const +{ + wsrep::unique_lock lock(mutex_); + streaming_appliers_map::const_iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + wsrep::high_priority_service* sa(i->second); + if (sa->transaction().xid() == xid) + { + return sa; + } + } + return NULL; +} + ////////////////////////////////////////////////////////////////////////////// // Private // ////////////////////////////////////////////////////////////////////////////// @@ -1430,13 +1447,19 @@ void wsrep::server_state::close_orphaned_sr_transactions( streaming_appliers_map::iterator i(streaming_appliers_.begin()); while (i != streaming_appliers_.end()) { - // rollback SR on equal consecutive primary views or if its - // originator is not in the current view - if (equal_consecutive_views || - (std::find_if(current_view_.members().begin(), - current_view_.members().end(), - server_id_cmp(i->first.first)) == - current_view_.members().end())) + wsrep::high_priority_service* streaming_applier(i->second); + + // Rollback SR on equal consecutive primary views or if its + // originator is not in the current view. + // Transactions in prepared state must be committed or + // rolled back explicitly, those are never rolled back here. + if ((streaming_applier->transaction().state() != + wsrep::transaction::s_prepared) && + (equal_consecutive_views || + (std::find_if(current_view_.members().begin(), + current_view_.members().end(), + server_id_cmp(i->first.first)) == + current_view_.members().end()))) { WSREP_LOG_DEBUG(wsrep::log::debug_log_level(), wsrep::log::debug_level_server_state, @@ -1445,7 +1468,6 @@ void wsrep::server_state::close_orphaned_sr_transactions( << ", " << i->first.second); wsrep::id server_id(i->first.first); wsrep::transaction_id transaction_id(i->first.second); - wsrep::high_priority_service* streaming_applier(i->second); int adopt_error; if ((adopt_error = high_priority_service.adopt_transaction( streaming_applier->transaction()))) diff --git a/src/transaction.cpp b/src/transaction.cpp index 99c19c7..1c7a0e2 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -108,6 +108,7 @@ wsrep::transaction::transaction( , streaming_context_() , sr_keys_() , apply_error_buf_() + , is_recovered_xa_(false) { } @@ -425,7 +426,8 @@ int wsrep::transaction::before_commit() assert((ret == 0 && state() == s_committing) || (state() == s_must_abort || state() == s_must_replay || - state() == s_cert_failed)); + state() == s_cert_failed || + state() == s_prepared)); } else if (state() == s_executing) { @@ -485,9 +487,8 @@ int wsrep::transaction::before_commit() case wsrep::client_state::m_high_priority: assert(certified()); assert(ordered()); - if (is_xa()) + if (state() == s_prepared) { - assert(state() == s_prepared); state(lock, s_committing); } @@ -567,7 +568,7 @@ int wsrep::transaction::after_commit() assert(client_state_.mode() == wsrep::client_state::m_local || client_state_.mode() == wsrep::client_state::m_high_priority); - if (is_xa()) + if (is_xa() || is_recovered_xa_) { // XA fragment removal happens here, // see comment in before_prepare @@ -1035,6 +1036,73 @@ void wsrep::transaction::after_replay(const wsrep::transaction& other) clear_fragments(); } +int wsrep::transaction::restore_to_prepared_state() +{ + wsrep::unique_lock lock(client_state_.mutex_); + assert(active()); + assert(is_empty()); + assert(state() == s_executing); + flags(flags() & ~wsrep::provider::flag::start_transaction); + state(lock, s_certifying); + state(lock, s_prepared); + is_recovered_xa_ = true; + return 0; +} + +int wsrep::transaction::commit_or_rollback_by_xid(const std::string& xid, + bool commit) +{ + wsrep::unique_lock lock(client_state_.mutex_); + wsrep::server_state& server_state(client_state_.server_state()); + wsrep::high_priority_service* sa(server_state.find_streaming_applier(xid)); + + if (!sa) + { + assert(sa); + client_state_.override_error(wsrep::e_error_during_commit); + return 1; + } + + int flags(commit ? + wsrep::provider::flag::commit : + wsrep::provider::flag::rollback); + flags = flags | wsrep::provider::flag::pa_unsafe; + wsrep::stid stid(sa->transaction().server_id(), + sa->transaction().id(), + client_state_.id()); + wsrep::ws_meta meta(stid); + + const enum wsrep::provider::status cert_ret( + provider().certify(client_state_.id(), + ws_handle_, + flags, + meta)); + + if (cert_ret == wsrep::provider::success) + { + if (commit) + { + state(lock, s_certifying); + state(lock, s_committing); + state(lock, s_committed); + } + else + { + state(lock, s_aborting); + state(lock, s_aborted); + } + return 0; + } + else + { + client_state_.override_error(wsrep::e_error_during_commit); + wsrep::log_error() << "Failed to commit_or_rollback_by_xid," + << " xid: " << xid + << " error: " << cert_ret; + return 1; + } +} + //////////////////////////////////////////////////////////////////////////////// // Private // //////////////////////////////////////////////////////////////////////////////// @@ -1164,13 +1232,13 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) break; } - if (streaming_context_.fragment_size_exceeded() || client_service_.is_xa_prepare()) + if (streaming_context_.fragment_size_exceeded() || is_xa_prepare()) { // Some statements have no effect. Do not atttempt to // replicate a fragment if no data has been generated since // last fragment replication. A XA PREPARE statement generates a // fragment even if no data is currently pending replication. - if (bytes_to_replicate <= 0 && !client_service_.is_xa_prepare()) + if (bytes_to_replicate <= 0 && !is_xa_prepare()) { assert(bytes_to_replicate == 0); return ret; @@ -1268,7 +1336,8 @@ int wsrep::transaction::certify_fragment( server_id, id(), flags(), - wsrep::const_buffer(data.data(), data.size()))) + wsrep::const_buffer(data.data(), data.size()), + xid())) { ret = 1; error = wsrep::e_append_fragment_error; @@ -1538,7 +1607,14 @@ int wsrep::transaction::certify_commit( { client_state_.override_error(wsrep::e_error_during_commit, cert_ret); - state(lock, s_must_abort); + if (is_xa()) + { + state(lock, s_prepared); + } + else + { + state(lock, s_must_abort); + } } break; case wsrep::provider::error_provider_failed: @@ -1670,6 +1746,7 @@ void wsrep::transaction::cleanup() streaming_context_.cleanup(); client_service_.cleanup_transaction(); apply_error_buf_.clear(); + is_recovered_xa_ = false; debug_log_state("cleanup_leave"); } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 207fd53..75e93ee 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -209,7 +209,12 @@ namespace : ws_meta_(ws_meta) , trx_meta_() , flags_(flags) - { } + { + std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(), + sizeof(trx_meta_.stid.node.data)); + trx_meta_.stid.conn = ws_meta.client_id().get(); + trx_meta_.stid.trx = ws_meta.transaction_id().get(); + } ~mutable_ws_meta() { diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index db16094..d86890d 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -141,6 +141,11 @@ namespace wsrep return is_xa_prepare_; } + std::string xid() const WSREP_OVERRIDE + { + return ""; + } + size_t bytes_generated() const WSREP_OVERRIDE { return bytes_generated_; diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index 3fd9c34..5f4ac51 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -54,7 +54,8 @@ namespace wsrep int append_fragment_and_commit( const wsrep::ws_handle&, const wsrep::ws_meta&, - const wsrep::const_buffer&) WSREP_OVERRIDE + const wsrep::const_buffer&, + const std::string&) WSREP_OVERRIDE { return 0; } int remove_fragments(const wsrep::ws_meta&) WSREP_OVERRIDE { return 0; } diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index 7fbcc8f..4e99329 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -39,8 +39,8 @@ class mock_server_state; int append_fragment(const wsrep::id&, wsrep::transaction_id, int, - const wsrep::const_buffer&) - WSREP_OVERRIDE + const wsrep::const_buffer&, + const std::string&) WSREP_OVERRIDE { return 0; } int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE