From 0851970c53218bfc75e58734fcde91fcbce09b73 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Fri, 29 Jun 2018 11:54:33 +0300 Subject: [PATCH] Bootstrap server service, fixes to server state management * Added bootstrap service call to do DBMS side bootstrap operations during the cluster bootstrap. * Added last_committed_gtid() to provider interface * Implemented wait_for_gtid() provider call * Pass initial position to the server state --- dbsim/db_server_service.cpp | 4 +++ dbsim/db_server_service.hpp | 1 + dbsim/db_server_state.hpp | 1 + include/wsrep/gtid.hpp | 5 +-- include/wsrep/provider.hpp | 5 +++ include/wsrep/server_service.hpp | 12 +++++++ include/wsrep/server_state.hpp | 13 ++++++- include/wsrep/view.hpp | 1 - src/gtid.cpp | 11 ++++++ src/server_state.cpp | 60 +++++++++++++++++++++++--------- src/transaction.cpp | 9 ++++- src/wsrep_provider_v26.cpp | 30 +++++++++++++++- src/wsrep_provider_v26.hpp | 3 +- test/mock_provider.hpp | 4 +++ test/mock_server_state.hpp | 3 +- 15 files changed, 137 insertions(+), 25 deletions(-) diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index f406c83..23bbba8 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -48,6 +48,10 @@ void db::server_service::background_rollback(wsrep::client_state&) { } +void db::server_service::bootstrap() +{ +} + void db::server_service::log_message(enum wsrep::log::level level, const char* message) { diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index 3e95556..096173a 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -22,6 +22,7 @@ namespace db int start_sst(const std::string&, const wsrep::gtid&, bool) override; std::string sst_request() override; void background_rollback(wsrep::client_state&) override; + void bootstrap() override; void log_message(enum wsrep::log::level, const char* message); void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&) override; diff --git a/dbsim/db_server_state.hpp b/dbsim/db_server_state.hpp index e4f56e7..ecffb56 100644 --- a/dbsim/db_server_state.hpp +++ b/dbsim/db_server_state.hpp @@ -31,6 +31,7 @@ namespace db server_id, address, working_dir, + wsrep::gtid::undefined(), 1, wsrep::server_state::rm_async) , mutex_() diff --git a/include/wsrep/gtid.hpp b/include/wsrep/gtid.hpp index 30a94fb..2cd6ed3 100644 --- a/include/wsrep/gtid.hpp +++ b/include/wsrep/gtid.hpp @@ -16,8 +16,8 @@ namespace wsrep { public: gtid() - : id_() - , seqno_() + : id_(wsrep::id::undefined()) + , seqno_(wsrep::seqno::undefined()) { } gtid(const wsrep::id& id, wsrep::seqno seqno) : id_(id) @@ -41,6 +41,7 @@ namespace wsrep }; std::ostream& operator<<(std::ostream&, const wsrep::gtid&); + std::istream& operator>>(std::istream&, wsrep::gtid&); } #endif // WSREP_GTID_HPP diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 56b2fd0..c8846da 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -299,6 +299,11 @@ namespace wsrep * @return Provider status indicating the result of the call. */ virtual enum status causal_read(int timeout) const = 0; + virtual enum status wait_for_gtid(const wsrep::gtid&, int timeout) const = 0; + /** + * Return last committed GTID. + */ + virtual wsrep::gtid last_committed_gtid() const = 0; virtual int sst_sent(const wsrep::gtid&, int) = 0; virtual int sst_received(const wsrep::gtid&, int) = 0; diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 1a3d830..2cf5325 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -55,6 +55,18 @@ namespace wsrep */ virtual void background_rollback(wsrep::client_state&) = 0; + /** + * Bootstrap a DBMS state for a new cluster. + * + * This method is called by the wsrep lib after the + * new cluster is bootstrapped and the server has reached + * initialized state. From this call the DBMS should initialize + * environment for the new cluster. + * + * @param gtid Gtid of the bootstrap position. + */ + virtual void bootstrap() = 0; + /** * Log message * diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 2bbc628..c0c24a0 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -193,6 +193,11 @@ namespace wsrep */ const std::string& working_dir() const { return working_dir_; } + /** + * Return initial position for server. + */ + const wsrep::gtid& initial_position() const + { return initial_position_; } /** * Return maximum protocol version. */ @@ -316,7 +321,8 @@ namespace wsrep * * @return Zero on success, non-zero on failure. */ - int wait_for_gtid(const wsrep::gtid&) const; + enum wsrep::provider::status + wait_for_gtid(const wsrep::gtid&, int timeout) const; /** * Perform a causal read in the cluster. After the call returns, @@ -484,6 +490,7 @@ namespace wsrep const std::string& id, const std::string& address, const std::string& working_dir, + const wsrep::gtid& initial_position, int max_protocol_version, enum rollback_mode rollback_mode) : mutex_(mutex) @@ -492,6 +499,8 @@ namespace wsrep , state_(s_disconnected) , state_hist_() , state_waiters_(n_states_) + , bootstrap_() + , initial_position_(initial_position) , init_initialized_() , init_synced_() , sst_gtid_() @@ -529,6 +538,8 @@ namespace wsrep enum state state_; std::vector state_hist_; mutable std::vector state_waiters_; + bool bootstrap_; + const wsrep::gtid initial_position_; bool init_initialized_; bool init_synced_; wsrep::gtid sst_gtid_; diff --git a/include/wsrep/view.hpp b/include/wsrep/view.hpp index 9e8414b..203f034 100644 --- a/include/wsrep/view.hpp +++ b/include/wsrep/view.hpp @@ -86,7 +86,6 @@ namespace wsrep int protocol_version() const { return protocol_version_; } - const std::vector& members() const { return members_; } /** diff --git a/src/gtid.cpp b/src/gtid.cpp index 5f9f9f3..656f5a3 100644 --- a/src/gtid.cpp +++ b/src/gtid.cpp @@ -10,3 +10,14 @@ std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::gtid& gtid) { return (os << gtid.id() << ":" << gtid.seqno()); } + +std::istream& wsrep::operator>>(std::istream& is, wsrep::gtid& gtid) +{ + std::string id_str; + std::getline(is, id_str, ':'); + long long seq; + is >> seq; + gtid = wsrep::gtid(wsrep::id(id_str), wsrep::seqno(seq)); + std::cout << "GTID: " << gtid << "\n"; + return is; +} diff --git a/src/server_state.cpp b/src/server_state.cpp index 7342230..82740aa 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -43,6 +43,18 @@ namespace return oss.str(); } + // + // This method is used to deal with historical burden of several + // ways to bootstrap the cluster. Bootstrap happens if + // + // * bootstrap option is given + // * cluster_address is "gcomm://" (Galera provider) + // + bool is_bootstrap(const std::string& cluster_address, bool bootstrap) + { + return (bootstrap || cluster_address == "gcomm://"); + } + int apply_write_set(wsrep::server_state& server_state, wsrep::client_state& client_state, const wsrep::ws_handle& ws_handle, @@ -242,7 +254,10 @@ namespace int wsrep::server_state::load_provider(const std::string& provider_spec, const std::string& provider_options) { - wsrep::log_info() << "Loading provider " << provider_spec; + wsrep::log_info() << "Loading provider " + << provider_spec + << "initial position: " + << initial_position_; provider_ = wsrep::provider::make_provider( *this, provider_spec, provider_options); return (provider_ ? 0 : 1); @@ -259,8 +274,10 @@ int wsrep::server_state::connect(const std::string& cluster_name, const std::string& state_donor, bool bootstrap) { + bootstrap_ = is_bootstrap(cluster_address, bootstrap); + wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_; return provider().connect(cluster_name, cluster_address, state_donor, - bootstrap); + bootstrap_); } int wsrep::server_state::disconnect() @@ -468,18 +485,11 @@ wsrep::gtid wsrep::server_state::last_committed_gtid() const return last_committed_gtid_; } -int wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid) const +enum wsrep::provider::status +wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout) + const { - wsrep::unique_lock lock(mutex_); - if (gtid.id() != last_committed_gtid_.id()) - { - return 1; - } - while (last_committed_gtid_.seqno() < gtid.seqno()) - { - cond_.wait(lock); - } - return 0; + return provider_->wait_for_gtid(gtid, timeout); } enum wsrep::provider::status @@ -522,8 +532,20 @@ void wsrep::server_state::on_view(const wsrep::view& view) { wsrep::unique_lock lock(mutex_); assert(view.final() == false); - // Cluster was bootstrapped - if (state_ == s_connected && view.members().size() == 1) + + // + // Reached primary from connected state. This may mean the following + // + // 1) Server was joined to the cluster and got SST transfer + // 2) Server was partitioned from the cluster and got back + // 3) A new cluster was bootstrapped from non-prim cluster + // + // There is no enough information here what was the cause + // of the primary component, so we need to walk through + // all states leading to joined to notify possible state + // waiters in other threads. + // + if (state_ == s_connected) { state(lock, s_joiner); state(lock, s_initializing); @@ -531,12 +553,16 @@ void wsrep::server_state::on_view(const wsrep::view& view) if (init_initialized_ == false) { - // DBMS has not been initialized yet wait_until_state(lock, s_initialized); } - assert(init_initialized_); + if (bootstrap_) + { + server_service_.bootstrap(); + bootstrap_ = false; + } + if (state_ == s_initialized) { state(lock, s_joined); diff --git a/src/transaction.cpp b/src/transaction.cpp index 1b3dd42..518221f 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -379,6 +379,8 @@ int wsrep::transaction::after_commit() } assert(ret == 0); state(lock, s_committed); + + // client_state_.server_state().last_committed_gtid(ws_meta.gitd()); debug_log_state("after_commit_leave"); return ret; } @@ -550,7 +552,12 @@ int wsrep::transaction::after_statement() if (ordered()) { ret = provider().commit_order_enter(ws_handle_, ws_meta_); - if (ret == 0) provider().commit_order_leave(ws_handle_, ws_meta_); + if (ret == 0) + { + // client_state_.server_state().last_committed_gtid( + // ws_meta.gtid()); + provider().commit_order_leave(ws_handle_, ws_meta_); + } } provider().release(ws_handle_); } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index ee900c5..08cab3c 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -465,6 +465,11 @@ wsrep::wsrep_provider_v26::wsrep_provider_v26( : provider(server_state) , wsrep_() { + wsrep_gtid_t state_id; + std::memcpy(state_id.uuid.data, + server_state.initial_position().id().data(), + sizeof(state_id.uuid.data)); + state_id.seqno = server_state.initial_position().seqno().get(); struct wsrep_init_args init_args; memset(&init_args, 0, sizeof(init_args)); init_args.app_ctx = &server_state; @@ -474,7 +479,7 @@ wsrep::wsrep_provider_v26::wsrep_provider_v26( init_args.data_dir = server_state_.working_dir().c_str(); init_args.options = provider_options.c_str(); init_args.proto_ver = server_state.max_protocol_version(); - init_args.state_id = 0; + init_args.state_id = &state_id; init_args.state = 0; init_args.logger_cb = &logger_cb; init_args.connected_cb = &connected_cb; @@ -704,6 +709,29 @@ wsrep::wsrep_provider_v26::causal_read(int timeout) const return map_return_value(wsrep_->sync_wait(wsrep_, 0, timeout, 0)); } +enum wsrep::provider::status +wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout) + const +{ + wsrep_gtid_t wsrep_gtid; + std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(), + sizeof(wsrep_gtid.uuid.data)); + wsrep_gtid.seqno = gtid.seqno().get(); + return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0)); +} + +wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const +{ + wsrep_gtid_t wsrep_gtid; + if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK) + { + throw wsrep::runtime_error("Failed to read last committed id"); + } + return wsrep::gtid( + wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)), + wsrep::seqno(wsrep_gtid.seqno)); +} + int wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err) { wsrep_gtid_t wsrep_gtid; diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index f655d58..b5b1094 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -57,7 +57,8 @@ namespace wsrep int); enum wsrep::provider::status leave_toi(wsrep::client_id); enum wsrep::provider::status causal_read(int) const; - + enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&, int) const; + wsrep::gtid last_committed_gtid() const; int sst_sent(const wsrep::gtid&,int); int sst_received(const wsrep::gtid& gtid, int); diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 6433db2..195e42d 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -212,6 +212,10 @@ namespace wsrep { return wsrep::provider::success; } + enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&, + int) const WSREP_OVERRIDE + { return wsrep::provider::success; } + wsrep::gtid last_committed_gtid() const { return wsrep::gtid(); } int sst_sent(const wsrep::gtid&, int) { return 0; } int sst_received(const wsrep::gtid&, int) { return 0; } diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index c5185a1..a2fe555 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -22,7 +22,7 @@ namespace wsrep const std::string& id, enum wsrep::server_state::rollback_mode rollback_mode) : wsrep::server_state(mutex_, cond_, *this, - name, id, "", "./", 1, rollback_mode) + name, id, "", "./", wsrep::gtid::undefined(), 1, rollback_mode) , sst_before_init_() , mutex_() , cond_() @@ -53,6 +53,7 @@ namespace wsrep { delete client_state; } + void bootstrap() WSREP_OVERRIDE { } void log_message(enum wsrep::log::level level, const char* message) { wsrep::log(level, name().c_str()) << message;