From b02200b1ef4b8c519479a1d745159e98924f03b7 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Tue, 17 Jul 2018 14:34:24 +0300 Subject: [PATCH] Fixes to streaming rollback * Check fragment removal error code in prepare phase. It is possible that the transaction gets BF aborted during fragment removal. * Mark fragment certified in certify_fragment() even if the provider returns cert failed error. With current wsrep-API error codes it may not be possible to distinquish certification failure and BF abort during fragment replication. This may also be a provider bug. As a result rollback fragment may sometimes be replicated when it would not be necessary. --- dbsim/db_client_service.hpp | 23 ++------- include/wsrep/client_service.hpp | 4 +- include/wsrep/client_state.hpp | 9 ++++ include/wsrep/server_service.hpp | 1 + src/client_state.cpp | 1 + src/server_state.cpp | 4 ++ src/transaction.cpp | 82 +++++++++++++++++++++++++------- test/mock_client_state.hpp | 3 +- 8 files changed, 87 insertions(+), 40 deletions(-) diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 59e6f2d..e19de51 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -19,46 +19,29 @@ namespace db client_service(db::client& client); bool do_2pc() const override; - - bool interrupted() const override - { - return false; - } - + bool interrupted() const override { return false; } void reset_globals() override { } - void store_globals() override { } - int prepare_data_for_replication() override { return 0; } - void cleanup_transaction() override { } - size_t bytes_generated() const override { return 0; } - bool statement_allowed_for_streaming() const override { return true; } - int prepare_fragment_for_replication(wsrep::mutable_buffer&) override { return 0; } - - void remove_fragments() override - { } - + int remove_fragments() override { return 0; } int bf_rollback() override; - - void will_replay() override - { } - + void will_replay() override { } void wait_for_replayers(wsrep::unique_lock&) override { } enum wsrep::provider::status replay() override; diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 0f2ec32..6d6b3d4 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -73,8 +73,10 @@ namespace wsrep * Remove fragments from the storage within current transaction. * Fragment removal will be committed once the current transaction * commits. + * + * @return Zero in case of success, non-zero on failure. */ - virtual void remove_fragments() = 0; + virtual int remove_fragments() = 0; // // Rollback diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index f33fd0e..2350ed0 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -346,6 +346,7 @@ namespace wsrep const wsrep::ws_meta& meta) { wsrep::unique_lock lock(mutex_); + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(mode_ == m_high_priority); return transaction_.start_transaction(wsh, meta); } @@ -355,6 +356,7 @@ namespace wsrep int before_prepare() { wsrep::unique_lock lock(mutex_); + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec); return transaction_.before_prepare(lock); } @@ -362,30 +364,35 @@ namespace wsrep int after_prepare() { wsrep::unique_lock lock(mutex_); + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec); return transaction_.after_prepare(lock); } int before_commit() { + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); return transaction_.before_commit(); } int ordered_commit() { + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); return transaction_.ordered_commit(); } int after_commit() { + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); return transaction_.after_commit(); } /** @} */ int before_rollback() { + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_idle || state_ == s_exec || state_ == s_result || @@ -395,6 +402,7 @@ namespace wsrep int after_rollback() { + assert(current_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_idle || state_ == s_exec || state_ == s_result || @@ -440,6 +448,7 @@ namespace wsrep assert(mode_ == m_local || transaction_.is_streaming()); return transaction_.total_order_bf_abort(lock, bf_seqno); } + /** * Adopt a streaming transaction state. This is must be * called from high_priority_service::adopt_transaction() diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 5a8404a..1cc8835 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -174,6 +174,7 @@ namespace wsrep * Provide a server level debug sync point for a caller. */ virtual void debug_sync(const char* sync_point) = 0; + }; } diff --git a/src/client_state.cpp b/src/client_state.cpp index 21987fe..3667758 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -67,6 +67,7 @@ int wsrep::client_state::before_command() { wsrep::unique_lock lock(mutex_); debug_log_state("before_command: enter"); + store_globals(); // Marks the control for this thread assert(state_ == s_idle); if (transaction_.active() && server_state_.rollback_mode() == wsrep::server_state::rm_sync) diff --git a/src/server_state.cpp b/src/server_state.cpp index 638effd..72e45f2 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -272,6 +272,10 @@ static int apply_write_set(wsrep::server_state& server_state, { assert(0); } + if (ret) + { + wsrep::log_info() << "Failed to apply write set: " << ws_meta; + } return ret; } diff --git a/src/transaction.cpp b/src/transaction.cpp index af68192..b390a6f 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -253,6 +253,7 @@ int wsrep::transaction::after_row() break; } } + debug_log_state("after_row_leave"); return 0; } @@ -291,7 +292,11 @@ int wsrep::transaction::before_prepare( } else { - client_service_.remove_fragments(); + ret = client_service_.remove_fragments(); + if (ret) + { + client_state_.override_error(wsrep::e_deadlock_error); + } } lock.lock(); client_service_.debug_crash( @@ -307,7 +312,7 @@ int wsrep::transaction::before_prepare( break; } - assert(state() == s_preparing); + assert(state() == s_preparing || (ret && state() == s_must_abort)); debug_log_state("before_prepare_leave"); return ret; } @@ -861,6 +866,18 @@ bool wsrep::transaction::bf_abort( client_state_.server_state().stop_streaming_applier( server_id_, id_); } + else if (client_state_.mode() == wsrep::client_state::m_local && + is_streaming()) + { + streaming_context_.rolled_back(id_); + enum wsrep::provider::status ret; + if ((ret = provider().rollback(id_))) + { + wsrep::log_warning() + << "Failed to replicate rollback fragment for " + << id_ << ": " << ret; + } + } lock.unlock(); server_service_.background_rollback(client_state_); } @@ -994,6 +1011,7 @@ int wsrep::transaction::certify_fragment( } int ret(0); // Storage service scope + enum wsrep::provider::status cert_ret(wsrep::provider::success); { scoped_storage_service sr_scope( @@ -1032,11 +1050,10 @@ int wsrep::transaction::certify_fragment( client_service_.debug_crash( "crash_replicate_fragment_before_certify"); wsrep::ws_meta sr_ws_meta; - enum wsrep::provider::status - cert_ret(provider().certify(client_state_.id(), - ws_handle_, - flags(), - sr_ws_meta)); + cert_ret = provider().certify(client_state_.id(), + ws_handle_, + flags(), + sr_ws_meta); client_service_.debug_crash( "crash_replicate_fragment_after_certify"); switch (cert_ret) @@ -1062,6 +1079,28 @@ int wsrep::transaction::certify_fragment( client_service_.debug_crash( "crash_replicate_fragment_success"); break; + case wsrep::provider::error_bf_abort: + case wsrep::provider::error_certification_failed: + // Streaming transcation got BF aborted, so it must roll + // back. Roll back the fragment storage operation out of + // order as the commit order will be grabbed later on + // during rollback process. Mark the fragment as certified + // though in streaming context in order to enter streaming + // rollback codepath. + // + // Note that despite we handle error_certification_failed + // here, we mark the transaction as streaming. Apparently + // the provider may return status corresponding to certification + // failure even if the fragment has passed certification. + // This may be a bug in provider implementation or a limitation + // of error codes defined in wsrep-API. In order to make + // sure that the transaction will be cleaned on other servers, + // we take a risk of sending one rollback fragment for nothing. + storage_service.rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + streaming_context_.certified(data.size()); + ret = 1; + break; default: // Storage service rollback must be done out of order, // otherwise there may be a deadlock between BF aborter @@ -1092,11 +1131,11 @@ int wsrep::transaction::certify_fragment( { state(lock, s_must_abort); } - client_state_.override_error(wsrep::e_deadlock_error); + client_state_.override_error(wsrep::e_deadlock_error, cert_ret); } else if (state_ == s_must_abort) { - client_state_.override_error(wsrep::e_deadlock_error); + client_state_.override_error(wsrep::e_deadlock_error, cert_ret); } else { @@ -1300,7 +1339,7 @@ void wsrep::transaction::streaming_rollback() { debug_log_state("streaming_rollback enter"); assert(state_ != s_must_replay); - assert(streaming_context_.rolled_back() == false); + // assert(streaming_context_.rolled_back() == false); assert(is_streaming()); if (bf_aborted_in_total_order_) @@ -1316,14 +1355,21 @@ void wsrep::transaction::streaming_rollback() // rolled back. client_state_.server_state_.convert_streaming_client_to_applier( &client_state_); + const bool was_rolled_back_for(streaming_context_.rolled_back()); streaming_context_.cleanup(); - streaming_context_.rolled_back(id_); client_service_.debug_sync("wsrep_before_SR_rollback"); - enum wsrep::provider::status ret; - if ((ret = provider().rollback(id_))) + // Send a rollback fragment only if it was not sent before + // for this transaction. + if (was_rolled_back_for == false) { - wsrep::log_warning() << "Failed to replicate rollback fragment for " - << id_ << ": " << ret; + streaming_context_.rolled_back(id_); + enum wsrep::provider::status ret; + if ((ret = provider().rollback(id_))) + { + wsrep::log_warning() + << "Failed to replicate rollback fragment for " + << id_ << ": " << ret; + } } } debug_log_state("streaming_rollback leave"); @@ -1366,8 +1412,8 @@ void wsrep::transaction::debug_log_state( { WSREP_TC_LOG_DEBUG( 1, context - << "\n server: " << client_state_.server_state().id() - << ", client: " << client_state_.id().get() + << "\n server: " << server_id_ + << ", client: " << int64_t(client_state_.id().get()) << ", state: " << wsrep::to_c_string(client_state_.state()) << ", mode: " << wsrep::to_c_string(client_state_.mode()) << "\n trx_id: " << int64_t(id_.get()) @@ -1377,9 +1423,11 @@ void wsrep::transaction::debug_log_state( << " state: " << wsrep::to_c_string(state_) << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_) << ", error: " << wsrep::to_c_string(client_state_.current_error()) + << ", status: " << client_state_.current_error_status() << "\n" << " is_sr: " << is_streaming() << ", frags: " << streaming_context_.fragments_certified() + << ", frags size: " << streaming_context_.fragments().size() << ", unit: " << streaming_context_.fragment_unit() << ", size: " << streaming_context_.fragment_size() << ", counter: " << streaming_context_.unit_counter() diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index c256ac1..29eb009 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -78,8 +78,7 @@ namespace wsrep int, const wsrep::const_buffer&) WSREP_OVERRIDE { return 0; } - void remove_fragments() - WSREP_OVERRIDE { } + int remove_fragments() WSREP_OVERRIDE { return 0; } void will_replay() WSREP_OVERRIDE { } enum wsrep::provider::status