1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Mock server context

This commit is contained in:
Teemu Ollakka
2018-04-17 15:01:48 +03:00
parent 5c3d5ce24d
commit ab795f7979
9 changed files with 264 additions and 134 deletions

View File

@ -5,6 +5,11 @@
#include "client_context.hpp"
#include "transaction_context.hpp"
trrep::provider& trrep::client_context::provider() const
{
return server_context_.provider();
}
// TODO: This should be pure virtual method, implemented by
// DBMS integration or mock classes only.
int trrep::client_context::replay(trrep::transaction_context& tc)

View File

@ -5,7 +5,7 @@
#ifndef TRREP_CLIENT_CONTEXT_HPP
#define TRREP_CLIENT_CONTEXT_HPP
#include "provider.hpp"
#include "server_context.hpp"
#include "mutex.hpp"
#include "lock.hpp"
#include "data.hpp"
@ -13,7 +13,7 @@
namespace trrep
{
class server_context;
class transaction_context;
class provider;
enum client_error
{
@ -68,6 +68,7 @@ namespace trrep
trrep::mutex& mutex() { return mutex_; }
trrep::server_context& server_context() const
{ return server_context_; }
trrep::provider& provider() const;
client_id id() const { return id_; }
client_id id(const client_id& id)
@ -100,7 +101,8 @@ namespace trrep
virtual int replay(trrep::transaction_context& tc);
virtual int apply(const trrep::data&) { return 0; }
virtual int apply(trrep::transaction_context&,
const trrep::data&) { return 0; }
virtual void wait_for_replayers(trrep::unique_lock<trrep::mutex>&)
{ }
@ -154,7 +156,6 @@ namespace trrep
client_context& orig_context_;
client_context& current_context_;
};
}
#endif // TRREP_CLIENT_CONTEXT_HPP

View File

@ -0,0 +1,47 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_MOCK_SERVER_CONTEXT_HPP
#define TRREP_MOCK_SERVER_CONTEXT_HPP
#include "server_context.hpp"
#include "mock_provider_impl.hpp"
namespace trrep
{
class mock_server_context : public trrep::server_context
{
public:
mock_server_context(const std::string& name,
const std::string& id,
enum trrep::server_context::rollback_mode rollback_mode)
: trrep::server_context(name, id, rollback_mode)
, mock_provider_impl_()
, provider_(&mock_provider_impl_)
, last_client_id_(0)
{ }
trrep::provider& provider() const
{ return provider_; }
trrep::mock_provider_impl& mock_provider() const
{ return mock_provider_impl_; }
trrep::client_context* local_client_context()
{
return new trrep::client_context(*this, ++last_client_id_,
trrep::client_context::m_local);
}
void on_connect() { }
void on_view() { }
void on_sync() { }
void on_apply(trrep::transaction_context&) { }
void on_commit(trrep::transaction_context&) { }
private:
mutable trrep::mock_provider_impl mock_provider_impl_;
mutable trrep::provider provider_;
unsigned long long last_client_id_;
};
}
#endif // TRREP_MOCK_SERVER_CONTEXT_HPP

View File

@ -4,87 +4,88 @@
#include "server_context.hpp"
#include "client_context.hpp"
#include "transaction_context.hpp"
// Todo: refactor into provider factory
#include "mock_provider_impl.hpp"
#include "wsrep_provider_v26.hpp"
#include <wsrep_api.h>
#define START_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_START)
#define COMMIT_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_END)
#define ROLLBACK_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_ROLLBACK)
#if 0
#include <cassert>
namespace
{
inline bool starts_transaction(uint32_t flags)
{
return (flags & WSREP_FLAG_TRX_START);
}
inline bool commits_transaction(uint32_t flags)
{
return (flags & WSREP_FLAG_TRX_END);
}
inline bool rolls_back_transaction(uint32_t flags)
{
return (flags & WSREP_FLAG_ROLLBACK);
}
static wsrep_cb_status_t apply_cb(void* ctx,
const wsrep_ws_handle_t* wsh,
uint32_t flags,
const wsrep_buf_t* buf,
const wsrep_trx_meta_t* meta,
wsrep_bool_t* exit_loop)
wsrep_bool_t* exit_loop __attribute__((unused)))
{
wsrep_cb_status_t ret(WSREP_CB_SUCCESS);
trrep::client_context* client_context(
reinterpret_cast<trrep::client_context*>(ctx));
assert(client_context);
assert(client_context.mode() == trrep::client_context::m_applier);
assert(client_context->mode() == trrep::client_context::m_applier);
const trrep::server_context& server_context(
client_context->server_context());
trrep::data data(buf->ptr, buf->len);
if (START_TRANSACTION(flags) && COMMIT_TRANSACTION(flags))
if (starts_transaction(flags) && commits_transaction(flags))
{
trrep::transaction_context transasction_context(
server_context.provider()
*client_context);
assert(transaction_context->active() == false);
transaction_context->start_transaction(meta->stid.trx);
if (client_context.apply(transaction_context, data))
trrep::transaction_context transaction_context(
*client_context,
*wsh,
*meta);
assert(transaction_context.active() == false);
transaction_context.start_transaction(meta->stid.trx);
if (client_context->apply(transaction_context, data))
{
ret = WSREP_CB_FAILURE;
}
else if (client_context.commit(transaction_context))
else if (client_context->commit(transaction_context))
{
ret = WSREP_CB_FAILURE;
}
}
else if (START_TRANSACTION(flags))
{
// First fragment of SR transaction
trrep::client_context* sr_client_context(
server_context.client_context(node_id, client_id, trx_id));
assert(sr_client_context->transaction_context().active() == false);
}
else if (COMMIT_TRANSACTION(flags))
{
// Final fragment of SR transaction
trrep::client_context* sr_client_context(
server_context.client_context(node_id, client_id, trx_id));
assert(sr_client_context->transaction_context().active());
if (data.size() > 0)
{
sr_client_context->apply(data);
}
sr_client_context->commit();
}
else
{
// Middle fragment of SR transaction
trrep::client_context* sr_client_context(
server_context.client_context(node_id, client_id, trx_id));
assert(sr_client_context->transaction_context().active());
sr_client_context->apply(data);
// SR not implemented yet
assert(0);
}
return ret;
}
return ret;
}
#endif // 0
trrep::client_context* trrep::server_context::local_client_context()
int trrep::server_context::load_provider(const std::string& provider_spec)
{
return new trrep::client_context(*this, ++client_id_,
trrep::client_context::m_local);
if (provider_spec == "mock")
{
provider_ = new trrep::provider(new trrep::mock_provider_impl);
}
else
{
struct wsrep_init_args init_args;
init_args.apply_cb = &apply_cb;
provider_ = new trrep::provider(
new trrep::wsrep_provider_v26(&init_args));
}
return 0;
}

