diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index ea800a8..a5e2070 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -48,6 +48,11 @@ void db::server_service::background_rollback(wsrep::client_state&) { } +void db::server_service::log_message(enum wsrep::log::level level, + const char* message) +{ + wsrep::log(level, server_.server_state().name().c_str()) << message; +} void db::server_service::log_dummy_write_set( wsrep::client_state&, const wsrep::ws_meta& meta) { diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index 3deee8b..4764e30 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -22,6 +22,7 @@ namespace db int start_sst(const std::string&, const wsrep::gtid&, bool) override; std::string sst_request() override; void background_rollback(wsrep::client_state&) override; + void log_message(enum wsrep::log::level, const char* message); void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&) override; void log_view(wsrep::client_state&, const wsrep::view&) override; diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp index 31bb094..f98ad2c 100644 --- a/dbsim/db_simulator.cpp +++ b/dbsim/db_simulator.cpp @@ -74,10 +74,10 @@ std::string db::simulator::stats() const void db::simulator::start() { - wsrep::log() << "Provider: " << params_.wsrep_provider; + wsrep::log_info() << "Provider: " << params_.wsrep_provider; std::string cluster_address(build_cluster_address()); - wsrep::log() << "Cluster address: " << cluster_address; + wsrep::log_info() << "Cluster address: " << cluster_address; for (size_t i(0); i < params_.n_servers; ++i) { std::ostringstream name_os; @@ -121,7 +121,7 @@ void db::simulator::start() } // Start client threads - wsrep::log() << "####################### Starting client load"; + wsrep::log_info() << "####################### Starting client load"; clients_start_ = std::chrono::steady_clock::now(); size_t index(0); for (auto& i : servers_) @@ -142,9 +142,9 @@ void db::simulator::stop() server.stop_clients(); } clients_stop_ = std::chrono::steady_clock::now(); - wsrep::log() << "######## Stats ############"; - wsrep::log() << stats(); - wsrep::log() << "######## Stats ############"; + wsrep::log_info() << "######## Stats ############"; + wsrep::log_info() << stats(); + wsrep::log_info() << "######## Stats ############"; if (params_.fast_exit) { exit(0); @@ -158,7 +158,7 @@ void db::simulator::stop() for_each(status.begin(), status.end(), [](const wsrep::provider::status_variable& sv) { - wsrep::log() << sv.name() << " = " << sv.value(); + wsrep::log_info() << sv.name() << " = " << sv.value(); }); server.server_state().disconnect(); server.server_state().wait_until_state( diff --git a/include/wsrep/exception.hpp b/include/wsrep/exception.hpp index 9fc4fdf..6d43505 100644 --- a/include/wsrep/exception.hpp +++ b/include/wsrep/exception.hpp @@ -10,13 +10,18 @@ namespace wsrep { + extern bool abort_on_exception; + class runtime_error : public std::runtime_error { public: runtime_error(const std::string& msg) : std::runtime_error(msg) { - // ::abort(); + if (abort_on_exception) + { + ::abort(); + } } }; diff --git a/include/wsrep/logger.hpp b/include/wsrep/logger.hpp index f968559..35288de 100644 --- a/include/wsrep/logger.hpp +++ b/include/wsrep/logger.hpp @@ -16,14 +16,33 @@ namespace wsrep class log { public: - log(const std::string& prefix = "INFO") - : prefix_(prefix) + enum level + { + debug, + info, + warning, + error + }; + static const char* to_c_string(enum level level) + { + switch (level) + { + case debug: return "debug"; + case info: return "info"; + case warning: return "warning"; + case error: return "error"; + }; + return "unknown"; + } + log(enum wsrep::log::level level, const char* prefix = "") + : level_(level) + , prefix_(prefix) , oss_() { } ~log() { wsrep::unique_lock lock(mutex_); - os_ << prefix_ << ": " << oss_.str() << "\n"; + os_ << prefix_ << ": " << oss_.str() << std::endl; } template std::ostream& operator<<(const T& val) @@ -31,7 +50,10 @@ namespace wsrep return (oss_ << val); } private: - const std::string prefix_; + log(const log&); + log& operator=(const log&); + enum level level_; + const char* prefix_; std::ostringstream oss_; static wsrep::mutex& mutex_; static std::ostream& os_; @@ -41,28 +63,28 @@ namespace wsrep { public: log_error() - : log("ERROR") { } + : log(error) { } }; class log_warning : public log { public: log_warning() - : log("WARNING") { } + : log(warning) { } }; class log_info : public log { public: log_info() - : log("INFO") { } + : log(info) { } }; class log_debug : public log { public: log_debug() - : log("DEBUG") { } + : log(debug) { } }; } diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 2dd5a61..5f2119e 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -13,6 +13,8 @@ #ifndef WSREP_SERVER_SERVICE_HPP #define WSREP_SERVER_SERVICE_HPP +#include "logger.hpp" + #include namespace wsrep @@ -53,6 +55,14 @@ namespace wsrep */ virtual void background_rollback(wsrep::client_state&) = 0; + /** + * Log message + * + * @param level Requested logging level + * @param message Message + */ + virtual void log_message(enum wsrep::log::level level, + const char* message) = 0; /** * Log a dummy write set. A dummy write set is usually either * a remotely generated write set which failed certification in diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index ab41334..da646f5 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -73,7 +73,6 @@ #include "id.hpp" #include "transaction_id.hpp" #include "provider.hpp" -// #include "gtid.hpp" #include #include @@ -312,6 +311,26 @@ namespace wsrep */ enum wsrep::provider::status causal_read(int timeout) const; + /** + * Prepares server state for SST. + * + * @return String containing a SST request + */ + std::string prepare_for_sst(); + + /** + * Start a state snapshot transfer. + * + * @param sst_requets SST request string + * @param gtid Current GTID + * @param bypass Bypass flag + * + * @return Zero in case of success, non-zero otherwise + */ + int start_sst(const std::string& sst_request, + const wsrep::gtid& gtid, + bool bypass); + /** * */ @@ -353,7 +372,11 @@ namespace wsrep const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data); - enum state state() const { return state_; } + enum state state() const + { + wsrep::unique_lock lock(mutex_); + return state_; + } /** * Set server wide wsrep debug logging level. * diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 531932c..5357a25 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,6 +4,7 @@ add_library(wsrep-lib client_state.cpp + exception.cpp gtid.cpp id.cpp logger.cpp diff --git a/src/exception.cpp b/src/exception.cpp new file mode 100644 index 0000000..e7ae9f8 --- /dev/null +++ b/src/exception.cpp @@ -0,0 +1,7 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "wsrep/exception.hpp" + +bool wsrep::abort_on_exception(false); diff --git a/src/server_state.cpp b/src/server_state.cpp index d1c2726..a9c6dbf 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -55,13 +55,50 @@ wsrep::server_state::~server_state() delete provider_; } +std::string wsrep::server_state::prepare_for_sst() +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_joiner); + return server_service_.sst_request(); +} + +int wsrep::server_state::start_sst(const std::string& sst_request, + const wsrep::gtid& gtid, + bool bypass) +{ + wsrep::unique_lock lock(mutex_); + state(lock, s_donor); + int ret(0); + lock.unlock(); + if (server_service_.start_sst(sst_request, gtid, bypass)) + { + lock.lock(); + wsrep::log_warning() << "SST start failed"; + state(lock, s_synced); + ret = 1; + } + return ret; +} + void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error) { - provider_->sst_sent(gtid, error); + if (provider_->sst_sent(gtid, error)) + { + server_service_.log_message(wsrep::log::warning, + "SST sent returned an error"); + } + wsrep::unique_lock lock(mutex_); + state(lock, s_joined); } + void wsrep::server_state::sst_received(const wsrep::gtid& gtid, int error) { - provider_->sst_received(gtid, error); + if (provider_->sst_received(gtid, error)) + { + throw wsrep::runtime_error("SST received failed"); + } + wsrep::unique_lock lock(mutex_); + state(lock, s_joined); } void wsrep::server_state::wait_until_state( @@ -114,14 +151,14 @@ wsrep::server_state::causal_read(int timeout) const void wsrep::server_state::on_connect() { - wsrep::log() << "Server " << name_ << " connected to cluster"; + wsrep::log_info() << "Server " << name_ << " connected to cluster"; wsrep::unique_lock lock(mutex_); state(lock, s_connected); } void wsrep::server_state::on_view(const wsrep::view& view) { - wsrep::log() << "================================================\nView:\n" + wsrep::log_info() << "================================================\nView:\n" << "id: " << view.state_id() << "\n" << "status: " << view.status() << "\n" << "own_index: " << view.own_index() << "\n" @@ -131,10 +168,10 @@ void wsrep::server_state::on_view(const wsrep::view& view) for (std::vector::const_iterator i(members.begin()); i != members.end(); ++i) { - wsrep::log() << "id: " << i->id() << " " - << "name: " << i->name(); + wsrep::log_info() << "id: " << i->id() << " " + << "name: " << i->name(); } - wsrep::log() << "================================================="; + wsrep::log_info() << "================================================="; wsrep::unique_lock lock(mutex_); if (view.final()) { @@ -144,12 +181,23 @@ void wsrep::server_state::on_view(const wsrep::view& view) void wsrep::server_state::on_sync() { - wsrep::log() << "Server " << name_ << " synced with group"; + wsrep::log_info() << "Server " << name_ << " synced with group"; wsrep::unique_lock lock(mutex_); - if (state_ != s_synced) + /* Follow the path joiner -> joined -> synced to notify possible + waiters. */ + switch (state_) { + case s_connected: + state(lock, s_joiner); + case s_joiner: + state(lock, s_joined); + case s_joined: state(lock, s_synced); - } + break; + default: + /* State */ + state(lock, s_synced); + }; } int wsrep::server_state::on_apply( @@ -363,8 +411,8 @@ void wsrep::server_state::state( { 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */ { 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* ing */ { 0, 0, 0, 1, 0, 1, 0, 0, 0}, /* ized */ - { 0, 0, 0, 0, 1, 0, 0, 1, 0}, /* cted */ - { 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* jer */ + { 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* cted */ + { 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* jer */ { 0, 0, 0, 0, 0, 0, 0, 1, 1}, /* jed */ { 0, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */ { 0, 0, 0, 0, 0, 1, 1, 0, 1}, /* sed */ @@ -373,8 +421,8 @@ void wsrep::server_state::state( if (allowed[state_][state]) { - wsrep::log() << "server " << name_ << " state change: " - << state_ << " -> " << state; + wsrep::log_info() << "server " << name_ << " state change: " + << state_ << " -> " << state; state_ = state; cond_.notify_all(); while (state_waiters_[state_]) diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 808b582..3eb5342 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -8,6 +8,7 @@ #include "wsrep/client_state.hpp" #include "wsrep/view.hpp" #include "wsrep/exception.hpp" +#include "wsrep/logger.hpp" #include @@ -306,10 +307,10 @@ namespace } wsrep_cb_status_t view_cb(void* app_ctx, - void* recv_ctx __attribute__((unused)), - const wsrep_view_info_t* view_info, - const char*, - size_t) + void* recv_ctx __attribute__((unused)), + const wsrep_view_info_t* view_info, + const char*, + size_t) { assert(app_ctx); assert(view_info); @@ -337,7 +338,7 @@ namespace try { - std::string req(server_state.server_service().sst_request()); + std::string req(server_state.prepare_for_sst()); *sst_req = ::strdup(req.c_str()); *sst_req_len = strlen(req.c_str()); return WSREP_CB_SUCCESS; @@ -417,7 +418,7 @@ namespace wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data, sizeof(req_gtid->uuid.data)), wsrep::seqno(req_gtid->seqno)); - if (server_state.server_service().start_sst(req, gtid, bypass)) + if (server_state.start_sst(req, gtid, bypass)) { return WSREP_CB_FAILURE; } @@ -428,6 +429,27 @@ namespace return WSREP_CB_FAILURE; } } + + void logger_cb(wsrep_log_level_t level, const char* msg) + { + switch (level) + { + case WSREP_LOG_FATAL: + case WSREP_LOG_ERROR: + wsrep::log_error() << "wsrep-lib: " << msg; + break; + case WSREP_LOG_WARN: + wsrep::log_warning() << "wsrep-lib: " <