From 4e8dfca3f1ce4a78e9b36e4758e6ff92832bcc96 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Tue, 3 Jul 2018 12:37:22 +0300 Subject: [PATCH] Initial RSU implementation. --- dbsim/db_server_service.cpp | 5 +++ dbsim/db_server_service.hpp | 1 + include/wsrep/client_state.hpp | 20 +++++++++-- include/wsrep/server_service.hpp | 9 +++++ src/client_state.cpp | 58 ++++++++++++++++++++++++++++---- test/mock_server_state.hpp | 6 ++-- 6 files changed, 87 insertions(+), 12 deletions(-) diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index 18116f3..16472c4 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -75,6 +75,11 @@ void db::server_service::log_view(const wsrep::view&) wsrep::log_info() << "View"; } +int db::server_service::wait_committing_transactions(int) +{ + throw wsrep::not_implemented_error(); +} + void db::server_service::debug_sync(const char*) { diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index 4274b73..4bc9fc3 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -29,6 +29,7 @@ namespace db void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&) override; void log_view(const wsrep::view&) override; + int wait_committing_transactions(int) override; void debug_sync(const char*) override; private: db::server& server_; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 2773c5e..393a09a 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -81,10 +81,12 @@ namespace wsrep /** High priority mode */ m_high_priority, /** Client is in total order isolation mode */ - m_toi + m_toi, + /** Client is executing rolling schema upgrade */ + m_rsu }; - static const int mode_max_ = m_toi + 1; + static const int n_modes_ = m_rsu + 1; /** * Client state enumeration. * @@ -443,6 +445,19 @@ namespace wsrep */ int leave_toi(); + /** + * Begin rolling schema upgrade operation. + * + * @param timeout Timeout in seconds to wait for committing + * connections to finish. + */ + int begin_rsu(int timeout); + + /** + * End rolling schema upgrade operation. + */ + int end_rsu(); + /** * Begin non-blocking operation. */ @@ -691,6 +706,7 @@ namespace wsrep case wsrep::client_state::m_replicating: return "replicating"; case wsrep::client_state::m_high_priority: return "high priority"; case wsrep::client_state::m_toi: return "toi"; + case wsrep::client_state::m_rsu: return "rsu"; } return "unknown"; } diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index ec589f5..83966b9 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -125,6 +125,15 @@ namespace wsrep const wsrep::gtid& gtid, bool bypass) = 0; + + /** + * Wait until committing transactions have completed. + * Prior calling this method the server should have been + * desynced from the group to disallow further transactions + * to start committing. + */ + virtual int wait_committing_transactions(int timeout) = 0; + /** * Provide a server level debug sync point for a caller. */ diff --git a/src/client_state.cpp b/src/client_state.cpp index 1195f39..b185d59 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -300,6 +300,51 @@ int wsrep::client_state::leave_toi() return ret; } +int wsrep::client_state::begin_rsu(int timeout) +{ + if (server_state_.desync()) + { + wsrep::log_warning() << "Failed to desync server"; + return 1; + } + if (server_state_.server_service().wait_committing_transactions(timeout)) + { + wsrep::log_warning() << "RSU failed due to pending transactions"; + server_state_.resync(); + return 1; + } + wsrep::seqno pause_seqno(server_state_.pause()); + if (pause_seqno.is_undefined()) + { + wsrep::log_warning() << "Failed to pause provider"; + server_state_.resync(); + return 1; + } + wsrep::log_info() << "Provider paused at: " << pause_seqno; + wsrep::unique_lock lock(mutex_); + toi_mode_ = mode_; + mode(lock, m_rsu); + return 0; +} + +int wsrep::client_state::end_rsu() +{ + int ret(0); + try + { + server_state_.resume(); + server_state_.resync(); + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_warning() << "End RSU failed: " << e.what(); + ret = 1; + } + wsrep::unique_lock lock(mutex_); + mode(lock, toi_mode_); + return ret; +} + int wsrep::client_state::sync_wait(int timeout) { std::pair result( @@ -382,12 +427,13 @@ void wsrep::client_state::mode( enum mode mode) { assert(lock.owns_lock()); - static const char allowed[mode_max_][mode_max_] = - { /* l r h t */ - { 0, 0, 0, 0 }, /* local */ - { 0, 0, 1, 1 }, /* repl */ - { 0, 1, 0, 1 }, /* high prio */ - { 0, 1, 1, 0 } /* toi */ + static const char allowed[n_modes_][n_modes_] = + { /* l r h t r */ + { 0, 0, 0, 0, 0 }, /* local */ + { 0, 0, 1, 1, 1 }, /* repl */ + { 0, 1, 0, 1, 0 }, /* high prio */ + { 0, 1, 1, 0, 0 }, /* toi */ + { 0, 1, 0, 0, 0 } /* rsu */ }; if (allowed[mode_][mode]) { diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 861d697..2bbeb2b 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -85,10 +85,6 @@ namespace wsrep } void log_view(const wsrep::view&) { } - // void on_connect(const wsrep::gtid& ) WSREP_OVERRIDE { } - // void wait_until_connected() WSREP_OVERRIDE { } - // void on_view(const wsrep::view&) WSREP_OVERRIDE { } - // void on_sync() WSREP_OVERRIDE { } bool sst_before_init() const WSREP_OVERRIDE { return sst_before_init_; } std::string sst_request() WSREP_OVERRIDE { return ""; } @@ -102,6 +98,8 @@ namespace wsrep client_state.after_rollback(); } + int wait_committing_transactions(int) WSREP_OVERRIDE { return 0; } + void debug_sync(const char* sync_point) WSREP_OVERRIDE { if (sync_point_enabled_ == sync_point)