diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 56ce1e8..d2079e7 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -238,6 +238,8 @@ namespace wsrep wsrep::provider& provider(); void flags(int flags) { flags_ = flags; } + int before_prepare_local(wsrep::unique_lock&); + int before_prepare_high_priority(wsrep::unique_lock&); int before_commit_local(wsrep::unique_lock&); int before_commit_high_priority(wsrep::unique_lock&); // Return true if the transaction must abort, is aborting, diff --git a/src/transaction.cpp b/src/transaction.cpp index 2c82990..414fcad 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -225,6 +225,8 @@ int wsrep::transaction::prepare_for_ordering( int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid) { + assert(active()); + assert(client_state_.mode() == wsrep::client_state::m_local); try { return provider().assign_read_view(ws_handle_, gtid); @@ -239,6 +241,7 @@ int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid) int wsrep::transaction::append_key(const wsrep::key& key) { assert(active()); + assert(client_state_.mode() == wsrep::client_state::m_local); try { debug_log_key_append(key); @@ -255,11 +258,13 @@ int wsrep::transaction::append_key(const wsrep::key& key) int wsrep::transaction::append_data(const wsrep::const_buffer& data) { assert(active()); + assert(client_state_.mode() == wsrep::client_state::m_local); return provider().append_data(ws_handle_, data); } int wsrep::transaction::after_row() { + assert(client_state_.mode() == wsrep::client_state::m_local); wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("after_row_enter"); int ret(0); @@ -272,106 +277,121 @@ int wsrep::transaction::after_row() return ret; } +int wsrep::transaction::before_prepare_local( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + assert(client_state_.mode() == wsrep::client_state::m_local); + assert(state() == s_executing || state() == s_must_abort || + state() == s_replaying); + + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + return 1; + } + + int ret = 0; + if (is_streaming()) + { + client_service_.debug_crash( + "crash_last_fragment_commit_before_fragment_removal"); + lock.unlock(); + if (client_service_.statement_allowed_for_streaming() == false) + { + client_state_.override_error(wsrep::e_error_during_commit, + wsrep::provider::error_not_allowed); + ret = 1; + } + else if (!is_xa()) + { + // Note: we can't remove fragments here for XA, + // the transaction has already issued XA END and + // is in IDLE state, no more changes allowed! + ret = client_service_.remove_fragments(); + if (ret) + { + client_state_.override_error(wsrep::e_deadlock_error); + } + } + lock.lock(); + client_service_.debug_crash( + "crash_last_fragment_commit_after_fragment_removal"); + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + ret = 1; + } + } + + if (ret == 0) + { + if (is_xa()) + { + // Force fragment replication on XA prepare + flags(flags() | wsrep::provider::flag::prepare); + pa_unsafe(true); + append_sr_keys_for_commit(); + const bool force_streaming_step = true; + ret = streaming_step(lock, force_streaming_step); + if (ret == 0) + { + assert(state() == s_executing); + state(lock, s_preparing); + } + } + else + { + ret = certify_commit(lock); + } + + assert((ret == 0 && state() == s_preparing) + || (state() == s_must_abort || state() == s_must_replay + || state() == s_cert_failed)); + + if (ret) + { + assert(state() == s_must_replay || client_state_.current_error()); + ret = 1; + } + } + return ret; +} + +int wsrep::transaction::before_prepare_high_priority( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + assert(state() == s_executing || state() == s_replaying); + // Note: fragment removal is done from applying + // context for high priority mode. + if (is_xa()) + { + assert(state() == s_executing || state() == s_replaying); + if (state() == s_replaying) + { + return 0; + } + } + state(lock, s_preparing); + return 0; +} + int wsrep::transaction::before_prepare( wsrep::unique_lock& lock) { assert(lock.owns_lock()); int ret(0); debug_log_state("before_prepare_enter"); - assert(state() == s_executing || state() == s_must_abort || - state() == s_replaying); - - if (state() == s_must_abort) - { - assert(client_state_.mode() == wsrep::client_state::m_local); - client_state_.override_error(wsrep::e_deadlock_error); - return 1; - } switch (client_state_.mode()) { case wsrep::client_state::m_local: - if (is_streaming()) - { - client_service_.debug_crash( - "crash_last_fragment_commit_before_fragment_removal"); - lock.unlock(); - if (client_service_.statement_allowed_for_streaming() == false) - { - client_state_.override_error( - wsrep::e_error_during_commit, - wsrep::provider::error_not_allowed); - ret = 1; - } - else if (!is_xa()) - { - // Note: we can't remove fragments here for XA, - // the transaction has already issued XA END and - // is in IDLE state, no more changes allowed! - ret = client_service_.remove_fragments(); - if (ret) - { - client_state_.override_error(wsrep::e_deadlock_error); - } - } - lock.lock(); - client_service_.debug_crash( - "crash_last_fragment_commit_after_fragment_removal"); - if (state() == s_must_abort) - { - client_state_.override_error(wsrep::e_deadlock_error); - ret = 1; - } - } - - if (ret == 0) - { - if (is_xa()) - { - // Force fragment replication on XA prepare - flags(flags() | wsrep::provider::flag::prepare); - pa_unsafe(true); - append_sr_keys_for_commit(); - const bool force_streaming_step = true; - ret = streaming_step(lock, force_streaming_step); - if (ret == 0) - { - assert(state() == s_executing); - state(lock, s_preparing); - } - } - else - { - ret = certify_commit(lock); - } - - assert((ret == 0 && state() == s_preparing) || - (state() == s_must_abort || - state() == s_must_replay || - state() == s_cert_failed)); - - if (ret) - { - assert(state() == s_must_replay || - client_state_.current_error()); - ret = 1; - } - } - + ret = before_prepare_local(lock); break; case wsrep::client_state::m_high_priority: - // Note: fragment removal is done from applying - // context for high priority mode. - if (is_xa()) - { - assert(state() == s_executing || - state() == s_replaying); - if (state() == s_replaying) - { - break; - } - } - state(lock, s_preparing); + ret = before_prepare_high_priority(lock); break; default: assert(0); @@ -441,6 +461,8 @@ int wsrep::transaction::after_prepare( int wsrep::transaction::before_commit_local( wsrep::unique_lock& lock) { + assert(lock.owns_lock()); + int ret = 1; assert(state() == s_executing || state() == s_prepared || state() == s_committing || state() == s_must_abort @@ -512,6 +534,8 @@ int wsrep::transaction::before_commit_local( int wsrep::transaction::before_commit_high_priority( wsrep::unique_lock& lock) { + assert(lock.owns_lock()); + int ret = 1; assert(certified()); assert(ordered());