View File

@ -5,11 +5,16 @@
#ifndef TRREP_SERVER_CONTEXT_HPP
#define TRREP_SERVER_CONTEXT_HPP
#include "exception.hpp"
#include "wsrep_api.h"
#include <string>
namespace trrep
{
// Forward declarations
class provider;
class client_context;
class transaction_context;
@ -26,20 +31,62 @@ namespace trrep
server_context(const std::string& name,
const std::string& id,
enum rollback_mode rollback_mode)
: name_(name)
: provider_()
, name_(name)
, id_(id)
, rollback_mode_(rollback_mode)
, client_id_()
{ }
const std::string& name() const { return name_; }
const std::string& id() const { return id_; }
virtual client_context* local_client_context();
virtual void on_connect() { }
virtual void on_view() { }
virtual void on_sync() { }
virtual void on_apply(trrep::transaction_context&) { }
virtual void on_commit(trrep::transaction_context&) { }
{ }
//
// Return server name
//
const std::string& name() const { return name_; }
//
// Return server identifier
//
const std::string& id() const { return id_; }
//
// Create client context which acts only locally, i.e. does
// not participate in replication. However, local client
// connection may execute transactions which require ordering,
// as when modifying local SR fragment storage requires
// strict commit ordering.
//
virtual client_context* local_client_context() = 0;
//
// Load provider
//
// @return Zero on success, non-zero on error
//
int load_provider(const std::string&);
//
// Return reference to provider
//
// @return Reference to provider
// @throw trrep::runtime_error if provider has not been loaded
//
virtual trrep::provider& provider() const
{
if (provider_ == 0)
{
throw trrep::runtime_error("provider not loaded");
}
return *provider_;
}
//
//
//
virtual void on_connect() = 0;
virtual void on_view() = 0;
virtual void on_sync() = 0;
virtual void on_apply(trrep::transaction_context&) = 0;
virtual void on_commit(trrep::transaction_context&) = 0;
virtual bool statement_allowed_for_streaming(
@ -50,11 +97,14 @@ namespace trrep
return false;
}
private:
server_context(const server_context&);
server_context& operator=(const server_context&);
trrep::provider* provider_;
std::string name_;
std::string id_;
enum rollback_mode rollback_mode_;
// TODO: This should be part of server mock
unsigned long long client_id_;
};
}

View File

