diff --git a/src/client_context.cpp b/src/client_context.cpp index 3399472..47200d6 100644 --- a/src/client_context.cpp +++ b/src/client_context.cpp @@ -5,6 +5,11 @@ #include "client_context.hpp" #include "transaction_context.hpp" +trrep::provider& trrep::client_context::provider() const +{ + return server_context_.provider(); +} + // TODO: This should be pure virtual method, implemented by // DBMS integration or mock classes only. int trrep::client_context::replay(trrep::transaction_context& tc) diff --git a/src/client_context.hpp b/src/client_context.hpp index ee06954..a8bc67b 100644 --- a/src/client_context.hpp +++ b/src/client_context.hpp @@ -5,7 +5,7 @@ #ifndef TRREP_CLIENT_CONTEXT_HPP #define TRREP_CLIENT_CONTEXT_HPP -#include "provider.hpp" +#include "server_context.hpp" #include "mutex.hpp" #include "lock.hpp" #include "data.hpp" @@ -13,7 +13,7 @@ namespace trrep { class server_context; - class transaction_context; + class provider; enum client_error { @@ -68,6 +68,7 @@ namespace trrep trrep::mutex& mutex() { return mutex_; } trrep::server_context& server_context() const { return server_context_; } + trrep::provider& provider() const; client_id id() const { return id_; } client_id id(const client_id& id) @@ -100,7 +101,8 @@ namespace trrep virtual int replay(trrep::transaction_context& tc); - virtual int apply(const trrep::data&) { return 0; } + virtual int apply(trrep::transaction_context&, + const trrep::data&) { return 0; } virtual void wait_for_replayers(trrep::unique_lock&) { } @@ -154,7 +156,6 @@ namespace trrep client_context& orig_context_; client_context& current_context_; }; - } #endif // TRREP_CLIENT_CONTEXT_HPP diff --git a/src/mock_server_context.hpp b/src/mock_server_context.hpp new file mode 100644 index 0000000..3ceada4 --- /dev/null +++ b/src/mock_server_context.hpp @@ -0,0 +1,47 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_MOCK_SERVER_CONTEXT_HPP +#define TRREP_MOCK_SERVER_CONTEXT_HPP + +#include "server_context.hpp" +#include "mock_provider_impl.hpp" + +namespace trrep +{ + class mock_server_context : public trrep::server_context + { + public: + mock_server_context(const std::string& name, + const std::string& id, + enum trrep::server_context::rollback_mode rollback_mode) + : trrep::server_context(name, id, rollback_mode) + , mock_provider_impl_() + , provider_(&mock_provider_impl_) + , last_client_id_(0) + { } + trrep::provider& provider() const + { return provider_; } + trrep::mock_provider_impl& mock_provider() const + { return mock_provider_impl_; } + trrep::client_context* local_client_context() + { + return new trrep::client_context(*this, ++last_client_id_, + trrep::client_context::m_local); + } + + void on_connect() { } + void on_view() { } + void on_sync() { } + void on_apply(trrep::transaction_context&) { } + void on_commit(trrep::transaction_context&) { } + + private: + mutable trrep::mock_provider_impl mock_provider_impl_; + mutable trrep::provider provider_; + unsigned long long last_client_id_; + }; +} + +#endif // TRREP_MOCK_SERVER_CONTEXT_HPP diff --git a/src/server_context.cpp b/src/server_context.cpp index 977490b..74781a6 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -4,87 +4,88 @@ #include "server_context.hpp" #include "client_context.hpp" +#include "transaction_context.hpp" + +// Todo: refactor into provider factory +#include "mock_provider_impl.hpp" +#include "wsrep_provider_v26.hpp" #include -#define START_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_START) -#define COMMIT_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_END) -#define ROLLBACK_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_ROLLBACK) - -#if 0 +#include namespace { + + inline bool starts_transaction(uint32_t flags) + { + return (flags & WSREP_FLAG_TRX_START); + } + + inline bool commits_transaction(uint32_t flags) + { + return (flags & WSREP_FLAG_TRX_END); + } + + inline bool rolls_back_transaction(uint32_t flags) + { + return (flags & WSREP_FLAG_ROLLBACK); + } + static wsrep_cb_status_t apply_cb(void* ctx, const wsrep_ws_handle_t* wsh, uint32_t flags, const wsrep_buf_t* buf, const wsrep_trx_meta_t* meta, - wsrep_bool_t* exit_loop) + wsrep_bool_t* exit_loop __attribute__((unused))) { wsrep_cb_status_t ret(WSREP_CB_SUCCESS); trrep::client_context* client_context( reinterpret_cast(ctx)); assert(client_context); - assert(client_context.mode() == trrep::client_context::m_applier); + assert(client_context->mode() == trrep::client_context::m_applier); - const trrep::server_context& server_context( - client_context->server_context()); trrep::data data(buf->ptr, buf->len); - if (START_TRANSACTION(flags) && COMMIT_TRANSACTION(flags)) + if (starts_transaction(flags) && commits_transaction(flags)) { - trrep::transaction_context transasction_context( - server_context.provider() - *client_context); - assert(transaction_context->active() == false); - transaction_context->start_transaction(meta->stid.trx); - if (client_context.apply(transaction_context, data)) + trrep::transaction_context transaction_context( + *client_context, + *wsh, + *meta); + assert(transaction_context.active() == false); + transaction_context.start_transaction(meta->stid.trx); + if (client_context->apply(transaction_context, data)) { ret = WSREP_CB_FAILURE; } - else if (client_context.commit(transaction_context)) + else if (client_context->commit(transaction_context)) { ret = WSREP_CB_FAILURE; } } - - else if (START_TRANSACTION(flags)) - { - // First fragment of SR transaction - trrep::client_context* sr_client_context( - server_context.client_context(node_id, client_id, trx_id)); - assert(sr_client_context->transaction_context().active() == false); - } - else if (COMMIT_TRANSACTION(flags)) - { - // Final fragment of SR transaction - trrep::client_context* sr_client_context( - server_context.client_context(node_id, client_id, trx_id)); - assert(sr_client_context->transaction_context().active()); - if (data.size() > 0) - { - sr_client_context->apply(data); - } - sr_client_context->commit(); - } else { - // Middle fragment of SR transaction - trrep::client_context* sr_client_context( - server_context.client_context(node_id, client_id, trx_id)); - assert(sr_client_context->transaction_context().active()); - sr_client_context->apply(data); + // SR not implemented yet + assert(0); } - + return ret; } - return ret; } -#endif // 0 -trrep::client_context* trrep::server_context::local_client_context() +int trrep::server_context::load_provider(const std::string& provider_spec) { - return new trrep::client_context(*this, ++client_id_, - trrep::client_context::m_local); + if (provider_spec == "mock") + { + provider_ = new trrep::provider(new trrep::mock_provider_impl); + } + else + { + struct wsrep_init_args init_args; + init_args.apply_cb = &apply_cb; + provider_ = new trrep::provider( + new trrep::wsrep_provider_v26(&init_args)); + } + return 0; } diff --git a/src/server_context.hpp b/src/server_context.hpp index 0f72d65..ec85e5f 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -5,11 +5,16 @@ #ifndef TRREP_SERVER_CONTEXT_HPP #define TRREP_SERVER_CONTEXT_HPP +#include "exception.hpp" + +#include "wsrep_api.h" + #include namespace trrep { // Forward declarations + class provider; class client_context; class transaction_context; @@ -26,20 +31,62 @@ namespace trrep server_context(const std::string& name, const std::string& id, enum rollback_mode rollback_mode) - : name_(name) + : provider_() + , name_(name) , id_(id) , rollback_mode_(rollback_mode) - , client_id_() - { } - const std::string& name() const { return name_; } - const std::string& id() const { return id_; } - virtual client_context* local_client_context(); - virtual void on_connect() { } - virtual void on_view() { } - virtual void on_sync() { } - virtual void on_apply(trrep::transaction_context&) { } - virtual void on_commit(trrep::transaction_context&) { } + { } + + // + // Return server name + // + const std::string& name() const { return name_; } + + // + // Return server identifier + // + const std::string& id() const { return id_; } + + // + // Create client context which acts only locally, i.e. does + // not participate in replication. However, local client + // connection may execute transactions which require ordering, + // as when modifying local SR fragment storage requires + // strict commit ordering. + // + virtual client_context* local_client_context() = 0; + + // + // Load provider + // + // @return Zero on success, non-zero on error + // + int load_provider(const std::string&); + + // + // Return reference to provider + // + // @return Reference to provider + // @throw trrep::runtime_error if provider has not been loaded + // + virtual trrep::provider& provider() const + { + if (provider_ == 0) + { + throw trrep::runtime_error("provider not loaded"); + } + return *provider_; + } + + // + // + // + virtual void on_connect() = 0; + virtual void on_view() = 0; + virtual void on_sync() = 0; + virtual void on_apply(trrep::transaction_context&) = 0; + virtual void on_commit(trrep::transaction_context&) = 0; virtual bool statement_allowed_for_streaming( @@ -50,11 +97,14 @@ namespace trrep return false; } private: + + server_context(const server_context&); + server_context& operator=(const server_context&); + + trrep::provider* provider_; std::string name_; std::string id_; enum rollback_mode rollback_mode_; - // TODO: This should be part of server mock - unsigned long long client_id_; }; } diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 4cf150e..0cdf374 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -14,6 +14,39 @@ // Public +trrep::transaction_context::transaction_context( + trrep::client_context& client_context) + : provider_(client_context.provider()) + , client_context_(client_context) + , id_(transaction_id::invalid()) + , state_(s_executing) + , state_hist_() + , ws_handle_() + , trx_meta_() + , pa_unsafe_(false) + , certified_(false) + , fragments_() + , rollback_replicated_for_(false) +{ } + +trrep::transaction_context::transaction_context( + trrep::client_context& client_context, + const wsrep_ws_handle_t& ws_handle, + const wsrep_trx_meta_t& trx_meta) + : provider_(client_context.provider()) + , client_context_(client_context) + , id_(trx_meta.stid.trx) + , state_(s_executing) + , state_hist_() + , ws_handle_(ws_handle) + , trx_meta_(trx_meta) + , pa_unsafe_() + , certified_(true) + , fragments_() + , rollback_replicated_for_(false) +{ } + + trrep::transaction_context::~transaction_context() { if (state() != s_committed && state() != s_aborted) @@ -388,6 +421,8 @@ void trrep::transaction_context::state( int trrep::transaction_context::certify_fragment( trrep::unique_lock& lock) { + // This method is not fully implemented and tested yet. + throw trrep::not_implemented_error(); assert(lock.owns_lock()); assert(client_context_.mode() == trrep::client_context::m_replicating); @@ -426,8 +461,7 @@ int trrep::transaction_context::certify_fragment( trrep::client_context_switch client_context_switch( client_context_, *sr_client_context); - trrep::transaction_context sr_transaction_context(provider_, - *sr_client_context); + trrep::transaction_context sr_transaction_context(*sr_client_context); if (sr_client_context->append_fragment(sr_transaction_context, flags, data)) { lock.lock(); diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp index 870d6af..c47e3d7 100644 --- a/src/transaction_context.hpp +++ b/src/transaction_context.hpp @@ -55,39 +55,10 @@ namespace trrep s_replaying }; static const int n_states = s_replaying + 1; - transaction_context(trrep::provider& provider, - trrep::client_context& client_context) - : provider_(provider) - , client_context_(client_context) - , id_(transaction_id::invalid()) - , state_(s_executing) - , state_hist_() - , ws_handle_() - , trx_meta_() - , pa_unsafe_(false) - , certified_(false) - , fragments_() - , rollback_replicated_for_(false) - { } - - transaction_context(trrep::provider& provider, - trrep::client_context& client_context, + transaction_context(trrep::client_context& client_context); + transaction_context(trrep::client_context& client_context, const wsrep_ws_handle_t& ws_handle, - const wsrep_trx_meta_t& trx_meta) - : provider_(provider) - , client_context_(client_context) - , id_(trx_meta.stid.trx) - , state_(s_executing) - , state_hist_() - , ws_handle_(ws_handle) - , trx_meta_(trx_meta) - , pa_unsafe_() - , certified_(true) - , fragments_() - , rollback_replicated_for_(false) - { } - - + const wsrep_trx_meta_t& trx_meta); ~transaction_context(); #if 0 transaction_context(trrep::provider& provider, diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp index 2fd7454..f95a563 100644 --- a/src/transaction_context_test.cpp +++ b/src/transaction_context_test.cpp @@ -4,7 +4,7 @@ #include "transaction_context.hpp" #include "client_context.hpp" -#include "server_context.hpp" +#include "mock_server_context.hpp" #include "provider.hpp" #include "mock_provider_impl.hpp" @@ -24,15 +24,14 @@ namespace } // BF abort method to abort transactions via provider - void bf_abort_provider(trrep::mock_provider_impl& provider, + void bf_abort_provider(trrep::mock_server_context& sc, const trrep::client_context& cc, const trrep::transaction_context& tc, wsrep_seqno_t seqno) { - provider.bf_abort(cc.id().get(), tc.id().get(), seqno); + sc.mock_provider().bf_abort(cc.id().get(), tc.id().get(), seqno); } - } // @@ -40,12 +39,10 @@ namespace // BOOST_AUTO_TEST_CASE(transaction_context_1pc) { - trrep::mock_provider_impl mock_provider; - trrep::provider provider(&mock_provider); - trrep::server_context sc("s1", "s1", - trrep::server_context::rm_sync); + trrep::mock_server_context sc("s1", "s1", + trrep::server_context::rm_sync); trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(provider, cc); + trrep::transaction_context tc(cc); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -82,12 +79,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc) // BOOST_AUTO_TEST_CASE(transaction_context_2pc) { - trrep::mock_provider_impl mock_provider; - trrep::provider provider(&mock_provider); - trrep::server_context sc("s1", "s1", + trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(provider, cc); + trrep::transaction_context tc(cc); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -131,12 +126,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc) // BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit) { - trrep::mock_provider_impl mock_provider; - trrep::provider provider(&mock_provider); - trrep::server_context sc("s1", "s1", + trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(provider, cc); + trrep::transaction_context tc(cc); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -174,12 +167,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit) // BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare) { - trrep::mock_provider_impl mock_provider; - trrep::provider provider(&mock_provider); - trrep::server_context sc("s1", "s1", + trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(provider, cc); + trrep::transaction_context tc(cc); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -217,12 +208,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare) // BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare) { - trrep::mock_provider_impl mock_provider; - trrep::provider provider(&mock_provider); - trrep::server_context sc("s1", "s1", + trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(provider, cc); + trrep::transaction_context tc(cc); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -265,12 +254,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare) // BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified) { - trrep::mock_provider_impl mock_provider; - trrep::provider provider(&mock_provider); - trrep::server_context sc("s1", "s1", + trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(provider, cc); + trrep::transaction_context tc(cc); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -282,7 +269,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); - bf_abort_provider(mock_provider, cc, tc, WSREP_SEQNO_UNDEFINED); + bf_abort_provider(sc, cc, tc, WSREP_SEQNO_UNDEFINED); // Run before commit BOOST_REQUIRE(tc.before_commit()); @@ -310,12 +297,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified // BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified) { - trrep::mock_provider_impl mock_provider; - trrep::provider provider(&mock_provider); - trrep::server_context sc("s1", "s1", + trrep::mock_server_context sc("s1", "s1", trrep::server_context::rm_sync); trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); - trrep::transaction_context tc(provider, cc); + trrep::transaction_context tc(cc); // Verify initial state BOOST_REQUIRE(tc.active() == false); @@ -327,7 +312,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified) BOOST_REQUIRE(tc.id() == trrep::transaction_id(1)); BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); - bf_abort_provider(mock_provider, cc, tc, 1); + bf_abort_provider(sc, cc, tc, 1); // Run before commit BOOST_REQUIRE(tc.before_commit()); diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp new file mode 100644 index 0000000..9a0ef63 --- /dev/null +++ b/src/wsrep_provider_v26.hpp @@ -0,0 +1,36 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_WSREP_PROVIDER_V26_HPP +#define TRREP_WSREP_PROVIDER_V26_HPP + +#include "provider_impl.hpp" + +#include + +namespace trrep +{ + class wsrep_provider_v26 : public trrep::provider_impl + { + public: + + wsrep_provider_v26(struct wsrep_init_args*) + { } + int start_transaction(wsrep_ws_handle_t*) { return 0; } + int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } + int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } + wsrep_status_t + certify(wsrep_conn_id_t, wsrep_ws_handle_t*, + uint32_t, + wsrep_trx_meta_t*) { return WSREP_OK; } + int rollback(const wsrep_trx_id_t) { return 0; } + wsrep_status commit_order_enter(wsrep_ws_handle_t*) { return WSREP_OK; } + int commit_order_leave(wsrep_ws_handle_t*) { return 0; } + int release(wsrep_ws_handle_t*) { return 0; } + + }; +} + + +#endif // TRREP_WSREP_PROVIDER_V26_HPP