diff --git a/src/client_state.cpp b/src/client_state.cpp index 3667758..95c916b 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -67,7 +67,6 @@ int wsrep::client_state::before_command() { wsrep::unique_lock lock(mutex_); debug_log_state("before_command: enter"); - store_globals(); // Marks the control for this thread assert(state_ == s_idle); if (transaction_.active() && server_state_.rollback_mode() == wsrep::server_state::rm_sync) @@ -77,6 +76,7 @@ int wsrep::client_state::before_command() cond_.wait(lock); } } + store_globals(); // Marks the control for this thread state(lock, s_exec); assert(transaction_.active() == false || (transaction_.state() == wsrep::transaction::s_executing || diff --git a/src/transaction.cpp b/src/transaction.cpp index ffbafcd..d708fe3 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -227,6 +227,7 @@ int wsrep::transaction::after_row() { wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("after_row_enter"); + int ret(0); if (streaming_context_.fragment_size() > 0) { switch (streaming_context_.fragment_unit()) @@ -237,7 +238,7 @@ int wsrep::transaction::after_row() streaming_context_.fragment_size()) { streaming_context_.reset_unit_counter(); - return certify_fragment(lock); + ret = certify_fragment(lock); } break; case streaming_context::bytes: @@ -245,7 +246,7 @@ int wsrep::transaction::after_row() streaming_context_.bytes_certified() + streaming_context_.fragment_size()) { - return certify_fragment(lock); + ret = certify_fragment(lock); } break; case streaming_context::statement: @@ -254,7 +255,7 @@ int wsrep::transaction::after_row() } } debug_log_state("after_row_leave"); - return 0; + return ret; } int wsrep::transaction::before_prepare( @@ -856,6 +857,18 @@ bool wsrep::transaction::bf_abort( if (ret) { bf_abort_client_state_ = client_state_.state(); + // If the transaction is in executing state, we must initiate + // streaming rollback to ensure that the rollback fragment gets + // replicated before the victim starts to roll back and release locks. + // In other states the BF abort will be detected outside of + // storage engine operations and streaming rollback will be + // handled from before_rollback() call. + if (client_state_.mode() == wsrep::client_state::m_local && + is_streaming() && state() == s_executing) + { + streaming_rollback(); + } + if ((client_state_.state() == wsrep::client_state::s_idle && client_state_.server_state().rollback_mode() == wsrep::server_state::rm_sync) // locally processing idle @@ -875,11 +888,7 @@ bool wsrep::transaction::bf_abort( client_state_.server_state().stop_streaming_applier( server_id_, id_); } - else if (client_state_.mode() == wsrep::client_state::m_local && - is_streaming()) - { - streaming_rollback(); - } + lock.unlock(); server_service_.background_rollback(client_state_); } @@ -976,7 +985,8 @@ int wsrep::transaction::certify_fragment( assert(lock.owns_lock()); assert(client_state_.mode() == wsrep::client_state::m_local); - assert(streaming_context_.rolled_back() == false); + assert(streaming_context_.rolled_back() == false || + state() == s_must_abort); client_service_.wait_for_replayers(lock); if (state() == s_must_abort) @@ -1120,7 +1130,7 @@ int wsrep::transaction::certify_fragment( // rollback process. if (ret == 0) { - provider().release(ws_handle_); + ret = provider().release(ws_handle_); } lock.lock(); if (ret) @@ -1129,6 +1139,10 @@ int wsrep::transaction::certify_fragment( { client_state_.server_state_.stop_streaming_client(&client_state_); } + else + { + streaming_rollback(); + } if (state_ != s_must_abort) { state(lock, s_must_abort); @@ -1137,6 +1151,10 @@ int wsrep::transaction::certify_fragment( } else if (state_ == s_must_abort) { + if (is_streaming()) + { + streaming_rollback(); + } client_state_.override_error(wsrep::e_deadlock_error, cert_ret); } else