mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-06-16 02:01:44 +03:00
NBO applying
- High priority interface method to apply NBO begin, separate from apply_toi() in order to avoid implementation to force interpreting ws_meta flags. - Method to put client_state into NBO mode when applying NBO begin. The client_state will process in m_local mode. - Unit tests for applying NBO
This commit is contained in:
@ -68,6 +68,13 @@ int db::high_priority_service::apply_toi(
|
||||
throw wsrep::not_implemented_error();
|
||||
}
|
||||
|
||||
int db::high_priority_service::apply_nbo_begin(
|
||||
const wsrep::ws_meta&,
|
||||
const wsrep::const_buffer&)
|
||||
{
|
||||
throw wsrep::not_implemented_error();
|
||||
}
|
||||
|
||||
int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
|
||||
const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
|
@ -50,6 +50,8 @@ namespace db
|
||||
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) override;
|
||||
int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&,
|
||||
wsrep::mutable_buffer&) override;
|
||||
int apply_nbo_begin(const wsrep::ws_meta&, const wsrep::const_buffer&)
|
||||
override;
|
||||
void adopt_apply_error(wsrep::mutable_buffer&) override;
|
||||
virtual void after_apply() override;
|
||||
void store_globals() override { }
|
||||
|
@ -755,6 +755,18 @@ namespace wsrep
|
||||
*/
|
||||
int end_nbo_phase_one();
|
||||
|
||||
/**
|
||||
* Enter in NBO mode. This method should be called when the
|
||||
* applier launches the asynchronous process to perform the
|
||||
* operation. The purpose of the call is to adjust
|
||||
* the state and set write set meta data.
|
||||
*
|
||||
* @param ws_meta Write set meta data.
|
||||
*
|
||||
* @return Zero in case of success, non-zero on failure.
|
||||
*/
|
||||
int enter_nbo_mode(const wsrep::ws_meta& ws_meta);
|
||||
|
||||
/**
|
||||
* Begin non-blocking operation phase two. The keys argument
|
||||
* passed to this call must contain the same keys which were
|
||||
|
@ -161,6 +161,25 @@ namespace wsrep
|
||||
const wsrep::const_buffer& ws,
|
||||
wsrep::mutable_buffer& err) = 0;
|
||||
|
||||
/**
|
||||
* Apply NBO begin event.
|
||||
*
|
||||
* The responsibility of the implementation is to start
|
||||
* an asynchronous process which will complete the operation.
|
||||
* The call is done under total order isolation, and the
|
||||
* isolation is released by the caller after the method
|
||||
* returns. It is a responsibility of the asynchronous process
|
||||
* to complete the second phase of NBO.
|
||||
*
|
||||
* @param ws_meta Write set meta data.
|
||||
* @param data Buffer containing the command to execute.
|
||||
*
|
||||
* @return Zero in case of success, non-zero if the asynchronous
|
||||
* process could not be started.
|
||||
*/
|
||||
virtual int apply_nbo_begin(const wsrep::ws_meta& ws_meta,
|
||||
const wsrep::const_buffer& data) = 0;
|
||||
|
||||
/**
|
||||
* Actions to take after applying a write set was completed.
|
||||
*/
|
||||
|
@ -500,6 +500,17 @@ int wsrep::client_state::end_nbo_phase_one()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta)
|
||||
{
|
||||
assert(state_ == s_exec);
|
||||
assert(mode_ == m_local);
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
toi_meta_ = ws_meta;
|
||||
toi_mode_ = mode_;
|
||||
mode(lock, m_nbo);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int wsrep::client_state::begin_nbo_phase_two(const wsrep::key_array& keys)
|
||||
{
|
||||
assert(state_ == s_exec);
|
||||
|
@ -463,8 +463,10 @@ static int apply_toi(wsrep::provider& provider,
|
||||
}
|
||||
else if (wsrep::starts_transaction(ws_meta.flags()))
|
||||
{
|
||||
// NBO begin
|
||||
throw wsrep::not_implemented_error();
|
||||
provider.commit_order_enter(ws_handle, ws_meta);
|
||||
int ret(high_priority_service.apply_nbo_begin(ws_meta, data));
|
||||
provider.commit_order_leave(ws_handle, ws_meta);
|
||||
return ret;
|
||||
}
|
||||
else if (wsrep::commits_transaction(ws_meta.flags()))
|
||||
{
|
||||
|
@ -137,8 +137,12 @@ namespace
|
||||
cc.open(cc.id());
|
||||
BOOST_REQUIRE(cc.before_command() == 0);
|
||||
BOOST_REQUIRE(cc.before_statement() == 0);
|
||||
wsrep::ws_handle ws_handle(wsrep::transaction_id(1), (void*)1);
|
||||
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(1)),
|
||||
}
|
||||
void start_transaction(wsrep::transaction_id id,
|
||||
wsrep::seqno seqno)
|
||||
{
|
||||
wsrep::ws_handle ws_handle(id, (void*)1);
|
||||
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), seqno),
|
||||
wsrep::stid(sc.id(),
|
||||
wsrep::transaction_id(1),
|
||||
cc.id()),
|
||||
@ -150,6 +154,7 @@ namespace
|
||||
BOOST_REQUIRE(tc.certified() == true);
|
||||
BOOST_REQUIRE(tc.ordered() == true);
|
||||
}
|
||||
|
||||
wsrep::mock_server_service server_service;
|
||||
wsrep::mock_server_state sc;
|
||||
wsrep::mock_client cc;
|
||||
|
@ -96,6 +96,32 @@ int wsrep::mock_high_priority_service::apply_toi(const wsrep::ws_meta&,
|
||||
return (fail_next_toi_ ? 1 : 0);
|
||||
}
|
||||
|
||||
int wsrep::mock_high_priority_service::apply_nbo_begin(
|
||||
const wsrep::ws_meta& ws_meta,
|
||||
const wsrep::const_buffer&)
|
||||
{
|
||||
const int nbo_begin_flags(wsrep::provider::flag::isolation |
|
||||
wsrep::provider::flag::start_transaction);
|
||||
assert(ws_meta.flags() & nbo_begin_flags);
|
||||
assert((ws_meta.flags() & ~nbo_begin_flags) == 0);
|
||||
|
||||
if (fail_next_toi_)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
nbo_cs_ = std::unique_ptr<wsrep::mock_client>(
|
||||
new wsrep::mock_client(client_state_->server_state(),
|
||||
wsrep::client_id(1),
|
||||
wsrep::client_state::m_local));
|
||||
nbo_cs_->open(wsrep::client_id(1));
|
||||
nbo_cs_->before_command();
|
||||
nbo_cs_->before_statement();
|
||||
return nbo_cs_->enter_nbo_mode(ws_meta);
|
||||
}
|
||||
}
|
||||
|
||||
void wsrep::mock_high_priority_service::adopt_apply_error(
|
||||
wsrep::mutable_buffer& err)
|
||||
{
|
||||
|
@ -23,6 +23,8 @@
|
||||
#include "wsrep/high_priority_service.hpp"
|
||||
#include "mock_client_state.hpp"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace wsrep
|
||||
{
|
||||
class mock_high_priority_service : public wsrep::high_priority_service
|
||||
@ -38,8 +40,11 @@ namespace wsrep
|
||||
, fail_next_toi_()
|
||||
, client_state_(client_state)
|
||||
, replaying_(replaying)
|
||||
, nbo_cs_()
|
||||
{ }
|
||||
|
||||
~mock_high_priority_service()
|
||||
{ }
|
||||
int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&)
|
||||
WSREP_OVERRIDE;
|
||||
|
||||
@ -65,6 +70,8 @@ namespace wsrep
|
||||
int apply_toi(const wsrep::ws_meta&,
|
||||
const wsrep::const_buffer&,
|
||||
wsrep::mutable_buffer&) WSREP_OVERRIDE;
|
||||
int apply_nbo_begin(const wsrep::ws_meta&,
|
||||
const wsrep::const_buffer&) WSREP_OVERRIDE;
|
||||
void adopt_apply_error(wsrep::mutable_buffer& err) WSREP_OVERRIDE;
|
||||
void after_apply() WSREP_OVERRIDE;
|
||||
void store_globals() WSREP_OVERRIDE { }
|
||||
@ -84,11 +91,19 @@ namespace wsrep
|
||||
bool do_2pc_;
|
||||
bool fail_next_applying_;
|
||||
bool fail_next_toi_;
|
||||
|
||||
wsrep::mock_client* nbo_cs() const { return nbo_cs_.get(); }
|
||||
|
||||
private:
|
||||
mock_high_priority_service(const mock_high_priority_service&);
|
||||
mock_high_priority_service& operator=(const mock_high_priority_service&);
|
||||
wsrep::mock_client_state* client_state_;
|
||||
bool replaying_;
|
||||
|
||||
/* Client state associated to NBO processing. This should be
|
||||
* stored elsewhere (like mock_server_state), but is kept
|
||||
* here for convenience. */
|
||||
std::unique_ptr<wsrep::mock_client> nbo_cs_;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -66,3 +66,80 @@ BOOST_FIXTURE_TEST_CASE(test_local_nbo,
|
||||
BOOST_REQUIRE(sc.provider().toi_start_transaction() == 1);
|
||||
BOOST_REQUIRE(sc.provider().toi_commit() == 1);
|
||||
}
|
||||
|
||||
// This test case operates through server_state object in order to
|
||||
// verify that the high priority service is called with appropriate
|
||||
// arguments.
|
||||
BOOST_FIXTURE_TEST_CASE(test_applying_nbo,
|
||||
applying_client_fixture)
|
||||
{
|
||||
|
||||
wsrep::mock_high_priority_service hps(sc, &cc, false);
|
||||
wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1));
|
||||
const int nbo_begin_flags(wsrep::provider::flag::start_transaction |
|
||||
wsrep::provider::flag::isolation);
|
||||
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"),
|
||||
wsrep::seqno(1)),
|
||||
wsrep::stid(wsrep::id("s1"),
|
||||
wsrep::transaction_id::undefined(),
|
||||
wsrep::client_id(1)),
|
||||
wsrep::seqno(0),
|
||||
nbo_begin_flags);
|
||||
std::string nbo_begin("nbo_begin");
|
||||
BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta,
|
||||
wsrep::const_buffer(nbo_begin.data(),
|
||||
nbo_begin.size())) == 0);
|
||||
wsrep::mock_client* nbo_cs(hps.nbo_cs());
|
||||
BOOST_REQUIRE(nbo_cs);
|
||||
BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local);
|
||||
BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_nbo);
|
||||
|
||||
// After this point the control is on local process and applier
|
||||
// has released toi critical section.
|
||||
wsrep::key key(wsrep::key::exclusive);
|
||||
key.append_key_part("k1", 2);
|
||||
key.append_key_part("k2", 2);
|
||||
wsrep::key_array keys{key};
|
||||
// Starting phase two should put nbo_cs in toi mode.
|
||||
BOOST_REQUIRE(nbo_cs->begin_nbo_phase_two(keys) == 0);
|
||||
BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_nbo);
|
||||
BOOST_REQUIRE(nbo_cs->in_toi());
|
||||
BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local);
|
||||
// Ending phase two should make nbo_cs leave TOI mode and
|
||||
// return to m_local mode.
|
||||
BOOST_REQUIRE(nbo_cs->end_nbo_phase_two() == 0);
|
||||
BOOST_REQUIRE(nbo_cs->mode() == wsrep::client_state::m_local);
|
||||
BOOST_REQUIRE(nbo_cs->in_toi() == false);
|
||||
BOOST_REQUIRE(nbo_cs->toi_mode() == wsrep::client_state::m_local);
|
||||
|
||||
// There must have been one toi write set with commit flag.
|
||||
BOOST_REQUIRE(sc.provider().toi_write_sets() == 1);
|
||||
BOOST_REQUIRE(sc.provider().toi_start_transaction() == 0);
|
||||
BOOST_REQUIRE(sc.provider().toi_commit() == 1);
|
||||
}
|
||||
|
||||
// This test case operates through server_state object in order to
|
||||
// verify that the high priority service is called with appropriate
|
||||
// arguments. The test checks that error is returned in the case if
|
||||
// launching asynchronous process for NBO fails.
|
||||
BOOST_FIXTURE_TEST_CASE(test_applying_nbo_fail,
|
||||
applying_client_fixture)
|
||||
{
|
||||
|
||||
wsrep::mock_high_priority_service hps(sc, &cc, false);
|
||||
wsrep::ws_handle ws_handle(wsrep::transaction_id::undefined(), (void*)(1));
|
||||
const int nbo_begin_flags(wsrep::provider::flag::start_transaction |
|
||||
wsrep::provider::flag::isolation);
|
||||
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("cluster1"),
|
||||
wsrep::seqno(1)),
|
||||
wsrep::stid(wsrep::id("s1"),
|
||||
wsrep::transaction_id::undefined(),
|
||||
wsrep::client_id(1)),
|
||||
wsrep::seqno(0),
|
||||
nbo_begin_flags);
|
||||
std::string nbo_begin("nbo_begin");
|
||||
hps.fail_next_toi_ = true;
|
||||
BOOST_REQUIRE(sc.on_apply(hps, ws_handle, ws_meta,
|
||||
wsrep::const_buffer(nbo_begin.data(),
|
||||
nbo_begin.size())) == 1);
|
||||
}
|
||||
|
@ -50,11 +50,6 @@ BOOST_FIXTURE_TEST_CASE(test_toi_mode,
|
||||
BOOST_FIXTURE_TEST_CASE(test_toi_applying,
|
||||
applying_client_fixture)
|
||||
{
|
||||
// Fixture opens a transaction, commit it first
|
||||
BOOST_REQUIRE((cc.before_commit() || cc.ordered_commit() ||
|
||||
cc.after_commit()) == 0);
|
||||
cc.after_applying();
|
||||
|
||||
BOOST_REQUIRE(cc.toi_mode() == wsrep::client_state::m_local);
|
||||
wsrep::ws_meta ws_meta(wsrep::gtid(wsrep::id("1"), wsrep::seqno(2)),
|
||||
wsrep::stid(sc.id(),
|
||||
|
@ -998,6 +998,8 @@ BOOST_FIXTURE_TEST_CASE(
|
||||
BOOST_FIXTURE_TEST_CASE(transaction_1pc_applying,
|
||||
applying_client_fixture)
|
||||
{
|
||||
start_transaction(wsrep::transaction_id(1),
|
||||
wsrep::seqno(1));
|
||||
BOOST_REQUIRE(cc.before_commit() == 0);
|
||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing);
|
||||
BOOST_REQUIRE(cc.ordered_commit() == 0);
|
||||
|
Reference in New Issue
Block a user