diff --git a/dbsim/CMakeLists.txt b/dbsim/CMakeLists.txt index b0cd756..30245e5 100644 --- a/dbsim/CMakeLists.txt +++ b/dbsim/CMakeLists.txt @@ -5,6 +5,7 @@ add_executable(dbsim db_client.cpp db_client_service.cpp + db_high_priority_service.cpp db_params.cpp db_server.cpp db_server_service.cpp diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index b331e29..2057d9c 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -11,7 +11,7 @@ #include "db_storage_engine.hpp" #include "db_client_state.hpp" #include "db_client_service.hpp" - +#include "db_high_priority_service.hpp" namespace db { @@ -47,6 +47,7 @@ namespace db private: friend class db::server_state; friend class db::client_service; + friend class db::high_priority_service; template int client_command(F f); void run_one_transaction(); void reset_error(); diff --git a/dbsim/db_client_service.cpp b/dbsim/db_client_service.cpp index fa4c4f4..24aa0d4 100644 --- a/dbsim/db_client_service.cpp +++ b/dbsim/db_client_service.cpp @@ -3,20 +3,9 @@ // #include "db_client_service.hpp" +#include "db_high_priority_service.hpp" #include "db_client.hpp" -int db::client_service::apply_write_set(const wsrep::const_buffer&) -{ - db::client* client(client_state_.client()); - client->se_trx_.start(client); - client->se_trx_.apply(client_state_.transaction()); - return 0; -} - -int db::client_service::apply_toi(const wsrep::const_buffer&) -{ - return 0; -} int db::client_service::commit(const wsrep::ws_handle&, const wsrep::ws_meta&) @@ -44,9 +33,11 @@ enum wsrep::provider::status db::client_service::replay() { wsrep::high_priority_context high_priority_context(client_state_); + db::high_priority_service high_priority_service( + client_state_.client()->server_, client_state_.client()); auto ret(client_state_.provider().replay( client_state_.transaction().ws_handle(), - &client_state_)); + &high_priority_service)); if (ret == wsrep::provider::success) { ++client_state_.client()->stats_.replays; diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index ecf174a..72b72c4 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -69,9 +69,9 @@ namespace db void remove_fragments() override { } - int apply_write_set(const wsrep::const_buffer&) override; + // int apply_write_set(const wsrep::const_buffer&) override; - int apply_toi(const wsrep::const_buffer&) override; + // int apply_toi(const wsrep::const_buffer&) override; int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp new file mode 100644 index 0000000..df64295 --- /dev/null +++ b/dbsim/db_high_priority_service.cpp @@ -0,0 +1,67 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "db_high_priority_service.hpp" +#include "db_server.hpp" +#include "db_client.hpp" + +int db::high_priority_service::apply(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) +{ + return server_.server_state().on_apply(*this, ws_handle, ws_meta, data); +} + +int db::high_priority_service::start_transaction( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + return client_->client_state().start_transaction(ws_handle, ws_meta); +} + +void db::high_priority_service::adopt_transaction(const wsrep::transaction&) +{ + throw wsrep::not_implemented_error(); +} + +int db::high_priority_service::apply_write_set(const wsrep::const_buffer&) +{ + client_->se_trx_.start(client_); + client_->se_trx_.apply(client_->client_state().transaction()); + return 0; +} + +int db::high_priority_service::apply_toi(const wsrep::const_buffer&) +{ + throw wsrep::not_implemented_error(); +} + +int db::high_priority_service::commit() +{ + int ret(client_->client_state_.before_commit()); + if (ret == 0) client_->se_trx_.commit(); + ret = ret || client_->client_state_.ordered_commit(); + ret = ret || client_->client_state_.after_commit(); + return ret; +} + +int db::high_priority_service::rollback() +{ + int ret(client_->client_state_.before_rollback()); + assert(ret == 0); + client_->se_trx_.rollback(); + ret = client_->client_state_.after_rollback(); + assert(ret == 0); + return ret; +} + +void db::high_priority_service::after_apply() +{ + client_->client_state_.after_statement(); +} + +bool db::high_priority_service::is_replaying() const +{ + return (client_->client_state_.transaction().state() == wsrep::transaction::s_replaying); +} diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp new file mode 100644 index 0000000..e250729 --- /dev/null +++ b/dbsim/db_high_priority_service.hpp @@ -0,0 +1,46 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_DB_HIGH_PRIORITY_SERVICE_HPP +#define WSREP_DB_HIGH_PRIORITY_SERVICE_HPP + +#include "wsrep/high_priority_service.hpp" + +namespace db +{ + class server; + class client; + class high_priority_service : wsrep::high_priority_service + { + public: + high_priority_service(db::server& server, db::client* client) + : server_(server) + , client_(client) + { } + + int apply(const wsrep::ws_handle&, const wsrep::ws_meta&, + const wsrep::const_buffer&) override; + int start_transaction(const wsrep::ws_handle&, + const wsrep::ws_meta&) override; + void adopt_transaction(const wsrep::transaction&) override; + int apply_write_set(const wsrep::const_buffer&) override; + int commit() override; + int rollback() override; + int apply_toi(const wsrep::const_buffer&) override; + void after_apply() override; + void store_globals() override { } + void reset_globals() override { } + int log_dummy_write_set(const wsrep::ws_handle&, + const wsrep::ws_meta&) + { return 0; } + bool is_replaying() const; + private: + high_priority_service(const high_priority_service&); + high_priority_service& operator=(const high_priority_service&); + db::server& server_; + db::client* client_; + }; +} + +#endif // WSREP_DB_HIGH_PRIORITY_SERVICE_HPP diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index 002f253..b9cae18 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -4,6 +4,7 @@ #include "db_server.hpp" #include "db_server_service.hpp" +#include "db_high_priority_service.hpp" #include "db_client.hpp" #include "db_simulator.hpp" @@ -34,11 +35,17 @@ void db::server::applier_thread() wsrep::client_state::m_high_priority, simulator_.params()); wsrep::client_state* cc(static_cast( - &applier.client_state())); + &applier.client_state())); + db::high_priority_service hps(*this, &applier); cc->open(cc->id()); + cc->before_command(); enum wsrep::provider::status ret( - server_state_.provider().run_applier(cc)); + server_state_.provider().run_applier(&hps)); wsrep::log_info() << "Applier thread exited with error code " << ret; + cc->after_command_before_result(); + cc->after_command_after_result(); + cc->close(); + cc->cleanup(); } void db::server::start_applier() @@ -112,15 +119,15 @@ wsrep::client_state* db::server::local_client_state() simulator_.params())); return &client->client_state(); } - -wsrep::client_state* db::server::streaming_applier_client_state() -{ - throw wsrep::not_implemented_error(); -} - void db::server::release_client_state(wsrep::client_state* client_state) { db::client_state* db_client_state( dynamic_cast(client_state)); delete db_client_state->client(); } + +wsrep::high_priority_service* db::server::streaming_applier_service() +{ + throw wsrep::not_implemented_error(); +} + diff --git a/dbsim/db_server.hpp b/dbsim/db_server.hpp index ad89a9d..24e10bc 100644 --- a/dbsim/db_server.hpp +++ b/dbsim/db_server.hpp @@ -42,9 +42,8 @@ namespace db } void donate_sst(const std::string&, const wsrep::gtid&, bool); wsrep::client_state* local_client_state(); - wsrep::client_state* streaming_applier_client_state(); void release_client_state(wsrep::client_state*); - + wsrep::high_priority_service* streaming_applier_service(); private: void start_client(size_t id); diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index bedef53..18116f3 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -6,6 +6,7 @@ #include "db_server.hpp" #include "wsrep/logger.hpp" +#include "wsrep/high_priority_service.hpp" db::server_service::server_service(db::server& server) : server_(server) @@ -16,17 +17,23 @@ wsrep::client_state* db::server_service::local_client_state() return server_.local_client_state(); } -wsrep::client_state* db::server_service::streaming_applier_client_state() -{ - return server_.streaming_applier_client_state(); -} - void db::server_service::release_client_state( wsrep::client_state* client_state) { server_.release_client_state(client_state); } +wsrep::high_priority_service* db::server_service::streaming_applier_service() +{ + return server_.streaming_applier_service(); +} + +void db::server_service::release_high_priority_service( + wsrep::high_priority_service *high_priority_service) +{ + delete high_priority_service; +} + bool db::server_service::sst_before_init() const { return true; diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index 19c1ca8..4274b73 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -16,8 +16,10 @@ namespace db public: server_service(db::server& server); wsrep::client_state* local_client_state() override; - wsrep::client_state* streaming_applier_client_state() override; void release_client_state(wsrep::client_state*) override; + wsrep::high_priority_service* streaming_applier_service() override; + void release_high_priority_service(wsrep::high_priority_service*) override; + bool sst_before_init() const override; int start_sst(const std::string&, const wsrep::gtid&, bool) override; std::string sst_request() override; diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index f97f791..64823cc 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -6,16 +6,12 @@ * * This file will define a `callback` abstract interface for a * DBMS client session service. The interface will define methods - * which will be called by the wsrep-lib under certain circumstances, - * for example when a transaction rollback is required by internal - * wsrep-lib operation or applier client needs to apply a write set. + * which will be called by the wsrep-lib. */ #ifndef WSREP_CLIENT_SERVICE_HPP #define WSREP_CLIENT_SERVICE_HPP -#include "transaction_termination_service.hpp" - #include "buffer.hpp" #include "provider.hpp" #include "mutex.hpp" @@ -24,7 +20,7 @@ namespace wsrep { class transaction; - class client_service : public wsrep::transaction_termination_service + class client_service { public: client_service() { } @@ -80,28 +76,15 @@ namespace wsrep virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; virtual void remove_fragments() = 0; + virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; // - // Applying interface + // Rollback // - /** - * Apply a write set. - * - * A write set applying happens always - * as a part of the transaction. The caller must start a - * new transaction before applying a write set and must - * either commit to make changes persistent or roll back. + * Roll back all transactions which are currently active + * in the client session. */ - virtual int apply_write_set(const wsrep::const_buffer&) = 0; - - /** - * Apply a TOI operation. - * - * TOI operation is a standalone operation and should not - * be executed as a part of a transaction. - */ - virtual int apply_toi(const wsrep::const_buffer&) = 0; - + virtual int rollback() = 0; // // Interface to global server state diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 560ea61..2773c5e 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -391,7 +391,7 @@ namespace wsrep return transaction_.start_replaying(ws_meta); } - void adopt_transaction(wsrep::transaction& transaction) + void adopt_transaction(const wsrep::transaction& transaction) { assert(mode_ == m_high_priority); transaction_.start_transaction(transaction.id()); diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp new file mode 100644 index 0000000..4953de7 --- /dev/null +++ b/include/wsrep/high_priority_service.hpp @@ -0,0 +1,99 @@ +// +// Copyright (C) 2018 Codership Oy +// + +/** @file high_priority_service.hpp + * + * Interface for services for applying high priority transactions. + */ +#ifndef WSREP_HIGH_PRIORITY_SERVICE_HPP +#define WSREP_HIGH_PRIORITY_SERVICE_HPP + +namespace wsrep +{ + class ws_handle; + class ws_meta; + class const_buffer; + class transaction; + class high_priority_service + { + public: + virtual ~high_priority_service() { } + + virtual int apply(const ws_handle&, const ws_meta&, + const const_buffer&) = 0; + /** + * Start a new transaction + */ + virtual int start_transaction(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; + + /** + * Adopt a transaction. + */ + virtual void adopt_transaction(const wsrep::transaction&) = 0; + /** + * Apply a write set. + * + * A write set applying happens always + * as a part of the transaction. The caller must start a + * new transaction before applying a write set and must + * either commit to make changes persistent or roll back. + */ + virtual int apply_write_set(const wsrep::const_buffer&) = 0; + + /** + * Commit a transaction. + */ + virtual int commit() = 0; + + /** + * Roll back a transaction + */ + virtual int rollback() = 0; + + /** + * Apply a TOI operation. + * + * TOI operation is a standalone operation and should not + * be executed as a part of a transaction. + */ + virtual int apply_toi(const wsrep::const_buffer&) = 0; + + /** + * Actions to take after applying a write set was completed. + */ + virtual void after_apply() = 0; + + virtual void store_globals() = 0; + virtual void reset_globals() = 0; + + virtual int log_dummy_write_set(const ws_handle&, const ws_meta&) = 0; + + virtual bool is_replaying() const = 0; + private: + }; + + class high_priority_switch + { + public: + high_priority_switch(high_priority_service& orig_service, + high_priority_service& current_service) + : orig_service_(orig_service) + , current_service_(current_service) + { + orig_service_.reset_globals(); + current_service_.store_globals(); + } + ~high_priority_switch() + { + current_service_.reset_globals(); + orig_service_.store_globals(); + } + private: + high_priority_service& orig_service_; + high_priority_service& current_service_; + }; +} + +#endif // WSREP_HIGH_PRIORITY_SERVICE_HPP diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index d7ca9b8..c5deb3b 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -20,6 +20,7 @@ namespace wsrep { class client_state; + class high_priority_service; class ws_meta; class gtid; class view; @@ -36,19 +37,21 @@ namespace wsrep * @return Pointer to Client State. */ virtual wsrep::client_state* local_client_state() = 0; - + virtual void release_client_state(wsrep::client_state*) = 0; /** * Create an applier state for streaming transaction applying. * * @return Pointer to streaming applier client state. */ - virtual wsrep::client_state* streaming_applier_client_state() = 0; + virtual wsrep::high_priority_service* + streaming_applier_service() = 0; /** * Release a client state allocated by either local_client_state() * or streaming_applier_client_state(). */ - virtual void release_client_state(wsrep::client_state*) = 0; + virtual void release_high_priority_service( + wsrep::high_priority_service*) = 0; /** * Perform a background rollback for a transaction. diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index dd78bd5..7a7036b 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -215,15 +215,16 @@ namespace wsrep void start_streaming_applier( const wsrep::id&, const wsrep::transaction_id&, - wsrep::client_state* client_state); + wsrep::high_priority_service*); void stop_streaming_applier( const wsrep::id&, const wsrep::transaction_id&); /** * Return reference to streaming applier. */ - client_state* find_streaming_applier(const wsrep::id&, - const wsrep::transaction_id&) const; + wsrep::high_priority_service* find_streaming_applier( + const wsrep::id&, + const wsrep::transaction_id&) const; /** * Load WSRep provider. * @@ -429,13 +430,13 @@ namespace wsrep * * @todo Make this private, allow calls for provider implementations * only. - * @param client_state Applier client context. + * @param high_priority_service High priority applier service. * @param transaction Transaction context. * @param data Write set data * * @return Zero on success, non-zero on failure. */ - int on_apply(wsrep::client_state& client_state, + int on_apply(wsrep::high_priority_service& high_priority_service, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data); @@ -555,7 +556,7 @@ namespace wsrep size_t pause_count_; wsrep::seqno pause_seqno_; bool desynced_on_pause_; - typedef std::map, wsrep::client_state*> streaming_appliers_map; + typedef std::map, wsrep::high_priority_service*> streaming_appliers_map; streaming_appliers_map streaming_appliers_; wsrep::provider* provider_; std::string name_; diff --git a/include/wsrep/transaction_termination_service.hpp b/include/wsrep/transaction_termination_service.hpp deleted file mode 100644 index 886a377..0000000 --- a/include/wsrep/transaction_termination_service.hpp +++ /dev/null @@ -1,21 +0,0 @@ -// -// Copyright (C) 2018 Codership Oy -// - - -#ifndef WSREP_TRANSACTION_TERMINATION_SERVICE_HPP -#define WSREP_TRANSACTION_TERMINATION_SERVICE_HPP - -namespace wsrep -{ - class ws_handle; - class ws_meta; - class transaction_termination_service - { - public: - virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; - virtual int rollback() = 0; - }; -} - -#endif // WSREP_TRANSACTION_TERMINATION_SERVICE_HPP diff --git a/src/server_state.cpp b/src/server_state.cpp index 620c7bf..6b89562 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -3,7 +3,7 @@ // #include "wsrep/server_state.hpp" -#include "wsrep/client_state.hpp" +#include "wsrep/high_priority_service.hpp" #include "wsrep/transaction.hpp" #include "wsrep/view.hpp" #include "wsrep/logger.hpp" @@ -56,126 +56,95 @@ namespace } int apply_write_set(wsrep::server_state& server_state, - wsrep::client_state& client_state, + wsrep::high_priority_service& high_priority_service, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) { int ret(0); - const wsrep::transaction& txc(client_state.transaction()); - assert(client_state.mode() == wsrep::client_state::m_high_priority); - - bool not_replaying(txc.state() != - wsrep::transaction::s_replaying); - if (wsrep::starts_transaction(ws_meta.flags()) && wsrep::commits_transaction(ws_meta.flags())) { - if (not_replaying) - { - client_state.before_command(); - client_state.before_statement(); - assert(txc.active() == false); - client_state.start_transaction(ws_handle, ws_meta); - } - else - { - client_state.start_replaying(ws_meta); - } - - if (client_state.client_service().apply_write_set(data)) + if (high_priority_service.start_transaction(ws_handle, ws_meta)) { ret = 1; } - else if (client_state.client_service().commit(ws_handle, ws_meta)) + else if (high_priority_service.apply_write_set(data)) + { + ret = 1; + } + else if (high_priority_service.commit()) { ret = 1; } - if (ret) { - client_state.client_service().rollback(); + high_priority_service.rollback(); } - - if (not_replaying) - { - client_state.after_statement(); - client_state.after_command_before_result(); - client_state.after_command_after_result(); - } - assert(ret || - txc.state() == wsrep::transaction::s_committed); + high_priority_service.after_apply(); } else if (wsrep::starts_transaction(ws_meta.flags())) { - assert(not_replaying); assert(server_state.find_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id()) == 0); - wsrep::client_state* sac( - server_state.server_service().streaming_applier_client_state()); + wsrep::high_priority_service* sa( + server_state.server_service().streaming_applier_service()); server_state.start_streaming_applier( - ws_meta.server_id(), ws_meta.transaction_id(), sac); - sac->start_transaction(ws_handle, ws_meta); + ws_meta.server_id(), ws_meta.transaction_id(), sa); + sa->start_transaction(ws_handle, ws_meta); { - // TODO: Client context switch will be ultimately - // implemented by the application. Therefore need to - // introduce a virtual method call which takes - // both original and sac client contexts as argument - // and a functor which does the applying. - wsrep::client_state_switch sw(client_state, *sac); - sac->before_command(); - sac->before_statement(); - sac->client_service().apply_write_set(data); - sac->after_statement(); - sac->after_command_before_result(); - sac->after_command_after_result(); + wsrep::high_priority_switch sw(high_priority_service, *sa); + sa->apply_write_set(data); + sa->after_apply(); } - server_state.server_service().log_dummy_write_set(client_state, ws_meta); + high_priority_service.log_dummy_write_set(ws_handle, ws_meta); } else if (ws_meta.flags() == 0) { - wsrep::client_state* sac( + wsrep::high_priority_service* sa( server_state.find_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id())); - if (sac == 0) + if (sa == 0) { // It is possible that rapid group membership changes // may cause streaming transaction be rolled back before // commit fragment comes in. Although this is a valid // situation, log a warning if a sac cannot be found as // it may be an indication of a bug too. + assert(0); wsrep::log_warning() << "Could not find applier context for " << ws_meta.server_id() << ": " << ws_meta.transaction_id(); } else { - wsrep::client_state_switch(client_state, *sac); - sac->before_command(); - sac->before_statement(); - ret = sac->client_service().apply_write_set(data); - sac->after_statement(); - sac->after_command_before_result(); - sac->after_command_after_result(); + wsrep::high_priority_switch(high_priority_service, *sa); + ret = sa->apply_write_set(data); + sa->after_apply(); } - server_state.server_service().log_dummy_write_set( - client_state, ws_meta); + high_priority_service.log_dummy_write_set( + ws_handle, ws_meta); } else if (wsrep::commits_transaction(ws_meta.flags())) { - if (not_replaying) + if (high_priority_service.is_replaying()) { - wsrep::client_state* sac( + ret = high_priority_service.apply_write_set(data) || + high_priority_service.commit(); + } + else + { + wsrep::high_priority_service* sa( server_state.find_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id())); - assert(sac); - if (sac == 0) + if (sa == 0) { // It is possible that rapid group membership changes // may cause streaming transaction be rolled back before // commit fragment comes in. Although this is a valid // situation, log a warning if a sac cannot be found as // it may be an indication of a bug too. + assert(0); wsrep::log_warning() << "Could not find applier context for " << ws_meta.server_id() @@ -183,40 +152,25 @@ namespace } else { - wsrep::client_state_switch(client_state, *sac); - sac->before_command(); - sac->before_statement(); - ret = sac->client_service().commit(ws_handle, ws_meta); - sac->after_statement(); - sac->after_command_before_result(); - sac->after_command_after_result(); + wsrep::high_priority_switch(high_priority_service, *sa); + ret = sa->commit(); + sa->after_apply(); server_state.stop_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id()); - server_state.server_service().release_client_state(sac); + server_state.server_service().release_high_priority_service(sa); } } - else - { - ret = client_state.start_replaying(ws_meta) || - client_state.client_service().apply_write_set( - wsrep::const_buffer()) || - client_state.client_service().commit(ws_handle, ws_meta); - } } else { // SR fragment applying not implemented yet assert(0); } - if (not_replaying) - { - assert(txc.active() == false); - } return ret; } int apply_toi(wsrep::provider& provider, - wsrep::client_state& client_state, + wsrep::high_priority_service& high_priority_service, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) @@ -226,9 +180,7 @@ namespace { // Regular toi provider.commit_order_enter(ws_handle, ws_meta); - client_state.enter_toi(ws_meta); - int ret(client_state.client_service().apply_toi(data)); - client_state.leave_toi(); + int ret(high_priority_service.apply_toi(data)); provider.commit_order_leave(ws_handle, ws_meta); return ret; } @@ -668,7 +620,7 @@ void wsrep::server_state::on_sync() } int wsrep::server_state::on_apply( - wsrep::client_state& client_state, + wsrep::high_priority_service& high_priority_service, const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data) @@ -682,7 +634,8 @@ int wsrep::server_state::on_apply( } if (is_toi(ws_meta.flags())) { - return apply_toi(provider(), client_state, ws_handle, ws_meta, data); + return apply_toi(provider(), high_priority_service, + ws_handle, ws_meta, data); } else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags())) { @@ -692,7 +645,7 @@ int wsrep::server_state::on_apply( } else { - return apply_write_set(*this, client_state, + return apply_write_set(*this, high_priority_service, ws_handle, ws_meta, data); } } @@ -700,14 +653,13 @@ int wsrep::server_state::on_apply( void wsrep::server_state::start_streaming_applier( const wsrep::id& server_id, const wsrep::transaction_id& transaction_id, - wsrep::client_state* client_state) + wsrep::high_priority_service* sa) { if (streaming_appliers_.insert( std::make_pair(std::make_pair(server_id, transaction_id), - client_state)).second == false) + sa)).second == false) { wsrep::log_error() << "Could not insert streaming applier"; - delete client_state; throw wsrep::fatal_error(); } } @@ -730,7 +682,7 @@ void wsrep::server_state::stop_streaming_applier( } } -wsrep::client_state* wsrep::server_state::find_streaming_applier( +wsrep::high_priority_service* wsrep::server_state::find_streaming_applier( const wsrep::id& server_id, const wsrep::transaction_id& transaction_id) const { diff --git a/src/transaction.cpp b/src/transaction.cpp index deecd44..ff8ab8e 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -5,6 +5,7 @@ #include "wsrep/transaction.hpp" #include "wsrep/client_state.hpp" #include "wsrep/server_state.hpp" +#include "wsrep/high_priority_service.hpp" #include "wsrep/key.hpp" #include "wsrep/logger.hpp" #include "wsrep/compiler.hpp" @@ -70,15 +71,22 @@ int wsrep::transaction::start_transaction( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - assert(ws_meta.flags()); - assert(active() == false); - id_ = ws_meta.transaction_id(); - assert(client_state_.mode() == wsrep::client_state::m_high_priority); - state_ = s_executing; - state_hist_.clear(); - ws_handle_ = ws_handle; - ws_meta_ = ws_meta; - certified_ = true; + if (state() != s_replaying) + { + assert(ws_meta.flags()); + assert(active() == false); + id_ = ws_meta.transaction_id(); + assert(client_state_.mode() == wsrep::client_state::m_high_priority); + state_ = s_executing; + state_hist_.clear(); + ws_handle_ = ws_handle; + ws_meta_ = ws_meta; + certified_ = true; + } + else + { + start_replaying(ws_meta); + } return 0; } @@ -341,7 +349,14 @@ int wsrep::transaction::ordered_commit() assert(state() == s_committing); assert(ordered()); int ret(provider().commit_order_leave(ws_handle_, ws_meta_)); - // Should always succeed + // Should always succeed: + // 1) If before commit before succeeds, the transaction handle + // in the provider is guaranteed to exist and the commit + // has been ordered + // 2) The transaction which has been ordered for commit cannot be BF + // aborted anymore + // 3) The provider should always guarantee that the transactions which + // have been ordered for commit can finish committing. assert(ret == 0); state(lock, s_ordered_commit); debug_log_state("ordered_commit_leave"); @@ -939,11 +954,11 @@ int wsrep::transaction::certify_commit( void wsrep::transaction::streaming_rollback() { assert(streaming_context_.rolled_back() == false); - wsrep::client_state* sac( - server_service_.streaming_applier_client_state()); + wsrep::high_priority_service* sa( + server_service_.streaming_applier_service()); client_state_.server_state().start_streaming_applier( - client_state_.server_state().id(), id(), sac); - sac->adopt_transaction(*this); + client_state_.server_state().id(), id(), sa); + sa->adopt_transaction(*this); streaming_context_.cleanup(); // Replicate rollback fragment provider().rollback(id_.get()); diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 87d084e..870d4b2 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -5,7 +5,7 @@ #include "wsrep_provider_v26.hpp" #include "wsrep/server_state.hpp" -#include "wsrep/client_state.hpp" +#include "wsrep/high_priority_service.hpp" #include "wsrep/view.hpp" #include "wsrep/exception.hpp" #include "wsrep/logger.hpp" @@ -355,10 +355,9 @@ namespace const wsrep_trx_meta_t* meta, wsrep_bool_t* exit_loop __attribute__((unused))) { - wsrep::client_state* client_state( - reinterpret_cast(ctx)); - assert(client_state); - assert(client_state->mode() == wsrep::client_state::m_high_priority); + wsrep::high_priority_service* high_priority_service( + reinterpret_cast(ctx)); + assert(high_priority_service); wsrep::const_buffer data(buf->ptr, buf->len); wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque); @@ -373,8 +372,7 @@ namespace map_flags_from_native(flags)); try { - if (client_state->server_state().on_apply( - *client_state, ws_handle, ws_meta, data)) + if (high_priority_service->apply(ws_handle, ws_meta, data)) { return WSREP_CB_FAILURE; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6c37933..d34d19f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,6 +3,7 @@ # add_executable(wsrep-lib_test + mock_high_priority_service.cpp mock_client_state.cpp test_utils.cpp id_test.cpp diff --git a/test/mock_client_state.cpp b/test/mock_client_state.cpp index 93ca4ad..7e1a82a 100644 --- a/test/mock_client_state.cpp +++ b/test/mock_client_state.cpp @@ -4,24 +4,7 @@ #include "wsrep/transaction.hpp" #include "mock_client_state.hpp" - - -int wsrep::mock_client_service::apply_write_set( - const wsrep::const_buffer&) - -{ - assert(client_state_.toi_meta().seqno().is_undefined()); - assert(client_state_.transaction().state() == wsrep::transaction::s_executing || - client_state_.transaction().state() == wsrep::transaction::s_replaying); - return (fail_next_applying_ ? 1 : 0); -} - -int wsrep::mock_client_service::apply_toi(const wsrep::const_buffer&) -{ - assert(client_state_.transaction().active() == false); - assert(client_state_.toi_meta().seqno().is_undefined() == false); - return (fail_next_toi_ ? 1 : 0); -} +#include "mock_high_priority_service.hpp" int wsrep::mock_client_service::commit( const wsrep::ws_handle&, const wsrep::ws_meta&) @@ -61,3 +44,16 @@ int wsrep::mock_client_service::rollback() } return ret; } + +enum wsrep::provider::status +wsrep::mock_client_service::replay() WSREP_OVERRIDE +{ + wsrep::mock_high_priority_service hps(client_state_.server_state(), + &client_state_, true); + enum wsrep::provider::status ret( + client_state_.provider().replay( + client_state_.transaction().ws_handle(), + &hps)); + ++replays_; + return ret; +} diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index aaf701f..e091353 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -46,8 +46,8 @@ namespace wsrep : wsrep::client_service() , is_autocommit_() , do_2pc_() - , fail_next_applying_() - , fail_next_toi_() + // , fail_next_applying_() + // , fail_next_toi_() , bf_abort_during_wait_() , error_during_prepare_data_() , killed_before_certify_() @@ -89,16 +89,7 @@ namespace wsrep void will_replay() WSREP_OVERRIDE { } enum wsrep::provider::status - replay() WSREP_OVERRIDE - { - enum wsrep::provider::status ret( - client_state_.provider().replay( - client_state_.transaction().ws_handle(), - &client_state_)); - ++replays_; - return ret; - } - + replay() WSREP_OVERRIDE; void wait_for_replayers( wsrep::unique_lock& lock) WSREP_OVERRIDE @@ -172,8 +163,8 @@ namespace wsrep // bool is_autocommit_; bool do_2pc_; - bool fail_next_applying_; - bool fail_next_toi_; + // bool fail_next_applying_; + // bool fail_next_toi_; bool bf_abort_during_wait_; bool error_during_prepare_data_; bool killed_before_certify_; diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp new file mode 100644 index 0000000..27adfaa --- /dev/null +++ b/test/mock_high_priority_service.cpp @@ -0,0 +1,67 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "mock_high_priority_service.hpp" +#include "mock_server_state.hpp" + +int wsrep::mock_high_priority_service::apply( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) WSREP_OVERRIDE +{ + return server_state_.on_apply(*this, ws_handle, ws_meta, data); +} + +int wsrep::mock_high_priority_service::start_transaction( + const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) +{ + return client_state_->start_transaction(ws_handle, ws_meta); +} + +void wsrep::mock_high_priority_service::adopt_transaction( + const wsrep::transaction& transaction) +{ + client_state_->adopt_transaction(transaction); +} + +int wsrep::mock_high_priority_service::apply_write_set( + const wsrep::const_buffer&) +{ + assert(client_state_->toi_meta().seqno().is_undefined()); + assert(client_state_->transaction().state() == wsrep::transaction::s_executing || + client_state_->transaction().state() == wsrep::transaction::s_replaying); + return (fail_next_applying_ ? 1 : 0); +} + +int wsrep::mock_high_priority_service::commit() +{ + + int ret(0); + if (client_state_->client_service().do_2pc()) + { + ret = client_state_->before_prepare() || + client_state_->after_prepare(); + } + return (ret || client_state_->before_commit() || + client_state_->ordered_commit() || + client_state_->after_commit()); +} + +int wsrep::mock_high_priority_service::rollback() +{ + return (client_state_->before_rollback() || + client_state_->after_rollback()); +} + +int wsrep::mock_high_priority_service::apply_toi(const wsrep::const_buffer&) +{ + assert(client_state_->transaction().active() == false); + assert(client_state_->toi_meta().seqno().is_undefined() == false); + return (fail_next_toi_ ? 1 : 0); +} + +void wsrep::mock_high_priority_service::after_apply() +{ + client_state_->after_statement(); +} diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp new file mode 100644 index 0000000..604a458 --- /dev/null +++ b/test/mock_high_priority_service.hpp @@ -0,0 +1,62 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_MOCK_HIGH_PRIORITY_SERVICE_HPP +#define WSREP_MOCK_HIGH_PRIORITY_SERVICE_HPP + +#include "wsrep/high_priority_service.hpp" +#include "mock_client_state.hpp" + +namespace wsrep +{ + class mock_high_priority_service : public wsrep::high_priority_service + { + public: + mock_high_priority_service( + wsrep::server_state& server_state, + wsrep::mock_client_state* client_state, + bool replaying) + : wsrep::high_priority_service() + , fail_next_applying_() + , fail_next_toi_() + , server_state_(server_state) + , client_state_(client_state) + , replaying_(replaying) + { } + + int apply(const wsrep::ws_handle&, + const wsrep::ws_meta&, + const wsrep::const_buffer&) WSREP_OVERRIDE; + + int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; + + void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; + int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE; + int commit() WSREP_OVERRIDE; + int rollback() WSREP_OVERRIDE; + int apply_toi(const wsrep::const_buffer&) WSREP_OVERRIDE; + void after_apply() WSREP_OVERRIDE; + void store_globals() WSREP_OVERRIDE { } + void reset_globals() WSREP_OVERRIDE { } + int log_dummy_write_set(const wsrep::ws_handle&, + const wsrep::ws_meta&) + WSREP_OVERRIDE { return 0; } + bool is_replaying() const WSREP_OVERRIDE { return replaying_; } + wsrep::mock_client_state* client_state() + { + return client_state_; + } + bool fail_next_applying_; + bool fail_next_toi_; + private: + mock_high_priority_service(const mock_high_priority_service&); + mock_high_priority_service& operator=(const mock_high_priority_service&); + wsrep::server_state& server_state_; + wsrep::mock_client_state* client_state_; + bool replaying_; + }; +} + +#endif // WSREP_MOCK_HIGH_PRIORITY_SERVICE_HPP diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 5de04d1..4ba2b8e 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -8,6 +8,7 @@ #include "wsrep/provider.hpp" #include "wsrep/logger.hpp" #include "wsrep/buffer.hpp" +#include "wsrep/high_priority_service.hpp" #include #include @@ -159,11 +160,13 @@ namespace wsrep return release_result_; } - enum wsrep::provider::status replay(const wsrep::ws_handle&, + enum wsrep::provider::status replay(const wsrep::ws_handle& ws_handle, void* ctx) { - wsrep::client_state& cc( - *static_cast(ctx)); + wsrep::mock_high_priority_service& high_priority_service( + *static_cast(ctx)); + wsrep::mock_client_state& cc( + *high_priority_service.client_state()); wsrep::high_priority_context high_priority_context(cc); const wsrep::transaction& tc(cc.transaction()); wsrep::ws_meta ws_meta; @@ -191,8 +194,8 @@ namespace wsrep return replay_result_; } - if (server_state_.on_apply(cc, tc.ws_handle(), ws_meta, - wsrep::const_buffer())) + if (high_priority_service.apply(ws_handle, ws_meta, + wsrep::const_buffer())) { return wsrep::provider::error_fatal; } diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 95dfaed..861d697 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -7,6 +7,7 @@ #include "wsrep/server_state.hpp" #include "mock_client_state.hpp" +#include "mock_high_priority_service.hpp" #include "mock_provider.hpp" #include "wsrep/compiler.hpp" @@ -41,20 +42,37 @@ namespace wsrep ret->open(ret->id()); return ret; } - wsrep::client_state* streaming_applier_client_state() - { - wsrep::client_state* ret( - new wsrep::mock_client( - *this, ++last_client_id_, - wsrep::client_state::m_high_priority)); - ret->open(ret->id()); - return ret; - } - void release_client_state(wsrep::client_state* client_state) { delete client_state; } + + wsrep::high_priority_service* streaming_applier_service() + { + wsrep::mock_client* cs(new wsrep::mock_client( + *this, ++last_client_id_, + wsrep::client_state::m_high_priority)); + wsrep::mock_high_priority_service* ret( + new wsrep::mock_high_priority_service(*this, cs, false)); + cs->open(cs->id()); + cs->before_command(); + return ret; + } + + void release_high_priority_service( + wsrep::high_priority_service *high_priority_service) + WSREP_OVERRIDE + { + mock_high_priority_service* mhps( + dynamic_cast(high_priority_service)); + wsrep::client_state* cs(mhps->client_state()); + cs->after_command_before_result(); + cs->after_command_after_result(); + cs->close(); + cs->cleanup(); + delete cs; + delete mhps; + } void bootstrap() WSREP_OVERRIDE { } void log_message(enum wsrep::log::level level, const char* message) { diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index 9287230..b544b3b 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -16,6 +16,7 @@ namespace , cc(ss, wsrep::client_id(1), wsrep::client_state::m_high_priority) + , hps(ss, &cc, false) , ws_handle(1, (void*)1) , ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), wsrep::stid(wsrep::id("1"), 1, 1), @@ -24,9 +25,11 @@ namespace wsrep::provider::flag::commit) { cc.open(cc.id()); + cc.before_command(); } wsrep::mock_server_state ss; wsrep::mock_client cc; + wsrep::mock_high_priority_service hps; wsrep::ws_handle ws_handle; wsrep::ws_meta ws_meta; }; @@ -56,7 +59,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc, applying_server_fixture) { char buf[1] = { 1 }; - BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, + BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, wsrep::const_buffer(buf, 1)) == 0); const wsrep::transaction& txc(cc.transaction()); // ::abort(); @@ -70,7 +73,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc, applying_server_fixture) { char buf[1] = { 1 }; - BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, + BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, wsrep::const_buffer(buf, 1)) == 0); const wsrep::transaction& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction::s_committed); @@ -81,9 +84,9 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc, BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback, applying_server_fixture) { - cc.fail_next_applying_ = true; + hps.fail_next_applying_ = true; char buf[1] = { 1 }; - BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, + BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, wsrep::const_buffer(buf, 1)) == 1); const wsrep::transaction& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted); @@ -94,9 +97,9 @@ BOOST_FIXTURE_TEST_CASE(server_state_applying_1pc_rollback, BOOST_FIXTURE_TEST_CASE(server_state_applying_2pc_rollback, applying_server_fixture) { - cc.fail_next_applying_ = true; + hps.fail_next_applying_ = true; char buf[1] = { 1 }; - BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, + BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, wsrep::const_buffer(buf, 1)) == 1); const wsrep::transaction& txc(cc.transaction()); BOOST_REQUIRE(txc.state() == wsrep::transaction::s_aborted); @@ -109,13 +112,16 @@ BOOST_AUTO_TEST_CASE(server_state_streaming) wsrep::mock_client cc(ss, wsrep::client_id(1), wsrep::client_state::m_high_priority); + cc.debug_log_level(1); + wsrep::mock_high_priority_service hps(ss, &cc, false); wsrep::ws_handle ws_handle(1, (void*)1); wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), wsrep::stid(wsrep::id("1"), 1, 1), wsrep::seqno(0), wsrep::provider::flag::start_transaction); cc.open(cc.id()); - BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, + BOOST_REQUIRE(cc.before_command() == 0); + BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, wsrep::const_buffer("1", 1)) == 0); BOOST_REQUIRE(ss.find_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id())); @@ -123,13 +129,13 @@ BOOST_AUTO_TEST_CASE(server_state_streaming) wsrep::stid(wsrep::id("1"), 1, 1), wsrep::seqno(1), 0); - BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, + BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, wsrep::const_buffer("1", 1)) == 0); ws_meta = wsrep::ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)), wsrep::stid(wsrep::id("1"), 1, 1), wsrep::seqno(1), wsrep::provider::flag::commit); - BOOST_REQUIRE(ss.on_apply(cc, ws_handle, ws_meta, + BOOST_REQUIRE(ss.on_apply(hps, ws_handle, ws_meta, wsrep::const_buffer("1", 1)) == 0); BOOST_REQUIRE(ss.find_streaming_applier( ws_meta.server_id(), ws_meta.transaction_id()) == 0); diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index dc40a70..5793f81 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -250,7 +250,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( // Run before commit BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_must_abort); - BOOST_REQUIRE(cc.current_error() == wsrep::e_error_during_commit); + BOOST_REQUIRE(cc.current_error() == wsrep::e_size_exceeded_error); BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(tc.ordered() == false);