diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index fd03b6f..92856cb 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -51,13 +51,29 @@ public: : mutex_() , params_(params) , servers_() + , clients_start_() + , clients_stop_() { } + ~dbms_simulator() + { + } void start(); void stop(); void donate_sst(dbms_server&, const std::string& req, const wsrep_gtid_t& gtid, bool); const dbms_simulator_params& params() const { return params_; } + std::string stats() const + { + std::ostringstream os; + os << "Number of transactions: " << + (params_.n_servers * params_.n_clients * params_.n_transactions) + << "\n" + << "Seconds: " + << std::chrono::duration(clients_stop_ - clients_start_).count() + << "\n"; + return os.str(); + } private: std::string server_port(size_t i) const { @@ -70,6 +86,8 @@ private: trrep::default_mutex mutex_; const dbms_simulator_params& params_; std::map> servers_; + std::chrono::time_point clients_start_; + std::chrono::time_point clients_stop_; }; class dbms_client; @@ -84,8 +102,10 @@ public: s_synced }; dbms_server(dbms_simulator& simulator, - const std::string& name, const std::string& id) - : trrep::server_context(name, id, name + "_data", + const std::string& name, + const std::string& id, + const std::string& address) + : trrep::server_context(name, id, address, name + "_data", trrep::server_context::rm_async) , simulator_(simulator) , mutex_() @@ -252,7 +272,7 @@ public: bool do_2pc() const { return false; } int apply(trrep::transaction_context&, const trrep::data&) { - std::cerr << "applying" << "\n"; + // std::cerr << "applying" << "\n"; return 0; } int commit(trrep::transaction_context& transaction_context) @@ -261,12 +281,14 @@ public: ret = transaction_context.before_commit(); ret = ret || transaction_context.ordered_commit(); ret = ret || transaction_context.after_commit(); - std::cerr << "commit" << "\n"; + // std::cerr << "commit" << "\n"; return 0; } int rollback(trrep::transaction_context& transaction_context) { - // std::cerr << "rollback: " << transaction_context.id().get() << "\n"; + std::cerr << "rollback: " << transaction_context.id().get() + << "state: " << trrep::to_string(transaction_context.state()) + << "\n"; transaction_context.before_rollback(); transaction_context.after_rollback(); return 0; @@ -288,22 +310,22 @@ private: int err(0); key.append_key_part(&trx_id, sizeof(trx_id)); err = trx.append_key(key); - std::cout << "append_key: " << err << "\n"; + // std::cout << "append_key: " << err << "\n"; err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size())); - std::cout << "append_data: " << err << "\n"; + // std::cout << "append_data: " << err << "\n"; if (do_2pc()) { err = err || trx.before_prepare(); - std::cout << "before_prepare: " << err << "\n"; + // std::cout << "before_prepare: " << err << "\n"; err = err || trx.after_prepare(); - std::cout << "after_prepare: " << err << "\n"; + // std::cout << "after_prepare: " << err << "\n"; } err = err || trx.before_commit(); - std::cout << "before_commit: " << err << "\n"; + // std::cout << "before_commit: " << err << "\n"; err = err || trx.ordered_commit(); - std::cout << "ordered_commit: " << err << "\n"; + // std::cout << "ordered_commit: " << err << "\n"; err = err || trx.after_commit(); - std::cout << "after_commit: " << err << "\n"; + // std::cout << "after_commit: " << err << "\n"; trx.after_statement(); } @@ -378,9 +400,16 @@ void dbms_simulator::start() name_os << (i + 1); std::ostringstream id_os; id_os << (i + 1); - auto it(servers_.insert(std::make_pair((i + 1), - std::make_unique( - *this, name_os.str(), id_os.str())))); + std::ostringstream address_os; + address_os << "127.0.0.1:" << server_port(i); + auto it(servers_.insert( + std::make_pair( + (i + 1), + std::make_unique( + *this, + name_os.str(), + id_os.str(), + address_os.str())))); if (it.second == false) { throw trrep::runtime_error("Failed to add server"); @@ -390,10 +419,16 @@ void dbms_simulator::start() dbms_server& server(*it.first->second); std::string server_options(params_.wsrep_provider_options); - server_options += "; base_port=" + server_port(i); - server.load_provider(params_.wsrep_provider, server_options); - server.provider().connect("sim_cluster", cluster_address, "", - i == 0); + // server_options += "; base_port=" + server_port(i); + if (server.load_provider(params_.wsrep_provider, server_options)) + { + throw trrep::runtime_error("Failed to load provider"); + } + if (server.provider().connect("sim_cluster", cluster_address, "", + i == 0)) + { + throw trrep::runtime_error("Failed to connect"); + } server.start_applier(); server.wait_until_connected(); server.wait_until_state(dbms_server::s_synced); @@ -401,6 +436,7 @@ void dbms_simulator::start() // Start client threads + clients_start_ = std::chrono::steady_clock::now(); for (auto& i : servers_) { i.second->start_clients(); @@ -414,9 +450,23 @@ void dbms_simulator::stop() { dbms_server& server(*i.second); server.stop_clients(); + } + clients_stop_ = std::chrono::steady_clock::now(); + for (auto& i : servers_) + { + dbms_server& server(*i.second); + std::cout << "Status for server: " << server.id() << "\n"; + auto status(server.provider().status()); + for_each(status.begin(), status.end(), + [](const trrep::provider::status_variable& sv) + { + std::cout << sv.name() << " = " << sv.value() << "\n"; + }); + server.provider().disconnect(); server.wait_until_disconnected(); server.stop_applier(); + } } @@ -443,7 +493,8 @@ void dbms_simulator::donate_sst(dbms_server& server, } std::string dbms_simulator::build_cluster_address() const { - std::string ret("gcomm://"); + std::string ret; + // std::string ret("gcomm://"); for (size_t i(0); i < params_.n_servers; ++i) { std::ostringstream sa_os; @@ -459,6 +510,7 @@ namespace po = boost::program_options; int main(int argc, char** argv) { + std::string stats; try { dbms_simulator_params params; @@ -489,8 +541,8 @@ int main(int argc, char** argv) dbms_simulator sim(params); sim.start(); - std::this_thread::sleep_for(std::chrono::seconds(5)); sim.stop(); + stats = sim.stats(); } catch (const std::exception& e) { @@ -498,5 +550,7 @@ int main(int argc, char** argv) return 1; } + std::cout << "Stats:\n" << stats << "\n"; + return 0; } diff --git a/src/mock_provider.hpp b/src/mock_provider.hpp index cc52067..982da81 100644 --- a/src/mock_provider.hpp +++ b/src/mock_provider.hpp @@ -96,9 +96,11 @@ namespace trrep int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } int rollback(const wsrep_trx_id_t) { return 0; } - wsrep_status_t commit_order_enter(wsrep_ws_handle_t*) + wsrep_status_t commit_order_enter(const wsrep_ws_handle_t*, + const wsrep_trx_meta_t*) { return WSREP_OK; } - int commit_order_leave(wsrep_ws_handle_t*) { return 0;} + int commit_order_leave(const wsrep_ws_handle_t*, + const wsrep_trx_meta_t*) { return 0;} int release(wsrep_ws_handle_t*) { return 0; } // Methods to modify mock state @@ -125,6 +127,11 @@ namespace trrep int sst_sent(const wsrep_gtid_t&, int) { return 0; } int sst_received(const wsrep_gtid_t&, int) { return 0; } + + std::vector status() const + { + return std::vector(); + } private: wsrep_uuid_t group_id_; wsrep_uuid_t node_id_; diff --git a/src/mock_server_context.hpp b/src/mock_server_context.hpp index d593cd3..d0a8173 100644 --- a/src/mock_server_context.hpp +++ b/src/mock_server_context.hpp @@ -18,7 +18,7 @@ namespace trrep mock_server_context(const std::string& name, const std::string& id, enum trrep::server_context::rollback_mode rollback_mode) - : trrep::server_context(name, id, "./", rollback_mode) + : trrep::server_context(name, id, "", "./", rollback_mode) , provider_() , last_client_id_(0) { } diff --git a/src/provider.hpp b/src/provider.hpp index 16c3cac..63eb1c8 100644 --- a/src/provider.hpp +++ b/src/provider.hpp @@ -10,6 +10,7 @@ #include #include +#include namespace trrep { @@ -18,6 +19,20 @@ namespace trrep { public: + class status_variable + { + public: + status_variable(const std::string& name, + const std::string& value) + : name_(name) + , value_(value) + { } + const std::string& name() const { return name_; } + const std::string& value() const { return value_; } + private: + std::string name_; + std::string value_; + }; virtual ~provider() { } // Provider state management virtual int connect(const std::string& cluster_name, @@ -51,12 +66,16 @@ namespace trrep wsrep_trx_id_t victim_trx, wsrep_seqno_t* victim_seqno) = 0; virtual int rollback(const wsrep_trx_id_t) = 0; - virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0; - virtual int commit_order_leave(wsrep_ws_handle_t*) = 0; + virtual wsrep_status commit_order_enter(const wsrep_ws_handle_t*, + const wsrep_trx_meta_t*) = 0; + virtual int commit_order_leave(const wsrep_ws_handle_t*, + const wsrep_trx_meta_t*) = 0; virtual int release(wsrep_ws_handle_t*) = 0; virtual int sst_sent(const wsrep_gtid_t&, int) = 0; virtual int sst_received(const wsrep_gtid_t&, int) = 0; + + virtual std::vector status() const = 0; // Factory method static provider* make_provider(const std::string& provider); }; diff --git a/src/server_context.cpp b/src/server_context.cpp index 504c08e..2a98608 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -37,7 +37,7 @@ namespace void* app_ctx, const wsrep_view_info_t* view __attribute((unused))) { - assert(app_ctx != 0); + assert(app_ctx); trrep::server_context& server_context( *reinterpret_cast(app_ctx)); // @@ -182,7 +182,7 @@ int trrep::server_context::load_provider(const std::string& provider_spec, memset(&init_args, 0, sizeof(init_args)); init_args.app_ctx = this; init_args.node_name = name_.c_str(); - init_args.node_address = ""; + init_args.node_address = address_.c_str(); init_args.node_incoming = ""; init_args.data_dir = working_dir_.c_str(); init_args.options = provider_options.c_str(); @@ -240,6 +240,7 @@ int trrep::server_context::on_apply( assert(0); } + transaction_context.after_statement(); if (ret) { client_context.rollback(transaction_context); diff --git a/src/server_context.hpp b/src/server_context.hpp index 0e0f1ef..855063f 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -32,11 +32,13 @@ namespace trrep server_context(const std::string& name, const std::string& id, + const std::string& address, const std::string& working_dir, enum rollback_mode rollback_mode) : provider_() , name_(name) , id_(id) + , address_(address) , working_dir_(working_dir) , rollback_mode_(rollback_mode) { } @@ -52,6 +54,11 @@ namespace trrep // const std::string& id() const { return id_; } + // + // Return server group communication address + // + const std::string& address() const { return address_; } + // // Create client context which acts only locally, i.e. does // not participate in replication. However, local client @@ -119,6 +126,7 @@ namespace trrep trrep::provider* provider_; std::string name_; std::string id_; + std::string address_; std::string working_dir_; enum rollback_mode rollback_mode_; }; diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 76ed5f3..8a26388 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -52,9 +52,16 @@ trrep::transaction_context::transaction_context( trrep::transaction_context::~transaction_context() { - if (state() != s_committed && state() != s_aborted) + if (active()) { - client_context_.rollback(*this); + try + { + (void)client_context_.rollback(*this); + } + catch (...) + { + // TODO: Log warning + } } } @@ -194,7 +201,7 @@ int trrep::transaction_context::before_commit() case trrep::client_context::m_local: if (ordered()) { - ret = provider_.commit_order_enter(&ws_handle_); + ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_); } break; case trrep::client_context::m_replicating: @@ -222,7 +229,7 @@ int trrep::transaction_context::before_commit() if (ret == 0) { lock.unlock(); - switch(provider_.commit_order_enter(&ws_handle_)) + switch(provider_.commit_order_enter(&ws_handle_, &trx_meta_)) { case WSREP_OK: break; @@ -240,7 +247,7 @@ int trrep::transaction_context::before_commit() break; case trrep::client_context::m_applier: assert(ordered()); - ret = provider_.commit_order_enter(&ws_handle_); + ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_); if (ret) { state(lock, s_must_abort); @@ -268,7 +275,7 @@ int trrep::transaction_context::ordered_commit() debug_log_state(); assert(state() == s_committing); assert(ordered()); - ret = provider_.commit_order_leave(&ws_handle_); + ret = provider_.commit_order_leave(&ws_handle_, &trx_meta_); // Should always succeed assert(ret == 0); state(lock, s_ordered_commit); @@ -427,8 +434,8 @@ int trrep::transaction_context::after_statement() { if (ordered()) { - ret = provider_.commit_order_enter(&ws_handle_); - if (ret == 0) provider_.commit_order_leave(&ws_handle_); + ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_); + if (ret == 0) provider_.commit_order_leave(&ws_handle_, &trx_meta_); } provider_.release(&ws_handle_); } @@ -466,9 +473,9 @@ void trrep::transaction_context::state( }; if (allowed[state_][next_state]) { - std::cerr << "state transition: " << trrep::to_string(state_) - << " -> " << trrep::to_string(next_state) - << "\n"; + // std::cerr << "state transition: " << trrep::to_string(state_) + // << " -> " << trrep::to_string(next_state) + // << "\n"; state_hist_.push_back(state_); state_ = next_state; } @@ -601,7 +608,7 @@ int trrep::transaction_context::certify_commit( assert(state() == s_certifying || state() == s_must_abort); client_context_.debug_sync("wsrep_after_replication"); - std::cout << "seqno: " << trx_meta_.gtid.seqno << "\n"; + // std::cout << "seqno: " << trx_meta_.gtid.seqno << "\n"; int ret(1); switch (cert_ret) { @@ -692,9 +699,9 @@ void trrep::transaction_context::clear_fragments() void trrep::transaction_context::cleanup() { - std::cerr << "Cleanup transaction " - << client_context_.id().get() - << ": " << id_.get() << "\n"; + // std::cerr << "Cleanup transaction " + // << client_context_.id().get() + // << ": " << id_.get() << "\n"; id_ = trrep::transaction_id::invalid(); state_ = s_executing; state_hist_.clear(); @@ -708,7 +715,7 @@ void trrep::transaction_context::cleanup() void trrep::transaction_context::debug_log_state() const { - std::cout << "client: " << client_context_.id().get() - << " trx: " << id_.get() - << " state: " << state_ << "\n"; + // std::cout << "client: " << client_context_.id().get() + // << " trx: " << id_.get() + // << " state: " << state_ << "\n"; } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 764eec9..d7df699 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -7,7 +7,10 @@ #include +#include + #include +#include trrep::wsrep_provider_v26::wsrep_provider_v26( const char* path, @@ -91,14 +94,17 @@ wsrep_status_t trrep::wsrep_provider_v26::certify(wsrep_conn_id_t conn_id, } wsrep_status_t trrep::wsrep_provider_v26::commit_order_enter( - wsrep_ws_handle_t* wsh) + const wsrep_ws_handle_t* wsh, + const wsrep_trx_meta_t* meta) { - return wsrep_->commit_order_enter(wsrep_, wsh); + return wsrep_->commit_order_enter(wsrep_, wsh, meta); } -int trrep::wsrep_provider_v26::commit_order_leave(wsrep_ws_handle_t* wsh) +int trrep::wsrep_provider_v26::commit_order_leave( + const wsrep_ws_handle_t* wsh, + const wsrep_trx_meta_t* meta) { - return (wsrep_->commit_order_leave(wsrep_, wsh, 0) != WSREP_OK); + return (wsrep_->commit_order_leave(wsrep_, wsh, meta, 0) != WSREP_OK); } int trrep::wsrep_provider_v26::release(wsrep_ws_handle_t* wsh) @@ -123,3 +129,43 @@ int trrep::wsrep_provider_v26::sst_received(const wsrep_gtid_t& gtid, int err) } return 0; } + +std::vector +trrep::wsrep_provider_v26::status() const +{ + std::vector ret; + struct wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_)); + struct wsrep_stats_var* i(stats); + if (i) + { + while (i->name) + { + switch (i->type) + { + case WSREP_VAR_STRING: + ret.push_back(status_variable(i->name, i->value._string)); + break; + case WSREP_VAR_INT64: + { + std::ostringstream os; + os << i->value._int64; + ret.push_back(status_variable(i->name, os.str())); + break; + } + case WSREP_VAR_DOUBLE: + { + std::ostringstream os; + os << i->value._double; + ret.push_back(status_variable(i->name, os.str())); + break; + } + default: + assert(0); + break; + } + ++i; + } + wsrep_->stats_free(wsrep_, stats); + } + return ret; +} diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index d4433a5..d33f451 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -33,11 +33,15 @@ namespace trrep wsrep_trx_id_t, wsrep_seqno_t*) { return WSREP_OK; } int rollback(const wsrep_trx_id_t) { return 0; } - wsrep_status commit_order_enter(wsrep_ws_handle_t*); - int commit_order_leave(wsrep_ws_handle_t*); + wsrep_status commit_order_enter(const wsrep_ws_handle_t*, + const wsrep_trx_meta_t*); + int commit_order_leave(const wsrep_ws_handle_t*, + const wsrep_trx_meta_t*); int release(wsrep_ws_handle_t*); int sst_sent(const wsrep_gtid_t&,int); int sst_received(const wsrep_gtid_t& gtid, int); + + std::vector status() const; private: wsrep_provider_v26(const wsrep_provider_v26&); wsrep_provider_v26& operator=(const wsrep_provider_v26);