From 468e66dea06b0c9ca6cf9686f7e1ac6f02894107 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Tue, 17 Apr 2018 16:37:03 +0300 Subject: [PATCH] Applier side transaction state changes and tests --- src/mock_server_context.hpp | 4 +- src/server_context.cpp | 61 ++++++++++------ src/server_context.hpp | 11 ++- src/transaction_context.cpp | 117 ++++++++++++++++++++++--------- src/transaction_context.hpp | 33 +++------ src/transaction_context_test.cpp | 66 +++++++++++++++++ 6 files changed, 212 insertions(+), 80 deletions(-) diff --git a/src/mock_server_context.hpp b/src/mock_server_context.hpp index 3ceada4..c8ca3bb 100644 --- a/src/mock_server_context.hpp +++ b/src/mock_server_context.hpp @@ -34,8 +34,8 @@ namespace trrep void on_connect() { } void on_view() { } void on_sync() { } - void on_apply(trrep::transaction_context&) { } - void on_commit(trrep::transaction_context&) { } + // void on_apply(trrep::transaction_context&) { } + // void on_commit(trrep::transaction_context&) { } private: mutable trrep::mock_provider_impl mock_provider_impl_; diff --git a/src/server_context.cpp b/src/server_context.cpp index 74781a6..5f76570 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -47,28 +47,14 @@ namespace assert(client_context->mode() == trrep::client_context::m_applier); trrep::data data(buf->ptr, buf->len); - - if (starts_transaction(flags) && commits_transaction(flags)) + trrep::transaction_context transaction_context(*client_context, + *wsh, + *meta, + flags); + if (client_context->server_context().on_apply( + *client_context, transaction_context, data)) { - trrep::transaction_context transaction_context( - *client_context, - *wsh, - *meta); - assert(transaction_context.active() == false); - transaction_context.start_transaction(meta->stid.trx); - if (client_context->apply(transaction_context, data)) - { - ret = WSREP_CB_FAILURE; - } - else if (client_context->commit(transaction_context)) - { - ret = WSREP_CB_FAILURE; - } - } - else - { - // SR not implemented yet - assert(0); + ret = WSREP_CB_FAILURE; } return ret; } @@ -89,3 +75,36 @@ int trrep::server_context::load_provider(const std::string& provider_spec) } return 0; } + +int trrep::server_context::on_apply( + trrep::client_context& client_context, + trrep::transaction_context& transaction_context, + const trrep::data& data) +{ + int ret(0); + if (starts_transaction(transaction_context.flags()) && + commits_transaction(transaction_context.flags())) + { + assert(transaction_context.active() == false); + transaction_context.start_transaction(); + if (client_context.apply(transaction_context, data)) + { + ret = 1; + } + else if (client_context.commit(transaction_context)) + { + ret = 1; + } + } + else + { + // SR not implemented yet + assert(0); + } + + if (ret) + { + client_context.rollback(transaction_context); + } + return ret; +} diff --git a/src/server_context.hpp b/src/server_context.hpp index ec85e5f..1c3ce3f 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -17,6 +17,7 @@ namespace trrep class provider; class client_context; class transaction_context; + class data; class server_context { @@ -85,9 +86,15 @@ namespace trrep virtual void on_connect() = 0; virtual void on_view() = 0; virtual void on_sync() = 0; - virtual void on_apply(trrep::transaction_context&) = 0; - virtual void on_commit(trrep::transaction_context&) = 0; + // + // This method will be called by the applier thread when + // a remote write set is being applied. It is the responsibility + // of the caller to set up transaction context and data properly. + // + int on_apply(trrep::client_context& client_context, + trrep::transaction_context& transaction_context, + const trrep::data& data); virtual bool statement_allowed_for_streaming( const trrep::client_context&, diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index 0cdf374..2eef5b5 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -23,6 +23,7 @@ trrep::transaction_context::transaction_context( , state_hist_() , ws_handle_() , trx_meta_() + , flags_() , pa_unsafe_(false) , certified_(false) , fragments_() @@ -32,14 +33,16 @@ trrep::transaction_context::transaction_context( trrep::transaction_context::transaction_context( trrep::client_context& client_context, const wsrep_ws_handle_t& ws_handle, - const wsrep_trx_meta_t& trx_meta) + const wsrep_trx_meta_t& trx_meta, + uint32_t flags) : provider_(client_context.provider()) , client_context_(client_context) - , id_(trx_meta.stid.trx) + , id_(transaction_id::invalid()) , state_(s_executing) , state_hist_() , ws_handle_(ws_handle) , trx_meta_(trx_meta) + , flags_(flags) , pa_unsafe_() , certified_(true) , fragments_() @@ -55,6 +58,27 @@ trrep::transaction_context::~transaction_context() } } +int trrep::transaction_context::start_transaction( + const trrep::transaction_id& id) +{ + assert(active() == false); + id_ = id; + ws_handle_.trx_id = id_.get(); + flags_ |= WSREP_FLAG_TRX_START; + switch (client_context_.mode()) + { + case trrep::client_context::m_local: + case trrep::client_context::m_applier: + return 0; + case trrep::client_context::m_replicating: + return provider_.start_transaction(&ws_handle_); + default: + assert(0); + return 1; + } +} + + int trrep::transaction_context::append_key(const trrep::key& key) { @@ -73,35 +97,44 @@ int trrep::transaction_context::before_prepare() trrep::unique_lock lock(client_context_.mutex()); - assert(client_context_.mode() == trrep::client_context::m_replicating); assert(state() == s_executing || state() == s_must_abort); if (state() == s_must_abort) { + assert(client_context_.mode() == trrep::client_context::m_replicating); return 1; } state(lock, s_preparing); - if (is_streaming()) + + switch (client_context_.mode()) { - client_context_.debug_suicide( - "crash_last_fragment_commit_before_fragment_removal"); - lock.unlock(); - if (client_context_.server_context().statement_allowed_for_streaming( - client_context_, *this)) + case trrep::client_context::m_replicating: + if (is_streaming()) { - client_context_.override_error(trrep::e_error_during_commit); - ret = 1; + client_context_.debug_suicide( + "crash_last_fragment_commit_before_fragment_removal"); + lock.unlock(); + if (client_context_.server_context().statement_allowed_for_streaming( + client_context_, *this)) + { + client_context_.override_error(trrep::e_error_during_commit); + ret = 1; + } + else + { + remove_fragments(); + } + lock.lock(); + client_context_.debug_suicide( + "crash_last_fragment_commit_after_fragment_removal"); } - else - { - remove_fragments(); - } - lock.lock(); - client_context_.debug_suicide( - "crash_last_fragment_commit_after_fragment_removal"); + break; + case trrep::client_context::m_local: + case trrep::client_context::m_applier: + break; } - assert(client_context_.mode() == trrep::client_context::m_replicating); + assert(state() == s_preparing); return ret; } @@ -111,27 +144,37 @@ int trrep::transaction_context::after_prepare() int ret(1); trrep::unique_lock lock(client_context_.mutex()); - assert(client_context_.mode() == trrep::client_context::m_replicating); assert(state() == s_preparing || state() == s_must_abort); if (state() == s_must_abort) { + assert(client_context_.mode() == trrep::client_context::m_replicating); return 1; } - if (state() == s_preparing) + switch (client_context_.mode()) { - ret = certify_commit(lock); - assert((ret == 0 || state() == s_committing) || - (state() == s_must_abort || - state() == s_must_replay || - state() == s_cert_failed)); + case trrep::client_context::m_replicating: + if (state() == s_preparing) + { + ret = certify_commit(lock); + assert((ret == 0 || state() == s_committing) || + (state() == s_must_abort || + state() == s_must_replay || + state() == s_cert_failed)); + } + else + { + assert(state() == s_must_abort); + client_context_.override_error(trrep::e_deadlock_error); + } + break; + case trrep::client_context::m_local: + case trrep::client_context::m_applier: + state(lock, s_certifying); + state(lock, s_committing); + ret = 0; + break; } - else - { - assert(state() == s_must_abort); - client_context_.override_error(trrep::e_deadlock_error); - } - assert(client_context_.mode() == trrep::client_context::m_replicating); return ret; } @@ -191,7 +234,6 @@ int trrep::transaction_context::before_commit() } lock.lock(); } - break; case trrep::client_context::m_applier: assert(ordered()); @@ -200,6 +242,15 @@ int trrep::transaction_context::before_commit() { state(lock, s_must_abort); } + else + { + if (state() == s_executing) + { + // 1pc + state(lock, s_certifying); + } + state(lock, s_committing); + } break; } return ret; diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp index c47e3d7..0735513 100644 --- a/src/transaction_context.hpp +++ b/src/transaction_context.hpp @@ -58,20 +58,9 @@ namespace trrep transaction_context(trrep::client_context& client_context); transaction_context(trrep::client_context& client_context, const wsrep_ws_handle_t& ws_handle, - const wsrep_trx_meta_t& trx_meta); + const wsrep_trx_meta_t& trx_meta, + uint32_t flags); ~transaction_context(); -#if 0 - transaction_context(trrep::provider& provider, - trrep::client_context& client_context, - const transaction_id& id) - : provider_(provider) - , client_context_(client_context) - , id_(id) - , state_(s_executing) - , ws_handle_() - , gtid_() - { } -#endif // Accessors trrep::transaction_id id() const { return id_; } @@ -101,14 +90,15 @@ namespace trrep bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } // - int start_transaction(const trrep::transaction_id& id) + int start_transaction() { assert(active() == false); - id_ = id; - ws_handle_.trx_id = id_.get(); - return provider_.start_transaction(&ws_handle_); + assert(trx_meta_.stid.trx != transaction_id::invalid()); + return start_transaction(trx_meta_.stid.trx); } + int start_transaction(const trrep::transaction_id& id); + int append_key(const trrep::key&); int append_data(const trrep::data&); @@ -133,14 +123,11 @@ namespace trrep int after_statement(); - private: uint32_t flags() const { - uint32_t ret(0); - if (is_streaming() == false) ret |= WSREP_FLAG_TRX_START; - if (pa_unsafe()) ret |= WSREP_FLAG_PA_UNSAFE; - return ret; + return flags_; } + private: int certify_fragment(trrep::unique_lock&); int certify_commit(trrep::unique_lock&); void remove_fragments(); @@ -153,6 +140,7 @@ namespace trrep std::vector state_hist_; wsrep_ws_handle_t ws_handle_; wsrep_trx_meta_t trx_meta_; + uint32_t flags_; bool pa_unsafe_; bool certified_; @@ -165,6 +153,7 @@ namespace trrep switch (state) { case trrep::transaction_context::s_executing: return "executing"; + case trrep::transaction_context::s_preparing: return "preparing"; case trrep::transaction_context::s_certifying: return "certifying"; case trrep::transaction_context::s_committing: return "committing"; case trrep::transaction_context::s_ordered_commit: return "ordered_commit"; diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp index f95a563..4332eb7 100644 --- a/src/transaction_context_test.cpp +++ b/src/transaction_context_test.cpp @@ -32,6 +32,23 @@ namespace sc.mock_provider().bf_abort(cc.id().get(), tc.id().get(), seqno); } + trrep::transaction_context applying_transaction( + trrep::client_context& cc, + trrep::transaction_id id, + wsrep_seqno_t seqno, + uint32_t flags) + { + wsrep_ws_handle_t ws_handle = { id.get(), 0 }; + wsrep_trx_meta_t meta = { + { {1 }, seqno }, /* gtid */ + { { static_cast(cc.id().get()) }, id.get(), cc.id().get() }, /* stid */ + seqno - 1 + }; + trrep::transaction_context ret(cc, ws_handle, meta, flags); + return ret; + } + + } // @@ -332,3 +349,52 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified) BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.certified() == false); } + +BOOST_AUTO_TEST_CASE(transaction_context_1pc_applying) +{ + trrep::mock_server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, + trrep::client_id(1), + trrep::client_context::m_applier); + trrep::transaction_context tc(applying_transaction( + cc, 1, 1, + WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); + + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.start_transaction() == 0); + BOOST_REQUIRE(tc.active() == true); + BOOST_REQUIRE(tc.certified() == true); + BOOST_REQUIRE(tc.ordered() == true); + + BOOST_REQUIRE(tc.before_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing); + BOOST_REQUIRE(tc.ordered_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit); + BOOST_REQUIRE(tc.after_commit() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed); +} + +BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying) +{ + trrep::mock_server_context sc("s1", "s1", + trrep::server_context::rm_sync); + trrep::client_context cc(sc, + trrep::client_id(1), + trrep::client_context::m_applier); + trrep::transaction_context tc(applying_transaction( + cc, 1, 1, + WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END)); + + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(tc.start_transaction() == 0); + BOOST_REQUIRE(tc.active() == true); + BOOST_REQUIRE(tc.certified() == true); + BOOST_REQUIRE(tc.ordered() == true); + + BOOST_REQUIRE(tc.before_prepare() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_preparing); + BOOST_REQUIRE(tc.after_prepare() == 0); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing); + +}