diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 3c518d6..b502cbd 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -927,6 +927,22 @@ namespace wsrep return transaction_; } + /** + * Mark the transaction associated with the client state + * (if any), as unsafe for parallel applying + * + * @return Zero on success, non-zero on error. + */ + int mark_transaction_pa_unsafe() + { + if (transaction_.active()) + { + transaction_.pa_unsafe(true); + return 0; + } + return 1; + } + const wsrep::ws_meta& toi_meta() const { return toi_meta_; diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 7ad1090..f19eeb2 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -143,9 +143,14 @@ namespace wsrep int xa_replay(wsrep::unique_lock&); - bool pa_unsafe() const { return pa_unsafe_; } - void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } - + bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); } + void pa_unsafe(bool pa_unsafe) { + if (pa_unsafe) { + flags(flags() | wsrep::provider::flag::pa_unsafe); + } else { + flags(flags() & ~wsrep::provider::flag::pa_unsafe); + } + } bool implicit_deps() const { return implicit_deps_; } void implicit_deps(bool implicit) { implicit_deps_ = implicit; } @@ -265,7 +270,6 @@ namespace wsrep wsrep::ws_handle ws_handle_; wsrep::ws_meta ws_meta_; int flags_; - bool pa_unsafe_; bool implicit_deps_; bool certified_; size_t fragments_certified_for_statement_; diff --git a/src/transaction.cpp b/src/transaction.cpp index 63ea2e4..b93b35c 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -101,7 +101,6 @@ wsrep::transaction::transaction( , ws_handle_() , ws_meta_() , flags_() - , pa_unsafe_(false) , implicit_deps_(false) , certified_(false) , fragments_certified_for_statement_() @@ -326,7 +325,7 @@ int wsrep::transaction::before_prepare( { // Force fragment replication on XA prepare flags(flags() | wsrep::provider::flag::prepare); - flags(flags() | wsrep::provider::flag::pa_unsafe); + pa_unsafe(true); append_sr_keys_for_commit(); const bool force_streaming_step = true; ret = streaming_step(lock, force_streaming_step); @@ -1116,7 +1115,7 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, { flags = wsrep::provider::flag::rollback; } - flags = flags | wsrep::provider::flag::pa_unsafe; + pa_unsafe(true); wsrep::stid stid(sa->transaction().server_id(), sa->transaction().id(), client_state_.id()); @@ -1650,7 +1649,7 @@ int wsrep::transaction::certify_commit( { append_sr_keys_for_commit(); } - flags(flags() | wsrep::provider::flag::pa_unsafe); + pa_unsafe(true); } if (implicit_deps()) @@ -1956,7 +1955,6 @@ void wsrep::transaction::cleanup() ws_meta_ = wsrep::ws_meta(); flags_ = 0; certified_ = false; - pa_unsafe_ = false; implicit_deps_ = false; sr_keys_.clear(); streaming_context_.cleanup();