diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a9dbcdc..d0d7b5b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ add_library(trrep + logger.cpp provider.cpp client_context.cpp server_context.cpp @@ -15,6 +16,7 @@ target_link_libraries(trrep wsrep pthread dl) add_executable(trrep_test mock_client_context.cpp mock_utils.cpp + client_context_test.cpp server_context_test.cpp transaction_context_test.cpp trrep_test.cpp diff --git a/src/client_context.cpp b/src/client_context.cpp index ed3c393..de7f540 100644 --- a/src/client_context.cpp +++ b/src/client_context.cpp @@ -33,30 +33,29 @@ int trrep::client_context::before_command() } } state(lock, s_exec); - if (transaction_.state() == trrep::transaction_context::s_must_abort) + if (transaction_.state() == trrep::transaction_context::s_must_abort || + transaction_.state() == trrep::transaction_context::s_aborted) { return 1; } return 0; } -int trrep::client_context::after_command() +void trrep::client_context::after_command() { - int ret(0); trrep::unique_lock lock(mutex_); if (transaction_.state() == trrep::transaction_context::s_must_abort) { lock.unlock(); rollback(transaction_); lock.lock(); - ret = 1; } state(lock, s_idle); - return ret; } int trrep::client_context::before_statement() { + trrep::unique_lock lock(mutex_); #if 0 /*! * \todo It might be beneficial to implement timed wait for @@ -68,18 +67,25 @@ int trrep::client_context::before_statement() return 1; } #endif // 0 + + if (transaction_.state() == trrep::transaction_context::s_must_abort) + { + lock.unlock(); + rollback(transaction_); + lock.lock(); + return 1; + } return 0; } -int trrep::client_context::after_statement() +void trrep::client_context::after_statement() { #if 0 /*! * \todo Check for replay state, do rollback if requested. */ #endif // 0 - transaction_.after_statement(); - return 0; + (void)transaction_.after_statement(); } // Private diff --git a/src/client_context.hpp b/src/client_context.hpp index 56d17fd..5e0c61a 100644 --- a/src/client_context.hpp +++ b/src/client_context.hpp @@ -148,7 +148,7 @@ namespace trrep * If overridden, the implementation should call base * class metods after any implementation specifict operations. */ - virtual int after_command(); + virtual void after_command(); /*! * Before statement execution operations. @@ -174,7 +174,7 @@ namespace trrep * If overridden by the implementation, base class method * should be called after any implementation specific operations. */ - virtual int after_statement(); + virtual void after_statement(); int start_transaction(const trrep::transaction_id& id) { @@ -314,6 +314,7 @@ namespace trrep trrep::transaction_context&, const trrep::data&); friend class client_context_switch; + friend class client_applier_mode; friend class transaction_context; /*! @@ -383,8 +384,7 @@ namespace trrep /*! * Replay the transaction. */ - virtual int replay(trrep::unique_lock&, - trrep::transaction_context& tc) = 0; + virtual int replay(trrep::transaction_context& tc) = 0; /*! @@ -465,6 +465,24 @@ namespace trrep client_context& orig_context_; client_context& current_context_; }; + + class client_applier_mode + { + public: + client_applier_mode(trrep::client_context& client) + : client_(client) + , orig_mode_(client.mode_) + { + client_.mode_ = trrep::client_context::m_applier; + } + ~client_applier_mode() + { + client_.mode_ = orig_mode_; + } + private: + trrep::client_context& client_; + enum trrep::client_context::mode orig_mode_; + }; } #endif // TRREP_CLIENT_CONTEXT_HPP diff --git a/src/client_context_test.cpp b/src/client_context_test.cpp new file mode 100644 index 0000000..5a5dab1 --- /dev/null +++ b/src/client_context_test.cpp @@ -0,0 +1,29 @@ + +// +// Copyright (C) 2018 Codership Oy +// + +#include "mock_client_context.hpp" +#include "mock_server_context.hpp" +#include "mock_utils.hpp" + +#include + +BOOST_AUTO_TEST_CASE(client_context_test_error_codes) +{ + trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_async); + trrep::mock_client_context cc(sc, + trrep::client_id(1), + trrep::client_context::m_applier, + false); + + trrep::transaction_context& tc(cc.transaction()); + BOOST_REQUIRE(tc.active() == false); + tc.start_transaction(1); + trrep_mock::bf_abort_unordered(cc, tc); + + BOOST_REQUIRE(cc.before_command()); + BOOST_REQUIRE(cc.before_statement()); + cc.after_statement(); + cc.after_command(); +} diff --git a/src/dbms_simulator.cpp b/src/dbms_simulator.cpp index 0e65fbb..10297d2 100644 --- a/src/dbms_simulator.cpp +++ b/src/dbms_simulator.cpp @@ -16,17 +16,19 @@ #include "provider.hpp" #include "condition_variable.hpp" #include "view.hpp" +#include "logger.hpp" #include #include #include -#include #include #include #include #include -// #include +#include +#include +#include class dbms_server; @@ -46,6 +48,83 @@ struct dbms_simulator_params { } }; +class dbms_storage_engine +{ +public: + dbms_storage_engine() + : mutex_() + , transactions_() + , alg_freq_(1) + , bf_aborts_() + { } + + class transaction + { + public: + transaction(dbms_storage_engine& se) + : se_(se) + , txc_() + { + } + void start(trrep::transaction_context* txc) + { + trrep::unique_lock lock(se_.mutex_); + if (se_.transactions_.insert(txc).second == false) + { + ::abort(); + } + txc_ = txc; + + } + + ~transaction() + { + if (txc_) + { + trrep::unique_lock lock(se_.mutex_); + se_.transactions_.erase(txc_); + } + } + + transaction(const transaction&) = delete; + transaction& operator=(const transaction&) = delete; + + private: + dbms_storage_engine& se_; + trrep::transaction_context* txc_; + }; + + void bf_abort_some(const trrep::transaction_context& txc) + { + trrep::unique_lock lock(mutex_); + if ((std::rand() % alg_freq_) == 0) + { + if (transactions_.empty() == false) + { + auto* victim_txc(*transactions_.begin()); + trrep::unique_lock victim_txc_lock( + victim_txc->mutex()); + lock.unlock(); + if (victim_txc->bf_abort(victim_txc_lock, txc)) + { + trrep::log() << "BF aborted " << victim_txc->id().get(); + ++bf_aborts_; + } + } + } + } + + long long bf_aborts() + { + return bf_aborts_; + } +private: + trrep::default_mutex mutex_; + std::unordered_set transactions_; + size_t alg_freq_; + std::atomic bf_aborts_; +}; + class dbms_simulator { public: @@ -65,21 +144,7 @@ public: const std::string& req, const wsrep_gtid_t& gtid, bool); const dbms_simulator_params& params() const { return params_; } - std::string stats() const - { - size_t transactions(params_.n_servers * params_.n_clients - * params_.n_transactions); - auto duration(std::chrono::duration( - clients_stop_ - clients_start_).count()); - std::ostringstream os; - os << "Number of transactions: " << transactions - << "\n" - << "Seconds: " << duration - << " \n" - << "Transactions per second: " << transactions/duration - << "\n"; - return os.str(); - } + std::string stats() const; private: std::string server_port(size_t i) const { @@ -93,7 +158,7 @@ private: const dbms_simulator_params& params_; std::map> servers_; std::chrono::time_point clients_start_; - std::chrono::time_point clients_stop_; + std::chrono::time_point clients_stop_; }; class dbms_client; @@ -110,6 +175,7 @@ public: name, id, address, name + "_data", trrep::server_context::rm_async) , simulator_(simulator) + , storage_engine_() , mutex_() , cond_() , last_client_id_(0) @@ -162,6 +228,15 @@ public: return (last_transaction_id_.fetch_add(1) + 1); } + dbms_storage_engine& storage_engine() { return storage_engine_; } + + int apply_to_storage_engine(const trrep::transaction_context& txc, + const trrep::data&) + { + storage_engine_.bf_abort_some(txc); + return 0; + } + void start_clients(); void stop_clients(); void client_thread(const std::shared_ptr& client); @@ -170,6 +245,7 @@ private: void start_client(size_t id); dbms_simulator& simulator_; + dbms_storage_engine storage_engine_; trrep::default_mutex mutex_; trrep::default_condition_variable cond_; std::atomic last_client_id_; @@ -192,7 +268,10 @@ public: , n_transactions_(n_transactions) { } - ~dbms_client() { } + ~dbms_client() + { + trrep::unique_lock lock(mutex_); + } void start() { @@ -205,10 +284,9 @@ public: private: bool do_2pc() const override { return false; } - int apply(trrep::transaction_context&, const trrep::data&) override + int apply(trrep::transaction_context& txc, const trrep::data& data) override { - // std::cerr << "applying" << "\n"; - return 0; + return server_.apply_to_storage_engine(txc, data); } int commit(trrep::transaction_context& transaction_context) override { @@ -216,26 +294,24 @@ private: ret = transaction_context.before_commit(); ret = ret || transaction_context.ordered_commit(); ret = ret || transaction_context.after_commit(); - // std::cerr << "commit" << "\n"; return 0; } int rollback(trrep::transaction_context& transaction_context) override { - std::cerr << "rollback: " << transaction_context.id().get() - << "state: " << trrep::to_string(transaction_context.state()) - << "\n"; + trrep::log() << "rollback: " << transaction_context.id().get() + << "state: " + << trrep::to_string(transaction_context.state()); transaction_context.before_rollback(); transaction_context.after_rollback(); return 0; } void will_replay(trrep::transaction_context&) override { } - int replay(trrep::unique_lock& lock, - trrep::transaction_context& tc) override + int replay(trrep::transaction_context& txc) override { - tc.state(lock, trrep::transaction_context::s_replaying); - tc.state(lock, trrep::transaction_context::s_committed); - return 0; + trrep::log() << "replay: " << txc.id().get(); + trrep::client_applier_mode applier_mode(*this); + return provider().replay(&txc.ws_handle(), this); } void wait_for_replayers(trrep::unique_lock&) const override { } @@ -248,59 +324,75 @@ private: void run_one_transaction() { - before_command(); - before_statement(); - start_transaction(server_.next_transaction_id()); - after_statement(); + int err(before_command()); + dbms_storage_engine::transaction se_trx(server_.storage_engine()); + if (err == 0) + { + err = before_statement(); + if (err == 0) + { + err = start_transaction(server_.next_transaction_id()); + se_trx.start(&transaction()); + } + after_statement(); + } after_command(); - - int err(0); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); for (int i(0); i < 1 && err == 0; ++i) { std::ostringstream os; - before_command(); - before_statement(); - int data(std::rand() % 10000000); - os << data; - trrep::key key; - key.append_key_part("dbms", 4); - wsrep_conn_id_t client_id(id().get()); - key.append_key_part(&client_id, sizeof(client_id)); - key.append_key_part(&data, sizeof(data)); - err = append_key(key); - err = err || append_data(trrep::data(os.str().c_str(), os.str().size())); - after_statement(); + err = before_command(); + if (err == 0) + { + err = before_statement(); + if (err == 0) + { + int data(std::rand() % 10000000); + os << data; + trrep::key key; + key.append_key_part("dbms", 4); + wsrep_conn_id_t client_id(id().get()); + key.append_key_part(&client_id, sizeof(client_id)); + key.append_key_part(&data, sizeof(data)); + err = append_key(key); + err = append_data(trrep::data(os.str().c_str(), os.str().size())); + } + after_statement(); + } after_command(); } - - before_command(); - before_statement(); - // std::cout << "append_data: " << err << "\n"; - if (err == 0 && do_2pc()) + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (err == 0) { - err = err || before_prepare(); - // std::cout << "before_prepare: " << err << "\n"; - err = err || after_prepare(); - // std::cout << "after_prepare: " << err << "\n"; + err = before_command(); + if (err == 0) + { + err = before_statement(); + + if (err == 0 && do_2pc()) + { + err = err || before_prepare(); + err = err || after_prepare(); + } + err = err || before_commit(); + err = err || ordered_commit(); + err = err || after_commit(); + after_statement(); + } + after_command(); } - err = err || before_commit(); - // std::cout << "before_commit: " << err << "\n"; - err = err || ordered_commit(); - // std::cout << "ordered_commit: " << err << "\n"; - err = err || after_commit(); - // std::cout << "after_commit: " << err << "\n"; - after_statement(); - after_command(); + assert(transaction().state() == trrep::transaction_context::s_committed + || + transaction().state() == trrep::transaction_context::s_aborted); } void report_progress(size_t i) const { if ((i % 100) == 0) { - std::cout << "client: " << id().get() - << " transactions: " << i - << " " << 100*double(i)/n_transactions_ << "%" - << std::endl; + trrep::log() << "client: " << id().get() + << " transactions: " << i + << " " << 100*double(i)/n_transactions_ << "%"; } } trrep::default_mutex mutex_; @@ -316,7 +408,7 @@ void dbms_server::applier_thread() dbms_client applier(*this, client_id, trrep::client_context::m_applier, 0); wsrep_status_t ret(provider().run_applier(&applier)); - std::cout << "Applier thread exited with error code " << ret << "\n"; + trrep::log() << "Applier thread exited with error code " << ret; } trrep::client_context* dbms_server::local_client_context() @@ -362,10 +454,10 @@ void dbms_server::start_client(size_t id) void dbms_simulator::start() { - std::cout << "Provider: " << params_.wsrep_provider << "\n"; + trrep::log() << "Provider: " << params_.wsrep_provider; std::string cluster_address(build_cluster_address()); - std::cout << "Cluster address: " << cluster_address << "\n"; + trrep::log() << "Cluster address: " << cluster_address; for (size_t i(0); i < params_.n_servers; ++i) { std::ostringstream name_os; @@ -406,7 +498,7 @@ void dbms_simulator::start() } // Start client threads - std::cout << "####################### Starting client load" << "\n"; + trrep::log() << "####################### Starting client load"; clients_start_ = std::chrono::steady_clock::now(); for (auto& i : servers_) { @@ -423,20 +515,20 @@ void dbms_simulator::stop() server.stop_clients(); } clients_stop_ = std::chrono::steady_clock::now(); - std::cout << "######## Stats ############\n"; - std::cout << stats(); - std::cout << "######## Stats ############\n"; + trrep::log() << "######## Stats ############"; + trrep::log() << stats(); + trrep::log() << "######## Stats ############"; // REMOVEME: Temporary shortcut // exit(0); for (auto& i : servers_) { dbms_server& server(*i.second); - std::cout << "Status for server: " << server.id() << "\n"; + trrep::log() << "Status for server: " << server.id(); auto status(server.provider().status()); for_each(status.begin(), status.end(), [](const trrep::provider::status_variable& sv) { - std::cout << sv.name() << " = " << sv.value() << "\n"; + trrep::log() << sv.name() << " = " << sv.value(); }); server.disconnect(); @@ -445,6 +537,29 @@ void dbms_simulator::stop() } } +std::string dbms_simulator::stats() const +{ + size_t transactions(params_.n_servers * params_.n_clients + * params_.n_transactions); + auto duration(std::chrono::duration( + clients_stop_ - clients_start_).count()); + long long bf_aborts(0); + for (const auto& s : servers_) + { + bf_aborts += s.second->storage_engine().bf_aborts(); + } + std::ostringstream os; + os << "Number of transactions: " << transactions + << "\n" + << "Seconds: " << duration + << " \n" + << "Transactions per second: " << transactions/duration + << "\n" + << "BF aborts: " + << bf_aborts; + return os.str(); +} + void dbms_simulator::donate_sst(dbms_server& server, const std::string& req, const wsrep_gtid_t& gtid, @@ -461,7 +576,7 @@ void dbms_simulator::donate_sst(dbms_server& server, } if (bypass == false) { - std::cout << "SST " << server.id() << " -> " << id << "\n"; + trrep::log() << "SST " << server.id() << " -> " << id; } i->second->sst_received(gtid); server.sst_sent(gtid); @@ -514,7 +629,7 @@ int main(int argc, char** argv) if (vm.count("help")) { - std::cout << desc << "\n"; + std::cerr << desc << "\n"; return 1; } @@ -526,17 +641,17 @@ int main(int argc, char** argv) } catch (const std::exception& e) { - std::cerr << "Caught exception: " << e.what(); + trrep::log() << "Caught exception: " << e.what(); } stats = sim.stats(); } catch (const std::exception& e) { - std::cerr << e.what() << "\n"; + trrep::log() << e.what(); return 1; } - std::cout << "Stats:\n" << stats << "\n"; + trrep::log() << "Stats:\n" << stats << "\n"; return 0; } diff --git a/src/lock.hpp b/src/lock.hpp index 2084cb7..d37ebdb 100644 --- a/src/lock.hpp +++ b/src/lock.hpp @@ -24,8 +24,10 @@ namespace trrep } ~unique_lock() { - assert(locked_); - mutex_.unlock(); + if (locked_) + { + unlock(); + } } void lock() diff --git a/src/logger.cpp b/src/logger.cpp new file mode 100644 index 0000000..c53da18 --- /dev/null +++ b/src/logger.cpp @@ -0,0 +1,8 @@ + +#include "logger.hpp" + +#include + +std::ostream& trrep::log::os_ = std::cout; +static trrep::default_mutex log_mutex_; +trrep::mutex& trrep::log::mutex_ = log_mutex_; diff --git a/src/logger.hpp b/src/logger.hpp new file mode 100644 index 0000000..11882ae --- /dev/null +++ b/src/logger.hpp @@ -0,0 +1,39 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_LOGGER_HPP +#define TRREP_LOGGER_HPP + +#include "mutex.hpp" +#include "lock.hpp" + +#include +#include + +namespace trrep +{ + class log + { + public: + log() + : oss_() + { } + ~log() + { + trrep::unique_lock lock(mutex_); + os_ << oss_.str() << "\n"; + } + template + std::ostream& operator<<(const T& val) + { + return (oss_ << val); + } + private: + std::ostringstream oss_; + static trrep::mutex& mutex_; + static std::ostream& os_; + }; +} + +#endif // TRREP_LOGGER_HPP diff --git a/src/mock_client_context.hpp b/src/mock_client_context.hpp index 385a2ec..0805df4 100644 --- a/src/mock_client_context.hpp +++ b/src/mock_client_context.hpp @@ -37,10 +37,11 @@ namespace trrep int rollback(trrep::transaction_context&); bool do_2pc() const { return do_2pc_; } void will_replay(trrep::transaction_context&) TRREP_OVERRIDE { } - int replay(trrep::unique_lock& lock, - trrep::transaction_context& tc) TRREP_OVERRIDE + int replay(trrep::transaction_context& tc) TRREP_OVERRIDE { - tc.state(lock, trrep::transaction_context::s_replaying); + trrep::unique_lock lock(mutex_); + tc.state(lock, trrep::transaction_context::s_committing); + tc.state(lock, trrep::transaction_context::s_ordered_commit); tc.state(lock, trrep::transaction_context::s_committed); return 0; } diff --git a/src/mock_provider.hpp b/src/mock_provider.hpp index 982da81..80fe8c8 100644 --- a/src/mock_provider.hpp +++ b/src/mock_provider.hpp @@ -103,6 +103,8 @@ namespace trrep const wsrep_trx_meta_t*) { return 0;} int release(wsrep_ws_handle_t*) { return 0; } + int replay(wsrep_ws_handle_t*, void*) { ::abort(); /* not impl */} + // Methods to modify mock state // Inject BF abort event into the provider. diff --git a/src/provider.hpp b/src/provider.hpp index 63eb1c8..af808cd 100644 --- a/src/provider.hpp +++ b/src/provider.hpp @@ -53,15 +53,16 @@ namespace trrep certify(wsrep_conn_id_t, wsrep_ws_handle_t*, uint32_t, wsrep_trx_meta_t*) = 0; - //! - //! BF abort a transaction inside provider. - //! - //! @param[in] bf_seqno Seqno of the aborter transaction - //! @param[in] victim_txt Transaction identifier of the victim - //! @param[out] victim_seqno Sequence number of the victim transaction - //! or WSREP_SEQNO_UNDEFINED if the victim was not ordered - //! - //! @return wsrep_status_t + /*! + * BF abort a transaction inside provider. + * + * @param[in] bf_seqno Seqno of the aborter transaction + * @param[in] victim_txt Transaction identifier of the victim + * @param[out] victim_seqno Sequence number of the victim transaction + * or WSREP_SEQNO_UNDEFINED if the victim was not ordered + * + * @return wsrep_status_t + */ virtual wsrep_status_t bf_abort(wsrep_seqno_t bf_seqno, wsrep_trx_id_t victim_trx, wsrep_seqno_t* victim_seqno) = 0; @@ -72,6 +73,13 @@ namespace trrep const wsrep_trx_meta_t*) = 0; virtual int release(wsrep_ws_handle_t*) = 0; + /*! + * Replay a transaction. + * + * @return Zero in case of success, non-zero on failure. + */ + virtual int replay(wsrep_ws_handle_t* ws_handle, void* applier_ctx) = 0; + virtual int sst_sent(const wsrep_gtid_t&, int) = 0; virtual int sst_received(const wsrep_gtid_t&, int) = 0; diff --git a/src/server_context.cpp b/src/server_context.cpp index 907d2f8..eaa2959 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -6,6 +6,7 @@ #include "client_context.hpp" #include "transaction_context.hpp" #include "view.hpp" +#include "logger.hpp" #include "compiler.hpp" // Todo: refactor into provider factory @@ -15,7 +16,6 @@ #include #include - #include namespace @@ -116,7 +116,7 @@ namespace assert(client_context->mode() == trrep::client_context::m_applier); trrep::data data(buf->ptr, buf->len); - if (client_context->start_transaction(*wsh, *meta, flags)) + if (client_context->transaction().state() != trrep::transaction_context::s_replaying && client_context->start_transaction(*wsh, *meta, flags)) { ret = WSREP_CB_FAILURE; } @@ -174,7 +174,7 @@ namespace int trrep::server_context::load_provider(const std::string& provider_spec, const std::string& provider_options) { - std::cout << "Loading provider " << provider_spec << "\n"; + trrep::log() << "Loading provider " << provider_spec; if (provider_spec == "mock") { provider_ = new trrep::mock_provider; @@ -252,27 +252,27 @@ void trrep::server_context::wait_until_state( void trrep::server_context::on_connect() { - std::cout << "Server " << name_ << " connected to cluster" << "\n"; + trrep::log() << "Server " << name_ << " connected to cluster"; trrep::unique_lock lock(mutex_); state(lock, s_connected); } void trrep::server_context::on_view(const trrep::view& view) { - std::cout << "================================================\nView:\n" - << "id: " << view.id() << "\n" - << "status: " << view.status() << "\n" - << "own_index: " << view.own_index() << "\n" - << "final: " << view.final() << "\n" - << "members: \n"; + trrep::log() << "================================================\nView:\n" + << "id: " << view.id() << "\n" + << "status: " << view.status() << "\n" + << "own_index: " << view.own_index() << "\n" + << "final: " << view.final() << "\n" + << "members"; const std::vector& members(view.members()); for (std::vector::const_iterator i(members.begin()); i != members.end(); ++i) { - std::cout << "id: " << i->id() << " " - << "name: " << i->name() << "\n"; + trrep::log() << "id: " << i->id() << " " + << "name: " << i->name(); } - std::cout << "=================================================\n"; + trrep::log() << "================================================="; trrep::unique_lock lock(mutex_); if (view.final()) { @@ -282,7 +282,7 @@ void trrep::server_context::on_view(const trrep::view& view) void trrep::server_context::on_sync() { - std::cout << "Synced with group" << "\n"; + trrep::log() << "Server " << name_ << " synced with group"; trrep::unique_lock lock(mutex_); if (state_ != s_synced) { @@ -299,8 +299,11 @@ int trrep::server_context::on_apply( if (starts_transaction(transaction_context.flags()) && commits_transaction(transaction_context.flags())) { - assert(transaction_context.active() == false); - transaction_context.start_transaction(); + if (transaction_context.state() != trrep::transaction_context::s_replaying) + { + assert(transaction_context.active() == false); + transaction_context.start_transaction(); + } if (client_context.apply(transaction_context, data)) { ret = 1; @@ -360,8 +363,8 @@ void trrep::server_context::state( if (allowed[state_][state]) { - std::cout << "server " << name_ << " state change: " - << state_ << " -> " << state; + trrep::log() << "server " << name_ << " state change: " + << state_ << " -> " << state; state_ = state; cond_.notify_all(); while (state_waiters_[state_]) diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index ef22bd5..dbba2d0 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -7,9 +7,9 @@ #include "server_context.hpp" #include "key.hpp" #include "data.hpp" +#include "logger.hpp" #include "compiler.hpp" -#include // TODO: replace with proper logging utility #include #include @@ -33,7 +33,8 @@ trrep::transaction_context::transaction_context( trrep::transaction_context::~transaction_context() -{ } +{ +} int trrep::transaction_context::start_transaction( const trrep::transaction_id& id) @@ -179,8 +180,10 @@ int trrep::transaction_context::before_commit() trrep::unique_lock lock(client_context_.mutex()); debug_log_state("before_commit_enter"); - assert(state() == s_executing || state() == s_committing || - state() == s_must_abort); + assert(state() == s_executing || + state() == s_committing || + state() == s_must_abort || + state() == s_replaying); switch (client_context_.mode()) { @@ -246,6 +249,10 @@ int trrep::transaction_context::before_commit() state(lock, s_certifying); state(lock, s_committing); } + else if (state() == s_replaying) + { + state(lock, s_committing); + } } break; } @@ -405,7 +412,11 @@ int trrep::transaction_context::after_statement() case s_aborted: break; case s_must_replay: - ret = client_context_.replay(lock, *this); + state(lock, s_replaying); + lock.unlock(); + ret = client_context_.replay(*this); + lock.lock(); + provider_.release(&ws_handle_); break; default: assert(0); @@ -432,9 +443,62 @@ int trrep::transaction_context::after_statement() } debug_log_state("after_statement_leave"); + assert(ret == 0 || state() == s_aborted); return ret; } +bool trrep::transaction_context::bf_abort( + trrep::unique_lock& lock TRREP_UNUSED, + const trrep::transaction_context& txc) +{ + bool ret(false); + assert(lock.owns_lock()); + switch (state()) + { + case s_executing: + case s_preparing: + case s_certifying: + case s_committing: + { + wsrep_seqno_t victim_seqno(WSREP_SEQNO_UNDEFINED); + wsrep_status_t status(client_context_.provider().bf_abort( + txc.seqno(), id_.get(), &victim_seqno)); + switch (status) + { + case WSREP_OK: + trrep::log() << "Seqno " << txc.seqno() + << " succesfully BF aborted " << id_.get() + << " victim_seqno " << victim_seqno; + state(lock, s_must_abort); + ret = true; + break; + default: + trrep::log() << "Seqno " << txc.seqno() + << " failed to BF abort " << id_.get() + << " with status " << status + << " victim_seqno " << victim_seqno; + break; + } + break; + } + default: + trrep::log() << "BF abort not allowed in state " + << trrep::to_string(state()) << "\n"; + break; + } + + if (client_context_.server_context().rollback_mode() == trrep::server_context::rm_sync) + { + //! \todo Launch background rollbacker. + assert(0); + } + return ret; +} + +trrep::mutex& trrep::transaction_context::mutex() +{ + return client_context_.mutex(); +} // Private void trrep::transaction_context::state( @@ -455,13 +519,10 @@ void trrep::transaction_context::state( { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ - { 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} /* re */ + { 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0} /* re */ }; if (allowed[state_][next_state]) { - // std::cerr << "state transition: " << trrep::to_string(state_) - // << " -> " << trrep::to_string(next_state) - // << "\n"; state_hist_.push_back(state_); state_ = next_state; } @@ -471,7 +532,7 @@ void trrep::transaction_context::state( os << "unallowed state transition for transaction " << id_.get() << ": " << trrep::to_string(state_) << " -> " << trrep::to_string(next_state); - std::cerr << os.str() << "\n"; + trrep::log() << os.str(); throw trrep::runtime_error(os.str()); } } @@ -565,6 +626,7 @@ int trrep::transaction_context::certify_commit( state(lock, s_certifying); + flags(flags() | WSREP_FLAG_TRX_END); lock.unlock(); trrep::data data; @@ -586,7 +648,7 @@ int trrep::transaction_context::certify_commit( wsrep_status cert_ret(provider_.certify(client_context_.id().get(), &ws_handle_, - flags() | WSREP_FLAG_TRX_END, + flags(), &trx_meta_)); lock.lock(); @@ -594,7 +656,6 @@ int trrep::transaction_context::certify_commit( assert(state() == s_certifying || state() == s_must_abort); client_context_.debug_sync("wsrep_after_replication"); - // std::cout << "seqno: " << trx_meta_.gtid.seqno << "\n"; int ret(1); switch (cert_ret) { @@ -640,7 +701,10 @@ int trrep::transaction_context::certify_commit( // transaction or to determine failed certification status. assert(ordered()); client_context_.will_replay(*this); - state(lock, s_must_abort); + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } state(lock, s_must_replay); break; case WSREP_TRX_FAIL: @@ -653,7 +717,12 @@ int trrep::transaction_context::certify_commit( break; case WSREP_CONN_FAIL: case WSREP_NODE_FAIL: - state(lock, s_must_abort); + // Galera provider may return CONN_FAIL if the trx is + // BF aborted O_o + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } client_context_.override_error(trrep::e_error_during_commit); break; case WSREP_FATAL: @@ -687,6 +756,7 @@ void trrep::transaction_context::cleanup() { debug_log_state("cleanup_enter"); id_ = trrep::transaction_id::invalid(); + ws_handle_.trx_id = -1; if (is_streaming()) { state_ = s_executing; @@ -702,13 +772,13 @@ void trrep::transaction_context::cleanup() } void trrep::transaction_context::debug_log_state( - const std::string& context TRREP_UNUSED) - const + const std::string& context TRREP_UNUSED) const { #if 0 - std::cout << context - << ": client: " << client_context_.id().get() - << " trx: " << int64_t(id_.get()) - << " state: " << trrep::to_string(state_) << "\n"; + trrep::log() << context + << ": server: " << client_context_.server_context().name() + << ": client: " << client_context_.id().get() + << " trx: " << int64_t(id_.get()) + << " state: " << trrep::to_string(state_); #endif /* 0 */ } diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp index 172c8ef..fd4424e 100644 --- a/src/transaction_context.hpp +++ b/src/transaction_context.hpp @@ -75,9 +75,13 @@ namespace trrep // fragment succeeded bool certified() { return certified_; } + wsrep_seqno_t seqno() const + { + return trx_meta_.gtid.seqno; + } // Return true if the last fragment was ordered by the // provider - bool ordered() { return (trx_meta_.gtid.seqno > 0); } + bool ordered() const { return (trx_meta_.gtid.seqno > 0); } bool is_streaming() const { @@ -125,20 +129,29 @@ namespace trrep int after_statement(); + bool bf_abort(trrep::unique_lock& lock, + const transaction_context& txc); + uint32_t flags() const { return flags_; } + + trrep::mutex& mutex(); + + wsrep_ws_handle_t& ws_handle() { return ws_handle_; } private: transaction_context(const transaction_context&); transaction_context operator=(const transaction_context&); + void flags(uint32_t flags) { flags_ = flags; } int certify_fragment(trrep::unique_lock&); int certify_commit(trrep::unique_lock&); void remove_fragments(); void clear_fragments(); void cleanup(); void debug_log_state(const std::string&) const; + trrep::provider& provider_; trrep::client_context& client_context_; trrep::transaction_id id_; diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index d7df699..739333c 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -93,6 +93,15 @@ wsrep_status_t trrep::wsrep_provider_v26::certify(wsrep_conn_id_t conn_id, return wsrep_->certify(wsrep_, conn_id, wsh, flags, meta); } +wsrep_status_t trrep::wsrep_provider_v26::bf_abort( + wsrep_seqno_t bf_seqno, + wsrep_trx_id_t victim_id, + wsrep_seqno_t *victim_seqno) +{ + return wsrep_->abort_certification( + wsrep_, bf_seqno, victim_id, victim_seqno); +} + wsrep_status_t trrep::wsrep_provider_v26::commit_order_enter( const wsrep_ws_handle_t* wsh, const wsrep_trx_meta_t* meta) @@ -112,6 +121,12 @@ int trrep::wsrep_provider_v26::release(wsrep_ws_handle_t* wsh) return (wsrep_->release(wsrep_, wsh) != WSREP_OK); } +int trrep::wsrep_provider_v26::replay(wsrep_ws_handle_t* wsh, + void* applier_ctx) +{ + return (wsrep_->replay_trx(wsrep_, wsh, applier_ctx) != WSREP_OK); +} + int trrep::wsrep_provider_v26::sst_sent(const wsrep_gtid_t& gtid, int err) { if (wsrep_->sst_sent(wsrep_, >id, err) != WSREP_OK) diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index d33f451..25a4c6f 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -31,13 +31,14 @@ namespace trrep wsrep_trx_meta_t*); wsrep_status_t bf_abort(wsrep_seqno_t, wsrep_trx_id_t, - wsrep_seqno_t*) { return WSREP_OK; } - int rollback(const wsrep_trx_id_t) { return 0; } + wsrep_seqno_t*); + int rollback(const wsrep_trx_id_t) { ::abort(); return 0; } wsrep_status commit_order_enter(const wsrep_ws_handle_t*, const wsrep_trx_meta_t*); int commit_order_leave(const wsrep_ws_handle_t*, const wsrep_trx_meta_t*); int release(wsrep_ws_handle_t*); + int replay(wsrep_ws_handle_t*, void*); int sst_sent(const wsrep_gtid_t&,int); int sst_received(const wsrep_gtid_t& gtid, int);