From 3f4e5dea3b2d00ca92641de329b96987b3a0303e Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 12 Jul 2018 13:36:45 +0300 Subject: [PATCH] Revised logic to handle SR replaying * Added server_id into transaction in order to be able to stop streaming applier during high priority BF abort * Added missing commit fragment applying * Don't clear fragments for replaying SR transaction --- include/wsrep/client_state.hpp | 9 - include/wsrep/streaming_context.hpp | 1 + include/wsrep/transaction.hpp | 6 +- src/server_state.cpp | 446 ++++++++++++++-------------- src/transaction.cpp | 41 ++- test/transaction_test.cpp | 2 +- 6 files changed, 255 insertions(+), 250 deletions(-) diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 7b1e29c..d215653 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -425,15 +425,6 @@ namespace wsrep return transaction_.bf_abort(lock, bf_seqno); } - // - // Replaying - // - int start_replaying(const wsrep::ws_meta& ws_meta) - { - assert(mode_ == m_high_priority); - return transaction_.start_replaying(ws_meta); - } - /** * Adopt a streaming transaction state. This is must be * called from high_priority_service::adopt_transaction() diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index cce2104..039f0fa 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -47,6 +47,7 @@ namespace wsrep void disable() { + wsrep::log_info() << "Disabling streaming"; fragment_size_ = 0; } diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 1732ed2..0279e22 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -52,6 +52,9 @@ namespace wsrep wsrep::transaction_id id() const { return id_; } + const wsrep::id& server_id() const + { return server_id_; } + bool active() const { return (id_ != wsrep::transaction_id::undefined()); } @@ -95,8 +98,6 @@ namespace wsrep const wsrep::ws_meta& ws_meta, bool is_commit); - int start_replaying(const wsrep::ws_meta&); - int append_key(const wsrep::key&); int append_data(const wsrep::const_buffer&); @@ -161,6 +162,7 @@ namespace wsrep wsrep::server_service& server_service_; wsrep::client_service& client_service_; wsrep::client_state& client_state_; + wsrep::id server_id_; wsrep::transaction_id id_; enum state state_; std::vector state_hist_; diff --git a/src/server_state.cpp b/src/server_state.cpp index 53899ab..70a7804 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -14,140 +14,163 @@ #include #include -namespace +////////////////////////////////////////////////////////////////////////////// +// Helpers // +////////////////////////////////////////////////////////////////////////////// + + +// +// This method is used to deal with historical burden of several +// ways to bootstrap the cluster. Bootstrap happens if +// +// * bootstrap option is given +// * cluster_address is "gcomm://" (Galera provider) +// +static bool is_bootstrap(const std::string& cluster_address, bool bootstrap) { - // - // This method is used to deal with historical burden of several - // ways to bootstrap the cluster. Bootstrap happens if - // - // * bootstrap option is given - // * cluster_address is "gcomm://" (Galera provider) - // - bool is_bootstrap(const std::string& cluster_address, bool bootstrap) - { - return (bootstrap || cluster_address == "gcomm://"); - } + return (bootstrap || cluster_address == "gcomm://"); +} - int apply_fragment(wsrep::server_state& server_state, - wsrep::high_priority_service& high_priority_service, - wsrep::high_priority_service& streaming_applier, - const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) - { - int ret; - { - wsrep::high_priority_switch sw(high_priority_service, - streaming_applier); - ret = streaming_applier.apply_write_set(ws_meta, data); - streaming_applier.after_apply(); - } - ret = ret || high_priority_service.append_fragment_and_commit( - ws_handle, ws_meta, data); - high_priority_service.after_apply(); - return ret; - } - - - int commit_fragment(wsrep::server_state& server_state, - wsrep::high_priority_service& high_priority_service, - wsrep::high_priority_service* streaming_applier, - const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) - { - int ret; - // Make high priority switch to go out of scope - // before the streaming applier is released. - { - wsrep::high_priority_switch sw( - high_priority_service, *streaming_applier); - streaming_applier->remove_fragments(ws_meta); - ret = streaming_applier->commit(ws_handle, ws_meta); - streaming_applier->after_apply(); - } - server_state.stop_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id()); - server_state.server_service().release_high_priority_service( - streaming_applier); - return ret; - } - - int rollback_fragment(wsrep::server_state& server_state, +static int apply_fragment(wsrep::server_state& server_state, wsrep::high_priority_service& high_priority_service, - wsrep::high_priority_service* streaming_applier, + wsrep::high_priority_service& streaming_applier, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) +{ + int ret; + { + wsrep::high_priority_switch sw(high_priority_service, + streaming_applier); + ret = streaming_applier.apply_write_set(ws_meta, data); + streaming_applier.after_apply(); + } + ret = ret || high_priority_service.append_fragment_and_commit( + ws_handle, ws_meta, data); + high_priority_service.after_apply(); + return ret; +} + + +static int commit_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret; + // Make high priority switch to go out of scope + // before the streaming applier is released. + { + wsrep::high_priority_switch sw( + high_priority_service, *streaming_applier); + ret = streaming_applier->apply_write_set(ws_meta, data) || + streaming_applier->remove_fragments(ws_meta) || + streaming_applier->commit(ws_handle, ws_meta); + streaming_applier->after_apply(); + } + if (ret == 0) { - int ret= 0; - // Adopts transaction state and starts a transaction for - // high priority service - high_priority_service.adopt_transaction( - streaming_applier->transaction()); - { - wsrep::high_priority_switch ws( - high_priority_service, *streaming_applier); - streaming_applier->rollback(); - streaming_applier->after_apply(); - } server_state.stop_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id()); server_state.server_service().release_high_priority_service( streaming_applier); - - high_priority_service.remove_fragments(ws_meta); - high_priority_service.commit(ws_handle, ws_meta); - high_priority_service.after_apply(); - return ret; } + return ret; +} - int apply_write_set(wsrep::server_state& server_state, - wsrep::high_priority_service& high_priority_service, - const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) +static int rollback_fragment(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + wsrep::high_priority_service* streaming_applier, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret= 0; + // Adopts transaction state and starts a transaction for + // high priority service + high_priority_service.adopt_transaction( + streaming_applier->transaction()); { - int ret(0); - if (wsrep::starts_transaction(ws_meta.flags()) && - wsrep::commits_transaction(ws_meta.flags()) && - wsrep::rolls_back_transaction(ws_meta.flags())) + wsrep::high_priority_switch ws( + high_priority_service, *streaming_applier); + streaming_applier->rollback(); + streaming_applier->after_apply(); + } + server_state.stop_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()); + server_state.server_service().release_high_priority_service( + streaming_applier); + + high_priority_service.remove_fragments(ws_meta); + high_priority_service.commit(ws_handle, ws_meta); + high_priority_service.after_apply(); + return ret; +} + +static int apply_write_set(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + int ret(0); + if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags()) && + wsrep::rolls_back_transaction(ws_meta.flags())) + { + // Non streaming rollback (certification failed) + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta); + } + else if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + ret = high_priority_service.start_transaction(ws_handle, ws_meta) || + high_priority_service.apply_write_set(ws_meta, data) || + high_priority_service.commit(ws_handle, ws_meta); + if (ret) { - // Non streaming rollback (certification failed) - ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); + high_priority_service.rollback(); } - else if (wsrep::starts_transaction(ws_meta.flags()) && - wsrep::commits_transaction(ws_meta.flags())) + high_priority_service.after_apply(); + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + assert(server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id()) == 0); + wsrep::high_priority_service* sa( + server_state.server_service().streaming_applier_service( + high_priority_service)); + server_state.start_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id(), sa); + sa->start_transaction(ws_handle, ws_meta); + ret = apply_fragment(server_state, + high_priority_service, + *sa, + ws_handle, + ws_meta, + data); + } + else if (ws_meta.flags() == 0) + { + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) { - if (high_priority_service.start_transaction(ws_handle, ws_meta)) - { - ret = 1; - } - else if (high_priority_service.apply_write_set(ws_meta, data)) - { - ret = 1; - } - else if (high_priority_service.commit(ws_handle, ws_meta)) - { - ret = 1; - } - if (ret) - { - high_priority_service.rollback(); - } - high_priority_service.after_apply(); + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); } - else if (wsrep::starts_transaction(ws_meta.flags())) + else { - assert(server_state.find_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id()) == 0); - wsrep::high_priority_service* sa( - server_state.server_service().streaming_applier_service( - high_priority_service)); - server_state.start_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id(), sa); - sa->start_transaction(ws_handle, ws_meta); ret = apply_fragment(server_state, high_priority_service, *sa, @@ -155,69 +178,17 @@ namespace ws_meta, data); } - else if (ws_meta.flags() == 0) + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + if (high_priority_service.is_replaying()) { - wsrep::high_priority_service* sa( - server_state.find_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id())); - if (sa == 0) - { - // It is possible that rapid group membership changes - // may cause streaming transaction be rolled back before - // commit fragment comes in. Although this is a valid - // situation, log a warning if a sac cannot be found as - // it may be an indication of a bug too. - wsrep::log_warning() << "Could not find applier context for " - << ws_meta.server_id() - << ": " << ws_meta.transaction_id(); - } - else - { - ret = apply_fragment(server_state, - high_priority_service, - *sa, - ws_handle, - ws_meta, - data); - } + ret = high_priority_service.start_transaction( + ws_handle, ws_meta) || + high_priority_service.apply_write_set(ws_meta, data) || + high_priority_service.commit(ws_handle, ws_meta); } - else if (wsrep::commits_transaction(ws_meta.flags())) - { - if (high_priority_service.is_replaying()) - { - ret = high_priority_service.apply_write_set(ws_meta, data) || - high_priority_service.commit(ws_handle, ws_meta); - } - else - { - wsrep::high_priority_service* sa( - server_state.find_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id())); - if (sa == 0) - { - // It is possible that rapid group membership changes - // may cause streaming transaction be rolled back before - // commit fragment comes in. Although this is a valid - // situation, log a warning if a sac cannot be found as - // it may be an indication of a bug too. - wsrep::log_warning() - << "Could not find applier context for " - << ws_meta.server_id() - << ": " << ws_meta.transaction_id(); - } - else - { - // Commit fragment consumes sa - ret = commit_fragment(server_state, - high_priority_service, - sa, - ws_handle, - ws_meta, - data); - } - } - } - else if (wsrep::rolls_back_transaction(ws_meta.flags())) + else { wsrep::high_priority_service* sa( server_state.find_streaming_applier( @@ -233,67 +204,98 @@ namespace << "Could not find applier context for " << ws_meta.server_id() << ": " << ws_meta.transaction_id(); - ret = high_priority_service.log_dummy_write_set( - ws_handle, ws_meta); } else { - // Rollback fragment consumes sa - ret = rollback_fragment(server_state, - high_priority_service, - sa, - ws_handle, - ws_meta, - data); + // Commit fragment consumes sa + ret = commit_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); } } - else - { - assert(0); - } - return ret; } - - int apply_toi(wsrep::provider& provider, - wsrep::high_priority_service& high_priority_service, - const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) + else if (wsrep::rolls_back_transaction(ws_meta.flags())) { - if (wsrep::starts_transaction(ws_meta.flags()) && - wsrep::commits_transaction(ws_meta.flags())) + wsrep::high_priority_service* sa( + server_state.find_streaming_applier( + ws_meta.server_id(), ws_meta.transaction_id())); + if (sa == 0) { - // - // Regular TOI. - // - // Note that we ignore error returned by apply_toi - // call here. This must be revised after the error - // voting is added. - // - provider.commit_order_enter(ws_handle, ws_meta); - (void)high_priority_service.apply_toi(ws_meta, data); - provider.commit_order_leave(ws_handle, ws_meta); - return 0; - } - else if (wsrep::starts_transaction(ws_meta.flags())) - { - // NBO begin - throw wsrep::not_implemented_error(); - } - else if (wsrep::commits_transaction(ws_meta.flags())) - { - // NBO end - throw wsrep::not_implemented_error(); + // It is possible that rapid group membership changes + // may cause streaming transaction be rolled back before + // commit fragment comes in. Although this is a valid + // situation, log a warning if a sac cannot be found as + // it may be an indication of a bug too. + wsrep::log_warning() + << "Could not find applier context for " + << ws_meta.server_id() + << ": " << ws_meta.transaction_id(); + ret = high_priority_service.log_dummy_write_set( + ws_handle, ws_meta); } else { - assert(0); - return 0; + // Rollback fragment consumes sa + ret = rollback_fragment(server_state, + high_priority_service, + sa, + ws_handle, + ws_meta, + data); } } - + else + { + assert(0); + } + return ret; } +static int apply_toi(wsrep::provider& provider, + wsrep::high_priority_service& high_priority_service, + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + if (wsrep::starts_transaction(ws_meta.flags()) && + wsrep::commits_transaction(ws_meta.flags())) + { + // + // Regular TOI. + // + // Note that we ignore error returned by apply_toi + // call here. This must be revised after the error + // voting is added. + // + provider.commit_order_enter(ws_handle, ws_meta); + (void)high_priority_service.apply_toi(ws_meta, data); + provider.commit_order_leave(ws_handle, ws_meta); + return 0; + } + else if (wsrep::starts_transaction(ws_meta.flags())) + { + // NBO begin + throw wsrep::not_implemented_error(); + } + else if (wsrep::commits_transaction(ws_meta.flags())) + { + // NBO end + throw wsrep::not_implemented_error(); + } + else + { + assert(0); + return 0; + } +} + +////////////////////////////////////////////////////////////////////////////// +// Server State // +////////////////////////////////////////////////////////////////////////////// + int wsrep::server_state::load_provider(const std::string& provider_spec, const std::string& provider_options) { diff --git a/src/transaction.cpp b/src/transaction.cpp index 5c2af81..faf7389 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -86,6 +86,7 @@ wsrep::transaction::transaction( : server_service_(client_state.server_state().server_service()) , client_service_(client_state.client_service()) , client_state_(client_state) + , server_id_() , id_(transaction_id::undefined()) , state_(s_executing) , state_hist_() @@ -112,6 +113,7 @@ int wsrep::transaction::start_transaction( debug_log_state("start_transaction enter"); assert(active() == false); assert(flags() == 0); + server_id_ = client_state_.server_state().id(); id_ = id; state_ = s_executing; state_hist_.clear(); @@ -142,6 +144,7 @@ int wsrep::transaction::start_transaction( // assert(ws_meta.flags()); assert(active() == false); assert(flags() == 0); + server_id_ = ws_meta.server_id(); id_ = ws_meta.transaction_id(); assert(client_state_.mode() == wsrep::client_state::m_high_priority); state_ = s_executing; @@ -153,7 +156,13 @@ int wsrep::transaction::start_transaction( } else { - start_replaying(ws_meta); + ws_meta_ = ws_meta; + assert(ws_meta_.flags() & wsrep::provider::flag::commit); + assert(active()); + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + assert(state() == s_replaying); + assert(ws_meta_.seqno().is_undefined() == false); + certified_ = true; } debug_log_state("start_transaction leave"); return 0; @@ -191,21 +200,8 @@ int wsrep::transaction::prepare_for_ordering( return 0; } -int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta) -{ - ws_meta_ = ws_meta; - assert(ws_meta_.flags() & wsrep::provider::flag::commit); - assert(active()); - assert(client_state_.mode() == wsrep::client_state::m_high_priority); - assert(state() == s_replaying); - assert(ws_meta_.seqno().is_undefined() == false); - certified_ = true; - return 0; -} - int wsrep::transaction::append_key(const wsrep::key& key) { - /** @todo Collect table level keys for SR commit */ try { debug_log_key_append(key); @@ -583,7 +579,7 @@ int wsrep::transaction::after_rollback() assert(state() == s_aborting || state() == s_must_replay); - if (is_streaming()) + if (is_streaming() && state() != s_must_replay) { clear_fragments(); } @@ -823,6 +819,12 @@ bool wsrep::transaction::bf_abort( // between releasing the lock and before background // rollbacker gets control. state(lock, wsrep::transaction::s_aborting); + if (client_state_.mode() == wsrep::client_state::m_high_priority) + { + assert(is_streaming()); + client_state_.server_state().stop_streaming_applier( + server_id_, id_); + } lock.unlock(); server_service_.background_rollback(client_state_); } @@ -913,6 +915,7 @@ int wsrep::transaction::certify_fragment( { lock.lock(); state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit); return 1; } @@ -921,6 +924,7 @@ int wsrep::transaction::certify_fragment( { lock.lock(); state(lock, s_must_abort); + client_state_.override_error(wsrep::e_error_during_commit); return 1; } @@ -996,7 +1000,11 @@ int wsrep::transaction::certify_fragment( lock.lock(); if (ret) { - state(lock, s_must_abort); + if (state_ != s_must_abort) + { + state(lock, s_must_abort); + } + client_state_.override_error(wsrep::e_deadlock_error); } else { @@ -1198,6 +1206,7 @@ int wsrep::transaction::append_sr_keys_for_commit() void wsrep::transaction::streaming_rollback() { debug_log_state("streaming_rollback enter"); + assert(state_ != s_must_replay); assert(streaming_context_.rolled_back() == false); // Create a high priority applier which will handle the // rollback fragment or clean up on configuration change. diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index 9f939c8..6b6bcd6 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -1068,7 +1068,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_cert_fail_non_commit, sc.provider().certify_result_ = wsrep::provider::success; BOOST_REQUIRE(cc.before_rollback() == 0); BOOST_REQUIRE(cc.after_rollback() == 0); - BOOST_REQUIRE(cc.after_statement() == 0); + BOOST_REQUIRE(cc.after_statement() == 1); BOOST_REQUIRE(sc.provider().fragments() == 2); BOOST_REQUIRE(sc.provider().start_fragments() == 1); BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);