diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index 8c61dd2..fd03b6f 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -10,6 +10,9 @@ #include "server_context.hpp" #include "client_context.hpp" +#include "transaction_context.hpp" +#include "key.hpp" +#include "data.hpp" #include "provider.hpp" #include "condition_variable.hpp" #include "view.hpp" @@ -25,24 +28,36 @@ class dbms_server; +struct dbms_simulator_params +{ + size_t n_servers; + size_t n_clients; + size_t n_transactions; + std::string wsrep_provider; + std::string wsrep_provider_options; + dbms_simulator_params() + : n_servers(0) + , n_clients(0) + , n_transactions(0) + , wsrep_provider() + , wsrep_provider_options() + { } +}; + class dbms_simulator { public: - dbms_simulator(size_t n_servers, - size_t n_clients, - const std::string& wsrep_provider, - const std::string& wsrep_provider_options) + dbms_simulator(const dbms_simulator_params& params) : mutex_() + , params_(params) , servers_() - , n_servers_(n_servers) - , n_clients_(n_clients) - , wsrep_provider_(wsrep_provider) - , wsrep_provider_options_(wsrep_provider_options) { } void start(); void stop(); void donate_sst(dbms_server&, const std::string& req, const wsrep_gtid_t& gtid, bool); + const dbms_simulator_params& params() const + { return params_; } private: std::string server_port(size_t i) const { @@ -53,11 +68,8 @@ private: std::string build_cluster_address() const; trrep::default_mutex mutex_; + const dbms_simulator_params& params_; std::map> servers_; - size_t n_servers_; - size_t n_clients_; - std::string wsrep_provider_; - std::string wsrep_provider_options_; }; class dbms_client; @@ -80,8 +92,10 @@ public: , cond_() , state_(s_disconnected) , last_client_id_(0) + , last_transaction_id_(0) , appliers_() , clients_() + , client_threads_() { } // Provider management @@ -136,6 +150,9 @@ public: void on_sync() { std::cerr << "Synced with group" << "\n"; + trrep::unique_lock lock(mutex_); + state_ = s_synced; + cond_.notify_all(); } std::string on_sst_request() @@ -159,6 +176,14 @@ public: provider().sst_received(gtid, 0); } + void wait_until_state(enum state state) + { + trrep::unique_lock lock(mutex_); + while (state_ != state) + { + cond_.wait(lock); + } + } void wait_until_connected() { trrep::unique_lock lock(mutex_); @@ -167,6 +192,7 @@ public: cond_.wait(lock); } } + void wait_until_disconnected() { trrep::unique_lock lock(mutex_); @@ -178,47 +204,123 @@ public: // Client context management trrep::client_context* local_client_context(); + + size_t next_transaction_id() + { + return (last_transaction_id_.fetch_add(1) + 1); + } + + void start_clients(); + void stop_clients(); + void client_thread(const std::shared_ptr& client); private: + + void start_client(size_t id); + dbms_simulator& simulator_; trrep::default_mutex mutex_; trrep::default_condition_variable cond_; enum state state_; std::atomic last_client_id_; + std::atomic last_transaction_id_; std::vector appliers_; std::map> clients_; + std::vector client_threads_; }; class dbms_client : public trrep::client_context { public: - dbms_client(dbms_server& server, const trrep::client_id& id, - enum trrep::client_context::mode mode) + dbms_client(dbms_server& server, + const trrep::client_id& id, + enum trrep::client_context::mode mode, + size_t n_transactions) : trrep::client_context(mutex_, server, id, mode) , mutex_() + , server_(server) + , n_transactions_(n_transactions) { } + + void start() + { + for (size_t i(0); i < n_transactions_; ++i) + { + run_one_transaction(); + } + } + bool do_2pc() const { return false; } int apply(trrep::transaction_context&, const trrep::data&) { + std::cerr << "applying" << "\n"; return 0; } - int commit(trrep::transaction_context&) + int commit(trrep::transaction_context& transaction_context) { + int ret(0); + ret = transaction_context.before_commit(); + ret = ret || transaction_context.ordered_commit(); + ret = ret || transaction_context.after_commit(); + std::cerr << "commit" << "\n"; return 0; } - int rollback(trrep::transaction_context&) + int rollback(trrep::transaction_context& transaction_context) { + // std::cerr << "rollback: " << transaction_context.id().get() << "\n"; + transaction_context.before_rollback(); + transaction_context.after_rollback(); return 0; } private: + + void run_one_transaction() + { + trrep::transaction_context trx(*this); + trx.start_transaction(server_.next_transaction_id()); + std::ostringstream os; + os << trx.id().get(); + trrep::key key; + key.append_key_part("dbms", 4); + wsrep_conn_id_t client_id(id().get()); + key.append_key_part(&client_id, sizeof(client_id)); + wsrep_trx_id_t trx_id(trx.id().get()); + + int err(0); + key.append_key_part(&trx_id, sizeof(trx_id)); + err = trx.append_key(key); + std::cout << "append_key: " << err << "\n"; + err = err || trx.append_data(trrep::data(os.str().c_str(), os.str().size())); + std::cout << "append_data: " << err << "\n"; + if (do_2pc()) + { + err = err || trx.before_prepare(); + std::cout << "before_prepare: " << err << "\n"; + err = err || trx.after_prepare(); + std::cout << "after_prepare: " << err << "\n"; + } + err = err || trx.before_commit(); + std::cout << "before_commit: " << err << "\n"; + err = err || trx.ordered_commit(); + std::cout << "ordered_commit: " << err << "\n"; + err = err || trx.after_commit(); + std::cout << "after_commit: " << err << "\n"; + trx.after_statement(); + + } + trrep::default_mutex mutex_; + dbms_server& server_; + const size_t n_transactions_; + }; // Server methods void dbms_server::applier_thread() { - dbms_client applier(*this, ++last_client_id_, - trrep::client_context::m_applier); + trrep::client_id client_id(last_client_id_.fetch_add(1) + 1); + dbms_client applier(*this, client_id, + trrep::client_context::m_applier, 0); wsrep_status_t ret(provider().run_applier(&applier)); std::cerr << "Applier thread exited with error code " << ret << "\n"; } @@ -226,22 +328,51 @@ void dbms_server::applier_thread() trrep::client_context* dbms_server::local_client_context() { std::ostringstream id_os; - size_t client_id(last_client_id_.fetch_add(1) + 1); - trrep::unique_lock lock(mutex_); - return clients_.insert(std::make_pair(client_id, - std::make_unique( - *this, client_id, - trrep::client_context::m_replicating))).first->second.get(); + size_t client_id(++last_client_id_); + return new dbms_client(*this, client_id, trrep::client_context::m_replicating, 0); } +void dbms_server::start_clients() +{ + size_t n_clients(simulator_.params().n_clients); + for (size_t i(0); i < n_clients; ++i) + { + start_client(i + 1); + } +} + +void dbms_server::stop_clients() +{ + for (auto& i : client_threads_) + { + i.join(); + } +} + +void dbms_server::client_thread(const std::shared_ptr& client) +{ + client->start(); +} + +void dbms_server::start_client(size_t id) +{ + auto client(std::make_shared( + *this, id, + trrep::client_context::m_replicating, + simulator_.params().n_transactions)); + client_threads_.push_back( + std::thread(&dbms_server::client_thread, this, client)); +} + + void dbms_simulator::start() { - std::cout << "Provider: " << wsrep_provider_ << "\n"; + std::cout << "Provider: " << params_.wsrep_provider << "\n"; std::string cluster_address(build_cluster_address()); std::cout << "Cluster address: " << cluster_address << "\n"; - for (size_t i(0); i < n_servers_; ++i) + for (size_t i(0); i < params_.n_servers; ++i) { std::ostringstream name_os; name_os << (i + 1); @@ -258,14 +389,23 @@ void dbms_simulator::start() boost::filesystem::create_directory(dir); dbms_server& server(*it.first->second); - std::string server_options(wsrep_provider_options_); + std::string server_options(params_.wsrep_provider_options); server_options += "; base_port=" + server_port(i); - server.load_provider(wsrep_provider_, server_options); + server.load_provider(params_.wsrep_provider, server_options); server.provider().connect("sim_cluster", cluster_address, "", i == 0); server.start_applier(); server.wait_until_connected(); + server.wait_until_state(dbms_server::s_synced); } + + // Start client threads + + for (auto& i : servers_) + { + i.second->start_clients(); + } + } void dbms_simulator::stop() @@ -273,6 +413,7 @@ void dbms_simulator::stop() for (auto& i : servers_) { dbms_server& server(*i.second); + server.stop_clients(); server.provider().disconnect(); server.wait_until_disconnected(); server.stop_applier(); @@ -303,13 +444,13 @@ void dbms_simulator::donate_sst(dbms_server& server, std::string dbms_simulator::build_cluster_address() const { std::string ret("gcomm://"); - for (size_t i(0); i < n_servers_; ++i) + for (size_t i(0); i < params_.n_servers; ++i) { std::ostringstream sa_os; sa_os << "127.0.0.1:"; sa_os << server_port(i); ret += sa_os.str(); - if (i < n_servers_ - 1) ret += ","; + if (i < params_.n_servers - 1) ret += ","; } return ret; } @@ -320,23 +461,22 @@ int main(int argc, char** argv) { try { - size_t n_servers; - size_t n_clients; - std::string wsrep_provider; - std::string wsrep_provider_options; + dbms_simulator_params params; po::options_description desc("Allowed options"); desc.add_options() ("help", "produce help message") ("wsrep-provider", - po::value(&wsrep_provider)->required(), + po::value(¶ms.wsrep_provider)->required(), "wsrep provider to load") ("wsrep-provider-options", - po::value(&wsrep_provider_options), + po::value(¶ms.wsrep_provider_options), "wsrep provider options") - ("servers", po::value(&n_servers)->required(), + ("servers", po::value(¶ms.n_servers)->required(), "number of servers to start") - ("clients", po::value(&n_clients)->required(), - "number of clients to start per server"); + ("clients", po::value(¶ms.n_clients)->required(), + "number of clients to start per server") + ("transactions", po::value(¶ms.n_transactions), + "number of transactions run by a client"); po::variables_map vm; po::store(po::parse_command_line(argc, argv, desc), vm); po::notify(vm); @@ -347,7 +487,7 @@ int main(int argc, char** argv) return 1; } - dbms_simulator sim(n_servers, n_clients, wsrep_provider, wsrep_provider_options); + dbms_simulator sim(params); sim.start(); std::this_thread::sleep_for(std::chrono::seconds(5)); sim.stop(); diff --git a/src/mock_client_context.cpp b/src/mock_client_context.cpp index bfea2c9..0201702 100644 --- a/src/mock_client_context.cpp +++ b/src/mock_client_context.cpp @@ -7,7 +7,7 @@ int trrep::mock_client_context::apply( - trrep::transaction_context& transaction_context, + trrep::transaction_context& transaction_context __attribute__((unused)), const trrep::data& data __attribute__((unused))) { diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 12f87f9..76ed5f3 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -96,7 +96,7 @@ int trrep::transaction_context::before_prepare() int ret(0); trrep::unique_lock lock(client_context_.mutex()); - + debug_log_state(); assert(state() == s_executing || state() == s_must_abort); if (state() == s_must_abort) @@ -136,6 +136,7 @@ int trrep::transaction_context::before_prepare() } assert(state() == s_preparing); + debug_log_state(); return ret; } @@ -143,7 +144,7 @@ int trrep::transaction_context::after_prepare() { int ret(1); trrep::unique_lock lock(client_context_.mutex()); - + debug_log_state(); assert(state() == s_preparing || state() == s_must_abort); if (state() == s_must_abort) { @@ -175,6 +176,7 @@ int trrep::transaction_context::after_prepare() ret = 0; break; } + debug_log_state(); return ret; } @@ -183,6 +185,7 @@ int trrep::transaction_context::before_commit() int ret(1); trrep::unique_lock lock(client_context_.mutex()); + debug_log_state(); assert(state() == s_executing || state() == s_committing || state() == s_must_abort); @@ -253,6 +256,7 @@ int trrep::transaction_context::before_commit() } break; } + debug_log_state(); return ret; } @@ -261,12 +265,14 @@ int trrep::transaction_context::ordered_commit() int ret(1); trrep::unique_lock lock(client_context_.mutex()); + debug_log_state(); assert(state() == s_committing); assert(ordered()); ret = provider_.commit_order_leave(&ws_handle_); // Should always succeed assert(ret == 0); state(lock, s_ordered_commit); + debug_log_state(); return ret; } @@ -275,7 +281,7 @@ int trrep::transaction_context::after_commit() int ret(0); trrep::unique_lock lock(client_context_.mutex()); - + debug_log_state(); assert(state() == s_ordered_commit); switch (client_context_.mode()) @@ -295,13 +301,14 @@ int trrep::transaction_context::after_commit() } assert(ret == 0); state(lock, s_committed); + debug_log_state(); return ret; } int trrep::transaction_context::before_rollback() { trrep::unique_lock lock(client_context_.mutex()); - + debug_log_state(); assert(state() == s_executing || state() == s_must_abort || state() == s_cert_failed || @@ -347,13 +354,14 @@ int trrep::transaction_context::before_rollback() assert(0); break; } + debug_log_state(); return 0; } int trrep::transaction_context::after_rollback() { trrep::unique_lock lock(client_context_.mutex()); - + debug_log_state(); assert(state() == s_aborting || state() == s_must_replay); @@ -368,6 +376,7 @@ int trrep::transaction_context::after_rollback() // during actual rollback. If the transaction has been ordered, // releasing the commit ordering critical section should be // also postponed until all resources have been released. + debug_log_state(); return 0; } @@ -375,11 +384,12 @@ int trrep::transaction_context::after_statement() { int ret(0); trrep::unique_lock lock(client_context_.mutex()); - + debug_log_state(); assert(state() == s_executing || state() == s_committed || state() == s_aborted || state() == s_must_abort || + state() == s_cert_failed || state() == s_must_replay); switch (state()) @@ -394,7 +404,10 @@ int trrep::transaction_context::after_statement() } break; case s_must_abort: + case s_cert_failed: + lock.unlock(); ret = client_context_.rollback(*this); + lock.lock(); break; case s_aborted: break; @@ -425,6 +438,7 @@ int trrep::transaction_context::after_statement() cleanup(); } + debug_log_state(); return ret; } @@ -587,6 +601,7 @@ int trrep::transaction_context::certify_commit( assert(state() == s_certifying || state() == s_must_abort); client_context_.debug_sync("wsrep_after_replication"); + std::cout << "seqno: " << trx_meta_.gtid.seqno << "\n"; int ret(1); switch (cert_ret) { @@ -690,3 +705,10 @@ void trrep::transaction_context::cleanup() certified_ = false; pa_unsafe_ = false; } + +void trrep::transaction_context::debug_log_state() const +{ + std::cout << "client: " << client_context_.id().get() + << " trx: " << id_.get() + << " state: " << state_ << "\n"; +} diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp index 0735513..4786232 100644 --- a/src/transaction_context.hpp +++ b/src/transaction_context.hpp @@ -133,6 +133,7 @@ namespace trrep void remove_fragments(); void clear_fragments(); void cleanup(); + void debug_log_state() const; trrep::provider& provider_; trrep::client_context& client_context_; trrep::transaction_id id_; diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index bfee5be..764eec9 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -68,6 +68,44 @@ wsrep_status_t trrep::wsrep_provider_v26::run_applier(void *applier_ctx) return wsrep_->recv(wsrep_, applier_ctx); } +int trrep::wsrep_provider_v26::append_key(wsrep_ws_handle_t* wsh, + const wsrep_key_t* key) +{ + return (wsrep_->append_key(wsrep_, wsh, key, 1, WSREP_KEY_EXCLUSIVE, true) + != WSREP_OK); +} + +int trrep::wsrep_provider_v26::append_data(wsrep_ws_handle_t* wsh, + const wsrep_buf_t* data) +{ + return (wsrep_->append_data(wsrep_, wsh, data, 1, WSREP_DATA_ORDERED, true) + != WSREP_OK); +} + +wsrep_status_t trrep::wsrep_provider_v26::certify(wsrep_conn_id_t conn_id, + wsrep_ws_handle_t* wsh, + uint32_t flags, + wsrep_trx_meta_t* meta) +{ + return wsrep_->certify(wsrep_, conn_id, wsh, flags, meta); +} + +wsrep_status_t trrep::wsrep_provider_v26::commit_order_enter( + wsrep_ws_handle_t* wsh) +{ + return wsrep_->commit_order_enter(wsrep_, wsh); +} + +int trrep::wsrep_provider_v26::commit_order_leave(wsrep_ws_handle_t* wsh) +{ + return (wsrep_->commit_order_leave(wsrep_, wsh, 0) != WSREP_OK); +} + +int trrep::wsrep_provider_v26::release(wsrep_ws_handle_t* wsh) +{ + return (wsrep_->release(wsrep_, wsh) != WSREP_OK); +} + int trrep::wsrep_provider_v26::sst_sent(const wsrep_gtid_t& gtid, int err) { if (wsrep_->sst_sent(wsrep_, >id, err) != WSREP_OK) diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index c090d3f..d4433a5 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -23,19 +23,19 @@ namespace trrep wsrep_status_t run_applier(void*); int start_transaction(wsrep_ws_handle_t*) { return 0; } - int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } - int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } + int append_key(wsrep_ws_handle_t*, const wsrep_key_t*); + int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*); wsrep_status_t certify(wsrep_conn_id_t, wsrep_ws_handle_t*, uint32_t, - wsrep_trx_meta_t*) { return WSREP_OK; } + wsrep_trx_meta_t*); wsrep_status_t bf_abort(wsrep_seqno_t, wsrep_trx_id_t, wsrep_seqno_t*) { return WSREP_OK; } int rollback(const wsrep_trx_id_t) { return 0; } - wsrep_status commit_order_enter(wsrep_ws_handle_t*) { return WSREP_OK; } - int commit_order_leave(wsrep_ws_handle_t*) { return 0; } - int release(wsrep_ws_handle_t*) { return 0; } + wsrep_status commit_order_enter(wsrep_ws_handle_t*); + int commit_order_leave(wsrep_ws_handle_t*); + int release(wsrep_ws_handle_t*); int sst_sent(const wsrep_gtid_t&,int); int sst_received(const wsrep_gtid_t& gtid, int); private: