From 9c9323e2a52730fda31b3578c225a7f31fe5847a Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Mon, 4 Feb 2019 08:45:00 +0100 Subject: [PATCH] Initial support for XA Force fragment replication when XA transaction is prepared, with prepare fragment. Commit fragment happens in before_commit(). Adjusted fragment removal, which cannot happen in atomically with the executing transaction. --- include/wsrep/client_service.hpp | 8 ++- include/wsrep/transaction.hpp | 7 +- src/server_state.cpp | 21 ++++-- src/transaction.cpp | 112 ++++++++++++++++++++++++++----- 4 files changed, 122 insertions(+), 26 deletions(-) diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 5ca04fe..7067b99 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -34,13 +34,14 @@ namespace wsrep { - class transaction; class client_service { public: client_service() { } virtual ~client_service() { } + virtual const char* query() const = 0; + /** * Return true if the current transaction has been interrupted * by the DBMS. The lock which is passed to interrupted call @@ -72,6 +73,11 @@ namespace wsrep */ virtual void cleanup_transaction() = 0; + /** + * Return true if the current transactions is XA + */ + virtual bool is_xa() const = 0; + // // Streaming // diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 6c78cd8..0a7aff8 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -28,6 +28,7 @@ #include "lock.hpp" #include "sr_key_set.hpp" #include "buffer.hpp" +#include "client_service.hpp" #include #include @@ -39,7 +40,6 @@ namespace wsrep class key; class const_buffer; - class transaction { public: @@ -121,6 +121,11 @@ namespace wsrep return sr_keys_.empty(); } + bool is_xa() const + { + return client_service_.is_xa(); + } + bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } diff --git a/src/server_state.cpp b/src/server_state.cpp index 0a79929..4b94cfa 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -177,11 +177,20 @@ static int commit_fragment(wsrep::server_state& server_state, { assert(err.size() == 0); } - streaming_applier->debug_crash( - "crash_apply_cb_before_fragment_removal"); - ret = ret || streaming_applier->remove_fragments(ws_meta); - streaming_applier->debug_crash( - "crash_apply_cb_after_fragment_removal"); + + const wsrep::transaction& trx(streaming_applier->transaction()); + // Fragment removal for XA is going to happen in after_commit + if (!trx.is_xa()) + { + streaming_applier->debug_crash( + "crash_apply_cb_before_fragment_removal"); + + ret = ret || streaming_applier->remove_fragments(ws_meta); + + streaming_applier->debug_crash( + "crash_apply_cb_after_fragment_removal"); + } + streaming_applier->debug_crash( "crash_commit_cb_before_last_fragment_commit"); ret = ret || streaming_applier->commit(ws_handle, ws_meta); @@ -427,7 +436,7 @@ static int apply_write_set(wsrep::server_state& server_state, } if (ret) { - wsrep::log_info() << "Failed to apply write set: " << ws_meta; + wsrep::log_error() << "Failed to apply write set: " << ws_meta; } return ret; } diff --git a/src/transaction.cpp b/src/transaction.cpp index 57dc8c9..520274e 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -288,8 +288,11 @@ int wsrep::transaction::before_prepare( wsrep::provider::error_not_allowed); ret = 1; } - else + else if (!is_xa()) { + // Note: we can't remove fragments here for XA, + // the transaction has already issued XA END and + // is in IDLE state, no more changes allowed! ret = client_service_.remove_fragments(); if (ret) { @@ -305,13 +308,25 @@ int wsrep::transaction::before_prepare( ret = 1; } } + if (ret == 0) { - ret = certify_commit(lock); - assert((ret == 0 && state() == s_preparing) || - (state() == s_must_abort || - state() == s_must_replay || - state() == s_cert_failed)); + if (is_xa()) + { + ret = streaming_step(lock); + if (ret == 0) + { + assert(state() == s_executing); + } + } + else + { + ret = certify_commit(lock); + assert((ret == 0 && state() == s_preparing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + } if (ret) { @@ -325,7 +340,14 @@ int wsrep::transaction::before_prepare( case wsrep::client_state::m_high_priority: // Note: fragment removal is done from applying // context for high priority mode. - state(lock, s_preparing); + if (is_xa()) + { + assert(state() == s_executing); + } + else + { + state(lock, s_preparing); + } break; default: assert(0); @@ -333,6 +355,7 @@ int wsrep::transaction::before_prepare( } assert(state() == s_preparing || + (state() == s_executing && is_xa()) || (ret && (state() == s_must_abort || state() == s_must_replay || state() == s_cert_failed || @@ -347,18 +370,26 @@ int wsrep::transaction::after_prepare( assert(lock.owns_lock()); debug_log_state("after_prepare_enter"); - assert(certified() && ordered()); - assert(state() == s_preparing || state() == s_must_abort); - - if (state() == s_must_abort) + if (is_xa()) { - assert(client_state_.mode() == wsrep::client_state::m_local); - state(lock, s_must_replay); - return 1; + // TODO XA consider adding state s_prepared + assert(state() == s_executing); + assert(client_state_.mode() == wsrep::client_state::m_local || + (certified() && ordered())); } + else + { + assert(certified() && ordered()); + assert(state() == s_preparing || state() == s_must_abort); - state(lock, s_committing); - + if (state() == s_must_abort) + { + assert(client_state_.mode() == wsrep::client_state::m_local); + state(lock, s_must_replay); + return 1; + } + state(lock, s_committing); + } debug_log_state("after_prepare_leave"); return 0; } @@ -380,7 +411,19 @@ int wsrep::transaction::before_commit() switch (client_state_.mode()) { case wsrep::client_state::m_local: - if (state() == s_executing) + if (state() == s_executing && is_xa()) + { + ret = certify_commit(lock); + assert((ret == 0 && state() == s_preparing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + if (ret == 0) + { + state(lock, s_committing); + } + } + else if (state() == s_executing) { ret = before_prepare(lock) || after_prepare(lock); assert((ret == 0 && state() == s_committing) @@ -438,6 +481,14 @@ int wsrep::transaction::before_commit() case wsrep::client_state::m_high_priority: assert(certified()); assert(ordered()); + if (is_xa()) + { + assert(state() == s_executing); + // one more reason to add prepared state? + state(lock, s_preparing); + state(lock, s_committing); + } + if (state() == s_executing || state() == s_replaying) { ret = before_prepare(lock) || after_prepare(lock); @@ -513,6 +564,25 @@ int wsrep::transaction::after_commit() { assert(client_state_.mode() == wsrep::client_state::m_local || client_state_.mode() == wsrep::client_state::m_high_priority); + + if (is_xa()) + { + // XA fragment removal happens here, + // see comment in before_prepare + lock.unlock(); + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); + storage_service.adopt_transaction(*this); + storage_service.remove_fragments(); + storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta()); + lock.lock(); + } + if (client_state_.mode() == wsrep::client_state::m_local) { lock.unlock(); @@ -1050,7 +1120,7 @@ bool wsrep::transaction::abort_or_interrupt( int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) { assert(lock.owns_lock()); - assert(streaming_context_.fragment_size()); + assert(streaming_context_.fragment_size() || is_xa()); if (client_service_.bytes_generated() < streaming_context_.bytes_certified()) @@ -1153,6 +1223,11 @@ int wsrep::transaction::certify_fragment( flags(flags() | wsrep::provider::flag::implicit_deps); } + if (is_xa()) + { + flags(flags() | wsrep::provider::flag::prepare); + } + int ret(0); enum wsrep::client_error error(wsrep::e_success); enum wsrep::provider::status cert_ret(wsrep::provider::success); @@ -1604,6 +1679,7 @@ void wsrep::transaction::debug_log_state( << ", sr_rb: " << streaming_context_.rolled_back() << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id()) << " thread_id: " << client_state_.owning_thread_id_ + << "\n query: " << client_service_.query() << ""); }