diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index faa1571..e41cc85 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -63,6 +63,13 @@ int db::high_priority_service::apply_toi( throw wsrep::not_implemented_error(); } +int db::high_priority_service::apply_nbo_begin( + const wsrep::ws_meta&, + const wsrep::const_buffer&) +{ + throw wsrep::not_implemented_error(); +} + int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index 8f16234..6833a81 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -48,6 +48,8 @@ namespace db int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) override; + int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&) + override; void adopt_apply_error(wsrep::mutable_buffer&) override; virtual void after_apply() override; void store_globals() override { } diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 4f92db6..0bc5310 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -659,6 +659,18 @@ namespace wsrep */ int end_nbo_phase_one(); + /** + * Enter in NBO mode. This method should be called when the + * applier launches the asynchronous process to perform the + * operation. The purpose of the call is to adjust + * the state and set write set meta data. + * + * @param ws_meta Write set meta data. + * + * @return Zero in case of success, non-zero on failure. + */ + int enter_nbo_mode(const wsrep::ws_meta& ws_meta); + /** * Begin non-blocking operation phase two. The keys argument * passed to this call must contain the same keys which were diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index f48f87f..ec3bf8a 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -154,6 +154,25 @@ namespace wsrep const wsrep::const_buffer& ws, wsrep::mutable_buffer& err) = 0; + /** + * Apply NBO begin event. + * + * The responsibility of the implementation is to start + * an asynchronous process which will complete the operation. + * The call is done under total order isolation, and the + * isolation is released by the caller after the method + * returns. It is a responsibility of the asynchronous process + * to complete the second phase of NBO. + * + * @param ws_meta Write set meta data. + * @param data Buffer containing the command to execute. + * + * @return Zero in case of success, non-zero if the asynchronous + * process could not be started. + */ + virtual int apply_nbo_begin(const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer& data) = 0; + /** * Actions to take after applying a write set was completed. */ diff --git a/src/client_state.cpp b/src/client_state.cpp index 16f0b6d..687db02 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -499,6 +499,17 @@ int wsrep::client_state::end_nbo_phase_one() return ret; } +int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta) +{ + assert(state_ == s_exec); + assert(mode_ == m_local); + wsrep::unique_lock lock(mutex_); + toi_meta_ = ws_meta; + toi_mode_ = mode_; + mode(lock, m_nbo); + return 0; +} + int wsrep::client_state::begin_nbo_phase_two(const wsrep::key_array& keys) { assert(state_ == s_exec); diff --git a/src/server_state.cpp b/src/server_state.cpp index cde28ea..07eaae9 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -452,8 +452,10 @@ static int apply_toi(wsrep::provider& provider, } else if (wsrep::starts_transaction(ws_meta.flags())) { - // NBO begin - throw wsrep::not_implemented_error(); + provider.commit_order_enter(ws_handle, ws_meta); + int ret(high_priority_service.apply_nbo_begin(ws_meta, data)); + provider.commit_order_leave(ws_handle, ws_meta); + return ret; } else if (wsrep::commits_transaction(ws_meta.flags())) { diff --git a/test/mock_high_priority_service.cpp b/test/mock_high_priority_service.cpp index ee6f82c..7de4476 100644 --- a/test/mock_high_priority_service.cpp +++ b/test/mock_high_priority_service.cpp @@ -90,6 +90,32 @@ int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&, return (fail_next_toi_ ? 1 : 0); } +int wsrep::mock_high_priority_service::apply_nbo_begin( + const wsrep::ws_meta& ws_meta, + const wsrep::const_buffer&) +{ + const int nbo_begin_flags(wsrep::provider::flag::isolation | + wsrep::provider::flag::start_transaction); + assert(ws_meta.flags() & nbo_begin_flags); + assert((ws_meta.flags() & ~nbo_begin_flags) == 0); + + if (fail_next_toi_) + { + return 1; + } + else + { + nbo_cs_ = std::unique_ptr( + new wsrep::mock_client(client_state_->server_state(), + wsrep::client_id(1), + wsrep::client_state::m_local)); + nbo_cs_->open(wsrep::client_id(1)); + nbo_cs_->before_command(); + nbo_cs_->before_statement(); + return nbo_cs_->enter_nbo_mode(ws_meta); + } +} + void wsrep::mock_high_priority_service::adopt_apply_error( wsrep::mutable_buffer& err) { diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index c93a6a6..17f785b 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -23,6 +23,8 @@ #include "wsrep/high_priority_service.hpp" #include "mock_client_state.hpp" +#include + namespace wsrep { class mock_high_priority_service : public wsrep::high_priority_service @@ -38,8 +40,11 @@ namespace wsrep , fail_next_toi_() , client_state_(client_state) , replaying_(replaying) + , nbo_cs_() { } + ~mock_high_priority_service() + { } int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) WSREP_OVERRIDE; @@ -62,6 +67,8 @@ namespace wsrep int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, wsrep::mutable_buffer&) WSREP_OVERRIDE; + int apply_nbo_begin(const wsrep::ws_meta&, + const wsrep::const_buffer&) WSREP_OVERRIDE; void adopt_apply_error(wsrep::mutable_buffer& err) WSREP_OVERRIDE; void after_apply() WSREP_OVERRIDE; void store_globals() WSREP_OVERRIDE { } @@ -81,11 +88,19 @@ namespace wsrep bool do_2pc_; bool fail_next_applying_; bool fail_next_toi_; + + wsrep::mock_client* nbo_cs() const { return nbo_cs_.get(); } + private: mock_high_priority_service(const mock_high_priority_service&); mock_high_priority_service& operator=(const mock_high_priority_service&); wsrep::mock_client_state* client_state_; bool replaying_; + + /* Client state associated to NBO processing. This should be + * stored elsewhere (like mock_server_state), but is kept + * here for convenience. */ + std::unique_ptr nbo_cs_; }; } diff --git a/test/nbo_test.cpp b/test/nbo_test.cpp index 66aa49b..73eafc8 100644 --- a/test/nbo_test.cpp +++ b/test/nbo_test.cpp @@ -67,7 +67,79 @@ BOOST_FIXTURE_TEST_CASE(test_local_nbo, BOOST_REQUIRE(sc.provider().toi_commit() == 1); } +// This test case operates through server_state object in order to +// verify that the high priority service is called with appropriate +// arguments. BOOST_FIXTURE_TEST_CASE(test_applying_nbo, applying_client_fixture) { + + wsrep::mock_high_priority_service hps(sc, &cc, false); + wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1)); + const int nbo_begin_flags(wsrep::provider::flag::start_transaction | + wsrep::provider::flag::isolation); + wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"), + wsrep::seqno(1)), + wsrep::stid(wsrep::id("s1"), + wsrep::transaction_id::undefined(), + wsrep::client_id(1)), + wsrep::seqno(0), + nbo_begin_flags); + std::string nbo_begin("nbo_begin"); + BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta, + wsrep::const_buffer(nbo_begin.data(), + nbo_begin.size())) == 0); + wsrep::mock_client* nbo_cs(hps.nbo_cs()); + BOOST_REQUIRE(nbo_cs); + BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local); + BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_nbo); + + // After this point the control is on local process and applier + // has released toi critical section. + wsrep::key key(wsrep::key::exclusive); + key.append_key_part("k1", 2); + key.append_key_part("k2", 2); + wsrep::key_array keys{key}; + // Starting phase two should put nbo_cs in toi mode. + BOOST_REQUIRE(nbo_cs->begin_nbo_phase_two(keys) == 0); + BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_nbo); + BOOST_REQUIRE(nbo_cs->in_toi()); + BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local); + // Ending phase two should make nbo_cs leave TOI mode and + // return to m_local mode. + BOOST_REQUIRE(nbo_cs->end_nbo_phase_two() == 0); + BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_local); + BOOST_REQUIRE(nbo_cs->in_toi() == false); + BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local); + + // There must have been one toi write set with commit flag. + BOOST_REQUIRE(sc.provider().toi_write_sets() == 1); + BOOST_REQUIRE(sc.provider().toi_start_transaction() == 0); + BOOST_REQUIRE(sc.provider().toi_commit() == 1); +} + +// This test case operates through server_state object in order to +// verify that the high priority service is called with appropriate +// arguments. The test checks that error is returned in the case if +// launching asynchronous process for NBO fails. +BOOST_FIXTURE_TEST_CASE(test_applying_nbo_fail, + applying_client_fixture) +{ + + wsrep::mock_high_priority_service hps(sc, &cc, false); + wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1)); + const int nbo_begin_flags(wsrep::provider::flag::start_transaction | + wsrep::provider::flag::isolation); + wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"), + wsrep::seqno(1)), + wsrep::stid(wsrep::id("s1"), + wsrep::transaction_id::undefined(), + wsrep::client_id(1)), + wsrep::seqno(0), + nbo_begin_flags); + std::string nbo_begin("nbo_begin"); + hps.fail_next_toi_ = true; + BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta, + wsrep::const_buffer(nbo_begin.data(), + nbo_begin.size())) == 1); }