From 3510d0739c45dc0f5b523eb589c47be45bcf8410 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 19 Apr 2018 15:03:38 +0300 Subject: [PATCH] Added SST methods, able to start several nodes in simulator --- CMakeLists.txt | 1 + src/CMakeLists.txt | 2 +- src/dbms_simulator.cpp | 75 ++++++++++++++++++++++++++++++++++--- src/mock_provider.hpp | 3 ++ src/mock_server_context.hpp | 7 +++- src/provider.hpp | 2 + src/server_context.cpp | 66 ++++++++++++++++++++++++++------ src/server_context.hpp | 9 ++++- src/wsrep_provider_v26.cpp | 18 +++++++++ src/wsrep_provider_v26.hpp | 2 + 10 files changed, 165 insertions(+), 20 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e71629..c1528d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,6 +20,7 @@ endif() find_package(Boost 1.54.0 REQUIRED unit_test_framework program_options + filesystem ) # Coverage diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 37ca540..3af900b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -27,5 +27,5 @@ add_test(NAME trrep_test add_executable(dbms_simulator dbms_simulator.cpp) -target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY}) +target_link_libraries(dbms_simulator trrep ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY}) set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14) diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index 2134522..8c61dd2 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -15,6 +15,7 @@ #include "view.hpp" #include +#include #include #include @@ -31,7 +32,8 @@ public: size_t n_clients, const std::string& wsrep_provider, const std::string& wsrep_provider_options) - : servers_() + : mutex_() + , servers_() , n_servers_(n_servers) , n_clients_(n_clients) , wsrep_provider_(wsrep_provider) @@ -39,8 +41,18 @@ public: { } void start(); void stop(); + void donate_sst(dbms_server&, + const std::string& req, const wsrep_gtid_t& gtid, bool); private: + std::string server_port(size_t i) const + { + std::ostringstream os; + os << (10000 + (i + 1)*10); + return os.str(); + } std::string build_cluster_address() const; + + trrep::default_mutex mutex_; std::map> servers_; size_t n_servers_; size_t n_clients_; @@ -59,8 +71,11 @@ public: s_connected, s_synced }; - dbms_server(const std::string& name, const std::string& id) - : trrep::server_context(name, id, trrep::server_context::rm_async) + dbms_server(dbms_simulator& simulator, + const std::string& name, const std::string& id) + : trrep::server_context(name, id, name + "_data", + trrep::server_context::rm_async) + , simulator_(simulator) , mutex_() , cond_() , state_(s_disconnected) @@ -123,6 +138,27 @@ public: std::cerr << "Synced with group" << "\n"; } + std::string on_sst_request() + { + return id(); + } + + void on_sst_donate_request(const std::string& req, + const wsrep_gtid_t& gtid, + bool bypass) + { + simulator_.donate_sst(*this, req, gtid, bypass); + } + + void sst_sent(const wsrep_gtid_t& gtid) + { + provider().sst_sent(gtid, 0); + } + void sst_received(const wsrep_gtid_t& gtid) + { + provider().sst_received(gtid, 0); + } + void wait_until_connected() { trrep::unique_lock lock(mutex_); @@ -143,6 +179,7 @@ public: // Client context management trrep::client_context* local_client_context(); private: + dbms_simulator& simulator_; trrep::default_mutex mutex_; trrep::default_condition_variable cond_; enum state state_; @@ -212,13 +249,18 @@ void dbms_simulator::start() id_os << (i + 1); auto it(servers_.insert(std::make_pair((i + 1), std::make_unique( - name_os.str(), id_os.str())))); + *this, name_os.str(), id_os.str())))); if (it.second == false) { throw trrep::runtime_error("Failed to add server"); } + boost::filesystem::path dir(std::string("./") + id_os.str() + "_data"); + boost::filesystem::create_directory(dir); + dbms_server& server(*it.first->second); - server.load_provider(wsrep_provider_, wsrep_provider_options_); + std::string server_options(wsrep_provider_options_); + server_options += "; base_port=" + server_port(i); + server.load_provider(wsrep_provider_, server_options); server.provider().connect("sim_cluster", cluster_address, "", i == 0); server.start_applier(); @@ -237,6 +279,27 @@ void dbms_simulator::stop() } } +void dbms_simulator::donate_sst(dbms_server& server, + const std::string& req, + const wsrep_gtid_t& gtid, + bool bypass) +{ + size_t id; + std::istringstream is(req); + is >> id; + trrep::unique_lock lock(mutex_); + auto i(servers_.find(id)); + if (i == servers_.end()) + { + throw trrep::runtime_error("Server " + req + " not found"); + } + if (bypass == false) + { + std::cout << "SST " << server.id() << " -> " << id << "\n"; + } + i->second->sst_received(gtid); + server.sst_sent(gtid); +} std::string dbms_simulator::build_cluster_address() const { std::string ret("gcomm://"); @@ -244,7 +307,7 @@ std::string dbms_simulator::build_cluster_address() const { std::ostringstream sa_os; sa_os << "127.0.0.1:"; - sa_os << (10000 + (i + 1)*10); + sa_os << server_port(i); ret += sa_os.str(); if (i < n_servers_ - 1) ret += ","; } diff --git a/src/mock_provider.hpp b/src/mock_provider.hpp index e2aac13..cc52067 100644 --- a/src/mock_provider.hpp +++ b/src/mock_provider.hpp @@ -122,6 +122,9 @@ namespace trrep *victim_seqno = WSREP_SEQNO_UNDEFINED; return WSREP_OK; } + + int sst_sent(const wsrep_gtid_t&, int) { return 0; } + int sst_received(const wsrep_gtid_t&, int) { return 0; } private: wsrep_uuid_t group_id_; wsrep_uuid_t node_id_; diff --git a/src/mock_server_context.hpp b/src/mock_server_context.hpp index ebe12b4..d593cd3 100644 --- a/src/mock_server_context.hpp +++ b/src/mock_server_context.hpp @@ -18,7 +18,7 @@ namespace trrep mock_server_context(const std::string& name, const std::string& id, enum trrep::server_context::rollback_mode rollback_mode) - : trrep::server_context(name, id, rollback_mode) + : trrep::server_context(name, id, "./", rollback_mode) , provider_() , last_client_id_(0) { } @@ -34,6 +34,11 @@ namespace trrep void wait_until_connected() { } void on_view(const trrep::view&) { } void on_sync() { } + std::string on_sst_request() { return ""; } + void on_sst_donate_request(const std::string&, + const wsrep_gtid_t&, + bool) { } + void sst_received(const wsrep_gtid_t&) { } // void on_apply(trrep::transaction_context&) { } // void on_commit(trrep::transaction_context&) { } diff --git a/src/provider.hpp b/src/provider.hpp index 7ff1f27..16c3cac 100644 --- a/src/provider.hpp +++ b/src/provider.hpp @@ -55,6 +55,8 @@ namespace trrep virtual int commit_order_leave(wsrep_ws_handle_t*) = 0; virtual int release(wsrep_ws_handle_t*) = 0; + virtual int sst_sent(const wsrep_gtid_t&, int) = 0; + virtual int sst_received(const wsrep_gtid_t&, int) = 0; // Factory method static provider* make_provider(const std::string& provider); }; diff --git a/src/server_context.cpp b/src/server_context.cpp index 9b67187..504c08e 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -33,7 +33,7 @@ namespace return (flags & WSREP_FLAG_ROLLBACK); } - static wsrep_cb_status_t connected_cb( + wsrep_cb_status_t connected_cb( void* app_ctx, const wsrep_view_info_t* view __attribute((unused))) { @@ -55,7 +55,7 @@ namespace } } - static wsrep_cb_status_t view_cb(void* app_ctx, + wsrep_cb_status_t view_cb(void* app_ctx, void* recv_ctx __attribute__((unused)), const wsrep_view_info_t* view_info, const char*, @@ -78,12 +78,32 @@ namespace } } - static wsrep_cb_status_t apply_cb(void* ctx, - const wsrep_ws_handle_t* wsh, - uint32_t flags, - const wsrep_buf_t* buf, - const wsrep_trx_meta_t* meta, - wsrep_bool_t* exit_loop __attribute__((unused))) + wsrep_cb_status_t sst_request_cb(void* app_ctx, + void **sst_req, size_t* sst_req_len) + { + assert(app_ctx); + trrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + + try + { + std::string req(server_context.on_sst_request()); + *sst_req = ::strdup(req.c_str()); + *sst_req_len = strlen(req.c_str()); + return WSREP_CB_SUCCESS; + } + catch (const trrep::runtime_error& e) + { + return WSREP_CB_FAILURE; + } + } + + wsrep_cb_status_t apply_cb(void* ctx, + const wsrep_ws_handle_t* wsh, + uint32_t flags, + const wsrep_buf_t* buf, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop __attribute__((unused))) { wsrep_cb_status_t ret(WSREP_CB_SUCCESS); @@ -121,6 +141,30 @@ namespace return WSREP_CB_FAILURE; } } + + + wsrep_cb_status_t sst_donate_cb(void* app_ctx, + void* , + const wsrep_buf_t* req_buf, + const wsrep_gtid_t* gtid, + const wsrep_buf_t*, + bool bypass) + { + assert(app_ctx); + trrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + try + { + std::string req(reinterpret_cast(req_buf->ptr), + req_buf->len); + server_context.on_sst_donate_request(req, *gtid, bypass); + return WSREP_CB_SUCCESS; + } + catch (const trrep::runtime_error& e) + { + return WSREP_CB_FAILURE; + } + } } int trrep::server_context::load_provider(const std::string& provider_spec, @@ -140,7 +184,7 @@ int trrep::server_context::load_provider(const std::string& provider_spec, init_args.node_name = name_.c_str(); init_args.node_address = ""; init_args.node_incoming = ""; - init_args.data_dir = "./"; + init_args.data_dir = working_dir_.c_str(); init_args.options = provider_options.c_str(); init_args.proto_ver = 1; init_args.state_id = 0; @@ -148,10 +192,10 @@ int trrep::server_context::load_provider(const std::string& provider_spec, init_args.logger_cb = 0; init_args.connected_cb = &connected_cb; init_args.view_cb = &view_cb; - init_args.sst_request_cb = 0; + init_args.sst_request_cb = &sst_request_cb; init_args.apply_cb = &apply_cb; init_args.unordered_cb = 0; - init_args.sst_donate_cb = 0; + init_args.sst_donate_cb = &sst_donate_cb; init_args.synced_cb = &synced_cb; std::cerr << init_args.options << "\n"; diff --git a/src/server_context.hpp b/src/server_context.hpp index fdb4209..0e0f1ef 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -32,10 +32,12 @@ namespace trrep server_context(const std::string& name, const std::string& id, + const std::string& working_dir, enum rollback_mode rollback_mode) : provider_() , name_(name) , id_(id) + , working_dir_(working_dir) , rollback_mode_(rollback_mode) { } @@ -88,7 +90,11 @@ namespace trrep virtual void wait_until_connected() = 0; virtual void on_view(const trrep::view&) = 0; virtual void on_sync() = 0; - + virtual std::string on_sst_request() = 0; + virtual void on_sst_donate_request(const std::string&, + const wsrep_gtid_t&, + bool) = 0; + virtual void sst_received(const wsrep_gtid_t&) = 0; // // This method will be called by the applier thread when // a remote write set is being applied. It is the responsibility @@ -113,6 +119,7 @@ namespace trrep trrep::provider* provider_; std::string name_; std::string id_; + std::string working_dir_; enum rollback_mode rollback_mode_; }; } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 5b1dd77..bfee5be 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -67,3 +67,21 @@ wsrep_status_t trrep::wsrep_provider_v26::run_applier(void *applier_ctx) { return wsrep_->recv(wsrep_, applier_ctx); } + +int trrep::wsrep_provider_v26::sst_sent(const wsrep_gtid_t& gtid, int err) +{ + if (wsrep_->sst_sent(wsrep_, >id, err) != WSREP_OK) + { + return 1; + } + return 0; +} + +int trrep::wsrep_provider_v26::sst_received(const wsrep_gtid_t& gtid, int err) +{ + if (wsrep_->sst_received(wsrep_, >id, 0, err) != WSREP_OK) + { + return 1; + } + return 0; +} diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 822f036..c090d3f 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -36,6 +36,8 @@ namespace trrep wsrep_status commit_order_enter(wsrep_ws_handle_t*) { return WSREP_OK; } int commit_order_leave(wsrep_ws_handle_t*) { return 0; } int release(wsrep_ws_handle_t*) { return 0; } + int sst_sent(const wsrep_gtid_t&,int); + int sst_received(const wsrep_gtid_t& gtid, int); private: wsrep_provider_v26(const wsrep_provider_v26&); wsrep_provider_v26& operator=(const wsrep_provider_v26);