diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 4055b8b..be6f9ad 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -60,11 +60,33 @@ namespace db int remove_fragments() override { return 0; } int bf_rollback() override; void will_replay() override { } + void signal_replayed() override { } void wait_for_replayers(wsrep::unique_lock&) override { } enum wsrep::provider::status replay() override; + enum wsrep::provider::status replay_unordered() override + { + return wsrep::provider::success; + } + void emergency_shutdown() override { ::abort(); } + + enum wsrep::provider::status commit_by_xid() override + { + return wsrep::provider::success; + } + + bool is_explicit_xa() override + { + return false; + } + + bool is_xa_rollback() override + { + return false; + } + void debug_sync(const char*) override { } void debug_crash(const char*) override { } private: diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index ac29e37..6664880 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -145,6 +145,11 @@ namespace wsrep */ virtual void will_replay() = 0; + /** + * Signal that replay is done. + */ + virtual void signal_replayed() = 0; + /** * Replay the current transaction. The implementation must put * the caller Client Context into applying mode and call @@ -155,6 +160,13 @@ namespace wsrep */ virtual enum wsrep::provider::status replay() = 0; + /** + * Replay the current transaction. This is used for replaying + * prepared XA transactions, which are BF aborted but not + * while orderding commit / rollback. + */ + virtual enum wsrep::provider::status replay_unordered() = 0; + /** * Wait until all replaying transactions have been finished * replaying. @@ -164,6 +176,32 @@ namespace wsrep */ virtual void wait_for_replayers(wsrep::unique_lock&) = 0; + // + // XA + // + /** + * Send a commit by xid + */ + virtual enum wsrep::provider::status commit_by_xid() = 0; + + /* + Returns true if the client has an ongoing XA transaction. + This method is used to determine when to cleanup the + corresponding wsrep-lib transaction object. + This method should return false when the XA transaction + is over, and the wsrep-lib transaction object can be + cleaned up. + */ + virtual bool is_explicit_xa() = 0; + + /** + * Returns true if the currently executing command is + * a rollback for XA. This is used to avoid setting a + * a deadlock error rollback as it may be unexpected + * by the DBMS. + */ + virtual bool is_xa_rollback() = 0; + // // Debug interface // diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index d16e77f..fd68eb1 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -604,6 +604,23 @@ namespace wsrep transaction_.xa_detach(); } + /** + * Replay a XA transaction + * + * Replay a XA transaction that is in s_idle state. + * This may happen if the transaction is BF aborted + * between prepare and commit. + * Since the victim is idle, this method can be called + * by the BF aborter or the backround rollbacker. + */ + void xa_replay() + { + assert(mode_ == m_local); + assert(state_ == s_idle); + wsrep::unique_lock lock(mutex_); + transaction_.xa_replay(lock); + } + // // BF aborting // diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 5761463..ff7c3f0 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -146,6 +146,8 @@ namespace wsrep void xa_detach(); + int xa_replay(wsrep::unique_lock&); + bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } diff --git a/src/client_state.cpp b/src/client_state.cpp index 40bc4a2..b3e750b 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -133,16 +133,25 @@ int wsrep::client_state::before_command() } else if (transaction_.state() == wsrep::transaction::s_aborted) { - // Transaction was rolled back either just before sending result - // to the client, or after client_state become idle. - // Clean up the transaction and return error. - override_error(wsrep::e_deadlock_error); - lock.unlock(); - (void)transaction_.after_statement(); - lock.lock(); - assert(transaction_.active() == false); - debug_log_state("before_command: error"); - return 1; + // Transaction was rolled back either just before sending + // result to the client, or after client_state become idle. + if (transaction_.is_xa()) + { + // Client will rollback explicitly, return error. + debug_log_state("before_command: error"); + return 1; + } + else + { + // Clean up the transaction and return error. + override_error(wsrep::e_deadlock_error); + lock.unlock(); + (void)transaction_.after_statement(); + lock.lock(); + assert(transaction_.active() == false); + debug_log_state("before_command: error"); + return 1; + } } } debug_log_state("before_command: success"); diff --git a/src/transaction.cpp b/src/transaction.cpp index 8e7c07b..7cad8b0 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -339,12 +339,13 @@ int wsrep::transaction::before_prepare( else { ret = certify_commit(lock); - assert((ret == 0 && state() == s_preparing) || - (state() == s_must_abort || - state() == s_must_replay || - state() == s_cert_failed)); } + assert((ret == 0 && state() == s_preparing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + if (ret) { assert(state() == s_must_replay || @@ -359,7 +360,12 @@ int wsrep::transaction::before_prepare( // context for high priority mode. if (is_xa()) { - assert(state() == s_executing); + assert(state() == s_executing || + state() == s_replaying); + if (state() == s_replaying) + { + break; + } } state(lock, s_preparing); break; @@ -369,6 +375,7 @@ int wsrep::transaction::before_prepare( } assert(state() == s_preparing || + (is_xa() && state() == s_replaying) || (ret && (state() == s_must_abort || state() == s_must_replay || state() == s_cert_failed || @@ -382,13 +389,30 @@ int wsrep::transaction::after_prepare( { assert(lock.owns_lock()); + int ret = 0; debug_log_state("after_prepare_enter"); if (is_xa()) { - assert(state() == s_preparing); - assert(client_state_.mode() == wsrep::client_state::m_local || - (certified() && ordered())); - state(lock, s_prepared); + switch (state()) + { + case s_preparing: + assert(client_state_.mode() == wsrep::client_state::m_local || + (certified() && ordered())); + state(lock, s_prepared); + break; + case s_must_abort: + // prepared locally, but client has not received + // a result yet. We can still abort. + assert(client_state_.mode() == wsrep::client_state::m_local); + client_state_.override_error(wsrep::e_deadlock_error); + ret = 1; + break; + case s_replaying: + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + break; + default: + assert(0); + } } else { @@ -398,13 +422,17 @@ int wsrep::transaction::after_prepare( if (state() == s_must_abort) { assert(client_state_.mode() == wsrep::client_state::m_local); + client_service_.will_replay(); state(lock, s_must_replay); - return 1; + ret = 1; + } + else + { + state(lock, s_committing); } - state(lock, s_committing); } debug_log_state("after_prepare_leave"); - return 0; + return ret; } int wsrep::transaction::before_commit() @@ -425,8 +453,7 @@ int wsrep::transaction::before_commit() switch (client_state_.mode()) { case wsrep::client_state::m_local: - if (is_xa() && - (state() == s_prepared || state() == s_executing)) + if (is_xa()) { ret = 0; if (state() == s_executing) @@ -436,6 +463,13 @@ int wsrep::transaction::before_commit() // fall back to prepare + commit ret = before_prepare(lock) || after_prepare(lock); } + else if (state() == s_must_abort) + { + assert(is_streaming()); + client_service_.will_replay(); + state(lock, s_must_replay); + ret = 1; + } if (!ret) { assert(state() == s_prepared); @@ -462,6 +496,7 @@ int wsrep::transaction::before_commit() assert(state() == s_must_abort); if (certified()) { + client_service_.will_replay(); state(lock, s_must_replay); } else @@ -488,6 +523,7 @@ int wsrep::transaction::before_commit() case wsrep::provider::success: break; case wsrep::provider::error_bf_abort: + client_service_.will_replay(); if (state() != s_must_abort) { state(lock, s_must_abort); @@ -505,12 +541,14 @@ int wsrep::transaction::before_commit() case wsrep::client_state::m_high_priority: assert(certified()); assert(ordered()); - if (state() == s_prepared) + if (is_xa()) { + assert(state() == s_prepared || + state() == s_replaying); state(lock, s_committing); + ret = 0; } - - if (state() == s_executing || state() == s_replaying) + else if (state() == s_executing || state() == s_replaying) { ret = before_prepare(lock) || after_prepare(lock); } @@ -671,6 +709,7 @@ int wsrep::transaction::before_rollback() case s_must_abort: if (certified()) { + client_service_.will_replay(); state(lock, s_must_replay); } else @@ -784,6 +823,7 @@ int wsrep::transaction::release_commit_order( int wsrep::transaction::after_statement() { int ret(0); + client_service_.debug_sync("wsrep_after_statement_enter"); wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("after_statement_enter"); assert(client_state_.mode() == wsrep::client_state::m_local); @@ -827,13 +867,21 @@ int wsrep::transaction::after_statement() // Fall through case s_must_replay: { - ret = replay(lock); + if (is_xa() && !ordered()) + { + ret = xa_replay(lock); + } + else + { + ret = replay(lock); + } break; } case s_aborted: // Raise a deadlock error if the transaction was BF aborted and // rolled back by client outside of transaction hooks. - if (bf_aborted() && client_state_.current_error() == wsrep::e_success) + if (bf_aborted() && client_state_.current_error() == wsrep::e_success && + !client_service_.is_xa_rollback()) { client_state_.override_error(wsrep::e_deadlock_error); } @@ -860,7 +908,9 @@ int wsrep::transaction::after_statement() lock.lock(); } - if (state() != s_executing && state() != s_prepared) + if (state() != s_executing && + (!client_service_.is_explicit_xa() || + client_state_.state() == wsrep::client_state::s_quitting)) { cleanup(); } @@ -918,6 +968,7 @@ bool wsrep::transaction::bf_abort( { case s_executing: case s_preparing: + case s_prepared: case s_certifying: case s_committing: { @@ -982,18 +1033,26 @@ bool wsrep::transaction::bf_abort( { // We need to change the state to aborting under the // lock protection to avoid a race between client thread, - // otherwise it could happend that the client gains control + // otherwise it could happen that the client gains control // between releasing the lock and before background // rollbacker gets control. - state(lock, wsrep::transaction::s_aborting); - client_state_.set_rollbacker_active(true); - - if (client_state_.mode() == wsrep::client_state::m_high_priority) + if (is_xa() && state_at_enter == s_prepared) { - lock.unlock(); - client_state_.server_state().stop_streaming_applier( - server_id_, id_); - lock.lock(); + client_service_.will_replay(); + state(lock, s_must_replay); + client_state_.set_rollbacker_active(true); + } + else + { + state(lock, s_aborting); + client_state_.set_rollbacker_active(true); + if (client_state_.mode() == wsrep::client_state::m_high_priority) + { + lock.unlock(); + client_state_.server_state().stop_streaming_applier( + server_id_, id_); + lock.lock(); + } } lock.unlock(); @@ -1019,6 +1078,8 @@ void wsrep::transaction::clone_for_replay(const wsrep::transaction& other) { assert(other.state() == s_replaying); id_ = other.id_; + xid_ = other.xid_; + server_id_ = other.server_id_; ws_handle_ = other.ws_handle_; ws_meta_ = other.ws_meta_; streaming_context_ = other.streaming_context_; @@ -1030,9 +1091,16 @@ int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) wsrep::unique_lock lock(client_state_.mutex_); assert(active()); assert(is_empty()); - assert(state() == s_executing); + assert(state() == s_executing || state() == s_replaying); flags(flags() & ~wsrep::provider::flag::start_transaction); - state(lock, s_certifying); + if (state() == s_executing) + { + state(lock, s_certifying); + } + else + { + state(lock, s_preparing); + } state(lock, s_prepared); xid_ = xid; return 0; @@ -1090,7 +1158,8 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, } else { - client_state_.override_error(wsrep::e_error_during_commit); + client_state_.override_error(wsrep::e_error_during_commit, + cert_ret); wsrep::log_error() << "Failed to commit_or_rollback_by_xid," << " xid: " << xid << " error: " << cert_ret; @@ -1100,16 +1169,82 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid, void wsrep::transaction::xa_detach() { - assert(state() == s_prepared); - wsrep::server_state& server_state(client_state_.server_state()); - server_state.convert_streaming_client_to_applier(&client_state_); - client_service_.cleanup_transaction(); - wsrep::unique_lock lock(client_state_.mutex_); - streaming_context_.cleanup(); - state(lock, s_aborting); - state(lock, s_aborted); - provider().release(ws_handle_); - cleanup(); + debug_log_state("xa_detach enter"); + assert(state() == s_prepared); + wsrep::server_state& server_state(client_state_.server_state()); + server_state.convert_streaming_client_to_applier(&client_state_); + client_service_.store_globals(); + client_service_.cleanup_transaction(); + wsrep::unique_lock lock(client_state_.mutex_); + streaming_context_.cleanup(); + state(lock, s_aborting); + state(lock, s_aborted); + provider().release(ws_handle_); + cleanup(); + debug_log_state("xa_detach leave"); +} + +int wsrep::transaction::xa_replay(wsrep::unique_lock& lock) +{ + debug_log_state("xa_replay enter"); + assert(lock.owns_lock()); + assert(is_xa()); + assert(is_streaming()); + assert(state() == s_must_replay); + assert(bf_aborted()); + + state(lock, s_replaying); + + enum wsrep::provider::status status; + wsrep::server_state& server_state(client_state_.server_state()); + + lock.unlock(); + server_state.convert_streaming_client_to_applier(&client_state_); + status = client_service_.replay_unordered(); + client_service_.store_globals(); + lock.lock(); + + if (status != wsrep::provider::success) + { + client_service_.emergency_shutdown(); + } + + int ret(1); + if (bf_abort_client_state_ == wsrep::client_state::s_idle) + { + state(lock, s_aborted); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + ret = 0; + } + else + { + lock.unlock(); + enum wsrep::provider::status status(client_service_.commit_by_xid()); + lock.lock(); + switch (status) + { + case wsrep::provider::success: + state(lock, s_committed); + streaming_context_.cleanup(); + provider().release(ws_handle_); + cleanup(); + ret = 0; + break; + default: + log_warning() << "Failed to commit by xid during replay"; + // Commit by xid failed, return a commit + // error and let the client retry + state(lock, s_preparing); + state(lock, s_prepared); + client_state_.override_error(wsrep::e_error_during_commit, status); + } + } + + client_service_.signal_replayed(); + debug_log_state("xa_replay leave"); + return ret; } //////////////////////////////////////////////////////////////////////////////// @@ -1134,16 +1269,18 @@ void wsrep::transaction::state( assert(lock.owns_lock()); // BF aborter is allowed to change the state without gaining control - // to the state if the next state is s_must_abort or s_aborting. + // to the state if the next state is s_must_abort, s_aborting or + // s_must_replay (for xa idle replay). assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || next_state == s_must_abort || + next_state == s_must_replay || next_state == s_aborting); static const char allowed[n_states][n_states] = { /* ex pg pd ce co oc ct cf ma ab ad mr re */ { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */ - { 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0}, /* pd */ + { 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0}, /* pd */ { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ @@ -1284,6 +1421,7 @@ int wsrep::transaction::certify_fragment( state(lock, s_certifying); lock.unlock(); + client_service_.debug_sync("wsrep_before_fragment_certification"); wsrep::mutable_buffer data; size_t log_position(0); @@ -1313,6 +1451,20 @@ int wsrep::transaction::certify_fragment( return 1; } + if (is_xa()) + { + // One more check to see if the transaction + // has been aborted. This is necessary because + // until the append_data above will make sure + // that the transaction exists in provider. + lock.lock(); + if (abort_or_interrupt(lock)) + { + return 1; + } + lock.unlock(); + } + if (is_streaming() == false) { client_state_.server_state_.start_streaming_client(&client_state_); @@ -1492,15 +1644,23 @@ int wsrep::transaction::certify_commit( if (abort_or_interrupt(lock)) { + if (is_xa() && state() == s_must_abort) + { + client_service_.will_replay(); + state(lock, s_must_replay); + } return 1; } state(lock, s_certifying); lock.unlock(); - if (is_streaming() && !is_xa()) + if (is_streaming()) { - append_sr_keys_for_commit(); + if (!is_xa()) + { + append_sr_keys_for_commit(); + } flags(flags() | wsrep::provider::flag::pa_unsafe); } @@ -1610,6 +1770,11 @@ int wsrep::transaction::certify_commit( // to reduce number of error state combinations elsewhere. if (state() == s_must_abort) { + if (is_xa()) + { + client_service_.will_replay(); + state(lock, s_must_replay); + } client_state_.override_error(wsrep::e_deadlock_error); } else @@ -1734,6 +1899,7 @@ int wsrep::transaction::replay(wsrep::unique_lock& lock) lock.unlock(); client_service_.debug_sync("wsrep_before_replay"); enum wsrep::provider::status replay_ret(client_service_.replay()); + client_service_.signal_replayed(); if (was_streaming) { client_state_.server_state_.stop_streaming_client(&client_state_); @@ -1783,9 +1949,9 @@ void wsrep::transaction::clear_fragments() void wsrep::transaction::cleanup() { + debug_log_state("cleanup_enter"); assert(is_streaming() == false); assert(state() == s_committed || state() == s_aborted); - debug_log_state("cleanup_enter"); id_ = wsrep::transaction_id::undefined(); ws_handle_ = wsrep::ws_handle(); // Keep the state history for troubleshooting. Reset diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index c7d5dc7..7e718cf 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -102,8 +102,15 @@ namespace wsrep void will_replay() WSREP_OVERRIDE { will_replay_called_ = true; } + void signal_replayed() WSREP_OVERRIDE { } + enum wsrep::provider::status replay() WSREP_OVERRIDE; + enum wsrep::provider::status replay_unordered() WSREP_OVERRIDE + { + return wsrep::provider::success; + } + void wait_for_replayers( wsrep::unique_lock& lock) WSREP_OVERRIDE @@ -153,6 +160,21 @@ namespace wsrep void store_globals() WSREP_OVERRIDE { } void reset_globals() WSREP_OVERRIDE { } + enum wsrep::provider::status commit_by_xid() WSREP_OVERRIDE + { + return wsrep::provider::success; + } + + bool is_explicit_xa() WSREP_OVERRIDE + { + return false; + } + + bool is_xa_rollback() WSREP_OVERRIDE + { + return false; + } + void debug_sync(const char* sync_point) WSREP_OVERRIDE { if (sync_point_enabled_ == sync_point)