From 85a03394cc3732bf4cd75b2c8ffa966c94e16465 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 6 Jun 2019 15:50:58 +0300 Subject: [PATCH] NBO applying - High priority interface method to apply NBO begin, separate from apply_toi() in order to avoid implementation to force interpreting ws_meta flags. - Method to put client_state into NBO mode when applying NBO begin. The client_state will process in m_local mode. - Unit tests for applying NBO --- dbsim/db_high_priority_service.cpp | 7 +++ dbsim/db_high_priority_service.hpp | 2 + include/wsrep/client_state.hpp | 12 ++++ include/wsrep/high_priority_service.hpp | 19 ++++++ src/client_state.cpp | 11 ++++ src/server_state.cpp | 6 +- test/client_state_fixture.hpp | 9 ++- test/mock_high_priority_service.cpp | 26 +++++++++ test/mock_high_priority_service.hpp | 15 +++++ test/nbo_test.cpp | 77 +++++++++++++++++++++++++ test/toi_test.cpp | 5 -- test/transaction_test.cpp | 2 + 12 files changed, 182 insertions(+), 9 deletions(-) diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 18e3475..db3e724 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -68,6 +68,13 @@ int db::high_priority_service::apply_toi( throw wsrep::not_implemented_error(); } +int db::high_priority_service::apply_nbo_begin( + const wsrep::ws_meta&, + const wsrep::const_buffer&) +{ + throw wsrep::not_implemented_error(); +} + int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index d29626c..b5f8d1c 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -50,6 +50,8 @@ namespace db int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) override; + int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&) + override; void adopt_apply_error(wsrep::mutable_buffer&) override; virtual void after_apply() override; void store_globals() override { } diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 422e59f..6d75aab 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -755,6 +755,18 @@ namespace wsrep */ int end_nbo_phase_one(); + /** + * Enter in NBO mode. This method should be called when the + * applier launches the asynchronous process to perform the + * operation. The purpose of the call is to adjust + * the state and set write set meta data. + * + * @param ws_meta Write set meta data. + * + * @return Zero in case of success, non-zero on failure. + */ + int enter_nbo_mode(const wsrep::ws_meta& ws_meta); + /** * Begin non-blocking operation phase two. The keys argument * passed to this call must contain the same keys which were diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 22ff818..67d109f 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -161,6 +161,25 @@ namespace wsrep const wsrep::const_buffer& ws, wsrep::mutable_buffer& err) = 0; + /** + * Apply NBO begin event. + * + * The responsibility of the implementation is to start + * an asynchronous process which will complete the operation. + * The call is done under total order isolation, and the + * isolation is released by the caller after the method + * returns. It is a responsibility of the asynchronous process + * to complete the second phase of NBO. + * + * @param ws_meta Write set meta data. + * @param data Buffer containing the command to execute. + * + * @return Zero in case of success, non-zero if the asynchronous + * process could not be started. + */ + virtual int apply_nbo_begin(const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) = 0; + /** * Actions to take after applying a write set was completed. */ diff --git a/src/client_state.cpp b/src/client_state.cpp index 29d45cb..dd7ed01 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -500,6 +500,17 @@ int wsrep::client_state::end_nbo_phase_one() return ret; } +int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta) +{ + assert(state_ == s_exec); + assert(mode_ == m_local); + wsrep::unique_lock lock(mutex_); + toi_meta_ = ws_meta; + toi_mode_ = mode_; + mode(lock, m_nbo); + return 0; +} + int wsrep::client_state::begin_nbo_phase_two(const wsrep::key_array& keys) { assert(state_ == s_exec); diff --git a/src/server_state.cpp b/src/server_state.cpp index 75904e8..720df6e 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -463,8 +463,10 @@ static int apply_toi(wsrep::provider& provider, } else if (wsrep::starts_transaction(ws_meta.flags())) { - // NBO begin - throw wsrep::not_implemented_error(); + provider.commit_order_enter(ws_handle, ws_meta); + int ret(high_priority_service.apply_nbo_begin(ws_meta, data)); + provider.commit_order_leave(ws_handle, ws_meta); + return ret; } else if (wsrep::commits_transaction(ws_meta.flags())) { diff --git a/test/client_state_fixture.hpp b/test/client_state_fixture.hpp index 966dead..63b946f 100644 --- a/test/client_state_fixture.hpp +++ b/test/client_state_fixture.hpp @@ -137,8 +137,12 @@ namespace cc.open(cc.id()); BOOST_REQUIRE(cc.before_command() == 0); BOOST_REQUIRE(cc.before_statement() == 0); - wsrep::ws_handle ws_handle(wsrep::transaction_id(1), (void*)1); - wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)), + } + void start_transaction(wsrep::transaction_id id, + wsrep::seqno seqno) + { + wsrep::ws_handle ws_handle(id, (void*)1); + wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), seqno), wsrep::stid(sc.id(), wsrep::transaction_id(1), cc.id()), @@ -150,6 +154,7 @@ namespace BOOST_REQUIRE(tc.certified() == true); BOOST_REQUIRE(tc.ordered() == true); } + wsrep::mock_server_service server_service; wsrep::mock_server_state sc; wsrep::mock_client cc; diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index 1e33fba..a0327c5 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -96,6 +96,32 @@ int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&, return (fail_next_toi_ ? 1 : 0); } +int wsrep::mock_high_priority_service::apply_nbo_begin( + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer&) +{ + const int nbo_begin_flags(wsrep::provider::flag::isolation | + wsrep::provider::flag::start_transaction); + assert(ws_meta.flags() & nbo_begin_flags); + assert((ws_meta.flags() & ~nbo_begin_flags) == 0); + + if (fail_next_toi_) + { + return 1; + } + else + { + nbo_cs_ = std::unique_ptr( + new wsrep::mock_client(client_state_->server_state(), + wsrep::client_id(1), + wsrep::client_state::m_local)); + nbo_cs_->open(wsrep::client_id(1)); + nbo_cs_->before_command(); + nbo_cs_->before_statement(); + return nbo_cs_->enter_nbo_mode(ws_meta); + } +} + void wsrep::mock_high_priority_service::adopt_apply_error( wsrep::mutable_buffer& err) { diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index e66dd46..ca7fcf1 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -23,6 +23,8 @@ #include "wsrep/high_priority_service.hpp" #include "mock_client_state.hpp" +#include + namespace wsrep { class mock_high_priority_service : public wsrep::high_priority_service @@ -38,8 +40,11 @@ namespace wsrep , fail_next_toi_() , client_state_(client_state) , replaying_(replaying) + , nbo_cs_() { } + ~mock_high_priority_service() + { } int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; @@ -65,6 +70,8 @@ namespace wsrep int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) WSREP_OVERRIDE; + int apply_nbo_begin(const wsrep::ws_meta&, + const wsrep::const_buffer&) WSREP_OVERRIDE; void adopt_apply_error(wsrep::mutable_buffer& err) WSREP_OVERRIDE; void after_apply() WSREP_OVERRIDE; void store_globals() WSREP_OVERRIDE { } @@ -84,11 +91,19 @@ namespace wsrep bool do_2pc_; bool fail_next_applying_; bool fail_next_toi_; + + wsrep::mock_client* nbo_cs() const { return nbo_cs_.get(); } + private: mock_high_priority_service(const mock_high_priority_service&); mock_high_priority_service& operator=(const mock_high_priority_service&); wsrep::mock_client_state* client_state_; bool replaying_; + + /* Client state associated to NBO processing. This should be + * stored elsewhere (like mock_server_state), but is kept + * here for convenience. */ + std::unique_ptr nbo_cs_; }; } diff --git a/test/nbo_test.cpp b/test/nbo_test.cpp index c735651..73eafc8 100644 --- a/test/nbo_test.cpp +++ b/test/nbo_test.cpp @@ -66,3 +66,80 @@ BOOST_FIXTURE_TEST_CASE(test_local_nbo, BOOST_REQUIRE(sc.provider().toi_start_transaction() == 1); BOOST_REQUIRE(sc.provider().toi_commit() == 1); } + +// This test case operates through server_state object in order to +// verify that the high priority service is called with appropriate +// arguments. +BOOST_FIXTURE_TEST_CASE(test_applying_nbo, + applying_client_fixture) +{ + + wsrep::mock_high_priority_service hps(sc, &cc, false); + wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1)); + const int nbo_begin_flags(wsrep::provider::flag::start_transaction | + wsrep::provider::flag::isolation); + wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"), + wsrep::seqno(1)), + wsrep::stid(wsrep::id("s1"), + wsrep::transaction_id::undefined(), + wsrep::client_id(1)), + wsrep::seqno(0), + nbo_begin_flags); + std::string nbo_begin("nbo_begin"); + BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta, + wsrep::const_buffer(nbo_begin.data(), + nbo_begin.size())) == 0); + wsrep::mock_client* nbo_cs(hps.nbo_cs()); + BOOST_REQUIRE(nbo_cs); + BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local); + BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_nbo); + + // After this point the control is on local process and applier + // has released toi critical section. + wsrep::key key(wsrep::key::exclusive); + key.append_key_part("k1", 2); + key.append_key_part("k2", 2); + wsrep::key_array keys{key}; + // Starting phase two should put nbo_cs in toi mode. + BOOST_REQUIRE(nbo_cs->begin_nbo_phase_two(keys) == 0); + BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_nbo); + BOOST_REQUIRE(nbo_cs->in_toi()); + BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local); + // Ending phase two should make nbo_cs leave TOI mode and + // return to m_local mode. + BOOST_REQUIRE(nbo_cs->end_nbo_phase_two() == 0); + BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_local); + BOOST_REQUIRE(nbo_cs->in_toi() == false); + BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local); + + // There must have been one toi write set with commit flag. + BOOST_REQUIRE(sc.provider().toi_write_sets() == 1); + BOOST_REQUIRE(sc.provider().toi_start_transaction() == 0); + BOOST_REQUIRE(sc.provider().toi_commit() == 1); +} + +// This test case operates through server_state object in order to +// verify that the high priority service is called with appropriate +// arguments. The test checks that error is returned in the case if +// launching asynchronous process for NBO fails. +BOOST_FIXTURE_TEST_CASE(test_applying_nbo_fail, + applying_client_fixture) +{ + + wsrep::mock_high_priority_service hps(sc, &cc, false); + wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1)); + const int nbo_begin_flags(wsrep::provider::flag::start_transaction | + wsrep::provider::flag::isolation); + wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"), + wsrep::seqno(1)), + wsrep::stid(wsrep::id("s1"), + wsrep::transaction_id::undefined(), + wsrep::client_id(1)), + wsrep::seqno(0), + nbo_begin_flags); + std::string nbo_begin("nbo_begin"); + hps.fail_next_toi_ = true; + BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta, + wsrep::const_buffer(nbo_begin.data(), + nbo_begin.size())) == 1); +} diff --git a/test/toi_test.cpp b/test/toi_test.cpp index c0df5ec..10b3662 100644 --- a/test/toi_test.cpp +++ b/test/toi_test.cpp @@ -50,11 +50,6 @@ BOOST_FIXTURE_TEST_CASE(test_toi_mode, BOOST_FIXTURE_TEST_CASE(test_toi_applying, applying_client_fixture) { - // Fixture opens a transaction, commit it first - BOOST_REQUIRE((cc.before_commit() || cc.ordered_commit() || - cc.after_commit()) == 0); - cc.after_applying(); - BOOST_REQUIRE(cc.toi_mode() == wsrep::client_state::m_local); wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)), wsrep::stid(sc.id(), diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index 01a39ab..5a6d1bc 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -998,6 +998,8 @@ BOOST_FIXTURE_TEST_CASE( BOOST_FIXTURE_TEST_CASE(transaction_1pc_applying, applying_client_fixture) { + start_transaction(wsrep::transaction_id(1), + wsrep::seqno(1)); BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); BOOST_REQUIRE(cc.ordered_commit() == 0);