diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index 02c5ee4..480556c 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -135,7 +135,7 @@ void db::client::run_one_transaction() err = err || client_state_.after_prepare(); } err = err || client_state_.before_commit(); - if (err == 0) se_trx_.commit(); + if (err == 0) se_trx_.commit(transaction.ws_meta().gtid()); err = err || client_state_.ordered_commit(); err = err || client_state_.after_commit(); if (err) diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 57a982e..bb7c4c5 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -66,7 +66,7 @@ int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, { client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true); int ret(client_.client_state_.before_commit()); - if (ret == 0) client_.se_trx_.commit(); + if (ret == 0) client_.se_trx_.commit(ws_meta.gtid()); ret = ret || client_.client_state_.ordered_commit(); ret = ret || client_.client_state_.after_commit(); return ret; diff --git a/dbsim/db_params.cpp b/dbsim/db_params.cpp index dbe11b8..7493151 100644 --- a/dbsim/db_params.cpp +++ b/dbsim/db_params.cpp @@ -73,15 +73,23 @@ db::params db::parse_args(int argc, char** argv) "debug logging level: 0 - none, 1 - verbose") ("fast-exit", po::value(¶ms.fast_exit), "exit from simulation without graceful shutdown"); - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); - if (vm.count("help")) + try + { + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + if (vm.count("help")) + { + std::cerr << desc << "\n"; + exit(0); + } + + validate_params(params); + } + catch (...) { std::cerr << desc << "\n"; - exit(0); + exit(1); } - - validate_params(params); return params; } diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index b919299..10fdf8a 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -26,7 +26,6 @@ db::server_service::server_service(db::server& server) : server_(server) - , logged_view_() { } wsrep::storage_service* db::server_service::storage_service( @@ -110,7 +109,7 @@ void db::server_service::log_view(wsrep::high_priority_service*, const wsrep::view& v) { wsrep::log_info() << "View:\n" << v; - logged_view_ = v; + server_.storage_engine().store_view(v); } void db::server_service::recover_streaming_appliers( @@ -126,22 +125,23 @@ void db::server_service::recover_streaming_appliers( wsrep::view db::server_service::get_view(wsrep::client_service&, const wsrep::id& own_id) { - int const my_idx(logged_view_.member_index(own_id)); + wsrep::view stored_view(server_.storage_engine().get_view()); + int const my_idx(stored_view.member_index(own_id)); wsrep::view my_view( - logged_view_.state_id(), - logged_view_.view_seqno(), - logged_view_.status(), - logged_view_.capabilities(), + stored_view.state_id(), + stored_view.view_seqno(), + stored_view.status(), + stored_view.capabilities(), my_idx, - logged_view_.protocol_version(), - logged_view_.members() + stored_view.protocol_version(), + stored_view.members() ); return my_view; } wsrep::gtid db::server_service::get_position(wsrep::client_service&) { - throw wsrep::not_implemented_error(); + return server_.storage_engine().get_position(); } void db::server_service::log_state_change( diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index c979be7..d68c367 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -59,7 +59,6 @@ namespace db void debug_sync(const char*) override; private: db::server& server_; - wsrep::view logged_view_; }; } diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp index 594b847..003616c 100644 --- a/dbsim/db_simulator.cpp +++ b/dbsim/db_simulator.cpp @@ -62,6 +62,9 @@ void db::simulator::sst(db::server& server, wsrep::log_info() << "SST " << server.server_state().name() << " -> " << request; + i->second->storage_engine().store_position(gtid); + i->second->storage_engine().store_view( + server.storage_engine().get_view()); } db::client dummy(*(i->second), wsrep::client_id(-1), @@ -146,11 +149,16 @@ void db::simulator::start() { throw wsrep::runtime_error("Failed to connect"); } + wsrep::log_debug() << "main: Starting applier"; server.start_applier(); + wsrep::log_debug() << "main: Waiting initializing state"; server.server_state().wait_until_state(wsrep::server_state::s_initializing); + wsrep::log_debug() << "main: Calling initialized"; server.server_state().initialized(); + wsrep::log_debug() << "main: Waiting for synced state"; server.server_state().wait_until_state( wsrep::server_state::s_synced); + wsrep::log_debug() << "main: Server synced"; } // Start client threads diff --git a/dbsim/db_storage_engine.cpp b/dbsim/db_storage_engine.cpp index 24fca2c..2be0fe5 100644 --- a/dbsim/db_storage_engine.cpp +++ b/dbsim/db_storage_engine.cpp @@ -37,12 +37,13 @@ void db::storage_engine::transaction::apply( se_.bf_abort_some(transaction); } -void db::storage_engine::transaction::commit() +void db::storage_engine::transaction::commit(const wsrep::gtid& gtid) { if (cc_) { wsrep::unique_lock lock(se_.mutex_); se_.transactions_.erase(cc_); + se_.store_position(gtid); } cc_ = nullptr; } @@ -80,3 +81,38 @@ void db::storage_engine::bf_abort_some(const wsrep::transaction& txc) } } } + +void db::storage_engine::store_position(const wsrep::gtid& gtid) +{ + validate_position(gtid); + position_ = gtid; +} + +wsrep::gtid db::storage_engine::get_position() const +{ + return position_; +} + +void db::storage_engine::store_view(const wsrep::view& view) +{ + view_ = view; +} + +wsrep::view db::storage_engine::get_view() const +{ + return view_; +} + +void db::storage_engine::validate_position(const wsrep::gtid& gtid) const +{ + using std::rel_ops::operator<=; + if (position_.id() == gtid.id() && gtid.seqno() <= position_.seqno()) + { + std::ostringstream os; + os << "Invalid position submitted, position seqno " + << position_.seqno() + << " is greater than submitted seqno " + << gtid.seqno(); + throw wsrep::runtime_error(os.str()); + } +} diff --git a/dbsim/db_storage_engine.hpp b/dbsim/db_storage_engine.hpp index 30e5d75..ba64925 100644 --- a/dbsim/db_storage_engine.hpp +++ b/dbsim/db_storage_engine.hpp @@ -39,6 +39,8 @@ namespace db , transactions_() , alg_freq_(params.alg_freq) , bf_aborts_() + , position_() + , view_() { } class transaction @@ -55,7 +57,7 @@ namespace db bool active() const { return cc_ != nullptr; } void start(client* cc); void apply(const wsrep::transaction&); - void commit(); + void commit(const wsrep::gtid&); void rollback(); db::client* client() { return cc_; } transaction(const transaction&) = delete; @@ -66,11 +68,18 @@ namespace db }; void bf_abort_some(const wsrep::transaction& tc); long long bf_aborts() const { return bf_aborts_; } + void store_position(const wsrep::gtid& gtid); + wsrep::gtid get_position() const; + void store_view(const wsrep::view& view); + wsrep::view get_view() const; private: + void validate_position(const wsrep::gtid& gtid) const; wsrep::default_mutex mutex_; std::unordered_set transactions_; size_t alg_freq_; std::atomic bf_aborts_; + wsrep::gtid position_; + wsrep::view view_; }; }