diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1b3a248..37ca540 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,7 +10,7 @@ add_library(trrep server_context.cpp transaction_context.cpp wsrep_provider_v26.cpp) -target_link_libraries(trrep wsrep dl) +target_link_libraries(trrep wsrep pthread dl) add_executable(trrep_test mock_client_context.cpp diff --git a/src/condition_variable.hpp b/src/condition_variable.hpp new file mode 100644 index 0000000..f5579d0 --- /dev/null +++ b/src/condition_variable.hpp @@ -0,0 +1,71 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_CONDITION_VARIABLE_HPP +#define TRREP_CONDITION_VARIABLE_HPP + +#include "lock.hpp" + +namespace trrep +{ + class condition_variable + { + public: + condition_variable() { } + virtual ~condition_variable() { } + virtual void notify_one() = 0; + virtual void notify_all() = 0; + virtual void wait(trrep::unique_lock& lock) = 0; + private: + condition_variable(const condition_variable&); + condition_variable& operator=(const condition_variable&); + }; + + // Default pthreads based condition variable implementation + class default_condition_variable : condition_variable + { + public: + default_condition_variable() + : cond_() + { + if (pthread_cond_init(&cond_, 0)) + { + throw trrep::runtime_error("Failed to initialized condvar"); + } + } + + ~default_condition_variable() + { + if (pthread_cond_destroy(&cond_)) + { + ::abort(); + } + } + void notify_one() + { + (void)pthread_cond_signal(&cond_); + } + + void notify_all() + { + (void)pthread_cond_broadcast(&cond_); + } + + void wait(trrep::unique_lock& lock) + { + if (pthread_cond_wait( + &cond_, + reinterpret_cast(lock.mutex().native()))) + { + throw trrep::runtime_error("Cond wait failed"); + } + } + + private: + pthread_cond_t cond_; + }; + +} + +#endif // TRREP_CONDITION_VARIABLE_HPP diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index d581a1c..2134522 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -10,6 +10,9 @@ #include "server_context.hpp" #include "client_context.hpp" +#include "provider.hpp" +#include "condition_variable.hpp" +#include "view.hpp" #include @@ -17,6 +20,7 @@ #include #include #include +#include class dbms_server; @@ -36,6 +40,7 @@ public: void start(); void stop(); private: + std::string build_cluster_address() const; std::map> servers_; size_t n_servers_; size_t n_clients_; @@ -48,20 +53,101 @@ class dbms_client; class dbms_server : public trrep::server_context { public: + enum state + { + s_disconnected, + s_connected, + s_synced + }; dbms_server(const std::string& name, const std::string& id) : trrep::server_context(name, id, trrep::server_context::rm_async) , mutex_() + , cond_() + , state_(s_disconnected) , last_client_id_(0) + , appliers_() , clients_() { } - void on_connect() { } - void on_view() { } - void on_sync() { } + // Provider management + + void applier_thread(); + + void start_applier() + { + trrep::unique_lock lock(mutex_); + appliers_.push_back(std::thread(&dbms_server::applier_thread, this)); + } + + void stop_applier() + { + trrep::unique_lock lock(mutex_); + appliers_.front().join(); + appliers_.erase(appliers_.begin()); + } + + void on_connect() + { + std::cerr << "dbms_server: connected" << "\n"; + trrep::unique_lock lock(mutex_); + state_ = s_connected; + cond_.notify_all(); + } + + void on_view(const trrep::view& view) + { + std::cerr << "================================================\nView:\n" + << "id: " << view.id() << "\n" + << "status: " << view.status() << "\n" + << "own_index: " << view.own_index() << "\n" + << "final: " << view.final() << "\n" + << "members: \n"; + auto members(view.members()); + for (const auto& m : members) + { + std::cerr << "id: " << m.id() << " " + << "name: " << m.name() << "\n"; + + } + std::cerr << "=================================================\n"; + trrep::unique_lock lock(mutex_); + if (view.final()) + { + state_ = s_disconnected; + cond_.notify_all(); + } + } + + void on_sync() + { + std::cerr << "Synced with group" << "\n"; + } + + void wait_until_connected() + { + trrep::unique_lock lock(mutex_); + while (state_ != s_connected) + { + cond_.wait(lock); + } + } + void wait_until_disconnected() + { + trrep::unique_lock lock(mutex_); + while (state_ != s_disconnected) + { + cond_.wait(lock); + } + } + + // Client context management trrep::client_context* local_client_context(); private: trrep::default_mutex mutex_; + trrep::default_condition_variable cond_; + enum state state_; std::atomic last_client_id_; + std::vector appliers_; std::map> clients_; }; @@ -90,6 +176,16 @@ private: trrep::default_mutex mutex_; }; + +// Server methods +void dbms_server::applier_thread() +{ + dbms_client applier(*this, ++last_client_id_, + trrep::client_context::m_applier); + wsrep_status_t ret(provider().run_applier(&applier)); + std::cerr << "Applier thread exited with error code " << ret << "\n"; +} + trrep::client_context* dbms_server::local_client_context() { std::ostringstream id_os; @@ -105,6 +201,9 @@ trrep::client_context* dbms_server::local_client_context() void dbms_simulator::start() { std::cout << "Provider: " << wsrep_provider_ << "\n"; + + std::string cluster_address(build_cluster_address()); + std::cout << "Cluster address: " << cluster_address << "\n"; for (size_t i(0); i < n_servers_; ++i) { std::ostringstream name_os; @@ -114,10 +213,44 @@ void dbms_simulator::start() auto it(servers_.insert(std::make_pair((i + 1), std::make_unique( name_os.str(), id_os.str())))); - it.first->second->load_provider(wsrep_provider_, wsrep_provider_options_); + if (it.second == false) + { + throw trrep::runtime_error("Failed to add server"); + } + dbms_server& server(*it.first->second); + server.load_provider(wsrep_provider_, wsrep_provider_options_); + server.provider().connect("sim_cluster", cluster_address, "", + i == 0); + server.start_applier(); + server.wait_until_connected(); } } +void dbms_simulator::stop() +{ + for (auto& i : servers_) + { + dbms_server& server(*i.second); + server.provider().disconnect(); + server.wait_until_disconnected(); + server.stop_applier(); + } +} + +std::string dbms_simulator::build_cluster_address() const +{ + std::string ret("gcomm://"); + for (size_t i(0); i < n_servers_; ++i) + { + std::ostringstream sa_os; + sa_os << "127.0.0.1:"; + sa_os << (10000 + (i + 1)*10); + ret += sa_os.str(); + if (i < n_servers_ - 1) ret += ","; + } + return ret; +} + namespace po = boost::program_options; int main(int argc, char** argv) @@ -153,6 +286,8 @@ int main(int argc, char** argv) dbms_simulator sim(n_servers, n_clients, wsrep_provider, wsrep_provider_options); sim.start(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + sim.stop(); } catch (const std::exception& e) { diff --git a/src/lock.hpp b/src/lock.hpp index 9b3cadf..2084cb7 100644 --- a/src/lock.hpp +++ b/src/lock.hpp @@ -46,6 +46,8 @@ namespace trrep { return locked_; } + + M& mutex() { return mutex_; } private: unique_lock(const unique_lock&); unique_lock& operator=(const unique_lock&); diff --git a/src/mock_provider.hpp b/src/mock_provider.hpp index c944ed4..e2aac13 100644 --- a/src/mock_provider.hpp +++ b/src/mock_provider.hpp @@ -30,6 +30,12 @@ namespace trrep memset(&node_id_, 0, sizeof(node_id_)); node_id_.data[0] = 1; } + + int connect(const std::string&, const std::string&, const std::string&, + bool) + { return 0; } + int disconnect() { return 0; } + wsrep_status_t run_applier(void*) { return WSREP_OK; } // Provider implemenatation interface int start_transaction(wsrep_ws_handle_t*) { return 0; } wsrep_status diff --git a/src/mock_server_context.hpp b/src/mock_server_context.hpp index d53d337..ebe12b4 100644 --- a/src/mock_server_context.hpp +++ b/src/mock_server_context.hpp @@ -31,7 +31,8 @@ namespace trrep } void on_connect() { } - void on_view() { } + void wait_until_connected() { } + void on_view(const trrep::view&) { } void on_sync() { } // void on_apply(trrep::transaction_context&) { } // void on_commit(trrep::transaction_context&) { } diff --git a/src/mutex.hpp b/src/mutex.hpp index 5968101..38e4a1c 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -21,6 +21,8 @@ namespace trrep virtual ~mutex() { } virtual void lock() = 0; virtual void unlock() = 0; + /* Return native handle */ + virtual void* native() = 0; private: mutex(const mutex& other); mutex& operator=(const mutex& other); @@ -62,6 +64,11 @@ namespace trrep throw trrep::runtime_error("mutex unlock failed"); } } + + void* native() + { + return &mutex_; + } private: pthread_mutex_t mutex_; }; diff --git a/src/provider.hpp b/src/provider.hpp index 88dfb03..7ff1f27 100644 --- a/src/provider.hpp +++ b/src/provider.hpp @@ -17,6 +17,20 @@ namespace trrep class provider { public: + + virtual ~provider() { } + // Provider state management + virtual int connect(const std::string& cluster_name, + const std::string& cluster_url, + const std::string& state_donor, + bool bootstrap) = 0; + virtual int disconnect() = 0; + + + // Applier interface + virtual wsrep_status_t run_applier(void* applier_ctx) = 0; + // Write set replication + // TODO: Rename to assing_read_view() virtual int start_transaction(wsrep_ws_handle_t*) = 0; virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0; virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0; @@ -40,8 +54,9 @@ namespace trrep virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0; virtual int commit_order_leave(wsrep_ws_handle_t*) = 0; virtual int release(wsrep_ws_handle_t*) = 0; - static provider* make_provider(const std::string& provider); + // Factory method + static provider* make_provider(const std::string& provider); }; } diff --git a/src/server_context.cpp b/src/server_context.cpp index 5f517ed..9b67187 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -5,6 +5,7 @@ #include "server_context.hpp" #include "client_context.hpp" #include "transaction_context.hpp" +#include "view.hpp" // Todo: refactor into provider factory #include "mock_provider.hpp" @@ -32,6 +33,51 @@ namespace return (flags & WSREP_FLAG_ROLLBACK); } + static wsrep_cb_status_t connected_cb( + void* app_ctx, + const wsrep_view_info_t* view __attribute((unused))) + { + assert(app_ctx != 0); + trrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + // + // TODO: Fetch server id and group id from view infor + // + try + { + server_context.on_connect(); + return WSREP_CB_SUCCESS; + } + catch (const trrep::runtime_error& e) + { + std::cerr << "Exception: " << e.what(); + return WSREP_CB_FAILURE; + } + } + + static wsrep_cb_status_t view_cb(void* app_ctx, + void* recv_ctx __attribute__((unused)), + const wsrep_view_info_t* view_info, + const char*, + size_t) + { + assert(app_ctx); + assert(view_info); + trrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + try + { + trrep::view view(*view_info); + server_context.on_view(view); + return WSREP_CB_SUCCESS; + } + catch (const trrep::runtime_error& e) + { + std::cerr << "Exception: " << e.what(); + return WSREP_CB_FAILURE; + } + } + static wsrep_cb_status_t apply_cb(void* ctx, const wsrep_ws_handle_t* wsh, uint32_t flags, @@ -58,6 +104,23 @@ namespace } return ret; } + + wsrep_cb_status_t synced_cb(void* app_ctx) + { + assert(app_ctx); + trrep::server_context& server_context( + *reinterpret_cast(app_ctx)); + try + { + server_context.on_sync(); + return WSREP_CB_SUCCESS; + } + catch (const trrep::runtime_error& e) + { + std::cerr << "On sync failed: " << e.what() << "\n"; + return WSREP_CB_FAILURE; + } + } } int trrep::server_context::load_provider(const std::string& provider_spec, @@ -83,13 +146,13 @@ int trrep::server_context::load_provider(const std::string& provider_spec, init_args.state_id = 0; init_args.state = 0; init_args.logger_cb = 0; - init_args.connected_cb = 0; - init_args.view_cb = 0; + init_args.connected_cb = &connected_cb; + init_args.view_cb = &view_cb; init_args.sst_request_cb = 0; init_args.apply_cb = &apply_cb; init_args.unordered_cb = 0; init_args.sst_donate_cb = 0; - init_args.synced_cb = 0; + init_args.synced_cb = &synced_cb; std::cerr << init_args.options << "\n"; provider_ = new trrep::wsrep_provider_v26(provider_spec.c_str(), @@ -98,6 +161,12 @@ int trrep::server_context::load_provider(const std::string& provider_spec, return 0; } +trrep::server_context::~server_context() +{ + delete provider_; +} + + int trrep::server_context::on_apply( trrep::client_context& client_context, trrep::transaction_context& transaction_context, diff --git a/src/server_context.hpp b/src/server_context.hpp index b4491db..fdb4209 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -17,6 +17,7 @@ namespace trrep class provider; class client_context; class transaction_context; + class view; class data; class server_context @@ -38,6 +39,7 @@ namespace trrep , rollback_mode_(rollback_mode) { } + virtual ~server_context(); // // Return server name // @@ -83,7 +85,8 @@ namespace trrep // // virtual void on_connect() = 0; - virtual void on_view() = 0; + virtual void wait_until_connected() = 0; + virtual void on_view(const trrep::view&) = 0; virtual void on_sync() = 0; // diff --git a/src/view.hpp b/src/view.hpp new file mode 100644 index 0000000..3c7c8cc --- /dev/null +++ b/src/view.hpp @@ -0,0 +1,90 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_VIEW_HPP +#define TRREP_VIEW_HPP + +#include + +#include + +namespace trrep +{ + class view + { + public: + class member + { + public: + member(const wsrep_member_info_t& member_info) + : id_() + , name_(member_info.name, WSREP_MEMBER_NAME_LEN) + , incoming_(member_info.incoming, WSREP_INCOMING_LEN) + { + char uuid_str[WSREP_UUID_STR_LEN + 1]; + wsrep_uuid_print(&member_info.id, uuid_str, sizeof(uuid_str)); + id_ = uuid_str; + } + const std::string& id() const { return id_; } + const std::string& name() const { return name_; } + const std::string& incoming() const { return incoming_; } + private: + std::string id_; + std::string name_; + std::string incoming_; + }; + + view(const wsrep_view_info_t& view_info) + : state_id_(view_info.state_id) + , view_(view_info.view) + , status_(view_info.status) + , capabilities_(view_info.capabilities) + , my_idx_(view_info.my_idx) + , proto_ver_(view_info.proto_ver) + , members_() + { + for (int i(0); i < view_info.memb_num; ++i) + { + members_.push_back(view_info.members[i]); + } + } + + + wsrep_seqno_t id() const + { return view_; } + wsrep_view_status_t status() const + { return status_; } + int own_index() const + { return my_idx_; } + + std::vector members() const + { + std::vector ret; + for (std::vector::const_iterator i(members_.begin()); + i != members_.end(); ++i) + { + ret.push_back(member(*i)); + } + return ret; + } + // + // Return true if the view is final + // + bool final() const + { + return (my_idx_ == -1); + } + + private: + wsrep_gtid_t state_id_; + wsrep_seqno_t view_; + wsrep_view_status_t status_; + wsrep_cap_t capabilities_; + int my_idx_; + int proto_ver_; + std::vector members_; + }; +} + +#endif // TRREP_VIEW diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 6d05421..5b1dd77 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -7,6 +7,8 @@ #include +#include + trrep::wsrep_provider_v26::wsrep_provider_v26( const char* path, struct wsrep_init_args* args) @@ -21,3 +23,47 @@ trrep::wsrep_provider_v26::wsrep_provider_v26( throw trrep::runtime_error("Failed to initialize wsrep provider"); } } + +trrep::wsrep_provider_v26::~wsrep_provider_v26() +{ + wsrep_unload(wsrep_); +} + +int trrep::wsrep_provider_v26::connect( + const std::string& cluster_name, + const std::string& cluster_url, + const std::string& state_donor, + bool bootstrap) +{ + int ret(0); + wsrep_status_t wret; + if ((wret = wsrep_->connect(wsrep_, + cluster_name.c_str(), + cluster_url.c_str(), + state_donor.c_str(), + bootstrap)) != WSREP_OK) + { + std::cerr << "Failed to connect cluster: " + << wret << "\n"; + ret = 1; + } + return ret; +} + +int trrep::wsrep_provider_v26::disconnect() +{ + int ret(0); + wsrep_status_t wret; + if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK) + { + std::cerr << "Failed to disconnect from cluster: " + << wret << "\n"; + ret = 1; + } + return ret; +} + +wsrep_status_t trrep::wsrep_provider_v26::run_applier(void *applier_ctx) +{ + return wsrep_->recv(wsrep_, applier_ctx); +} diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 811e71c..822f036 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -16,7 +16,12 @@ namespace trrep public: wsrep_provider_v26(const char*, struct wsrep_init_args*); + ~wsrep_provider_v26(); + int connect(const std::string&, const std::string&, const std::string&, + bool); + int disconnect(); + wsrep_status_t run_applier(void*); int start_transaction(wsrep_ws_handle_t*) { return 0; } int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } @@ -32,6 +37,8 @@ namespace trrep int commit_order_leave(wsrep_ws_handle_t*) { return 0; } int release(wsrep_ws_handle_t*) { return 0; } private: + wsrep_provider_v26(const wsrep_provider_v26&); + wsrep_provider_v26& operator=(const wsrep_provider_v26); struct wsrep* wsrep_; }; }