diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index be779f5..e5ae421 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(trrep provider.cpp client_context.cpp + server_context.cpp transaction_context.cpp) add_executable(trrep_test diff --git a/src/applier.hpp b/src/applier.hpp new file mode 100644 index 0000000..25a81ee --- /dev/null +++ b/src/applier.hpp @@ -0,0 +1,16 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef TRREP_APPLIER_HPP +#define TRREP_APPLIER_HPP + +namespace trrep +{ + class applier + { + + }; +} + +#endif // TRREP_APPLIER_HPP diff --git a/src/client_context.hpp b/src/client_context.hpp index 80f38d7..ee06954 100644 --- a/src/client_context.hpp +++ b/src/client_context.hpp @@ -19,7 +19,8 @@ namespace trrep { e_success, e_error_during_commit, - e_deadlock_error + e_deadlock_error, + e_append_fragment_error }; class client_id @@ -40,8 +41,9 @@ namespace trrep public: enum mode { - m_local, - m_applier + m_local, // Operates in local only mode, no replication + m_replicating, // Generates write sets for replication + m_applier // Applying write sets from provider }; enum state @@ -64,9 +66,16 @@ namespace trrep virtual ~client_context() { } // Accessors trrep::mutex& mutex() { return mutex_; } - const trrep::server_context& server_context() const + trrep::server_context& server_context() const { return server_context_; } + client_id id() const { return id_; } + client_id id(const client_id& id) + { + assert(mode() == m_applier); + id_ = id; + } + enum mode mode() const { return mode_; } enum state state() const { return state_; } @@ -81,11 +90,18 @@ namespace trrep virtual int after_statement() { return 0; } + virtual int append_fragment(trrep::transaction_context&, + uint32_t, const trrep::data&) + { return 0; } + virtual int commit(trrep::transaction_context&) { return 0; } virtual int rollback(trrep::transaction_context&) { return 0; } virtual void will_replay(trrep::transaction_context&) { } virtual int replay(trrep::transaction_context& tc); + + virtual int apply(const trrep::data&) { return 0; } + virtual void wait_for_replayers(trrep::unique_lock&) { } virtual int prepare_data_for_replication( @@ -98,6 +114,7 @@ namespace trrep virtual void override_error(const trrep::client_error&) { } virtual bool killed() const { return 0; } virtual void abort() const { ::abort(); } + virtual void store_globals() { } // Debug helpers virtual void debug_sync(const std::string&) { @@ -117,6 +134,27 @@ namespace trrep enum mode mode_; enum state state_; }; + + + class client_context_switch + { + public: + client_context_switch(trrep::client_context& orig_context, + trrep::client_context& current_context) + : orig_context_(orig_context) + , current_context_(current_context) + { + current_context_.store_globals(); + } + ~client_context_switch() + { + orig_context_.store_globals(); + } + private: + client_context& orig_context_; + client_context& current_context_; + }; + } #endif // TRREP_CLIENT_CONTEXT_HPP diff --git a/src/mock_provider_impl.hpp b/src/mock_provider_impl.hpp index b1edbca..248519b 100644 --- a/src/mock_provider_impl.hpp +++ b/src/mock_provider_impl.hpp @@ -33,10 +33,10 @@ namespace trrep // Provider implemenatation interface int start_transaction(wsrep_ws_handle_t*) { return 0; } wsrep_status - certify_commit(wsrep_conn_id_t conn_id, - wsrep_ws_handle_t* ws_handle, - uint32_t flags, - wsrep_trx_meta_t* trx_meta) + certify(wsrep_conn_id_t conn_id, + wsrep_ws_handle_t* ws_handle, + uint32_t flags, + wsrep_trx_meta_t* trx_meta) { assert(flags | WSREP_FLAG_TRX_END); if ((flags | WSREP_FLAG_TRX_END) == 0) diff --git a/src/provider.hpp b/src/provider.hpp index 9da1c2a..68deb63 100644 --- a/src/provider.hpp +++ b/src/provider.hpp @@ -27,10 +27,10 @@ namespace trrep { return impl_->append_key(wsh, key); } int append_data(wsrep_ws_handle_t* wsh, const wsrep_buf_t* buf) { return impl_->append_data(wsh, buf); } - wsrep_status certify_commit(wsrep_conn_id_t conn_id, - wsrep_ws_handle_t* wsh, - uint32_t flags, wsrep_trx_meta_t* trx_meta) - { return impl_->certify_commit(conn_id, wsh, flags, trx_meta); } + wsrep_status certify(wsrep_conn_id_t conn_id, + wsrep_ws_handle_t* wsh, + uint32_t flags, wsrep_trx_meta_t* trx_meta) + { return impl_->certify(conn_id, wsh, flags, trx_meta); } int rollback(const wsrep_trx_id_t trx_id) { return impl_->rollback(trx_id); } wsrep_status commit_order_enter(wsrep_ws_handle_t* wsh) diff --git a/src/provider_impl.hpp b/src/provider_impl.hpp index 511558f..f2e9e25 100644 --- a/src/provider_impl.hpp +++ b/src/provider_impl.hpp @@ -16,10 +16,10 @@ namespace trrep virtual int start_transaction(wsrep_ws_handle_t*) = 0; virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0; virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0; - virtual wsrep_status - certify_commit(wsrep_conn_id_t, wsrep_ws_handle_t*, - uint32_t, - wsrep_trx_meta_t*) = 0; + virtual wsrep_status_t + certify(wsrep_conn_id_t, wsrep_ws_handle_t*, + uint32_t, + wsrep_trx_meta_t*) = 0; virtual int rollback(const wsrep_trx_id_t) = 0; virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0; virtual int commit_order_leave(wsrep_ws_handle_t*) = 0; diff --git a/src/server_context.cpp b/src/server_context.cpp new file mode 100644 index 0000000..977490b --- /dev/null +++ b/src/server_context.cpp @@ -0,0 +1,90 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "server_context.hpp" +#include "client_context.hpp" + +#include + +#define START_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_START) +#define COMMIT_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_END) +#define ROLLBACK_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_ROLLBACK) + +#if 0 + +namespace +{ + static wsrep_cb_status_t apply_cb(void* ctx, + const wsrep_ws_handle_t* wsh, + uint32_t flags, + const wsrep_buf_t* buf, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop) + { + wsrep_cb_status_t ret(WSREP_CB_SUCCESS); + + trrep::client_context* client_context( + reinterpret_cast(ctx)); + assert(client_context); + assert(client_context.mode() == trrep::client_context::m_applier); + + const trrep::server_context& server_context( + client_context->server_context()); + trrep::data data(buf->ptr, buf->len); + + if (START_TRANSACTION(flags) && COMMIT_TRANSACTION(flags)) + { + trrep::transaction_context transasction_context( + server_context.provider() + *client_context); + assert(transaction_context->active() == false); + transaction_context->start_transaction(meta->stid.trx); + if (client_context.apply(transaction_context, data)) + { + ret = WSREP_CB_FAILURE; + } + else if (client_context.commit(transaction_context)) + { + ret = WSREP_CB_FAILURE; + } + } + + else if (START_TRANSACTION(flags)) + { + // First fragment of SR transaction + trrep::client_context* sr_client_context( + server_context.client_context(node_id, client_id, trx_id)); + assert(sr_client_context->transaction_context().active() == false); + } + else if (COMMIT_TRANSACTION(flags)) + { + // Final fragment of SR transaction + trrep::client_context* sr_client_context( + server_context.client_context(node_id, client_id, trx_id)); + assert(sr_client_context->transaction_context().active()); + if (data.size() > 0) + { + sr_client_context->apply(data); + } + sr_client_context->commit(); + } + else + { + // Middle fragment of SR transaction + trrep::client_context* sr_client_context( + server_context.client_context(node_id, client_id, trx_id)); + assert(sr_client_context->transaction_context().active()); + sr_client_context->apply(data); + } + + } + return ret; +} +#endif // 0 + +trrep::client_context* trrep::server_context::local_client_context() +{ + return new trrep::client_context(*this, ++client_id_, + trrep::client_context::m_local); +} diff --git a/src/server_context.hpp b/src/server_context.hpp index a4cab90..0f72d65 100644 --- a/src/server_context.hpp +++ b/src/server_context.hpp @@ -10,6 +10,7 @@ namespace trrep { // Forward declarations + class client_context; class transaction_context; class server_context @@ -28,9 +29,11 @@ namespace trrep : name_(name) , id_(id) , rollback_mode_(rollback_mode) - { } + , client_id_() + { } const std::string& name() const { return name_; } const std::string& id() const { return id_; } + virtual client_context* local_client_context(); virtual void on_connect() { } virtual void on_view() { } @@ -50,6 +53,8 @@ namespace trrep std::string name_; std::string id_; enum rollback_mode rollback_mode_; + // TODO: This should be part of server mock + unsigned long long client_id_; }; } diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index ce30d31..4cf150e 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -10,9 +10,18 @@ #include // TODO: replace with proper logging utility #include +#include // Public +trrep::transaction_context::~transaction_context() +{ + if (state() != s_committed && state() != s_aborted) + { + client_context_.rollback(*this); + } +} + int trrep::transaction_context::append_key(const trrep::key& key) { @@ -31,7 +40,7 @@ int trrep::transaction_context::before_prepare() trrep::unique_lock lock(client_context_.mutex()); - assert(client_context_.mode() == trrep::client_context::m_local); + assert(client_context_.mode() == trrep::client_context::m_replicating); assert(state() == s_executing || state() == s_must_abort); if (state() == s_must_abort) @@ -39,6 +48,7 @@ int trrep::transaction_context::before_prepare() return 1; } + state(lock, s_preparing); if (is_streaming()) { client_context_.debug_suicide( @@ -58,8 +68,8 @@ int trrep::transaction_context::before_prepare() client_context_.debug_suicide( "crash_last_fragment_commit_after_fragment_removal"); } - assert(client_context_.mode() == trrep::client_context::m_local); - assert(state() == s_executing); + assert(client_context_.mode() == trrep::client_context::m_replicating); + assert(state() == s_preparing); return ret; } @@ -68,14 +78,14 @@ int trrep::transaction_context::after_prepare() int ret(1); trrep::unique_lock lock(client_context_.mutex()); - assert(client_context_.mode() == trrep::client_context::m_local); - assert(state() == s_executing || state() == s_must_abort); + assert(client_context_.mode() == trrep::client_context::m_replicating); + assert(state() == s_preparing || state() == s_must_abort); if (state() == s_must_abort) { return 1; } - if (state() == s_executing) + if (state() == s_preparing) { ret = certify_commit(lock); assert((ret == 0 || state() == s_committing) || @@ -88,7 +98,7 @@ int trrep::transaction_context::after_prepare() assert(state() == s_must_abort); client_context_.override_error(trrep::e_deadlock_error); } - assert(client_context_.mode() == trrep::client_context::m_local); + assert(client_context_.mode() == trrep::client_context::m_replicating); return ret; } @@ -97,9 +107,18 @@ int trrep::transaction_context::before_commit() int ret(1); trrep::unique_lock lock(client_context_.mutex()); + assert(state() == s_executing || state() == s_committing || + state() == s_must_abort); + switch (client_context_.mode()) { case trrep::client_context::m_local: + if (ordered()) + { + ret = provider_.commit_order_enter(&ws_handle_); + } + break; + case trrep::client_context::m_replicating: // Commit is one phase - before/after prepare was not called if (state() == s_executing) @@ -142,6 +161,7 @@ int trrep::transaction_context::before_commit() break; case trrep::client_context::m_applier: + assert(ordered()); ret = provider_.commit_order_enter(&ws_handle_); if (ret) { @@ -158,6 +178,7 @@ int trrep::transaction_context::ordered_commit() trrep::unique_lock lock(client_context_.mutex()); assert(state() == s_committing); + assert(ordered()); ret = provider_.commit_order_leave(&ws_handle_); // Should always succeed assert(ret == 0); @@ -176,6 +197,9 @@ int trrep::transaction_context::after_commit() switch (client_context_.mode()) { case trrep::client_context::m_local: + // Nothing to do + break; + case trrep::client_context::m_replicating: if (is_streaming()) { clear_fragments(); @@ -328,18 +352,19 @@ void trrep::transaction_context::state( { assert(lock.owns_lock()); static const char allowed[n_states][n_states] = - { /* ex ce co oc ct cf ma ab ad mr re from/to */ - { 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* ex */ - { 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ - { 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ - { 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ - { 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ - { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* ma */ - { 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ - { 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} /* re */ + { /* ex pr ce co oc ct cf ma ab ad mr re from/to */ + { 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* ex */ + { 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */ + { 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */ + { 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */ + { 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */ + { 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */ + { 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* ma */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ + { 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} /* re */ }; if (allowed[state_][next_state]) { @@ -360,9 +385,74 @@ void trrep::transaction_context::state( } } -int trrep::transaction_context::certify_fragment() +int trrep::transaction_context::certify_fragment( + trrep::unique_lock& lock) { - throw trrep::not_implemented_error(); + assert(lock.owns_lock()); + + assert(client_context_.mode() == trrep::client_context::m_replicating); + assert(rollback_replicated_for_ != id_); + + client_context_.wait_for_replayers(lock); + if (state() == s_must_abort) + { + client_context_.override_error(trrep::e_deadlock_error); + return 1; + } + + state(lock, s_certifying); + + lock.unlock(); + + uint32_t flags(0); + if (fragments_.empty()) + { + flags |= WSREP_FLAG_TRX_START; + } + + trrep::data data; + if (client_context_.prepare_data_for_replication(*this, data)) + { + lock.lock(); + state(lock, s_must_abort); + return 1; + } + + // Client context to store fragment in separate transaction + // Switch temporarily to sr_transaction_context, switch back + // to original when this goes out of scope + std::auto_ptr sr_client_context( + client_context_.server_context().local_client_context()); + trrep::client_context_switch client_context_switch( + client_context_, + *sr_client_context); + trrep::transaction_context sr_transaction_context(provider_, + *sr_client_context); + if (sr_client_context->append_fragment(sr_transaction_context, flags, data)) + { + lock.lock(); + state(lock, s_must_abort); + client_context_.override_error(trrep::e_append_fragment_error); + return 1; + } + + wsrep_status_t cert_ret(provider_.certify(client_context_.id().get(), + &sr_transaction_context.ws_handle_, + flags, + &sr_transaction_context.trx_meta_)); + int ret(0); + switch (cert_ret) + { + case WSREP_OK: + sr_client_context->commit(sr_transaction_context); + break; + default: + sr_client_context->rollback(sr_transaction_context); + ret = 1; + break; + } + + return ret; } int trrep::transaction_context::certify_commit( @@ -402,10 +492,10 @@ int trrep::transaction_context::certify_commit( return 1; } - wsrep_status cert_ret(provider_.certify_commit(client_context_.id().get(), - &ws_handle_, - flags() | WSREP_FLAG_TRX_END, - &trx_meta_)); + wsrep_status cert_ret(provider_.certify(client_context_.id().get(), + &ws_handle_, + flags() | WSREP_FLAG_TRX_END, + &trx_meta_)); lock.lock(); diff --git a/src/transaction_context.hpp b/src/transaction_context.hpp index 61093f9..870d6af 100644 --- a/src/transaction_context.hpp +++ b/src/transaction_context.hpp @@ -42,6 +42,7 @@ namespace trrep enum state { s_executing, + s_preparing, s_certifying, s_committing, s_ordered_commit, @@ -65,8 +66,29 @@ namespace trrep , trx_meta_() , pa_unsafe_(false) , certified_(false) + , fragments_() + , rollback_replicated_for_(false) { } + transaction_context(trrep::provider& provider, + trrep::client_context& client_context, + const wsrep_ws_handle_t& ws_handle, + const wsrep_trx_meta_t& trx_meta) + : provider_(provider) + , client_context_(client_context) + , id_(trx_meta.stid.trx) + , state_(s_executing) + , state_hist_() + , ws_handle_(ws_handle) + , trx_meta_(trx_meta) + , pa_unsafe_() + , certified_(true) + , fragments_() + , rollback_replicated_for_(false) + { } + + + ~transaction_context(); #if 0 transaction_context(trrep::provider& provider, trrep::client_context& client_context, @@ -148,7 +170,7 @@ namespace trrep if (pa_unsafe()) ret |= WSREP_FLAG_PA_UNSAFE; return ret; } - int certify_fragment(); + int certify_fragment(trrep::unique_lock&); int certify_commit(trrep::unique_lock&); void remove_fragments(); void clear_fragments(); @@ -162,6 +184,9 @@ namespace trrep wsrep_trx_meta_t trx_meta_; bool pa_unsafe_; bool certified_; + + std::vector fragments_; + trrep::transaction_id rollback_replicated_for_; }; static inline std::string to_string(enum trrep::transaction_context::state state) diff --git a/src/transaction_context_test.cpp b/src/transaction_context_test.cpp index f24cefe..2fd7454 100644 --- a/src/transaction_context_test.cpp +++ b/src/transaction_context_test.cpp @@ -44,7 +44,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc) trrep::provider provider(&mock_provider); trrep::server_context sc("s1", "s1", trrep::server_context::rm_sync); - trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::transaction_context tc(provider, cc); // Verify initial state @@ -86,7 +86,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc) trrep::provider provider(&mock_provider); trrep::server_context sc("s1", "s1", trrep::server_context::rm_sync); - trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::transaction_context tc(provider, cc); // Verify initial state @@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc) // Run before prepare BOOST_REQUIRE(tc.before_prepare() == 0); - BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_preparing); // Run after prepare BOOST_REQUIRE(tc.after_prepare() == 0); @@ -135,7 +135,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit) trrep::provider provider(&mock_provider); trrep::server_context sc("s1", "s1", trrep::server_context::rm_sync); - trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::transaction_context tc(provider, cc); // Verify initial state @@ -172,13 +172,13 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit) // // Test a 2PC transaction which gets BF aborted before before_prepare // -BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_prepare) +BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare) { trrep::mock_provider_impl mock_provider; trrep::provider provider(&mock_provider); trrep::server_context sc("s1", "s1", trrep::server_context::rm_sync); - trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::transaction_context tc(provider, cc); // Verify initial state @@ -215,13 +215,13 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_prepare) // // Test a 2PC transaction which gets BF aborted before before_prepare // -BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_after_prepare) +BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare) { trrep::mock_provider_impl mock_provider; trrep::provider provider(&mock_provider); trrep::server_context sc("s1", "s1", trrep::server_context::rm_sync); - trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::transaction_context tc(provider, cc); // Verify initial state @@ -236,7 +236,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_after_prepare) // Run before prepare BOOST_REQUIRE(tc.before_prepare() == 0); - BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing); + BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_preparing); bf_abort_unordered(cc, tc); @@ -269,7 +269,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified trrep::provider provider(&mock_provider); trrep::server_context sc("s1", "s1", trrep::server_context::rm_sync); - trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::transaction_context tc(provider, cc); // Verify initial state @@ -314,7 +314,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified) trrep::provider provider(&mock_provider); trrep::server_context sc("s1", "s1", trrep::server_context::rm_sync); - trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local); + trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating); trrep::transaction_context tc(provider, cc); // Verify initial state