From b3f60b7be11e01c968074113502926570b79dd8e Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Fri, 15 Jun 2018 15:13:22 +0300 Subject: [PATCH] Refactoring continued. --- dbsim/CMakeLists.txt | 6 ++- dbsim/db_client.cpp | 13 +++++ dbsim/db_client.hpp | 18 +++++-- dbsim/db_client_service.hpp | 3 ++ dbsim/db_server.cpp | 54 ++++++++++++++++--- dbsim/db_server.hpp | 90 +++++++++----------------------- dbsim/db_server_context.hpp | 30 ++++++++++- dbsim/db_simulator.hpp | 4 +- include/wsrep/client_context.hpp | 24 +++++++++ include/wsrep/server_context.hpp | 16 ++++++ src/server_context.cpp | 2 +- src/transaction_context.cpp | 21 +++++--- test/mock_server_context.hpp | 5 ++ 13 files changed, 197 insertions(+), 89 deletions(-) diff --git a/dbsim/CMakeLists.txt b/dbsim/CMakeLists.txt index bff6d15..bbe1218 100644 --- a/dbsim/CMakeLists.txt +++ b/dbsim/CMakeLists.txt @@ -3,9 +3,11 @@ # add_executable(dbms_simulator - db_storage_engine.cpp db_client.cpp -# db_client_service.cpp + db_server.cpp + db_storage_engine.cpp + + # db_client_service.cpp # dbms_simulator.cpp ) target_link_libraries(dbms_simulator wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY}) diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index 4d96a83..86ec97d 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -3,7 +3,20 @@ // #include "db_client.hpp" +#include "db_server.hpp" +db::client::client(db::server& server, + wsrep::client_id client_id, + enum wsrep::client_context::mode mode, + const db::params& params) + : mutex_() + , params_(params) + , server_context_(server.server_context()) + , client_context_(mutex_, server_context_, client_service_, client_id, mode) + , client_service_(server_context_.provider(), client_context_) + , se_trx_(server.storage_engine()) + , stats_() +{ } template int db::client::client_command(F f) diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index cf71f8b..03f45d2 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -7,10 +7,12 @@ #include "db_client_context.hpp" #include "db_server_context.hpp" +#include "db_client_service.hpp" #include "db_storage_engine.hpp" namespace db { + class server; class client { public: @@ -26,11 +28,20 @@ namespace db { } }; - client(); + client(db::server&, + wsrep::client_id, + enum wsrep::client_context::mode, + const db::params&); bool bf_abort(wsrep::seqno); - private: + const struct stats stats() const { return stats_; } + void store_globals() { } + void reset_globals() { } + void start(); + + private: + friend class db::server_context; template int client_command(F f); void run_one_transaction(); void reset_error(); @@ -38,8 +49,9 @@ namespace db const db::params& params_; db::server_context& server_context_; db::client_context client_context_; + db::client_service client_service_; db::storage_engine::transaction se_trx_; - stats stats_; + struct stats stats_; }; } diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 71c6db1..01ffabe 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -86,6 +86,8 @@ namespace db void will_replay(const wsrep::transaction_context&) override { } + void wait_for_replayers(wsrep::client_context&, + wsrep::unique_lock&) override { } enum wsrep::provider::status replay(wsrep::client_context&, wsrep::transaction_context&) override @@ -97,6 +99,7 @@ namespace db return 0; } + void emergency_shutdown() { ::abort(); } void debug_sync(wsrep::client_context&, const char*) override { } void debug_crash(const char*) override { } private: diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index 74b9213..1698e42 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -3,25 +3,67 @@ // #include "db_server.hpp" +#include "db_client.hpp" +#include "db_simulator.hpp" + +db::server::server(simulator& simulator, + const std::string& name, + const std::string& server_id, + const std::string& address) + : simulator_(simulator) + , storage_engine_(simulator_.params()) + , mutex_() + , cond_() + , server_context_(name, server_id, address, "dbsim_" + name + "_data") + , last_client_id_(0) + , last_transaction_id_(0) + , appliers_() + , clients_() + , client_threads_() +{ } void db::server::applier_thread() { wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1); db::client applier(*this, client_id, wsrep::client_context::m_applier, simulator_.params()); - enum wsrep::provider::status ret(provider().run_applier(&applier)); + enum wsrep::provider::status ret(server_context_.provider().run_applier(&applier)); wsrep::log() << "Applier thread exited with error code " << ret; } +void db::server::start_applier() +{ + wsrep::unique_lock lock(mutex_); + appliers_.push_back(boost::thread(&server::applier_thread, this)); +} + +void db::server::stop_applier() +{ + wsrep::unique_lock lock(mutex_); + appliers_.front().join(); + appliers_.erase(appliers_.begin()); +} + +#if 0 +void db::server::on_sst_request(const std::string& req, + const wsrep::gtid& gtid, + bool bypass) +{ + simulator_.donate_sst(*this, req, gtid, bypass); +} + + wsrep::client_context* db::server::local_client_context() { std::ostringstream id_os; size_t client_id(++last_client_id_); - return new db::client(*this, client_id, - wsrep::client_context::m_replicating, - simulator_.params()); + db::client* client(new db::client(*this, client_id, + wsrep::client_context::m_replicating, + simulator_.params())); + return &client->client_context_; } +#endif void db::server::start_clients() { size_t n_clients(simulator_.params().n_clients); @@ -39,9 +81,9 @@ void db::server::stop_clients() } for (const auto& i : clients_) { - struct db::client::stats stats(i->stats()); + const struct db::client::stats& stats(i->stats()); simulator_.stats_.commits += stats.commits; - simulator_.stats_.aborts += stats.aborts; + simulator_.stats_.aborts += stats.rollbacks; simulator_.stats_.replays += stats.replays; } } diff --git a/dbsim/db_server.hpp b/dbsim/db_server.hpp index 2f0bf31..63b6361 100644 --- a/dbsim/db_server.hpp +++ b/dbsim/db_server.hpp @@ -5,84 +5,40 @@ #ifndef WSREP_DB_SERVER_HPP #define WSREP_DB_SERVER_HPP +#include "wsrep/gtid.hpp" +#include "wsrep/client_context.hpp" + +#include "db_storage_engine.hpp" +#include "db_server_context.hpp" + +#include + +#include +#include + namespace db { + class simulator; + class client; class server { -public: + public: server(simulator& simulator, const std::string& name, const std::string& id, - const std::string& address) - : wsrep::server_context(mutex_, - cond_, - name, id, address, name + "_data", - wsrep::server_context::rm_async) - , simulator_(simulator) - , storage_engine_(simulator_.params()) - , mutex_() - , cond_() - , last_client_id_(0) - , last_transaction_id_(0) - , appliers_() - , clients_() - , client_threads_() - { } + const std::string& address); // Provider management void applier_thread(); - void start_applier() - { - wsrep::unique_lock lock(mutex_); - appliers_.push_back(boost::thread(&dbms_server::applier_thread, this)); - } + void start_applier(); - void stop_applier() - { - wsrep::unique_lock lock(mutex_); - appliers_.front().join(); - appliers_.erase(appliers_.begin()); - } + void stop_applier(); - bool sst_before_init() const override { return false; } - std::string on_sst_required() - { - return id(); - } - void on_sst_request(const std::string& req, - const wsrep::gtid& gtid, - bool bypass) - { - simulator_.donate_sst(*this, req, gtid, bypass); - } - void background_rollback(wsrep::client_context& cs) override - { - assert(0); - cs.before_rollback(); - cs.after_rollback(); - } - - // Client context management - wsrep::client_context* local_client_context(); - - wsrep::client_context* streaming_applier_client_context() override - { - throw wsrep::not_implemented_error(); - } - - void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&) - override - { } - size_t next_transaction_id() - { - return (last_transaction_id_.fetch_add(1) + 1); - } - - storage_engine& storage_engine() { return storage_engine_; } + db::storage_engine& storage_engine() { return storage_engine_; } int apply_to_storage_engine(const wsrep::transaction_context& txc, const wsrep::const_buffer&) @@ -93,19 +49,21 @@ public: void start_clients(); void stop_clients(); - void client_thread(const std::shared_ptr& client); + void client_thread(const std::shared_ptr& client); + db::server_context& server_context() { return server_context_; } private: void start_client(size_t id); - simulator& simulator_; - storage_engine storage_engine_; + db::simulator& simulator_; + db::storage_engine storage_engine_; wsrep::default_mutex mutex_; wsrep::default_condition_variable cond_; + db::server_context server_context_; std::atomic last_client_id_; std::atomic last_transaction_id_; std::vector appliers_; - std::vector> clients_; + std::vector> clients_; std::vector client_threads_; }; }; diff --git a/dbsim/db_server_context.hpp b/dbsim/db_server_context.hpp index d474e20..72e1203 100644 --- a/dbsim/db_server_context.hpp +++ b/dbsim/db_server_context.hpp @@ -6,6 +6,7 @@ #define WSREP_DB_SERVER_CONTEXT_HPP #include "wsrep/server_context.hpp" +#include "wsrep/client_context.hpp" #include @@ -14,11 +15,38 @@ namespace db class server_context : public wsrep::server_context { public: - size_t next_transaction_id() + server_context(const std::string& name, + const std::string& server_id, + const std::string& address, + const std::string& working_dir) + : wsrep::server_context( + mutex_, + cond_, + name, + server_id, + address, + working_dir, + wsrep::server_context::rm_async) + , mutex_() + , cond_() + , last_transaction_id_() + { } + wsrep::client_context* local_client_context() override; + wsrep::client_context* streaming_applier_client_context() override; + void release_client_context(wsrep::client_context*) override; + bool sst_before_init() const override; + void on_sst_request(const std::string&, const wsrep::gtid&, bool) override; + std::string on_sst_required() override; + void background_rollback(wsrep::client_context&) override; + void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&) + override; + size_t next_transaction_id() { return (last_transaction_id_.fetch_add(1) + 1); } private: + wsrep::default_mutex mutex_; + wsrep::default_condition_variable cond_; std::atomic last_transaction_id_; }; } diff --git a/dbsim/db_simulator.hpp b/dbsim/db_simulator.hpp index 056a1f8..277648f 100644 --- a/dbsim/db_simulator.hpp +++ b/dbsim/db_simulator.hpp @@ -31,7 +31,7 @@ namespace db void stop(); void donate_sst(server&, const std::string& req, const wsrep::gtid& gtid, bool); - const simulator_params& params() const + const db::params& params() const { return params_; } std::string stats() const; private: @@ -44,7 +44,7 @@ namespace db std::string build_cluster_address() const; wsrep::default_mutex mutex_; - const simulator_params& params_; + const db::params& params_; std::map> servers_; std::chrono::time_point clients_start_; std::chrono::time_point clients_stop_; diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index 59d14e6..a9afff6 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -649,6 +649,30 @@ namespace wsrep enum wsrep::client_context::mode orig_mode_; }; + template + class scoped_client_context + { + public: + scoped_client_context(wsrep::client_context* client_context, D deleter) + : client_context_(client_context) + , deleter_(deleter) + { + if (client_context_ == 0) + { + throw wsrep::runtime_error("Null client_context provided"); + } + } + wsrep::client_context& client_context() { return *client_context_; } + ~scoped_client_context() + { + deleter_(client_context_); + } + private: + scoped_client_context(const scoped_client_context&); + scoped_client_context& operator=(const scoped_client_context&); + wsrep::client_context* client_context_; + D deleter_; + }; } diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index 18d0061..63cb3dc 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -210,6 +210,8 @@ namespace wsrep */ virtual client_context* streaming_applier_client_context() = 0; + virtual void release_client_context(wsrep::client_context*) = 0; + void start_streaming_applier( const wsrep::id&, const wsrep::transaction_id&, @@ -442,6 +444,20 @@ namespace wsrep int debug_log_level_; }; + class client_deleter + { + public: + client_deleter(wsrep::server_context& server_context) + : server_context_(server_context) + { } + void operator()(wsrep::client_context* client_context) + { + server_context_.release_client_context(client_context); + } + private: + wsrep::server_context& server_context_; + }; + static inline std::string to_string(enum wsrep::server_context::state state) { switch (state) diff --git a/src/server_context.cpp b/src/server_context.cpp index 5ecb619..82d247f 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -246,7 +246,7 @@ int wsrep::server_context::on_apply( sac->after_command_after_result(); stop_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id()); - delete sac; + release_client_context(sac); } } else diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 36068ce..b70c176 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -719,18 +719,23 @@ int wsrep::transaction_context::certify_fragment( // Client context to store fragment in separate transaction // Switch temporarily to sr_transaction_context, switch back // to original when this goes out of scope - std::auto_ptr sr_client_context( - client_context_.server_context().local_client_context()); + // std::auto_ptr sr_client_context( + // client_context_.server_context().local_client_context()); + wsrep::scoped_client_context sr_client_context_scope( + client_context_.server_context().local_client_context(), + wsrep::client_deleter(client_context_.server_context())); + wsrep::client_context& sr_client_context( + sr_client_context_scope.client_context()); wsrep::client_context_switch client_context_switch( client_context_, - *sr_client_context); + sr_client_context); - wsrep::unique_lock sr_lock(sr_client_context->mutex()); + wsrep::unique_lock sr_lock(sr_client_context.mutex()); wsrep::transaction_context& sr_transaction_context( - sr_client_context->transaction_); + sr_client_context.transaction_); sr_transaction_context.state(sr_lock, s_certifying); sr_lock.unlock(); - if (sr_client_context->append_fragment( + if (sr_client_context.append_fragment( sr_transaction_context, flags_, wsrep::const_buffer(data.data(), data.size()))) { @@ -755,7 +760,7 @@ int wsrep::transaction_context::certify_fragment( sr_transaction_context.certified_ = true; sr_transaction_context.state(sr_lock, s_committing); sr_lock.unlock(); - if (sr_client_context->commit()) + if (sr_client_context.commit()) { ret = 1; } @@ -764,7 +769,7 @@ int wsrep::transaction_context::certify_fragment( sr_lock.lock(); sr_transaction_context.state(sr_lock, s_must_abort); sr_lock.unlock(); - sr_client_context->rollback(); + sr_client_context.rollback(); ret = 1; break; } diff --git a/test/mock_server_context.hpp b/test/mock_server_context.hpp index 9f05234..0585d78 100644 --- a/test/mock_server_context.hpp +++ b/test/mock_server_context.hpp @@ -41,6 +41,11 @@ namespace wsrep *this, client_service_, ++last_client_id_, wsrep::client_context::m_applier); } + void release_client_context(wsrep::client_context* client_context) + { + delete client_context; + } + void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&) WSREP_OVERRIDE