From ae68122d591af9f73ad490079696fbd433d0748d Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Fri, 15 Jun 2018 16:25:27 +0300 Subject: [PATCH] Refactored dbms simulator. Survives SST. --- dbsim/CMakeLists.txt | 15 ++-- dbsim/db_client.cpp | 34 ++++++-- dbsim/db_client.hpp | 12 +-- dbsim/db_client_context.hpp | 8 ++ dbsim/db_params.cpp | 46 +++++++++++ dbsim/db_params.hpp | 2 + dbsim/db_server.cpp | 49 +++++++----- dbsim/db_server.hpp | 28 +++---- dbsim/db_server_context.cpp | 48 ++++++++++++ dbsim/db_server_context.hpp | 12 ++- dbsim/db_simulator.cpp | 150 ++++++++++++++++++++++++++++++++++++ dbsim/db_simulator.hpp | 24 +++--- dbsim/dbsim.cpp | 16 ++++ 13 files changed, 370 insertions(+), 74 deletions(-) create mode 100644 dbsim/db_params.cpp create mode 100644 dbsim/db_server_context.cpp create mode 100644 dbsim/db_simulator.cpp create mode 100644 dbsim/dbsim.cpp diff --git a/dbsim/CMakeLists.txt b/dbsim/CMakeLists.txt index bbe1218..37f16be 100644 --- a/dbsim/CMakeLists.txt +++ b/dbsim/CMakeLists.txt @@ -2,13 +2,16 @@ # Copyright (C) 2018 Codership Oy # -add_executable(dbms_simulator +add_executable(dbsim db_client.cpp + db_client_service.cpp + db_params.cpp db_server.cpp + db_server_context.cpp + db_simulator.cpp db_storage_engine.cpp - - # db_client_service.cpp -# dbms_simulator.cpp + dbsim.cpp ) -target_link_libraries(dbms_simulator wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY}) -set_property(TARGET dbms_simulator PROPERTY CXX_STANDARD 14) + +target_link_libraries(dbsim wsrep-lib ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY}) +set_property(TARGET dbsim PROPERTY CXX_STANDARD 14) diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index 86ec97d..da5284c 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -11,13 +11,33 @@ db::client::client(db::server& server, const db::params& params) : mutex_() , params_(params) + , server_(server) , server_context_(server.server_context()) - , client_context_(mutex_, server_context_, client_service_, client_id, mode) + , client_context_(mutex_, this, server_context_, client_service_, client_id, mode) , client_service_(server_context_.provider(), client_context_) , se_trx_(server.storage_engine()) , stats_() { } +void db::client::start() +{ + for (size_t i(0); i < params_.n_transactions; ++i) + { + run_one_transaction(); + report_progress(i + 1); + } +} + +bool db::client::bf_abort(wsrep::seqno seqno) +{ + wsrep::unique_lock lock(mutex_); + return client_context_.bf_abort(lock, seqno); +} + +//////////////////////////////////////////////////////////////////////////////// +// Private // +//////////////////////////////////////////////////////////////////////////////// + template int db::client::client_command(F f) { @@ -54,7 +74,7 @@ void db::client::run_one_transaction() { // wsrep::log_debug() << "Start transaction"; err = client_context_.start_transaction( - server_context_.next_transaction_id()); + server_.next_transaction_id()); assert(err == 0); se_trx_.start(this); return err; @@ -124,8 +144,12 @@ void db::client::run_one_transaction() } } -bool db::client::bf_abort(wsrep::seqno seqno) +void db::client::report_progress(size_t i) const { - wsrep::unique_lock lock(mutex_); - return client_context_.bf_abort(lock, seqno); + if ((i % 1000) == 0) + { + wsrep::log_info() << "client: " << client_context_.id().get() + << " transactions: " << i + << " " << 100*double(i)/params_.n_transactions << "%"; + } } diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index 03f45d2..8cead7e 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -5,10 +5,11 @@ #ifndef WSREP_DB_CLIENT_HPP #define WSREP_DB_CLIENT_HPP -#include "db_client_context.hpp" #include "db_server_context.hpp" -#include "db_client_service.hpp" #include "db_storage_engine.hpp" +#include "db_client_context.hpp" +#include "db_client_service.hpp" + namespace db { @@ -27,26 +28,25 @@ namespace db , replays(0) { } }; - client(db::server&, wsrep::client_id, enum wsrep::client_context::mode, const db::params&); - bool bf_abort(wsrep::seqno); const struct stats stats() const { return stats_; } - void store_globals() { } void reset_globals() { } void start(); - + wsrep::client_context& client_context() { return client_context_; } private: friend class db::server_context; template int client_command(F f); void run_one_transaction(); void reset_error(); + void report_progress(size_t) const; wsrep::default_mutex mutex_; const db::params& params_; + db::server& server_; db::server_context& server_context_; db::client_context client_context_; db::client_service client_service_; diff --git a/dbsim/db_client_context.hpp b/dbsim/db_client_context.hpp index 7e51b27..4e5d39a 100644 --- a/dbsim/db_client_context.hpp +++ b/dbsim/db_client_context.hpp @@ -10,10 +10,12 @@ namespace db { + class client; class client_context : public wsrep::client_context { public: client_context(wsrep::mutex& mutex, + db::client* client, db::server_context& server_context, wsrep::client_service& client_service, const wsrep::client_id& client_id, @@ -23,14 +25,20 @@ namespace db client_service, client_id, mode) + , client_(client) , is_autocommit_(false) , do_2pc_(false) { } + db::client* client() { return client_; } void reset_globals() { } void store_globals() { } bool is_autocommit() const { return is_autocommit_; } bool do_2pc() const { return do_2pc_; } + private: + client_context(const client_context&); + client_context& operator=(const client_context&); + db::client* client_; bool is_autocommit_; bool do_2pc_; }; diff --git a/dbsim/db_params.cpp b/dbsim/db_params.cpp new file mode 100644 index 0000000..f3b2d60 --- /dev/null +++ b/dbsim/db_params.cpp @@ -0,0 +1,46 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "db_params.hpp" + +#include +#include + +db::params db::parse_args(int argc, char** argv) +{ + namespace po = boost::program_options; + db::params params; + po::options_description desc("Allowed options"); + desc.add_options() + ("help", "produce help message") + ("wsrep-provider", + po::value(¶ms.wsrep_provider)->required(), + "wsrep provider to load") + ("wsrep-provider-options", + po::value(¶ms.wsrep_provider_options), + "wsrep provider options") + ("servers", po::value(¶ms.n_servers)->required(), + "number of servers to start") + ("clients", po::value(¶ms.n_clients)->required(), + "number of clients to start per server") + ("transactions", po::value(¶ms.n_transactions), + "number of transactions run by a client") + ("rows", po::value(¶ms.n_rows), + "number of rows per table") + ("alg-freq", po::value(¶ms.alg_freq), + "ALG frequency") + ("debug-log-level", po::value(¶ms.debug_log_level), + "debug logging level: 0 - none, 1 - verbose") + ("fast-exit", po::value(¶ms.fast_exit), + "exit from simulation without graceful shutdown"); + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + if (vm.count("help")) + { + std::cerr << desc << "\n"; + exit(0); + } + return params; +} diff --git a/dbsim/db_params.hpp b/dbsim/db_params.hpp index c3b825a..8e3f8ee 100644 --- a/dbsim/db_params.hpp +++ b/dbsim/db_params.hpp @@ -33,6 +33,8 @@ namespace db , fast_exit(0) { } }; + + params parse_args(int argc, char** argv); } #endif // WSREP_DB_PARAMS_HPP diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index 1698e42..da7eff0 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -14,7 +14,7 @@ db::server::server(simulator& simulator, , storage_engine_(simulator_.params()) , mutex_() , cond_() - , server_context_(name, server_id, address, "dbsim_" + name + "_data") + , server_context_(*this, name, server_id, address, "dbsim_" + name + "_data") , last_client_id_(0) , last_transaction_id_(0) , appliers_() @@ -44,26 +44,7 @@ void db::server::stop_applier() appliers_.erase(appliers_.begin()); } -#if 0 -void db::server::on_sst_request(const std::string& req, - const wsrep::gtid& gtid, - bool bypass) -{ - simulator_.donate_sst(*this, req, gtid, bypass); -} - -wsrep::client_context* db::server::local_client_context() -{ - std::ostringstream id_os; - size_t client_id(++last_client_id_); - db::client* client(new db::client(*this, client_id, - wsrep::client_context::m_replicating, - simulator_.params())); - return &client->client_context_; -} - -#endif void db::server::start_clients() { size_t n_clients(simulator_.params().n_clients); @@ -105,3 +86,31 @@ void db::server::start_client(size_t id) boost::thread(&db::server::client_thread, this, client)); } +void db::server::donate_sst(const std::string& req, + const wsrep::gtid& gtid, + bool bypass) +{ + simulator_.sst(*this, req, gtid, bypass); +} + +wsrep::client_context* db::server::local_client_context() +{ + std::ostringstream id_os; + size_t client_id(++last_client_id_); + db::client* client(new db::client(*this, client_id, + wsrep::client_context::m_replicating, + simulator_.params())); + return &client->client_context(); +} + +wsrep::client_context* db::server::streaming_applier_client_context() +{ + throw wsrep::not_implemented_error(); +} + +void db::server::release_client_context(wsrep::client_context* client_context) +{ + db::client_context* db_client_context( + dynamic_cast(client_context)); + delete db_client_context->client(); +} diff --git a/dbsim/db_server.hpp b/dbsim/db_server.hpp index 63b6361..f6c40d0 100644 --- a/dbsim/db_server.hpp +++ b/dbsim/db_server.hpp @@ -27,32 +27,24 @@ namespace db const std::string& name, const std::string& id, const std::string& address); - - // Provider management - void applier_thread(); - void start_applier(); - void stop_applier(); - - - - db::storage_engine& storage_engine() { return storage_engine_; } - - int apply_to_storage_engine(const wsrep::transaction_context& txc, - const wsrep::const_buffer&) - { - storage_engine_.bf_abort_some(txc); - return 0; - } - void start_clients(); void stop_clients(); void client_thread(const std::shared_ptr& client); + db::storage_engine& storage_engine() { return storage_engine_; } db::server_context& server_context() { return server_context_; } - private: + wsrep::transaction_id next_transaction_id() + { + return (last_transaction_id_.fetch_add(1) + 1); + } + void donate_sst(const std::string&, const wsrep::gtid&, bool); + wsrep::client_context* local_client_context(); + wsrep::client_context* streaming_applier_client_context(); + void release_client_context(wsrep::client_context*); + private: void start_client(size_t id); db::simulator& simulator_; diff --git a/dbsim/db_server_context.cpp b/dbsim/db_server_context.cpp new file mode 100644 index 0000000..21d0234 --- /dev/null +++ b/dbsim/db_server_context.cpp @@ -0,0 +1,48 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "db_server_context.hpp" +#include "db_server.hpp" + +wsrep::client_context* db::server_context::local_client_context() +{ + return server_.local_client_context(); +} + +wsrep::client_context* db::server_context::streaming_applier_client_context() +{ + return server_.streaming_applier_client_context(); +} + +void db::server_context::release_client_context( + wsrep::client_context* client_context) +{ + server_.release_client_context(client_context); +} + +bool db::server_context::sst_before_init() const +{ + return false; +} + +std::string db::server_context::on_sst_required() +{ + return id(); +} + +void db::server_context::on_sst_request( + const std::string& request, const wsrep::gtid& gtid, bool bypass) +{ + server_.donate_sst(request, gtid, bypass); +} + +void db::server_context::background_rollback(wsrep::client_context&) +{ +} + +void db::server_context::log_dummy_write_set( + wsrep::client_context&, const wsrep::ws_meta& meta) +{ + wsrep::log_info() << "Dummy write set: " << meta.seqno(); +} diff --git a/dbsim/db_server_context.hpp b/dbsim/db_server_context.hpp index 72e1203..c6775ef 100644 --- a/dbsim/db_server_context.hpp +++ b/dbsim/db_server_context.hpp @@ -12,10 +12,12 @@ namespace db { + class server; class server_context : public wsrep::server_context { public: - server_context(const std::string& name, + server_context(db::server& server, + const std::string& name, const std::string& server_id, const std::string& address, const std::string& working_dir) @@ -29,7 +31,7 @@ namespace db wsrep::server_context::rm_async) , mutex_() , cond_() - , last_transaction_id_() + , server_(server) { } wsrep::client_context* local_client_context() override; wsrep::client_context* streaming_applier_client_context() override; @@ -40,14 +42,10 @@ namespace db void background_rollback(wsrep::client_context&) override; void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&) override; - size_t next_transaction_id() - { - return (last_transaction_id_.fetch_add(1) + 1); - } private: wsrep::default_mutex mutex_; wsrep::default_condition_variable cond_; - std::atomic last_transaction_id_; + db::server& server_; }; } diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp new file mode 100644 index 0000000..7952198 --- /dev/null +++ b/dbsim/db_simulator.cpp @@ -0,0 +1,150 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "db_simulator.hpp" +#include +#include + +void db::simulator::start() +{ + wsrep::log() << "Provider: " << params_.wsrep_provider; + + std::string cluster_address(build_cluster_address()); + wsrep::log() << "Cluster address: " << cluster_address; + for (size_t i(0); i < params_.n_servers; ++i) + { + std::ostringstream name_os; + name_os << (i + 1); + std::ostringstream id_os; + id_os << (i + 1); + std::ostringstream address_os; + address_os << "127.0.0.1:" << server_port(i); + auto it(servers_.insert( + std::make_pair( + (i + 1), + std::make_unique( + *this, + name_os.str(), + id_os.str(), + address_os.str())))); + if (it.second == false) + { + throw wsrep::runtime_error("Failed to add server"); + } + boost::filesystem::path dir("dbsim_" + id_os.str() + "_data"); + boost::filesystem::create_directory(dir); + + db::server& server(*it.first->second); + server.server_context().debug_log_level(params_.debug_log_level); + std::string server_options(params_.wsrep_provider_options); + + if (server.server_context().load_provider( + params_.wsrep_provider, server_options)) + { + throw wsrep::runtime_error("Failed to load provider"); + } + if (server.server_context().connect("sim_cluster", cluster_address, "", + i == 0)) + { + throw wsrep::runtime_error("Failed to connect"); + } + server.start_applier(); + server.server_context().wait_until_state( + wsrep::server_context::s_synced); + } + + // Start client threads + wsrep::log() << "####################### Starting client load"; + clients_start_ = std::chrono::steady_clock::now(); + for (auto& i : servers_) + { + i.second->start_clients(); + } +} + +void db::simulator::stop() +{ + +} + +void db::simulator::sst(db::server& server, + const std::string& request, + const wsrep::gtid& gtid, + bool bypass) +{ + size_t id; + std::istringstream is(request); + is >> id; + wsrep::unique_lock lock(mutex_); + auto i(servers_.find(id)); + if (i == servers_.end()) + { + throw wsrep::runtime_error("Server " + request + " not found"); + } + if (bypass == false) + { + wsrep::log_info() << "SST " << server.server_context().id() << " -> " << id; + } + i->second->server_context().sst_received(gtid, 0); + server.server_context().sst_sent(gtid, 0); +} + +std::string db::simulator::stats() const +{ + size_t transactions(params_.n_servers * params_.n_clients + * params_.n_transactions); + auto duration(std::chrono::duration( + clients_stop_ - clients_start_).count()); + long long bf_aborts(0); + for (const auto& s : servers_) + { + bf_aborts += s.second->storage_engine().bf_aborts(); + } + std::ostringstream os; + os << "Number of transactions: " << transactions + << "\n" + << "Seconds: " << duration + << " \n" + << "Transactions per second: " << transactions/duration + << "\n" + << "BF aborts: " + << bf_aborts + << "\n" + << "Client commits: " << stats_.commits + << "\n" + << "Client aborts: " << stats_.aborts + << "\n" + << "Client replays: " << stats_.replays; + return os.str(); +} + +//////////////////////////////////////////////////////////////////////////////// +// Private // +//////////////////////////////////////////////////////////////////////////////// + +std::string db::simulator::server_port(size_t i) const +{ + std::ostringstream os; + os << (10000 + (i + 1)*10); + return os.str(); +} + +std::string db::simulator::build_cluster_address() const +{ + std::string ret; + if (params_.wsrep_provider.find("galera_smm") != std::string::npos) + { + ret += "gcomm://"; + } + + for (size_t i(0); i < params_.n_servers; ++i) + { + std::ostringstream sa_os; + sa_os << "127.0.0.1:"; + sa_os << server_port(i); + ret += sa_os.str(); + if (i < params_.n_servers - 1) ret += ","; + } + return ret; +} diff --git a/dbsim/db_simulator.hpp b/dbsim/db_simulator.hpp index 277648f..69b41cc 100644 --- a/dbsim/db_simulator.hpp +++ b/dbsim/db_simulator.hpp @@ -5,16 +5,21 @@ #ifndef WSREP_DB_SIMULATOR_HPP #define WSREP_DB_SIMULATOR_HPP +#include "wsrep/gtid.hpp" +#include "wsrep/mutex.hpp" +#include "wsrep/lock.hpp" + +#include "db_params.hpp" +#include "db_server.hpp" + #include #include #include - -#include "db_params.hpp" +#include namespace db { class server; - class simulator { public: @@ -29,23 +34,18 @@ namespace db void start(); void stop(); - void donate_sst(server&, - const std::string& req, const wsrep::gtid& gtid, bool); + void sst(db::server&, + const std::string&, const wsrep::gtid&, bool); const db::params& params() const { return params_; } std::string stats() const; private: - std::string server_port(size_t i) const - { - std::ostringstream os; - os << (10000 + (i + 1)*10); - return os.str(); - } + std::string server_port(size_t i) const; std::string build_cluster_address() const; wsrep::default_mutex mutex_; const db::params& params_; - std::map> servers_; + std::map> servers_; std::chrono::time_point clients_start_; std::chrono::time_point clients_stop_; public: diff --git a/dbsim/dbsim.cpp b/dbsim/dbsim.cpp new file mode 100644 index 0000000..ce07d29 --- /dev/null +++ b/dbsim/dbsim.cpp @@ -0,0 +1,16 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "db_params.hpp" +#include "db_simulator.hpp" + +int main(int argc, char** argv) +{ + db::params params(db::parse_args(argc, argv)); + db::simulator simulator(params); + simulator.start(); + simulator.stop(); + std::cout << simulator.stats() << std::endl; + return 0; +}