From d6f185c2785d6bc1685e82eace56897e57c65991 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Tue, 12 Jun 2018 10:52:56 +0300 Subject: [PATCH] Refactored provider specific code out of server_context.cpp --- include/wsrep/logger.hpp | 13 ++ include/wsrep/provider.hpp | 14 +- include/wsrep/server_context.hpp | 6 + src/provider.cpp | 34 ++++- src/server_context.cpp | 214 +------------------------------ src/wsrep_provider_v26.cpp | 210 +++++++++++++++++++++++++++++- src/wsrep_provider_v26.hpp | 3 +- 7 files changed, 270 insertions(+), 224 deletions(-) diff --git a/include/wsrep/logger.hpp b/include/wsrep/logger.hpp index f62d712..f968559 100644 --- a/include/wsrep/logger.hpp +++ b/include/wsrep/logger.hpp @@ -37,6 +37,12 @@ namespace wsrep static std::ostream& os_; }; + class log_error : public log + { + public: + log_error() + : log("ERROR") { } + }; class log_warning : public log { @@ -45,6 +51,13 @@ namespace wsrep : log("WARNING") { } }; + class log_info : public log + { + public: + log_info() + : log("INFO") { } + }; + class log_debug : public log { public: diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 4d4916d..7bc035a 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -270,14 +270,16 @@ namespace wsrep virtual std::vector status() const = 0; /*! - * Return a pointer to native handle. + * Create a new provider. * - * \todo This should be eventually deprecated. + * \param provider_spec Provider specification + * \param provider_options Initial options to provider */ - // virtual struct wsrep* native() = 0; - // Factory method - static provider* make_provider(const std::string& provider); - private: + static provider* make_provider( + wsrep::server_context&, + const std::string& provider_spec, + const std::string& provider_options); + protected: wsrep::server_context& server_context_; }; diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index aacd04a..1605dc1 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -173,6 +173,12 @@ namespace wsrep */ const std::string& address() const { return address_; } + /*! + * Return working directory + * + * \return String containing path to working directory. + */ + const std::string& working_dir() const { return working_dir_; } /*! * Get the rollback mode which server is operating in. * diff --git a/src/provider.cpp b/src/provider.cpp index 11c9be4..72a0b70 100644 --- a/src/provider.cpp +++ b/src/provider.cpp @@ -3,9 +3,41 @@ // #include "wsrep/provider.hpp" +#include "wsrep/logger.hpp" + +#include "mock_provider.hpp" +#include "wsrep_provider_v26.hpp" wsrep::provider* wsrep::provider::make_provider( - const std::string&) + wsrep::server_context& server_context, + const std::string& provider_spec, + const std::string& provider_options) { + try + { + if (provider_spec == "mock") + { + return new wsrep::mock_provider(server_context); + } + else + { + return new wsrep::wsrep_provider_v26( + server_context, provider_options, provider_spec); + } + } + catch (const wsrep::runtime_error& e) + { + wsrep::log_error() << "Failed to create a new provider '" + << provider_spec << "'" + << " with options '" << provider_options + << "':" << e.what(); + } + catch (...) + { + wsrep::log_error() << "Caught unknown exception when trying to " + << "create a new provider '" + << provider_spec << "'" + << " with options '" << provider_options; + } return 0; } diff --git a/src/server_context.cpp b/src/server_context.cpp index d27fafd..3b35124 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -9,44 +9,11 @@ #include "wsrep/logger.hpp" #include "wsrep/compiler.hpp" -// Todo: refactor into provider factory -#include "mock_provider.hpp" -#include "wsrep_provider_v26.hpp" - #include #include namespace { - inline uint32_t map_one(const int flags, const uint32_t from, - const int to) - { - return ((flags & from) ? to : 0); - } - - int map_flags_from_native(uint32_t flags) - { - using wsrep::provider; - return (map_one(flags, - WSREP_FLAG_TRX_START, - provider::flag::start_transaction) | - map_one(flags, - WSREP_FLAG_TRX_END, - provider::flag::commit) | - map_one(flags, - WSREP_FLAG_ROLLBACK, - provider::flag::rollback) | - map_one(flags, - WSREP_FLAG_ISOLATION, - provider::flag::isolation) | - map_one(flags, - WSREP_FLAG_PA_UNSAFE, - provider::flag::pa_unsafe) | - // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE) | - // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) | - map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot)); - } - inline bool starts_transaction(const wsrep::ws_meta& ws_meta) { return (ws_meta.flags() & wsrep::provider::flag::start_transaction); @@ -62,187 +29,14 @@ namespace return (ws_meta.flags() & wsrep::provider::flag::rollback); } - wsrep_cb_status_t connected_cb( - void* app_ctx, - const wsrep_view_info_t* view __attribute((unused))) - { - assert(app_ctx); - wsrep::server_context& server_context( - *reinterpret_cast(app_ctx)); - // - // TODO: Fetch server id and group id from view infor - // - try - { - server_context.on_connect(); - return WSREP_CB_SUCCESS; - } - catch (const wsrep::runtime_error& e) - { - std::cerr << "Exception: " << e.what(); - return WSREP_CB_FAILURE; - } - } - - wsrep_cb_status_t view_cb(void* app_ctx, - void* recv_ctx __attribute__((unused)), - const wsrep_view_info_t* view_info, - const char*, - size_t) - { - assert(app_ctx); - assert(view_info); - wsrep::server_context& server_context( - *reinterpret_cast(app_ctx)); - try - { - wsrep::view view(*view_info); - server_context.on_view(view); - return WSREP_CB_SUCCESS; - } - catch (const wsrep::runtime_error& e) - { - std::cerr << "Exception: " << e.what(); - return WSREP_CB_FAILURE; - } - } - - wsrep_cb_status_t sst_request_cb(void* app_ctx, - void **sst_req, size_t* sst_req_len) - { - assert(app_ctx); - wsrep::server_context& server_context( - *reinterpret_cast(app_ctx)); - - try - { - std::string req(server_context.on_sst_required()); - *sst_req = ::strdup(req.c_str()); - *sst_req_len = strlen(req.c_str()); - return WSREP_CB_SUCCESS; - } - catch (const wsrep::runtime_error& e) - { - return WSREP_CB_FAILURE; - } - } - - wsrep_cb_status_t apply_cb(void* ctx, - const wsrep_ws_handle_t* wsh, - uint32_t flags, - const wsrep_buf_t* buf, - const wsrep_trx_meta_t* meta, - wsrep_bool_t* exit_loop __attribute__((unused))) - { - wsrep_cb_status_t ret(WSREP_CB_SUCCESS); - - wsrep::client_context* client_context( - reinterpret_cast(ctx)); - assert(client_context); - assert(client_context->mode() == wsrep::client_context::m_applier); - - wsrep::data data(buf->ptr, buf->len); - wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque); - wsrep::ws_meta ws_meta( - wsrep::gtid(wsrep::id(meta->gtid.uuid.data, - sizeof(meta->gtid.uuid.data)), - wsrep::seqno(meta->gtid.seqno)), - wsrep::stid(wsrep::id(meta->stid.node.data, - sizeof(meta->stid.node.data)), - meta->stid.trx, - meta->stid.conn), wsrep::seqno(meta->depends_on), - map_flags_from_native(flags)); - if (ret == WSREP_CB_SUCCESS && - client_context->server_context().on_apply( - *client_context, ws_handle, ws_meta, data)) - { - ret = WSREP_CB_FAILURE; - } - return ret; - } - - wsrep_cb_status_t synced_cb(void* app_ctx) - { - assert(app_ctx); - wsrep::server_context& server_context( - *reinterpret_cast(app_ctx)); - try - { - server_context.on_sync(); - return WSREP_CB_SUCCESS; - } - catch (const wsrep::runtime_error& e) - { - std::cerr << "On sync failed: " << e.what() << "\n"; - return WSREP_CB_FAILURE; - } - } - - - wsrep_cb_status_t sst_donate_cb(void* app_ctx, - void* , - const wsrep_buf_t* req_buf, - const wsrep_gtid_t* req_gtid, - const wsrep_buf_t*, - bool bypass) - { - assert(app_ctx); - wsrep::server_context& server_context( - *reinterpret_cast(app_ctx)); - try - { - std::string req(reinterpret_cast(req_buf->ptr), - req_buf->len); - wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data, - sizeof(req_gtid->uuid.data)), - wsrep::seqno(req_gtid->seqno)); - server_context.on_sst_request(req, gtid, bypass); - return WSREP_CB_SUCCESS; - } - catch (const wsrep::runtime_error& e) - { - return WSREP_CB_FAILURE; - } - } } int wsrep::server_context::load_provider(const std::string& provider_spec, const std::string& provider_options) { - wsrep::log() << "Loading provider " << provider_spec; - if (provider_spec == "mock") - { - provider_ = new wsrep::mock_provider(*this); - } - else - { - std::cerr << "Provider options" << provider_options << "\n"; - struct wsrep_init_args init_args; - memset(&init_args, 0, sizeof(init_args)); - init_args.app_ctx = this; - init_args.node_name = name_.c_str(); - init_args.node_address = address_.c_str(); - init_args.node_incoming = ""; - init_args.data_dir = working_dir_.c_str(); - init_args.options = provider_options.c_str(); - init_args.proto_ver = 1; - init_args.state_id = 0; - init_args.state = 0; - init_args.logger_cb = 0; - init_args.connected_cb = &connected_cb; - init_args.view_cb = &view_cb; - init_args.sst_request_cb = &sst_request_cb; - init_args.apply_cb = &apply_cb; - init_args.unordered_cb = 0; - init_args.sst_donate_cb = &sst_donate_cb; - init_args.synced_cb = &synced_cb; - - std::cerr << init_args.options << "\n"; - provider_ = new wsrep::wsrep_provider_v26(*this, - provider_spec.c_str(), - &init_args); - } - return 0; + wsrep::log_info() << "Loading provider " << provider_spec; + provider_ = wsrep::provider::make_provider(*this, provider_spec, provider_options); + return (provider_ ? 0 : 1); } int wsrep::server_context::connect(const std::string& cluster_name, @@ -434,7 +228,7 @@ void wsrep::server_context::state( std::ostringstream os; os << "server: " << name_ << " unallowed state transition: " << wsrep::to_string(state_) << " -> " << wsrep::to_string(state); - std::cerr << os.str() << "\n"; + wsrep::log_error() << os.str() << "\n"; ::abort(); // throw wsrep::runtime_error(os.str()); } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 2bbb4c6..6b5b834 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -3,6 +3,10 @@ // #include "wsrep_provider_v26.hpp" + +#include "wsrep/server_context.hpp" +#include "wsrep/client_context.hpp" +#include "wsrep/view.hpp" #include "wsrep/exception.hpp" #include @@ -14,6 +18,10 @@ namespace { + ///////////////////////////////////////////////////////////////////// + // Helpers // + ///////////////////////////////////////////////////////////////////// + enum wsrep::provider::status map_return_value(wsrep_status_t status) { switch (status) @@ -68,8 +76,9 @@ namespace { return wsrep::seqno(seqno == WSREP_SEQNO_UNDEFINED ? 0 : seqno); } - inline uint32_t map_one(const int flags, const int from, - const uint32_t to) + template + inline uint32_t map_one(const int flags, const F from, + const T to) { return ((flags & from) ? to : 0); } @@ -88,6 +97,29 @@ namespace map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT)); } + int map_flags_from_native(uint32_t flags) + { + using wsrep::provider; + return (map_one(flags, + WSREP_FLAG_TRX_START, + provider::flag::start_transaction) | + map_one(flags, + WSREP_FLAG_TRX_END, + provider::flag::commit) | + map_one(flags, + WSREP_FLAG_ROLLBACK, + provider::flag::rollback) | + map_one(flags, + WSREP_FLAG_ISOLATION, + provider::flag::isolation) | + map_one(flags, + WSREP_FLAG_PA_UNSAFE, + provider::flag::pa_unsafe) | + // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE) | + // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) | + map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot)); + } + class mutable_ws_handle { public: @@ -199,20 +231,186 @@ namespace wsrep_trx_meta_t trx_meta_; }; + ///////////////////////////////////////////////////////////////////// + // Callbacks // + ///////////////////////////////////////////////////////////////////// + + wsrep_cb_status_t connected_cb( + void* app_ctx, + const wsrep_view_info_t* view __attribute((unused))) + { + assert(app_ctx); + wsrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + // + // TODO: Fetch server id and group id from view infor + // + try + { + server_context.on_connect(); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + std::cerr << "Exception: " << e.what(); + return WSREP_CB_FAILURE; + } + } + + wsrep_cb_status_t view_cb(void* app_ctx, + void* recv_ctx __attribute__((unused)), + const wsrep_view_info_t* view_info, + const char*, + size_t) + { + assert(app_ctx); + assert(view_info); + wsrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + try + { + wsrep::view view(*view_info); + server_context.on_view(view); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + std::cerr << "Exception: " << e.what(); + return WSREP_CB_FAILURE; + } + } + + wsrep_cb_status_t sst_request_cb(void* app_ctx, + void **sst_req, size_t* sst_req_len) + { + assert(app_ctx); + wsrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + + try + { + std::string req(server_context.on_sst_required()); + *sst_req = ::strdup(req.c_str()); + *sst_req_len = strlen(req.c_str()); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + return WSREP_CB_FAILURE; + } + } + + wsrep_cb_status_t apply_cb(void* ctx, + const wsrep_ws_handle_t* wsh, + uint32_t flags, + const wsrep_buf_t* buf, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop __attribute__((unused))) + { + wsrep_cb_status_t ret(WSREP_CB_SUCCESS); + + wsrep::client_context* client_context( + reinterpret_cast(ctx)); + assert(client_context); + assert(client_context->mode() == wsrep::client_context::m_applier); + + wsrep::data data(buf->ptr, buf->len); + wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque); + wsrep::ws_meta ws_meta( + wsrep::gtid(wsrep::id(meta->gtid.uuid.data, + sizeof(meta->gtid.uuid.data)), + wsrep::seqno(meta->gtid.seqno)), + wsrep::stid(wsrep::id(meta->stid.node.data, + sizeof(meta->stid.node.data)), + meta->stid.trx, + meta->stid.conn), wsrep::seqno(meta->depends_on), + map_flags_from_native(flags)); + if (ret == WSREP_CB_SUCCESS && + client_context->server_context().on_apply( + *client_context, ws_handle, ws_meta, data)) + { + ret = WSREP_CB_FAILURE; + } + return ret; + } + + wsrep_cb_status_t synced_cb(void* app_ctx) + { + assert(app_ctx); + wsrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + try + { + server_context.on_sync(); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + std::cerr << "On sync failed: " << e.what() << "\n"; + return WSREP_CB_FAILURE; + } + } + + + wsrep_cb_status_t sst_donate_cb(void* app_ctx, + void* , + const wsrep_buf_t* req_buf, + const wsrep_gtid_t* req_gtid, + const wsrep_buf_t*, + bool bypass) + { + assert(app_ctx); + wsrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + try + { + std::string req(reinterpret_cast(req_buf->ptr), + req_buf->len); + wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data, + sizeof(req_gtid->uuid.data)), + wsrep::seqno(req_gtid->seqno)); + server_context.on_sst_request(req, gtid, bypass); + return WSREP_CB_SUCCESS; + } + catch (const wsrep::runtime_error& e) + { + return WSREP_CB_FAILURE; + } + } } wsrep::wsrep_provider_v26::wsrep_provider_v26( wsrep::server_context& server_context, - const char* path, - wsrep_init_args* args) + const std::string& provider_options, + const std::string& provider_spec) : provider(server_context) , wsrep_() { - if (wsrep_load(path, &wsrep_, 0)) + struct wsrep_init_args init_args; + memset(&init_args, 0, sizeof(init_args)); + init_args.app_ctx = &server_context; + init_args.node_name = server_context_.name().c_str(); + init_args.node_address = server_context_.address().c_str(); + init_args.node_incoming = ""; + init_args.data_dir = server_context_.working_dir().c_str(); + init_args.options = provider_options.c_str(); + init_args.proto_ver = 1; + init_args.state_id = 0; + init_args.state = 0; + init_args.logger_cb = 0; + init_args.connected_cb = &connected_cb; + init_args.view_cb = &view_cb; + init_args.sst_request_cb = &sst_request_cb; + init_args.apply_cb = &apply_cb; + init_args.unordered_cb = 0; + init_args.sst_donate_cb = &sst_donate_cb; + init_args.synced_cb = &synced_cb; + + if (wsrep_load(provider_spec.c_str(), &wsrep_, 0)) { throw wsrep::runtime_error("Failed to load wsrep library"); } - if (wsrep_->init(wsrep_, args) != WSREP_OK) + if (wsrep_->init(wsrep_, &init_args) != WSREP_OK) { throw wsrep::runtime_error("Failed to initialize wsrep provider"); } diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 151f19b..b89298f 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -15,7 +15,8 @@ namespace wsrep { public: - wsrep_provider_v26(wsrep::server_context&, const char*, struct wsrep_init_args*); + wsrep_provider_v26(wsrep::server_context&, const std::string&, + const std::string&); ~wsrep_provider_v26(); int connect(const std::string&, const std::string&, const std::string&, bool);