@ -14,6 +14,39 @@
// Public
trrep::transaction_context::transaction_context(
trrep::client_context& client_context)
: provider_(client_context.provider())
, client_context_(client_context)
, id_(transaction_id::invalid())
, state_(s_executing)
, state_hist_()
, ws_handle_()
, trx_meta_()
, pa_unsafe_(false)
, certified_(false)
, fragments_()
, rollback_replicated_for_(false)
{ }
trrep::transaction_context::transaction_context(
trrep::client_context& client_context,
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta)
: provider_(client_context.provider())
, client_context_(client_context)
, id_(trx_meta.stid.trx)
, state_(s_executing)
, state_hist_()
, ws_handle_(ws_handle)
, trx_meta_(trx_meta)
, pa_unsafe_()
, certified_(true)
, fragments_()
, rollback_replicated_for_(false)
{ }
trrep::transaction_context::~transaction_context()
{
if (state() != s_committed && state() != s_aborted)
@ -388,6 +421,8 @@ void trrep::transaction_context::state(
int trrep::transaction_context::certify_fragment(
trrep::unique_lock<trrep::mutex>& lock)
{
// This method is not fully implemented and tested yet.
throw trrep::not_implemented_error();
assert(lock.owns_lock());
assert(client_context_.mode() == trrep::client_context::m_replicating);
@ -426,8 +461,7 @@ int trrep::transaction_context::certify_fragment(
trrep::client_context_switch client_context_switch(
client_context_,
*sr_client_context);
trrep::transaction_context sr_transaction_context(provider_,
*sr_client_context);
trrep::transaction_context sr_transaction_context(*sr_client_context);
if (sr_client_context->append_fragment(sr_transaction_context, flags, data))
{
lock.lock();

View File

@ -55,39 +55,10 @@ namespace trrep
s_replaying
};
static const int n_states = s_replaying + 1;
transaction_context(trrep::provider& provider,
trrep::client_context& client_context)
: provider_(provider)
, client_context_(client_context)
, id_(transaction_id::invalid())
, state_(s_executing)
, state_hist_()
, ws_handle_()
, trx_meta_()
, pa_unsafe_(false)
, certified_(false)
, fragments_()
, rollback_replicated_for_(false)
{ }
transaction_context(trrep::provider& provider,
trrep::client_context& client_context,
transaction_context(trrep::client_context& client_context);
transaction_context(trrep::client_context& client_context,
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta)
: provider_(provider)
, client_context_(client_context)
, id_(trx_meta.stid.trx)
, state_(s_executing)
, state_hist_()
, ws_handle_(ws_handle)
, trx_meta_(trx_meta)
, pa_unsafe_()
, certified_(true)
, fragments_()
, rollback_replicated_for_(false)
{ }
const wsrep_trx_meta_t& trx_meta);
~transaction_context();
#if 0
transaction_context(trrep::provider& provider,

View File

@ -4,7 +4,7 @@
#include "transaction_context.hpp"
#include "client_context.hpp"
#include "server_context.hpp"
#include "mock_server_context.hpp"
#include "provider.hpp"
#include "mock_provider_impl.hpp"
@ -24,15 +24,14 @@ namespace
}
// BF abort method to abort transactions via provider
void bf_abort_provider(trrep::mock_provider_impl& provider,
void bf_abort_provider(trrep::mock_server_context& sc,
const trrep::client_context& cc,
const trrep::transaction_context& tc,
wsrep_seqno_t seqno)
{
provider.bf_abort(cc.id().get(), tc.id().get(), seqno);
sc.mock_provider().bf_abort(cc.id().get(), tc.id().get(), seqno);
}
}
//
@ -40,12 +39,10 @@ namespace
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
trrep::transaction_context tc(cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
@ -82,12 +79,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc)
//
BOOST_AUTO_TEST_CASE(transaction_context_2pc)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
trrep::transaction_context tc(cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
@ -131,12 +126,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc)
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
trrep::transaction_context tc(cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
@ -174,12 +167,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit)
//
BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
trrep::transaction_context tc(cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
@ -217,12 +208,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare)
//
BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
trrep::transaction_context tc(cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
@ -265,12 +254,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare)
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
trrep::transaction_context tc(cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
@ -282,7 +269,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
bf_abort_provider(mock_provider, cc, tc, WSREP_SEQNO_UNDEFINED);
bf_abort_provider(sc, cc, tc, WSREP_SEQNO_UNDEFINED);
// Run before commit
BOOST_REQUIRE(tc.before_commit());
@ -310,12 +297,10 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
trrep::transaction_context tc(cc);
// Verify initial state
BOOST_REQUIRE(tc.active() == false);
@ -327,7 +312,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified)
BOOST_REQUIRE(tc.id() == trrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
bf_abort_provider(mock_provider, cc, tc, 1);
bf_abort_provider(sc, cc, tc, 1);
// Run before commit
BOOST_REQUIRE(tc.before_commit());

View File

@ -0,0 +1,36 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_WSREP_PROVIDER_V26_HPP
#define TRREP_WSREP_PROVIDER_V26_HPP
#include "provider_impl.hpp"
#include <wsrep_api.h>
namespace trrep
{
class wsrep_provider_v26 : public trrep::provider_impl
{
public:
wsrep_provider_v26(struct wsrep_init_args*)
{ }
int start_transaction(wsrep_ws_handle_t*) { return 0; }
int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; }
int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; }
wsrep_status_t
certify(wsrep_conn_id_t, wsrep_ws_handle_t*,
uint32_t,
wsrep_trx_meta_t*) { return WSREP_OK; }
int rollback(const wsrep_trx_id_t) { return 0; }
wsrep_status commit_order_enter(wsrep_ws_handle_t*) { return WSREP_OK; }
int commit_order_leave(wsrep_ws_handle_t*) { return 0; }
int release(wsrep_ws_handle_t*) { return 0; }
};
}
#endif // TRREP_WSREP_PROVIDER_V26_HPP