From 86472ee4201c3669e0001f7fc28500b1e777bcf9 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Sat, 14 Jul 2018 16:11:13 +0300 Subject: [PATCH] Implemented SR transaction rollbacking during configuration changes. SR tranasctions are BF aborted or rolled back on primary view changes according to the following rules: * Ongoing local SR transactions are BF aborted if the processing server is not found from the current view. * All remote SR transactions whose origin server is not included in the current view are rolled back. --- include/wsrep/client_id.hpp | 12 +++ include/wsrep/server_state.hpp | 38 ++++++- src/server_state.cpp | 181 ++++++++++++++++++++++++++++++--- src/transaction.cpp | 68 +++++++++---- src/wsrep_provider_v26.cpp | 6 +- test/server_context_test.cpp | 4 +- 6 files changed, 265 insertions(+), 44 deletions(-) diff --git a/include/wsrep/client_id.hpp b/include/wsrep/client_id.hpp index df43791..21c9316 100644 --- a/include/wsrep/client_id.hpp +++ b/include/wsrep/client_id.hpp @@ -5,6 +5,8 @@ #ifndef WSREP_CLIENT_ID_HPP #define WSREP_CLIENT_ID_HPP +#include + namespace wsrep { class client_id @@ -20,9 +22,19 @@ namespace wsrep { } type get() const { return id_; } static type undefined() { return -1; } + bool operator<(const client_id& other) const + { + return (id_ < other.id_); + } private: type id_; }; + static inline std::ostream& operator<<( + std::ostream& os, const wsrep::client_id& client_id) + { + return (os << client_id.get()); + } } + #endif // WSREP_CLIENT_ID_HPP diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 6d7277c..7fb30c9 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -212,6 +212,15 @@ namespace wsrep */ enum rollback_mode rollback_mode() const { return rollback_mode_; } + /** + * Registers a streaming client. + */ + void start_streaming_client(wsrep::client_state* client_state); + + void convert_streaming_client_to_applier( + wsrep::client_state* client_state); + void stop_streaming_client(wsrep::client_state* client_state); + void start_streaming_applier( const wsrep::id&, const wsrep::transaction_id&, @@ -278,7 +287,8 @@ namespace wsrep * @params view wsrep::view object which holds the new view * information. */ - void on_view(const wsrep::view& view); + void on_view(const wsrep::view& view, + wsrep::high_priority_service*); /** * A method which will be called when the server @@ -533,6 +543,7 @@ namespace wsrep , desync_count_() , pause_count_() , pause_seqno_() + , streaming_clients_() , streaming_appliers_() , provider_() , name_(name) @@ -557,6 +568,11 @@ namespace wsrep void resync(wsrep::unique_lock&); void state(wsrep::unique_lock&, enum state); void wait_until_state(wsrep::unique_lock&, enum state) const; + // Close SR transcations whose origin is outside of current + // cluster view. + void close_foreign_sr_transactions( + wsrep::unique_lock&, + wsrep::high_priority_service&); wsrep::mutex& mutex_; wsrep::condition_variable& cond_; @@ -572,7 +588,25 @@ namespace wsrep size_t desync_count_; size_t pause_count_; wsrep::seqno pause_seqno_; - typedef std::map, wsrep::high_priority_service*> streaming_appliers_map; + typedef std::map + streaming_clients_map; + streaming_clients_map streaming_clients_; + typedef std::map, + wsrep::high_priority_service*> streaming_appliers_map; + class server_id_cmp + { + public: + server_id_cmp(const wsrep::id& server_id) + : server_id_(server_id) + { } + bool operator()(const std::vector::value_type& vt) const + { + return (vt.id() == server_id_); + } + private: + wsrep::id server_id_; + }; + streaming_appliers_map streaming_appliers_; wsrep::provider* provider_; std::string name_; diff --git a/src/server_state.cpp b/src/server_state.cpp index b297c4c..161259a 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -3,6 +3,7 @@ // #include "wsrep/server_state.hpp" +#include "wsrep/client_state.hpp" #include "wsrep/server_service.hpp" #include "wsrep/high_priority_service.hpp" #include "wsrep/transaction.hpp" @@ -13,6 +14,7 @@ #include #include +#include ////////////////////////////////////////////////////////////////////////////// // Helpers // @@ -504,19 +506,6 @@ void wsrep::server_state::initialized() } } -void wsrep::server_state::wait_until_state( - wsrep::unique_lock& lock, - enum wsrep::server_state::state state) const -{ - ++state_waiters_[state]; - while (state_ != state) - { - cond_.wait(lock); - } - --state_waiters_[state]; - cond_.notify_all(); -} - void wsrep::server_state::last_committed_gtid(const wsrep::gtid& gtid) { wsrep::unique_lock lock(mutex_); @@ -556,7 +545,8 @@ void wsrep::server_state::on_connect(const wsrep::gtid& gtid) state(lock, s_connected); } -void wsrep::server_state::on_view(const wsrep::view& view) +void wsrep::server_state::on_view(const wsrep::view& view, + wsrep::high_priority_service* high_priority_service) { wsrep::log_info() << "================================================\nView:\n" @@ -644,6 +634,11 @@ void wsrep::server_state::on_view(const wsrep::view& view) bootstrap_ = false; } + assert(high_priority_service); + if (high_priority_service) + { + close_foreign_sr_transactions(lock, *high_priority_service); + } if (server_service_.sst_before_init()) { if (state_ == s_initialized) @@ -756,11 +751,85 @@ int wsrep::server_state::on_apply( } } +void wsrep::server_state::start_streaming_client( + wsrep::client_state* client_state) +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_debug() << "Start streaming client: " << client_state->id(); + if (streaming_clients_.insert( + std::make_pair(client_state->id(), client_state)).second == false) + { + wsrep::log_warning() << "Failed to insert streaming client " + << client_state->id(); + assert(0); + } +} + +void wsrep::server_state::convert_streaming_client_to_applier( + wsrep::client_state* client_state) +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_debug() << "Convert streaming client to applier " + << client_state->id(); + streaming_clients_map::iterator i( + streaming_clients_.find(client_state->id())); + assert(i != streaming_clients_.end()); + if (i == streaming_clients_.end()) + { + wsrep::log_warning() << "Unable to find streaming client " + << client_state->id(); + assert(0); + } + else + { + streaming_clients_.erase(i); + } + wsrep::high_priority_service* streaming_applier( + server_service_.streaming_applier_service( + client_state->client_service())); + streaming_applier->adopt_transaction(client_state->transaction()); + if (streaming_appliers_.insert( + std::make_pair( + std::make_pair(id_, client_state->transaction().id()), + streaming_applier)).second == false) + { + wsrep::log_warning() << "Could not insert streaming applier " + << id_ + << ", " + << client_state->transaction().id(); + assert(0); + } +} + + +void wsrep::server_state::stop_streaming_client( + wsrep::client_state* client_state) +{ + wsrep::unique_lock lock(mutex_); + wsrep::log_debug() << "Stop streaming client: " << client_state->id(); + streaming_clients_map::iterator i( + streaming_clients_.find(client_state->id())); + assert(i != streaming_clients_.end()); + if (i == streaming_clients_.end()) + { + wsrep::log_warning() << "Unable to find streaming client " + << client_state->id(); + assert(0); + return; + } + else + { + streaming_clients_.erase(i); + cond_.notify_all(); + } +} + void wsrep::server_state::start_streaming_applier( const wsrep::id& server_id, const wsrep::transaction_id& transaction_id, wsrep::high_priority_service* sa) { + wsrep::unique_lock lock(mutex_); if (streaming_appliers_.insert( std::make_pair(std::make_pair(server_id, transaction_id), sa)).second == false) @@ -774,6 +843,7 @@ void wsrep::server_state::stop_streaming_applier( const wsrep::id& server_id, const wsrep::transaction_id& transaction_id) { + wsrep::unique_lock lock(mutex_); streaming_appliers_map::iterator i( streaming_appliers_.find(std::make_pair(server_id, transaction_id))); assert(i != streaming_appliers_.end()); @@ -785,6 +855,7 @@ void wsrep::server_state::stop_streaming_applier( else { streaming_appliers_.erase(i); + cond_.notify_all(); } } @@ -792,13 +863,15 @@ wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( const wsrep::id& server_id, const wsrep::transaction_id& transaction_id) const { + wsrep::unique_lock lock(mutex_); streaming_appliers_map::const_iterator i( streaming_appliers_.find(std::make_pair(server_id, transaction_id))); return (i == streaming_appliers_.end() ? 0 : i->second); } -// Private - +////////////////////////////////////////////////////////////////////////////// +// Private // +////////////////////////////////////////////////////////////////////////////// int wsrep::server_state::desync(wsrep::unique_lock& lock) { @@ -869,3 +942,79 @@ void wsrep::server_state::state( // throw wsrep::runtime_error(os.str()); } } + +void wsrep::server_state::wait_until_state( + wsrep::unique_lock& lock, + enum wsrep::server_state::state state) const +{ + ++state_waiters_[state]; + while (state_ != state) + { + cond_.wait(lock); + } + --state_waiters_[state]; + cond_.notify_all(); +} + +void wsrep::server_state::close_foreign_sr_transactions( + wsrep::unique_lock& lock, + wsrep::high_priority_service& high_priority_service) +{ + assert(lock.owns_lock()); + if (current_view_.own_index() == -1) + { + while (streaming_clients_.empty() == false) + { + streaming_clients_map::iterator i(streaming_clients_.begin()); + wsrep::client_id client_id(i->first); + wsrep::transaction_id transaction_id(i->second->transaction().id()); + i->second->total_order_bf_abort(current_view_.view_seqno()); + streaming_clients_map::const_iterator found_i; + while ((found_i = streaming_clients_.find(client_id)) != + streaming_clients_.end() && + found_i->second->transaction().id() == transaction_id) + { + cond_.wait(lock); + } + } + } + + + streaming_appliers_map::iterator i(streaming_appliers_.begin()); + while (i != streaming_appliers_.end()) + { + if (std::find_if(current_view_.members().begin(), + current_view_.members().end(), + server_id_cmp(i->first.first)) != + current_view_.members().end()) + { + wsrep::id server_id(i->first.first); + wsrep::transaction_id transaction_id(i->first.second); + wsrep::high_priority_service* streaming_applier(i->second); + high_priority_service.adopt_transaction( + streaming_applier->transaction()); + { + wsrep::high_priority_switch sw(high_priority_service, + *streaming_applier); + streaming_applier->rollback( + wsrep::ws_handle(), wsrep::ws_meta()); + streaming_applier->after_apply(); + } + + streaming_appliers_.erase(i++); + server_service_.release_high_priority_service(streaming_applier); + wsrep::ws_meta ws_meta( + wsrep::gtid(), + wsrep::stid(server_id, transaction_id, wsrep::client_id()), + wsrep::seqno::undefined(), 0); + high_priority_service.remove_fragments(ws_meta); + high_priority_service.commit(wsrep::ws_handle(transaction_id, 0), + ws_meta); + high_priority_service.after_apply(); + } + else + { + ++i; + } + } +} diff --git a/src/transaction.cpp b/src/transaction.cpp index 95ab7d8..83cfce4 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -477,6 +477,10 @@ int wsrep::transaction::after_commit() { assert(client_state_.mode() == wsrep::client_state::m_local || client_state_.mode() == wsrep::client_state::m_high_priority); + if (client_state_.mode() == wsrep::client_state::m_local) + { + client_state_.server_state_.stop_streaming_client(&client_state_); + } clear_fragments(); } @@ -522,7 +526,7 @@ int wsrep::transaction::before_rollback() // fall through case s_executing: // Voluntary rollback - if (is_streaming() && bf_aborted_in_total_order_ == false) + if (is_streaming()) { streaming_rollback(); } @@ -535,7 +539,7 @@ int wsrep::transaction::before_rollback() } else { - if (is_streaming() && bf_aborted_in_total_order_ == false) + if (is_streaming()) { streaming_rollback(); } @@ -543,14 +547,14 @@ int wsrep::transaction::before_rollback() } break; case s_cert_failed: - if (is_streaming() && bf_aborted_in_total_order_ == false) + if (is_streaming()) { streaming_rollback(); } state(lock, s_aborting); break; case s_aborting: - if (is_streaming() && bf_aborted_in_total_order_ == false) + if (is_streaming()) { streaming_rollback(); } @@ -653,8 +657,16 @@ int wsrep::transaction::after_statement() case s_must_replay: { state(lock, s_replaying); + // Need to remember streaming state before replay, entering + // after_commit() after succesful replay will clear + // fragments. + const bool was_streaming(is_streaming()); lock.unlock(); enum wsrep::provider::status replay_ret(client_service_.replay()); + if (was_streaming) + { + client_state_.server_state_.stop_streaming_client(&client_state_); + } switch (replay_ret) { case wsrep::provider::success: @@ -961,6 +973,10 @@ int wsrep::transaction::certify_fragment( return 1; } + if (is_streaming() == false) + { + client_state_.server_state_.start_streaming_client(&client_state_); + } int ret(0); // Storage service scope { @@ -1033,6 +1049,10 @@ int wsrep::transaction::certify_fragment( lock.lock(); if (ret) { + if (is_streaming() == false) + { + client_state_.server_state_.stop_streaming_client(&client_state_); + } if (state_ != s_must_abort) { state(lock, s_must_abort); @@ -1246,25 +1266,29 @@ void wsrep::transaction::streaming_rollback() debug_log_state("streaming_rollback enter"); assert(state_ != s_must_replay); assert(streaming_context_.rolled_back() == false); - // Create a high priority applier which will handle the - // rollback fragment or clean up on configuration change. - // Adopt transaction will copy fragment set and appropriate - // meta data. Mark current transaction streaming context - // rolled back. - wsrep::high_priority_service* sa( - server_service_.streaming_applier_service( - client_state_.client_service())); - client_state_.server_state().start_streaming_applier( - client_state_.server_state().id(), id(), sa); - sa->adopt_transaction(*this); - streaming_context_.cleanup(); - streaming_context_.rolled_back(id_); - client_service_.debug_sync("wsrep_before_SR_rollback"); - enum wsrep::provider::status ret; - if ((ret = provider().rollback(id_))) + + if (bf_aborted_in_total_order_) { - wsrep::log_warning() << "Failed to replicate rollback fragment for " - << id_ << ": " << ret; + client_state_.server_state_.stop_streaming_client(&client_state_); + } + else + { + // Create a high priority applier which will handle the + // rollback fragment or clean up on configuration change. + // Adopt transaction will copy fragment set and appropriate + // meta data. Mark current transaction streaming context + // rolled back. + client_state_.server_state_.convert_streaming_client_to_applier( + &client_state_); + streaming_context_.cleanup(); + streaming_context_.rolled_back(id_); + client_service_.debug_sync("wsrep_before_SR_rollback"); + enum wsrep::provider::status ret; + if ((ret = provider().rollback(id_))) + { + wsrep::log_warning() << "Failed to replicate rollback fragment for " + << id_ << ": " << ret; + } } debug_log_state("streaming_rollback leave"); } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 4b21859..7390292 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -306,7 +306,7 @@ namespace } wsrep_cb_status_t view_cb(void* app_ctx, - void* recv_ctx __attribute__((unused)), + void* recv_ctx, const wsrep_view_info_t* view_info, const char*, size_t) @@ -315,10 +315,12 @@ namespace assert(view_info); wsrep::server_state& server_state( *reinterpret_cast(app_ctx)); + wsrep::high_priority_service* high_priority_service( + reinterpret_cast(recv_ctx)); try { wsrep::view view(view_from_native(*view_info)); - server_state.on_view(view); + server_state.on_view(view, high_priority_service); return WSREP_CB_SUCCESS; } catch (const wsrep::runtime_error& e) diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index 3dcd878..db16a60 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -192,7 +192,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_sst_first_boostrap, BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); ss.sync_point_enabled_ = "on_view_wait_initialized"; ss.sync_point_action_ = ss.spa_initialize; - ss.on_view(bootstrap_view); + ss.on_view(bootstrap_view, &hps); ss.sync_point_enabled_ = ""; BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined); ss.on_sync(); @@ -218,7 +218,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_init_first_boostrap, BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0); ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0))); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); - ss.on_view(bootstrap_view); + ss.on_view(bootstrap_view, &hps); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined); ss.on_sync(); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_synced);