diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index b6f2136..09a72e9 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -42,6 +42,7 @@ #include "provider.hpp" #include "transaction_context.hpp" #include "client_id.hpp" +#include "client_service.hpp" #include "mutex.hpp" #include "lock.hpp" #include "buffer.hpp" @@ -133,6 +134,15 @@ namespace wsrep assert(transaction_.active() == false); } + + /*! + * + */ + bool do_2pc() const + { + return client_service_.do_2pc(); + } + /*! * Method which should be called before the client * starts processing the command received from the application. @@ -210,29 +220,42 @@ namespace wsrep */ enum after_statement_result after_statement(); + // + // Replicating interface + // int start_transaction(const wsrep::transaction_id& id) { assert(state_ == s_exec); return transaction_.start_transaction(id); } - int start_transaction(const wsrep::ws_handle& wsh, - const wsrep::ws_meta& meta) + int append_key(const wsrep::key& key) { - assert(mode_ == m_applier); - return transaction_.start_transaction(wsh, meta); + assert(mode_ == m_replicating); + assert(state_ == s_exec); + return transaction_.append_key(key); } - int start_replaying(const wsrep::ws_meta& ws_meta) + int append_data(const wsrep::const_buffer& data) { - assert(mode_ == m_applier); - return transaction_.start_replaying(ws_meta); + assert(mode_ == m_replicating); + assert(state_ == s_exec); + return transaction_.append_data(data); } - void adopt_transaction(wsrep::transaction_context& transaction) + int prepare_data_for_replication(const wsrep::transaction_context& tc) { - transaction_.start_transaction(transaction.id()); - transaction_.streaming_context_ = transaction.streaming_context_; + return client_service_.prepare_data_for_replication(*this, tc); + } + + // + // Streaming interface + // + int after_row() + { + assert(mode_ == m_replicating); + assert(state_ == s_exec); + return transaction_.after_row(); } int enable_streaming( @@ -240,6 +263,7 @@ namespace wsrep fragment_unit, size_t fragment_size) { + assert(mode_ == m_replicating); if (transaction_.active() && transaction_.streaming_context_.fragment_unit() != fragment_unit) @@ -253,24 +277,64 @@ namespace wsrep fragment_unit, fragment_size); return 0; } - int append_key(const wsrep::key& key) + + size_t bytes_generated() const { - assert(state_ == s_exec); - return transaction_.append_key(key); + assert(mode_ == m_replicating); + return client_service_.bytes_generated(); } - int append_data(const wsrep::const_buffer& data) + int prepare_fragment_for_replication( + const wsrep::transaction_context& tc, + wsrep::mutable_buffer& mb) { - assert(state_ == s_exec); - return transaction_.append_data(data); + return client_service_.prepare_fragment_for_replication( + *this, tc, mb); } - - int after_row() + int append_fragment(const wsrep::transaction_context& tc, + int flags, + const wsrep::const_buffer& buf) { - assert(state_ == s_exec); - return transaction_.after_row(); + return client_service_.append_fragment(tc, flags, buf); } + /*! + * Remove fragments from the fragment storage. If the + * storage is transactional, this should be done within + * the same transaction which is committing. + */ + void remove_fragments() + { + client_service_.remove_fragments(transaction_); + } + + // + // Applying interface + // + + int start_transaction(const wsrep::ws_handle& wsh, + const wsrep::ws_meta& meta) + { + assert(mode_ == m_applier); + return transaction_.start_transaction(wsh, meta); + } + + int apply(const wsrep::const_buffer& data) + { + assert(mode_ == m_applier); + return client_service_.apply(*this, data); + } + + int commit() + { + assert(mode_ == m_applier); + return client_service_.commit(*this, + transaction_.ws_handle(), transaction_.ws_meta()); + } + // + // Commit ordering + // + int before_prepare() { wsrep::unique_lock lock(mutex_); @@ -302,22 +366,96 @@ namespace wsrep assert(state_ == s_exec); return transaction_.after_commit(); } + + // + // Rollback + // + int rollback() + { + return client_service_.rollback(*this); + } + int before_rollback() { assert(state_ == s_idle || state_ == s_exec || state_ == s_result); return transaction_.before_rollback(); } + int after_rollback() { assert(state_ == s_idle || state_ == s_exec || state_ == s_result); return transaction_.after_rollback(); } + // + // BF aborting + // int bf_abort(wsrep::unique_lock& lock, wsrep::seqno bf_seqno) { + assert(mode_ == m_replicating); return transaction_.bf_abort(lock, bf_seqno); } + + // + // Replaying + // + + + int start_replaying(const wsrep::ws_meta& ws_meta) + { + assert(mode_ == m_applier); + return transaction_.start_replaying(ws_meta); + } + + void adopt_transaction(wsrep::transaction_context& transaction) + { + assert(mode_ == m_applier); + transaction_.start_transaction(transaction.id()); + transaction_.streaming_context_ = transaction.streaming_context_; + } + + enum wsrep::provider::status replay( + wsrep::transaction_context& tc) + { + return client_service_.replay(tc); + } + + // + // + // + + void will_replay(const wsrep::transaction_context& tc) + { + client_service_.will_replay(tc); + } + void wait_for_replayers(wsrep::unique_lock& lock) + { + client_service_.wait_for_replayers(*this, lock); + } + + bool interrupted() const + { + return client_service_.interrupted(); + } + + void emergency_shutdown() + { + client_service_.emergency_shutdown(); + } + + // + // Debug interface + // + void debug_sync(const char* sync_point) + { + client_service_.debug_sync(*this, sync_point); + } + + void debug_crash(const char* crash_point) + { + client_service_.debug_crash(crash_point); + } /*! * Get reference to the client mutex. * @@ -395,11 +533,13 @@ namespace wsrep */ client_context(wsrep::mutex& mutex, wsrep::server_context& server_context, + wsrep::client_service& client_service, const client_id& id, enum mode mode) : thread_id_(wsrep::this_thread::get_id()) , mutex_(mutex) , server_context_(server_context) + , client_service_(client_service) , id_(id) , mode_(mode) , state_(s_idle) @@ -431,117 +571,12 @@ namespace wsrep */ void state(wsrep::unique_lock& lock, enum state state); - virtual bool is_autocommit() const = 0; - /*! - * Virtual method to return true if the client operates - * in two phase commit mode. - * - * \return True if two phase commit is required, false otherwise. - */ - virtual bool do_2pc() const = 0; - - - /*! - * Append SR fragment to the transaction. - */ - virtual int append_fragment(const wsrep::transaction_context&, - int, const wsrep::const_buffer&) = 0; - - virtual void remove_fragments(const wsrep::transaction_context&) = 0; - /*! - * This method applies a write set give in data buffer. - * This must be implemented by the DBMS integration. - * - * \return Zero on success, non-zero on applying failure. - */ - virtual int apply(const wsrep::const_buffer& data) = 0; - - /*! - * Virtual method which will be called - * in order to commit the transaction into - * storage engine. - * - * \return Zero on success, non-zero on failure. - */ - virtual int commit() = 0; - - /*! - * Rollback the transaction. - * - * This metod must be implemented by DBMS integration. - * - * \return Zero on success, no-zero on failure. - */ - virtual int rollback() = 0; - - /*! - * Notify a implementation that the client is about - * to replay the transaction. - */ - virtual void will_replay(wsrep::transaction_context&) = 0; - - /*! - * Replay the transaction. - */ - virtual enum wsrep::provider::status - replay(wsrep::transaction_context& tc) = 0; - - - /*! - * Wait until all of the replaying transactions have been committed. - */ - virtual void wait_for_replayers(wsrep::unique_lock&) = 0; - - virtual int prepare_data_for_replication( - const wsrep::transaction_context&) = 0; - - virtual size_t bytes_generated() const = 0; - virtual int prepare_fragment_for_replication( - const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0; - /*! - * Return true if the current client operation was killed. - */ - virtual bool killed() const = 0; - - /*! - * Abort server operation on fatal error. This should be used - * only for critical conditions which would sacrifice data - * consistency. - */ - virtual void abort() = 0; - - public: - /*! - * Set up thread global variables for client connection. - */ - virtual void store_globals() - { - thread_id_ = wsrep::this_thread::get_id(); - } - private: - - /*! - * Enter debug synchronization point. - */ - virtual void debug_sync(const char*) = 0; - - /*! - * - */ - virtual void debug_suicide(const char*) = 0; - - /*! - * Notify the implementation about an error. - */ - virtual void on_error(enum wsrep::client_error error) = 0; - /*! - * - */ void override_error(enum wsrep::client_error error); wsrep::thread::id thread_id_; wsrep::mutex& mutex_; wsrep::server_context& server_context_; + wsrep::client_service& client_service_; client_id id_; enum mode mode_; enum state state_; @@ -566,11 +601,11 @@ namespace wsrep : orig_context_(orig_context) , current_context_(current_context) { - current_context_.store_globals(); + current_context_.client_service_.store_globals(); } ~client_context_switch() { - orig_context_.store_globals(); + orig_context_.client_service_.store_globals(); } private: client_context& orig_context_; diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index c66b7d2..4afdc81 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -9,16 +9,22 @@ * which will be called by the wsrep-lib under certain circumstances, * for example when a transaction rollback is required by internal * wsrep-lib operation or applier client needs to apply a write set. - * - * \todo Figure out better name for this interface. */ +#include "buffer.hpp" + namespace wsrep { class client_service { public: + client_service(wsrep::provider& provider) + : provider_(provider) { } + /*! + * + */ virtual bool is_autocommit() const = 0; + /*! * Return true if two pahase commit is required for transaction * to commit. @@ -26,7 +32,8 @@ namespace wsrep virtual bool do_2pc() const = 0; /*! - * Return true if the current transaction has been interrupted. + * Return true if the current transaction has been interrupted + * by the DBMS. */ virtual bool interrupted() const = 0; @@ -45,28 +52,30 @@ namespace wsrep /*! * Set up a data for replication. */ - virtual int prepare_data_for_replication(wsrep::data& data) = 0; + virtual int prepare_data_for_replication(wsrep::client_context&, const wsrep::transaction_context&) = 0; + // + // Streaming + // + virtual size_t bytes_generated() const = 0; + virtual int prepare_fragment_for_replication( + wsrep::client_context&, const wsrep::transaction_context&, wsrep::mutable_buffer&) = 0; + virtual void remove_fragments(const wsrep::transaction_context&) = 0; /*! * Apply a write set. */ - virtual int apply() = 0; - - /*! - * Prepare a transaction for commit. - */ - virtual int prepare() = 0; + virtual int apply(wsrep::client_context&, const wsrep::const_buffer&) = 0; /*! * Commit transaction. */ - virtual int commit() = 0; + virtual int commit(wsrep::client_context&, const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; /*! * Roll back transaction. */ - virtual int rollback() = 0; + virtual int rollback(wsrep::client_context&) = 0; /*! * Forcefully shut down the DBMS process or replication system. @@ -84,7 +93,7 @@ namespace wsrep * \todo This should not be visible to DBMS level, should be * handled internally by wsrep-lib. */ - virtual void will_replay() = 0; + virtual void will_replay(const wsrep::transaction_context&) = 0; /*! * Replay the current transaction. The implementation must put @@ -94,7 +103,7 @@ namespace wsrep * \todo This should not be visible to DBMS level, should be * handled internally by wsrep-lib. */ - virtual int replay() = 0; + virtual enum wsrep::provider::status replay(wsrep::transaction_context&) = 0; /*! * Wait until all replaying transactions have been finished @@ -103,13 +112,32 @@ namespace wsrep * \todo This should not be visible to DBMS level, should be * handled internally by wsrep-lib. */ - virtual void wait_for_replayers() = 0; + virtual void wait_for_replayers(wsrep::client_context&, wsrep::unique_lock&) = 0; // Streaming replication /*! * Append a write set fragment into fragment storage. */ - virtual int append_fragment() = 0; + virtual int append_fragment(const wsrep::transaction_context&, int flag, const wsrep::const_buffer&) = 0; + + // + // Debug interface + // + /*! + * Enter debug sync point. + * + * @params sync_point Name of the debug sync point. + */ + virtual void debug_sync(wsrep::client_context&, const char* sync_point) = 0; + + /*! + * Forcefully kill the process if the crash_point has + * been enabled. + */ + virtual void debug_crash(const char* crash_point) = 0; + + protected: + wsrep::provider& provider_; }; /*! @@ -119,18 +147,6 @@ namespace wsrep class client_debug_callback { public: - /*! - * Enter debug sync point. - * - * @params sync_point Name of the debug sync point. - */ - void debug_sync(const char* sync_point) = 0; - - /*! - * Forcefully kill the process if the suicide_point has - * been enabled. - */ - void debug_suicide(const char* suicide_point) = 0; }; diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 08860b6..f3c5763 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -241,7 +241,9 @@ namespace wsrep /*! * Replay a transaction. * - * @return Zero in case of success, non-zero on failure. + * \todo Inspect if the ws_handle could be made const + * + * \return Zero in case of success, non-zero on failure. */ virtual enum status replay( wsrep::ws_handle& ws_handle, void* applier_ctx) = 0; diff --git a/src/client_context.cpp b/src/client_context.cpp index c3e9fe4..4ab38ff 100644 --- a/src/client_context.cpp +++ b/src/client_context.cpp @@ -58,7 +58,7 @@ int wsrep::client_context::before_command() wsrep::server_context::rm_async); override_error(wsrep::e_deadlock_error); lock.unlock(); - rollback(); + client_service_.rollback(*this); (void)transaction_.after_statement(); lock.lock(); assert(transaction_.state() == @@ -96,7 +96,7 @@ void wsrep::client_context::after_command_before_result() { override_error(wsrep::e_deadlock_error); lock.unlock(); - rollback(); + client_service_.rollback(*this); (void)transaction_.after_statement(); lock.lock(); assert(transaction_.state() == wsrep::transaction_context::s_aborted); @@ -116,7 +116,7 @@ void wsrep::client_context::after_command_after_result() transaction_.state() == wsrep::transaction_context::s_must_abort) { lock.unlock(); - rollback(); + client_service_.rollback(*this); lock.lock(); assert(transaction_.state() == wsrep::transaction_context::s_aborted); override_error(wsrep::e_deadlock_error); @@ -170,7 +170,7 @@ wsrep::client_context::after_statement() (void)transaction_.after_statement(); if (current_error() == wsrep::e_deadlock_error) { - if (is_autocommit()) + if (client_service_.is_autocommit()) { debug_log_state("after_statement: may_retry"); return asr_may_retry; diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 8cb90c1..4bb52b3 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -160,7 +160,7 @@ int wsrep::transaction_context::before_prepare( case wsrep::client_context::m_replicating: if (is_streaming()) { - client_context_.debug_suicide( + client_context_.debug_crash( "crash_last_fragment_commit_before_fragment_removal"); lock.unlock(); if (client_context_.server_context().statement_allowed_for_streaming( @@ -171,10 +171,10 @@ int wsrep::transaction_context::before_prepare( } else { - client_context_.remove_fragments(*this); + client_context_.remove_fragments(); } lock.lock(); - client_context_.debug_suicide( + client_context_.debug_crash( "crash_last_fragment_commit_after_fragment_removal"); } break; @@ -182,7 +182,7 @@ int wsrep::transaction_context::before_prepare( case wsrep::client_context::m_applier: if (is_streaming()) { - client_context_.remove_fragments(*this); + client_context_.remove_fragments(); } break; default: @@ -522,7 +522,7 @@ int wsrep::transaction_context::after_statement() ret = 1; break; default: - client_context_.abort(); + client_context_.emergency_shutdown(); break; } lock.lock(); @@ -811,7 +811,7 @@ int wsrep::transaction_context::certify_commit( return 1; } - if (client_context_.killed()) + if (client_context_.interrupted()) { lock.lock(); client_context_.override_error(wsrep::e_interrupted_error); @@ -901,7 +901,7 @@ int wsrep::transaction_context::certify_commit( case wsrep::provider::error_fatal: client_context_.override_error(wsrep::e_error_during_commit); state(lock, s_must_abort); - client_context_.abort(); + client_context_.emergency_shutdown(); break; case wsrep::provider::error_not_implemented: case wsrep::provider::error_not_allowed: diff --git a/test/client_context_fixture.hpp b/test/client_context_fixture.hpp index e9a2336..c132bef 100644 --- a/test/client_context_fixture.hpp +++ b/test/client_context_fixture.hpp @@ -17,7 +17,7 @@ namespace { replicating_client_fixture_sync_rm() : sc("s1", "s1", wsrep::server_context::rm_sync) - , cc(sc, wsrep::client_id(1), + , cc(sc, sc.client_service(), wsrep::client_id(1), wsrep::client_context::m_replicating) , tc(cc.transaction()) { @@ -36,7 +36,7 @@ namespace { replicating_client_fixture_async_rm() : sc("s1", "s1", wsrep::server_context::rm_async) - , cc(sc, wsrep::client_id(1), + , cc(sc, sc.client_service(), wsrep::client_id(1), wsrep::client_context::m_replicating) , tc(cc.transaction()) { @@ -55,10 +55,11 @@ namespace { replicating_client_fixture_2pc() : sc("s1", "s1", wsrep::server_context::rm_sync) - , cc(sc, wsrep::client_id(1), - wsrep::client_context::m_replicating, false, true) + , cc(sc, sc.client_service(), wsrep::client_id(1), + wsrep::client_context::m_replicating) , tc(cc.transaction()) { + sc.client_service().do_2pc_ = true; BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); // Verify initial state @@ -74,10 +75,11 @@ namespace { replicating_client_fixture_autocommit() : sc("s1", "s1", wsrep::server_context::rm_sync) - , cc(sc, wsrep::client_id(1), - wsrep::client_context::m_replicating, true) + , cc(sc, sc.client_service(), wsrep::client_id(1), + wsrep::client_context::m_replicating) , tc(cc.transaction()) { + sc.client_service().is_autocommit_ = true; BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); // Verify initial state @@ -94,7 +96,7 @@ namespace applying_client_fixture() : sc("s1", "s1", wsrep::server_context::rm_async) - , cc(sc, + , cc(sc, sc.client_service(), wsrep::client_id(1), wsrep::client_context::m_applier) , tc(cc.transaction()) @@ -122,11 +124,12 @@ namespace applying_client_fixture_2pc() : sc("s1", "s1", wsrep::server_context::rm_async) - , cc(sc, + , cc(sc, sc.client_service(), wsrep::client_id(1), - wsrep::client_context::m_applier, false, true) + wsrep::client_context::m_applier) , tc(cc.transaction()) { + sc.client_service().do_2pc_ = true; BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); wsrep::ws_handle ws_handle(1, (void*)1); @@ -149,7 +152,7 @@ namespace { streaming_client_fixture_row() : sc("s1", "s1", wsrep::server_context::rm_sync) - , cc(sc, wsrep::client_id(1), + , cc(sc, sc.client_service(), wsrep::client_id(1), wsrep::client_context::m_replicating) , tc(cc.transaction()) { @@ -169,7 +172,7 @@ namespace { streaming_client_fixture_byte() : sc("s1", "s1", wsrep::server_context::rm_sync) - , cc(sc, wsrep::client_id(1), + , cc(sc, sc.client_service(), wsrep::client_id(1), wsrep::client_context::m_replicating) , tc(cc.transaction()) { @@ -189,7 +192,7 @@ namespace { streaming_client_fixture_statement() : sc("s1", "s1", wsrep::server_context::rm_sync) - , cc(sc, wsrep::client_id(1), + , cc(sc, sc.client_service(), wsrep::client_id(1), wsrep::client_context::m_replicating) , tc(cc.transaction()) { diff --git a/test/mock_client_context.cpp b/test/mock_client_context.cpp index dfa2116..5ef9719 100644 --- a/test/mock_client_context.cpp +++ b/test/mock_client_context.cpp @@ -6,47 +6,49 @@ #include "mock_client_context.hpp" -int wsrep::mock_client_context::apply( +int wsrep::mock_client_service::apply( + wsrep::client_context& client_context, const wsrep::const_buffer& data __attribute__((unused))) { - assert(transaction_.state() == wsrep::transaction_context::s_executing || - transaction_.state() == wsrep::transaction_context::s_replaying); + assert(client_context.transaction().state() == wsrep::transaction_context::s_executing || + client_context.transaction().state() == wsrep::transaction_context::s_replaying); return (fail_next_applying_ ? 1 : 0); } -int wsrep::mock_client_context::commit() +int wsrep::mock_client_service::commit(wsrep::client_context& client_context, const wsrep::ws_handle&, const wsrep::ws_meta&) { int ret(0); if (do_2pc()) { - if (before_prepare()) + if (client_context.before_prepare()) { ret = 1; } - else if (after_prepare()) + else if (client_context.after_prepare()) { ret = 1; } } if (ret == 0 && - (transaction_.before_commit() || - transaction_.ordered_commit() || - transaction_.after_commit())) + (client_context.before_commit() || + client_context.ordered_commit() || + client_context.after_commit())) { ret = 1; } return ret; } -int wsrep::mock_client_context::rollback() +int wsrep::mock_client_service::rollback( + wsrep::client_context& client_context) { int ret(0); - if (transaction_.before_rollback()) + if (client_context.before_rollback()) { ret = 1; } - else if (transaction_.after_rollback()) + else if (client_context.after_rollback()) { ret = 1; } diff --git a/test/mock_client_context.hpp b/test/mock_client_context.hpp index 16c0823..1388bd9 100644 --- a/test/mock_client_context.hpp +++ b/test/mock_client_context.hpp @@ -2,8 +2,8 @@ // Copyright (C) 2018 Codership Oy // -#ifndef WSREP_FAKE_CLIENT_CONTEXT_HPP -#define WSREP_FAKE_CLIENT_CONTEXT_HPP +#ifndef WSREP_MOCK_CLIENT_CONTEXT_HPP +#define WSREP_MOCK_CLIENT_CONTEXT_HPP #include "wsrep/client_context.hpp" #include "wsrep/mutex.hpp" @@ -17,16 +17,35 @@ namespace wsrep { public: mock_client_context(wsrep::server_context& server_context, + wsrep::client_service& client_service, const wsrep::client_id& id, - enum wsrep::client_context::mode mode, - bool is_autocommit = false, - bool do_2pc = false) - : wsrep::client_context(mutex_, server_context, id, mode) + enum wsrep::client_context::mode mode) + : wsrep::client_context(mutex_, server_context, client_service, id, mode) // Note: Mutex is initialized only after passed // to client_context constructor. , mutex_() - , is_autocommit_(is_autocommit) - , do_2pc_(do_2pc) + { } + ~mock_client_context() + { + if (transaction().active()) + { + (void)rollback(); + } + } + private: + wsrep::default_mutex mutex_; + public: + private: + }; + + + class mock_client_service : public wsrep::client_service + { + public: + mock_client_service(wsrep::provider& provider) + : wsrep::client_service(provider) + , is_autocommit_() + , do_2pc_() , fail_next_applying_() , bf_abort_during_wait_() , error_during_prepare_data_() @@ -37,43 +56,58 @@ namespace wsrep , replays_() , aborts_() { } - ~mock_client_context() - { - if (transaction().active()) - { - (void)rollback(); - } - } - int apply(const wsrep::const_buffer&); - int commit(); - int rollback(); - bool is_autocommit() const { return is_autocommit_; } - bool do_2pc() const { return do_2pc_; } + + int apply(wsrep::client_context&, const wsrep::const_buffer&) WSREP_OVERRIDE; + + int commit(wsrep::client_context&, const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; + + int rollback(wsrep::client_context&) WSREP_OVERRIDE; + + bool is_autocommit() const WSREP_OVERRIDE + { return is_autocommit_; } + + bool do_2pc() const WSREP_OVERRIDE + { return do_2pc_; } + + bool interrupted() const WSREP_OVERRIDE + { return false; } + + void reset_globals() WSREP_OVERRIDE { } + void emergency_shutdown() WSREP_OVERRIDE { } + int append_fragment(const wsrep::transaction_context&, int, const wsrep::const_buffer&) WSREP_OVERRIDE { return 0; } void remove_fragments(const wsrep::transaction_context& ) WSREP_OVERRIDE { } - void will_replay(wsrep::transaction_context&) WSREP_OVERRIDE { } + void will_replay(const wsrep::transaction_context&) + WSREP_OVERRIDE { } + enum wsrep::provider::status replay(wsrep::transaction_context& tc) WSREP_OVERRIDE { enum wsrep::provider::status ret( - provider().replay(tc.ws_handle(), this)); + provider_.replay(tc.ws_handle(), this)); ++replays_; return ret; } - void wait_for_replayers(wsrep::unique_lock& lock) + + void wait_for_replayers( + wsrep::client_context& client_context, + wsrep::unique_lock& lock) WSREP_OVERRIDE { lock.unlock(); if (bf_abort_during_wait_) { - wsrep_test::bf_abort_unordered(*this); + wsrep_test::bf_abort_unordered(client_context); } lock.lock(); } + int prepare_data_for_replication( + wsrep::client_context& client_context, const wsrep::transaction_context&) WSREP_OVERRIDE { if (error_during_prepare_data_) @@ -82,7 +116,7 @@ namespace wsrep } static const char buf[1] = { 1 }; wsrep::const_buffer data = wsrep::const_buffer(buf, 1); - return transaction_.append_data(data); + return client_context.append_data(data); } size_t bytes_generated() const @@ -90,8 +124,10 @@ namespace wsrep return bytes_generated_; } - int prepare_fragment_for_replication(const wsrep::transaction_context&, - wsrep::mutable_buffer& buffer) + int prepare_fragment_for_replication( + wsrep::client_context& client_context, + const wsrep::transaction_context&, + wsrep::mutable_buffer& buffer) WSREP_OVERRIDE { if (error_during_prepare_data_) @@ -101,39 +137,40 @@ namespace wsrep static const char buf[1] = { 1 }; buffer.push_back(&buf[0], &buf[1]); wsrep::const_buffer data(buffer.data(), buffer.size()); - return transaction_.append_data(data); + return client_context.append_data(data); } bool killed() const WSREP_OVERRIDE { return killed_before_certify_; } void abort() WSREP_OVERRIDE { ++aborts_; } + void store_globals() WSREP_OVERRIDE { } - void debug_sync(const char* sync_point) WSREP_OVERRIDE + + void debug_sync(wsrep::client_context& client_context, + const char* sync_point) WSREP_OVERRIDE { if (sync_point_enabled_ == sync_point) { switch (sync_point_action_) { case spa_bf_abort_unordered: - wsrep_test::bf_abort_unordered(*this); + wsrep_test::bf_abort_unordered(client_context); break; case spa_bf_abort_ordered: - wsrep_test::bf_abort_ordered(*this); + wsrep_test::bf_abort_ordered(client_context); break; } } } - void debug_suicide(const char*) WSREP_OVERRIDE + + void debug_crash(const char*) WSREP_OVERRIDE { // Not going to do this while unit testing } - void on_error(enum wsrep::client_error) { } - size_t replays() const { return replays_; } - size_t aborts() const { return aborts_; } + + // + // Knobs to tune the behavior // - private: - wsrep::default_mutex mutex_; - public: bool is_autocommit_; bool do_2pc_; bool fail_next_applying_; @@ -147,10 +184,18 @@ namespace wsrep spa_bf_abort_ordered } sync_point_action_; size_t bytes_generated_; + + // + // Verifying the state + // + size_t replays() const { return replays_; } + size_t aborts() const { return aborts_; } + private: size_t replays_; size_t aborts_; }; + } -#endif // WSREP_FAKE_CLIENT_CONTEXT_HPP +#endif // WSREP_MOCK_CLIENT_CONTEXT_HPP diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 6f552be..79d216c 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -13,6 +13,8 @@ #include #include // todo: proper logging +#include + namespace wsrep { class mock_provider : public wsrep::provider @@ -128,16 +130,27 @@ namespace wsrep return 0; } enum wsrep::provider::status - commit_order_enter(const wsrep::ws_handle&, - const wsrep::ws_meta&) - { return commit_order_enter_result_; } + commit_order_enter(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) + { + BOOST_REQUIRE(ws_handle.opaque()); + BOOST_REQUIRE(ws_meta.seqno().nil() == false); + return commit_order_enter_result_; + } - int commit_order_leave(const wsrep::ws_handle&, - const wsrep::ws_meta&) - { return commit_order_leave_result_;} + int commit_order_leave(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) + { + BOOST_REQUIRE(ws_handle.opaque()); + BOOST_REQUIRE(ws_meta.seqno().nil() == false); + return commit_order_leave_result_; + } - int release(wsrep::ws_handle&) - { return release_result_; } + int release(wsrep::ws_handle& ) + { + // BOOST_REQUIRE(ws_handle.opaque()); + return release_result_; + } enum wsrep::provider::status replay(wsrep::ws_handle&, void* ctx) { diff --git a/test/mock_server_context.hpp b/test/mock_server_context.hpp index 3cd5073..a98c406 100644 --- a/test/mock_server_context.hpp +++ b/test/mock_server_context.hpp @@ -24,21 +24,25 @@ namespace wsrep , mutex_() , cond_() , provider_(*this) + , client_service_(provider_) , last_client_id_(0) { } wsrep::mock_provider& provider() const { return provider_; } wsrep::client_context* local_client_context() { - return new wsrep::mock_client_context(*this, ++last_client_id_, - wsrep::client_context::m_local); + return new wsrep::mock_client_context( + *this, client_service_, ++last_client_id_, + wsrep::client_context::m_local); } wsrep::client_context* streaming_applier_client_context() { return new wsrep::mock_client_context( - *this, ++last_client_id_, wsrep::client_context::m_applier); + *this, client_service_, ++last_client_id_, + wsrep::client_context::m_applier); } - void log_dummy_write_set(wsrep::client_context&, const wsrep::ws_meta&) + void log_dummy_write_set(wsrep::client_context&, + const wsrep::ws_meta&) WSREP_OVERRIDE { // @@ -61,11 +65,15 @@ namespace wsrep // void sst_received(const wsrep_gtid_t&, int) WSREP_OVERRIDE { } // void on_apply(wsrep::transaction_context&) { } // void on_commit(wsrep::transaction_context&) { } - + wsrep::mock_client_service& client_service() + { + return client_service_; + } private: wsrep::default_mutex mutex_; wsrep::default_condition_variable cond_; mutable wsrep::mock_provider provider_; + wsrep::mock_client_service client_service_; unsigned long long last_client_id_; }; } diff --git a/test/server_context_test.cpp b/test/server_context_test.cpp index 399e078..9f3d90a 100644 --- a/test/server_context_test.cpp +++ b/test/server_context_test.cpp @@ -13,10 +13,9 @@ namespace applying_server_fixture() : sc("s1", "s1", wsrep::server_context::rm_sync) - , cc(sc, + , cc(sc, sc.client_service(), wsrep::client_id(1), - wsrep::client_context::m_applier, - false) + wsrep::client_context::m_applier) , ws_handle(1, (void*)1) , ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), wsrep::stid(wsrep::id("1"), 1, 1), @@ -62,7 +61,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc, BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc_rollback, applying_server_fixture) { - cc.fail_next_applying_ = true; + sc.client_service().fail_next_applying_ = true; char buf[1] = { 1 }; BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, wsrep::const_buffer(buf, 1)) == 1); @@ -75,7 +74,7 @@ BOOST_FIXTURE_TEST_CASE(server_context_applying_1pc_rollback, BOOST_FIXTURE_TEST_CASE(server_context_applying_2pc_rollback, applying_server_fixture) { - cc.fail_next_applying_ = true; + sc.client_service().fail_next_applying_ = true; char buf[1] = { 1 }; BOOST_REQUIRE(sc.on_apply(cc, ws_handle, ws_meta, wsrep::const_buffer(buf, 1)) == 1); @@ -88,9 +87,9 @@ BOOST_AUTO_TEST_CASE(server_context_streaming) wsrep::mock_server_context sc("s1", "s1", wsrep::server_context::rm_sync); wsrep::mock_client_context cc(sc, + sc.client_service(), wsrep::client_id(1), - wsrep::client_context::m_applier, - false); + wsrep::client_context::m_applier); wsrep::ws_handle ws_handle(1, (void*)1); wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), wsrep::stid(wsrep::id("1"), 1, 1), diff --git a/test/transaction_context_test.cpp b/test/transaction_context_test.cpp index 9c3de8c..8740d0c 100644 --- a/test/transaction_context_test.cpp +++ b/test/transaction_context_test.cpp @@ -198,6 +198,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( transaction_context_1pc_bf_during_commit_wait_for_replayers, T, replicating_fixtures, T) { + wsrep::mock_server_context& sc(T::sc); wsrep::mock_client_context& cc(T::cc); const wsrep::transaction_context& tc(T::tc); @@ -207,7 +208,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.bf_abort_during_wait_ = true; + sc.client_service().bf_abort_during_wait_ = true; // Run before commit BOOST_REQUIRE(cc.before_commit()); @@ -236,6 +237,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( transaction_context_1pc_error_during_prepare_data, T, replicating_fixtures, T) { + wsrep::mock_server_context& sc(T::sc); wsrep::mock_client_context& cc(T::cc); const wsrep::transaction_context& tc(T::tc); @@ -245,7 +247,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.error_during_prepare_data_ = true; + sc.client_service().error_during_prepare_data_ = true; // Run before commit BOOST_REQUIRE(cc.before_commit()); @@ -275,6 +277,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( transaction_context_1pc_killed_before_certify, T, replicating_fixtures, T) { + wsrep::mock_server_context& sc(T::sc); wsrep::mock_client_context& cc(T::cc); const wsrep::transaction_context& tc(T::tc); @@ -284,7 +287,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.killed_before_certify_ = true; + sc.client_service().killed_before_certify_ = true; // Run before commit BOOST_REQUIRE(cc.before_commit()); @@ -386,7 +389,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(cc.current_error() == wsrep::e_success); - BOOST_REQUIRE(cc.replays() == 1); + BOOST_REQUIRE(sc.client_service().replays() == 1); } // @@ -407,8 +410,8 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.active()); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.sync_point_enabled_ = "wsrep_before_certification"; - cc.sync_point_action_ = wsrep::mock_client_context::spa_bf_abort_unordered; + sc.client_service().sync_point_enabled_ = "wsrep_before_certification"; + sc.client_service().sync_point_action_ = wsrep::mock_client_service::spa_bf_abort_unordered; sc.provider().certify_result_ = wsrep::provider::error_certification_failed; BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_cert_failed); @@ -669,7 +672,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.certified() == false); BOOST_REQUIRE(cc.current_error() == wsrep::e_error_during_commit); - BOOST_REQUIRE(cc.aborts() == 1); + BOOST_REQUIRE(sc.client_service().aborts() == 1); } // @@ -721,7 +724,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( transaction_context_1pc_bf_abort_before_certify_regain_lock, T, replicating_fixtures, T) { - // wsrep::mock_server_context& sc(T::sc); + wsrep::mock_server_context& sc(T::sc); wsrep::mock_client_context& cc(T::cc); const wsrep::transaction_context& tc(T::tc); @@ -731,8 +734,8 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_executing); - cc.sync_point_enabled_ = "wsrep_after_certification"; - cc.sync_point_action_ = wsrep::mock_client_context::spa_bf_abort_ordered; + sc.client_service().sync_point_enabled_ = "wsrep_after_certification"; + sc.client_service().sync_point_action_ = wsrep::mock_client_service::spa_bf_abort_ordered; // Run before commit BOOST_REQUIRE(cc.before_commit()); BOOST_REQUIRE(tc.state() == wsrep::transaction_context::s_must_replay); @@ -747,7 +750,7 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( // Cleanup after statement cc.after_statement(); - BOOST_REQUIRE(cc.replays() == 1); + BOOST_REQUIRE(sc.client_service().replays() == 1); BOOST_REQUIRE(tc.active() == false); BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.certified() == false); @@ -1126,7 +1129,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_byte_streaming_1pc_commit, streaming_client_fixture_byte) { BOOST_REQUIRE(cc.start_transaction(1) == 0); - cc.bytes_generated_ = 1; + sc.client_service().bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0); @@ -1145,10 +1148,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_context_byte_batch_streaming_1pc_commit, cc.enable_streaming( wsrep::streaming_context::bytes, 2) == 0); BOOST_REQUIRE(cc.start_transaction(1) == 0); - cc.bytes_generated_ = 1; + sc.client_service().bytes_generated_ = 1; BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 0); - cc.bytes_generated_ = 2; + sc.client_service().bytes_generated_ = 2; BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(tc.streaming_context_.fragments_certified() == 1); BOOST_REQUIRE(cc.before_commit() == 0);