diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index 8cead7e..fbdee71 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -34,12 +34,17 @@ namespace db const db::params&); bool bf_abort(wsrep::seqno); const struct stats stats() const { return stats_; } - void store_globals() { } - void reset_globals() { } + void store_globals() + { + client_context_.store_globals(); + } + void reset_globals() + { } void start(); wsrep::client_context& client_context() { return client_context_; } private: friend class db::server_context; + friend class db::client_service; template int client_command(F f); void run_one_transaction(); void reset_error(); diff --git a/dbsim/db_client_context.hpp b/dbsim/db_client_context.hpp index 4e5d39a..2d46fce 100644 --- a/dbsim/db_client_context.hpp +++ b/dbsim/db_client_context.hpp @@ -31,7 +31,7 @@ namespace db { } db::client* client() { return client_; } void reset_globals() { } - void store_globals() { } + void store_globals() { wsrep::client_context::store_globals(); } bool is_autocommit() const { return is_autocommit_; } bool do_2pc() const { return do_2pc_; } diff --git a/dbsim/db_client_service.cpp b/dbsim/db_client_service.cpp index e652944..2442170 100644 --- a/dbsim/db_client_service.cpp +++ b/dbsim/db_client_service.cpp @@ -3,3 +3,49 @@ // #include "db_client_service.hpp" +#include "db_client.hpp" + +int db::client_service::apply(wsrep::client_context&, + const wsrep::const_buffer&) +{ + db::client* client(client_context_.client()); + client->se_trx_.start(client); + client->se_trx_.apply(client_context_.transaction()); + return 0; +} + +int db::client_service::commit(wsrep::client_context&, + const wsrep::ws_handle&, + const wsrep::ws_meta&) +{ + db::client* client(client_context_.client()); + int ret(client_context_.before_commit()); + if (ret == 0) client->se_trx_.commit(); + ret = ret || client_context_.ordered_commit(); + ret = ret || client_context_.after_commit(); + return ret; +} + +int db::client_service::rollback(wsrep::client_context&) +{ + db::client* client(client_context_.client()); + int ret(client_context_.before_rollback()); + assert(ret == 0); + client->se_trx_.rollback(); + ret = client_context_.after_rollback(); + assert(ret == 0); + return ret; +} +enum wsrep::provider::status +db::client_service::replay(wsrep::client_context&, + wsrep::transaction_context& transaction_context) +{ + wsrep::client_applier_mode applier_mode(client_context_); + auto ret(provider_.replay(transaction_context.ws_handle(), + &client_context_)); + if (ret == wsrep::provider::success) + { + ++client_context_.client()->stats_.replays; + } + return ret; +} diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 01ffabe..7bb4cd7 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -2,11 +2,10 @@ // Copyright (C) 2018 Codership Oy // -#ifndef WSREP_DB_CLIENT_SREVICE_HPP +#ifndef WSREP_DB_CLIENT_SERVICE_HPP #define WSREP_DB_CLIENT_SERVICE_HPP #include "wsrep/client_service.hpp" -#include "wsrep/client_context.hpp" #include "wsrep/transaction_context.hpp" #include "db_client_context.hpp" @@ -67,21 +66,12 @@ namespace db void remove_fragments(const wsrep::transaction_context&) override { } - int apply(wsrep::client_context&, const wsrep::const_buffer&) override - { - return 0; - } + int apply(wsrep::client_context&, const wsrep::const_buffer&) override; int commit(wsrep::client_context&, - const wsrep::ws_handle&, const wsrep::ws_meta&) override - { - return 0; - } + const wsrep::ws_handle&, const wsrep::ws_meta&) override; - int rollback(wsrep::client_context&) override - { - return 0; - } + int rollback(wsrep::client_context&) override; void will_replay(const wsrep::transaction_context&) override { } @@ -90,8 +80,7 @@ namespace db wsrep::unique_lock&) override { } enum wsrep::provider::status replay(wsrep::client_context&, wsrep::transaction_context&) - override - { return wsrep::provider::success; } + override; int append_fragment(const wsrep::transaction_context&, int, const wsrep::const_buffer&) override diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index da7eff0..2f809fe 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -27,7 +27,10 @@ void db::server::applier_thread() wsrep::client_id client_id(last_client_id_.fetch_add(1) + 1); db::client applier(*this, client_id, wsrep::client_context::m_applier, simulator_.params()); - enum wsrep::provider::status ret(server_context_.provider().run_applier(&applier)); + wsrep::client_context* cc(static_cast( + &applier.client_context())); + enum wsrep::provider::status ret( + server_context_.provider().run_applier(cc)); wsrep::log() << "Applier thread exited with error code " << ret; } @@ -64,7 +67,7 @@ void db::server::stop_clients() { const struct db::client::stats& stats(i->stats()); simulator_.stats_.commits += stats.commits; - simulator_.stats_.aborts += stats.rollbacks; + simulator_.stats_.rollbacks += stats.rollbacks; simulator_.stats_.replays += stats.replays; } } diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp index 7952198..6ae01ac 100644 --- a/dbsim/db_simulator.cpp +++ b/dbsim/db_simulator.cpp @@ -6,6 +6,70 @@ #include #include +void db::simulator::run() +{ + start(); + stop(); + std::flush(std::cerr); + std::cout << "Results:\n"; + std::cout << stats() << std::endl; +} + +void db::simulator::sst(db::server& server, + const std::string& request, + const wsrep::gtid& gtid, + bool bypass) +{ + size_t id; + std::istringstream is(request); + is >> id; + wsrep::unique_lock lock(mutex_); + auto i(servers_.find(id)); + if (i == servers_.end()) + { + throw wsrep::runtime_error("Server " + request + " not found"); + } + if (bypass == false) + { + wsrep::log_info() << "SST " << server.server_context().id() << " -> " << id; + } + i->second->server_context().sst_received(gtid, 0); + server.server_context().sst_sent(gtid, 0); +} + +std::string db::simulator::stats() const +{ + size_t transactions(params_.n_servers * params_.n_clients + * params_.n_transactions); + auto duration(std::chrono::duration( + clients_stop_ - clients_start_).count()); + long long bf_aborts(0); + for (const auto& s : servers_) + { + bf_aborts += s.second->storage_engine().bf_aborts(); + } + std::ostringstream os; + os << "Number of transactions: " << transactions + << "\n" + << "Seconds: " << duration + << " \n" + << "Transactions per second: " << transactions/duration + << "\n" + << "BF aborts: " + << bf_aborts + << "\n" + << "Client commits: " << stats_.commits + << "\n" + << "Client rollbacks: " << stats_.rollbacks + << "\n" + << "Client replays: " << stats_.replays; + return os.str(); +} + +//////////////////////////////////////////////////////////////////////////////// +// Private // +//////////////////////////////////////////////////////////////////////////////// + void db::simulator::start() { wsrep::log() << "Provider: " << params_.wsrep_provider; @@ -65,64 +129,38 @@ void db::simulator::start() void db::simulator::stop() { - -} - -void db::simulator::sst(db::server& server, - const std::string& request, - const wsrep::gtid& gtid, - bool bypass) -{ - size_t id; - std::istringstream is(request); - is >> id; - wsrep::unique_lock lock(mutex_); - auto i(servers_.find(id)); - if (i == servers_.end()) + for (auto& i : servers_) { - throw wsrep::runtime_error("Server " + request + " not found"); + db::server& server(*i.second); + server.stop_clients(); } - if (bypass == false) + clients_stop_ = std::chrono::steady_clock::now(); + wsrep::log() << "######## Stats ############"; + wsrep::log() << stats(); + wsrep::log() << "######## Stats ############"; + if (params_.fast_exit) { - wsrep::log_info() << "SST " << server.server_context().id() << " -> " << id; + exit(0); } - i->second->server_context().sst_received(gtid, 0); - server.server_context().sst_sent(gtid, 0); -} - -std::string db::simulator::stats() const -{ - size_t transactions(params_.n_servers * params_.n_clients - * params_.n_transactions); - auto duration(std::chrono::duration( - clients_stop_ - clients_start_).count()); - long long bf_aborts(0); - for (const auto& s : servers_) + for (auto& i : servers_) { - bf_aborts += s.second->storage_engine().bf_aborts(); + db::server& server(*i.second); + wsrep::log_info() << "Status for server: " + << server.server_context().id(); + auto status(server.server_context().provider().status()); + for_each(status.begin(), status.end(), + [](const wsrep::provider::status_variable& sv) + { + wsrep::log() << sv.name() << " = " << sv.value(); + }); + server.server_context().disconnect(); + server.server_context().wait_until_state( + wsrep::server_context::s_disconnected); + server.stop_applier(); + server.server_context().unload_provider(); } - std::ostringstream os; - os << "Number of transactions: " << transactions - << "\n" - << "Seconds: " << duration - << " \n" - << "Transactions per second: " << transactions/duration - << "\n" - << "BF aborts: " - << bf_aborts - << "\n" - << "Client commits: " << stats_.commits - << "\n" - << "Client aborts: " << stats_.aborts - << "\n" - << "Client replays: " << stats_.replays; - return os.str(); } -//////////////////////////////////////////////////////////////////////////////// -// Private // -//////////////////////////////////////////////////////////////////////////////// - std::string db::simulator::server_port(size_t i) const { std::ostringstream os; diff --git a/dbsim/db_simulator.hpp b/dbsim/db_simulator.hpp index 69b41cc..d386354 100644 --- a/dbsim/db_simulator.hpp +++ b/dbsim/db_simulator.hpp @@ -32,14 +32,15 @@ namespace db , stats_() { } - void start(); - void stop(); + void run(); void sst(db::server&, const std::string&, const wsrep::gtid&, bool); const db::params& params() const { return params_; } std::string stats() const; private: + void start(); + void stop(); std::string server_port(size_t i) const; std::string build_cluster_address() const; @@ -52,11 +53,11 @@ namespace db struct stats { long long commits; - long long aborts; + long long rollbacks; long long replays; stats() : commits(0) - , aborts(0) + , rollbacks(0) , replays(0) { } } stats_; diff --git a/dbsim/db_storage_engine.cpp b/dbsim/db_storage_engine.cpp index 040f2e2..5aec8f6 100644 --- a/dbsim/db_storage_engine.cpp +++ b/dbsim/db_storage_engine.cpp @@ -5,7 +5,7 @@ #include "db_storage_engine.hpp" #include "db_client.hpp" -void db::storage_engine::transaction::start(client* cc) +void db::storage_engine::transaction::start(db::client* cc) { wsrep::unique_lock lock(se_.mutex_); if (se_.transactions_.insert(cc).second == false) @@ -15,6 +15,13 @@ void db::storage_engine::transaction::start(client* cc) cc_ = cc; } +void db::storage_engine::transaction::apply( + const wsrep::transaction_context& transaction_context) +{ + assert(cc_); + se_.bf_abort_some(transaction_context); +} + void db::storage_engine::transaction::commit() { if (cc_) @@ -43,10 +50,19 @@ void db::storage_engine::bf_abort_some(const wsrep::transaction_context& txc) { if (transactions_.empty() == false) { - auto* victim_txc(*transactions_.begin()); - if (victim_txc->bf_abort(txc.seqno())) + for (auto victim : transactions_) { - ++bf_aborts_; + wsrep::client_context& cc(victim->client_context()); + wsrep::unique_lock lock(cc.mutex()); + if (cc.mode() == wsrep::client_context::m_replicating) + { + lock.unlock(); + if (victim->bf_abort(txc.seqno())) + { + ++bf_aborts_; + break; + } + } } } } diff --git a/dbsim/db_storage_engine.hpp b/dbsim/db_storage_engine.hpp index 583f1d0..bb50e4e 100644 --- a/dbsim/db_storage_engine.hpp +++ b/dbsim/db_storage_engine.hpp @@ -39,13 +39,15 @@ namespace db } bool active() const { return cc_ != nullptr; } void start(client* cc); + void apply(const wsrep::transaction_context&); void commit(); void rollback(); + db::client* client() { return cc_; } transaction(const transaction&) = delete; transaction& operator=(const transaction&) = delete; private: - storage_engine& se_; - client* cc_; + db::storage_engine& se_; + db::client* cc_; }; void bf_abort_some(const wsrep::transaction_context& tc); long long bf_aborts() const { return bf_aborts_; } diff --git a/dbsim/dbsim.cpp b/dbsim/dbsim.cpp index ce07d29..f58cf82 100644 --- a/dbsim/dbsim.cpp +++ b/dbsim/dbsim.cpp @@ -7,10 +7,6 @@ int main(int argc, char** argv) { - db::params params(db::parse_args(argc, argv)); - db::simulator simulator(params); - simulator.start(); - simulator.stop(); - std::cout << simulator.stats() << std::endl; + db::simulator(db::parse_args(argc, argv)).run(); return 0; } diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index a9afff6..87d79d3 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -126,6 +126,13 @@ namespace wsrep }; const static int state_max_ = s_quitting + 1; + + + void store_globals() + { + thread_id_ = wsrep::this_thread::get_id(); + } + /*! * Destructor. */ @@ -134,7 +141,6 @@ namespace wsrep assert(transaction_.active() == false); } - /*! * */ @@ -328,7 +334,8 @@ namespace wsrep int commit() { assert(mode_ == m_applier || mode_ == m_local); - return client_service_.commit(*this, + return client_service_.commit( + *this, transaction_.ws_handle(), transaction_.ws_meta()); } // @@ -553,13 +560,6 @@ namespace wsrep client_context(const client_context&); client_context& operator=(client_context&); - /* - * Friend declarations - */ - //friend int server_context::on_apply(client_context&, - // const wsrep::ws_handle&, - // const wsrep::ws_meta&, - // const wsrep::const_buffer&); friend class client_context_switch; friend class client_applier_mode; friend class client_toi_mode; diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index 63cb3dc..e5f7786 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -239,6 +239,8 @@ namespace wsrep int load_provider(const std::string& provider, const std::string& provider_options); + void unload_provider(); + /*! * Return reference to provider. * diff --git a/src/server_context.cpp b/src/server_context.cpp index 82d247f..74d6bf7 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -26,6 +26,12 @@ int wsrep::server_context::load_provider(const std::string& provider_spec, return (provider_ ? 0 : 1); } +void wsrep::server_context::unload_provider() +{ + delete provider_; + provider_ = 0; +} + int wsrep::server_context::connect(const std::string& cluster_name, const std::string& cluster_address, const std::string& state_donor, diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index b70c176..6b40391 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -74,6 +74,7 @@ int wsrep::transaction_context::start_transaction( id_ = ws_meta.transaction_id(); assert(client_context_.mode() == wsrep::client_context::m_applier); state_ = s_executing; + state_hist_.clear(); ws_handle_ = ws_handle; ws_meta_ = ws_meta; certified_ = true; @@ -946,6 +947,7 @@ void wsrep::transaction_context::clear_fragments() void wsrep::transaction_context::cleanup() { assert(is_streaming() == false); + assert(state() == s_committed || state() == s_aborted); debug_log_state("cleanup_enter"); id_ = wsrep::transaction_id::invalid(); ws_handle_ = wsrep::ws_handle(); diff --git a/test/mock_client_context.cpp b/test/mock_client_context.cpp index 5ef9719..b97bb76 100644 --- a/test/mock_client_context.cpp +++ b/test/mock_client_context.cpp @@ -7,8 +7,8 @@ int wsrep::mock_client_service::apply( - wsrep::client_context& client_context, - const wsrep::const_buffer& data __attribute__((unused))) + wsrep::client_context& client_context WSREP_UNUSED, + const wsrep::const_buffer&) { assert(client_context.transaction().state() == wsrep::transaction_context::s_executing ||