1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-30 07:23:07 +03:00

Various changes

* Added preparing state to transaction_context to better distinguish
  between 1PC and 2PC processing
* Changed client_context m_local to denote local only client mode,
  added m_replicating for replicating mode
* Initial unfinished and untested fragment certification
This commit is contained in:
Teemu Ollakka
2018-04-17 11:21:11 +03:00
parent 3b428ff0b7
commit 5c3d5ce24d
11 changed files with 319 additions and 54 deletions

View File

@ -7,6 +7,7 @@
add_library(trrep
provider.cpp
client_context.cpp
server_context.cpp
transaction_context.cpp)
add_executable(trrep_test

16
src/applier.hpp Normal file
View File

@ -0,0 +1,16 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef TRREP_APPLIER_HPP
#define TRREP_APPLIER_HPP
namespace trrep
{
class applier
{
};
}
#endif // TRREP_APPLIER_HPP

View File

@ -19,7 +19,8 @@ namespace trrep
{
e_success,
e_error_during_commit,
e_deadlock_error
e_deadlock_error,
e_append_fragment_error
};
class client_id
@ -40,8 +41,9 @@ namespace trrep
public:
enum mode
{
m_local,
m_applier
m_local, // Operates in local only mode, no replication
m_replicating, // Generates write sets for replication
m_applier // Applying write sets from provider
};
enum state
@ -64,9 +66,16 @@ namespace trrep
virtual ~client_context() { }
// Accessors
trrep::mutex& mutex() { return mutex_; }
const trrep::server_context& server_context() const
trrep::server_context& server_context() const
{ return server_context_; }
client_id id() const { return id_; }
client_id id(const client_id& id)
{
assert(mode() == m_applier);
id_ = id;
}
enum mode mode() const { return mode_; }
enum state state() const { return state_; }
@ -81,11 +90,18 @@ namespace trrep
virtual int after_statement() { return 0; }
virtual int append_fragment(trrep::transaction_context&,
uint32_t, const trrep::data&)
{ return 0; }
virtual int commit(trrep::transaction_context&) { return 0; }
virtual int rollback(trrep::transaction_context&) { return 0; }
virtual void will_replay(trrep::transaction_context&) { }
virtual int replay(trrep::transaction_context& tc);
virtual int apply(const trrep::data&) { return 0; }
virtual void wait_for_replayers(trrep::unique_lock<trrep::mutex>&)
{ }
virtual int prepare_data_for_replication(
@ -98,6 +114,7 @@ namespace trrep
virtual void override_error(const trrep::client_error&) { }
virtual bool killed() const { return 0; }
virtual void abort() const { ::abort(); }
virtual void store_globals() { }
// Debug helpers
virtual void debug_sync(const std::string&)
{
@ -117,6 +134,27 @@ namespace trrep
enum mode mode_;
enum state state_;
};
class client_context_switch
{
public:
client_context_switch(trrep::client_context& orig_context,
trrep::client_context& current_context)
: orig_context_(orig_context)
, current_context_(current_context)
{
current_context_.store_globals();
}
~client_context_switch()
{
orig_context_.store_globals();
}
private:
client_context& orig_context_;
client_context& current_context_;
};
}
#endif // TRREP_CLIENT_CONTEXT_HPP

View File

@ -33,10 +33,10 @@ namespace trrep
// Provider implemenatation interface
int start_transaction(wsrep_ws_handle_t*) { return 0; }
wsrep_status
certify_commit(wsrep_conn_id_t conn_id,
wsrep_ws_handle_t* ws_handle,
uint32_t flags,
wsrep_trx_meta_t* trx_meta)
certify(wsrep_conn_id_t conn_id,
wsrep_ws_handle_t* ws_handle,
uint32_t flags,
wsrep_trx_meta_t* trx_meta)
{
assert(flags | WSREP_FLAG_TRX_END);
if ((flags | WSREP_FLAG_TRX_END) == 0)

View File

@ -27,10 +27,10 @@ namespace trrep
{ return impl_->append_key(wsh, key); }
int append_data(wsrep_ws_handle_t* wsh, const wsrep_buf_t* buf)
{ return impl_->append_data(wsh, buf); }
wsrep_status certify_commit(wsrep_conn_id_t conn_id,
wsrep_ws_handle_t* wsh,
uint32_t flags, wsrep_trx_meta_t* trx_meta)
{ return impl_->certify_commit(conn_id, wsh, flags, trx_meta); }
wsrep_status certify(wsrep_conn_id_t conn_id,
wsrep_ws_handle_t* wsh,
uint32_t flags, wsrep_trx_meta_t* trx_meta)
{ return impl_->certify(conn_id, wsh, flags, trx_meta); }
int rollback(const wsrep_trx_id_t trx_id)
{ return impl_->rollback(trx_id); }
wsrep_status commit_order_enter(wsrep_ws_handle_t* wsh)

View File

@ -16,10 +16,10 @@ namespace trrep
virtual int start_transaction(wsrep_ws_handle_t*) = 0;
virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0;
virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0;
virtual wsrep_status
certify_commit(wsrep_conn_id_t, wsrep_ws_handle_t*,
uint32_t,
wsrep_trx_meta_t*) = 0;
virtual wsrep_status_t
certify(wsrep_conn_id_t, wsrep_ws_handle_t*,
uint32_t,
wsrep_trx_meta_t*) = 0;
virtual int rollback(const wsrep_trx_id_t) = 0;
virtual wsrep_status commit_order_enter(wsrep_ws_handle_t*) = 0;
virtual int commit_order_leave(wsrep_ws_handle_t*) = 0;

90
src/server_context.cpp Normal file
View File

@ -0,0 +1,90 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#include "server_context.hpp"
#include "client_context.hpp"
#include <wsrep_api.h>
#define START_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_START)
#define COMMIT_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_TRX_END)
#define ROLLBACK_TRANSACTION(flags_) ((flags_) & WSREP_FLAG_ROLLBACK)
#if 0
namespace
{
static wsrep_cb_status_t apply_cb(void* ctx,
const wsrep_ws_handle_t* wsh,
uint32_t flags,
const wsrep_buf_t* buf,
const wsrep_trx_meta_t* meta,
wsrep_bool_t* exit_loop)
{
wsrep_cb_status_t ret(WSREP_CB_SUCCESS);
trrep::client_context* client_context(
reinterpret_cast<trrep::client_context*>(ctx));
assert(client_context);
assert(client_context.mode() == trrep::client_context::m_applier);
const trrep::server_context& server_context(
client_context->server_context());
trrep::data data(buf->ptr, buf->len);
if (START_TRANSACTION(flags) && COMMIT_TRANSACTION(flags))
{
trrep::transaction_context transasction_context(
server_context.provider()
*client_context);
assert(transaction_context->active() == false);
transaction_context->start_transaction(meta->stid.trx);
if (client_context.apply(transaction_context, data))
{
ret = WSREP_CB_FAILURE;
}
else if (client_context.commit(transaction_context))
{
ret = WSREP_CB_FAILURE;
}
}
else if (START_TRANSACTION(flags))
{
// First fragment of SR transaction
trrep::client_context* sr_client_context(
server_context.client_context(node_id, client_id, trx_id));
assert(sr_client_context->transaction_context().active() == false);
}
else if (COMMIT_TRANSACTION(flags))
{
// Final fragment of SR transaction
trrep::client_context* sr_client_context(
server_context.client_context(node_id, client_id, trx_id));
assert(sr_client_context->transaction_context().active());
if (data.size() > 0)
{
sr_client_context->apply(data);
}
sr_client_context->commit();
}
else
{
// Middle fragment of SR transaction
trrep::client_context* sr_client_context(
server_context.client_context(node_id, client_id, trx_id));
assert(sr_client_context->transaction_context().active());
sr_client_context->apply(data);
}
}
return ret;
}
#endif // 0
trrep::client_context* trrep::server_context::local_client_context()
{
return new trrep::client_context(*this, ++client_id_,
trrep::client_context::m_local);
}

View File

@ -10,6 +10,7 @@
namespace trrep
{
// Forward declarations
class client_context;
class transaction_context;
class server_context
@ -28,9 +29,11 @@ namespace trrep
: name_(name)
, id_(id)
, rollback_mode_(rollback_mode)
{ }
, client_id_()
{ }
const std::string& name() const { return name_; }
const std::string& id() const { return id_; }
virtual client_context* local_client_context();
virtual void on_connect() { }
virtual void on_view() { }
@ -50,6 +53,8 @@ namespace trrep
std::string name_;
std::string id_;
enum rollback_mode rollback_mode_;
// TODO: This should be part of server mock
unsigned long long client_id_;
};
}

View File

@ -10,9 +10,18 @@
#include <iostream> // TODO: replace with proper logging utility
#include <sstream>
#include <memory>
// Public
trrep::transaction_context::~transaction_context()
{
if (state() != s_committed && state() != s_aborted)
{
client_context_.rollback(*this);
}
}
int trrep::transaction_context::append_key(const trrep::key& key)
{
@ -31,7 +40,7 @@ int trrep::transaction_context::before_prepare()
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(client_context_.mode() == trrep::client_context::m_local);
assert(client_context_.mode() == trrep::client_context::m_replicating);
assert(state() == s_executing || state() == s_must_abort);
if (state() == s_must_abort)
@ -39,6 +48,7 @@ int trrep::transaction_context::before_prepare()
return 1;
}
state(lock, s_preparing);
if (is_streaming())
{
client_context_.debug_suicide(
@ -58,8 +68,8 @@ int trrep::transaction_context::before_prepare()
client_context_.debug_suicide(
"crash_last_fragment_commit_after_fragment_removal");
}
assert(client_context_.mode() == trrep::client_context::m_local);
assert(state() == s_executing);
assert(client_context_.mode() == trrep::client_context::m_replicating);
assert(state() == s_preparing);
return ret;
}
@ -68,14 +78,14 @@ int trrep::transaction_context::after_prepare()
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(client_context_.mode() == trrep::client_context::m_local);
assert(state() == s_executing || state() == s_must_abort);
assert(client_context_.mode() == trrep::client_context::m_replicating);
assert(state() == s_preparing || state() == s_must_abort);
if (state() == s_must_abort)
{
return 1;
}
if (state() == s_executing)
if (state() == s_preparing)
{
ret = certify_commit(lock);
assert((ret == 0 || state() == s_committing) ||
@ -88,7 +98,7 @@ int trrep::transaction_context::after_prepare()
assert(state() == s_must_abort);
client_context_.override_error(trrep::e_deadlock_error);
}
assert(client_context_.mode() == trrep::client_context::m_local);
assert(client_context_.mode() == trrep::client_context::m_replicating);
return ret;
}
@ -97,9 +107,18 @@ int trrep::transaction_context::before_commit()
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(state() == s_executing || state() == s_committing ||
state() == s_must_abort);
switch (client_context_.mode())
{
case trrep::client_context::m_local:
if (ordered())
{
ret = provider_.commit_order_enter(&ws_handle_);
}
break;
case trrep::client_context::m_replicating:
// Commit is one phase - before/after prepare was not called
if (state() == s_executing)
@ -142,6 +161,7 @@ int trrep::transaction_context::before_commit()
break;
case trrep::client_context::m_applier:
assert(ordered());
ret = provider_.commit_order_enter(&ws_handle_);
if (ret)
{
@ -158,6 +178,7 @@ int trrep::transaction_context::ordered_commit()
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(state() == s_committing);
assert(ordered());
ret = provider_.commit_order_leave(&ws_handle_);
// Should always succeed
assert(ret == 0);
@ -176,6 +197,9 @@ int trrep::transaction_context::after_commit()
switch (client_context_.mode())
{
case trrep::client_context::m_local:
// Nothing to do
break;
case trrep::client_context::m_replicating:
if (is_streaming())
{
clear_fragments();
@ -328,18 +352,19 @@ void trrep::transaction_context::state(
{
assert(lock.owns_lock());
static const char allowed[n_states][n_states] =
{ /* ex ce co oc ct cf ma ab ad mr re from/to */
{ 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* ex */
{ 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
{ 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
{ 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
{ 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
{ 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* ma */
{ 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
{ 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} /* re */
{ /* ex pr ce co oc ct cf ma ab ad mr re from/to */
{ 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* ex */
{ 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0}, /* pr */
{ 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
{ 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
{ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
{ 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
{ 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0}, /* ma */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
{ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} /* re */
};
if (allowed[state_][next_state])
{
@ -360,9 +385,74 @@ void trrep::transaction_context::state(
}
}
int trrep::transaction_context::certify_fragment()
int trrep::transaction_context::certify_fragment(
trrep::unique_lock<trrep::mutex>& lock)
{
throw trrep::not_implemented_error();
assert(lock.owns_lock());
assert(client_context_.mode() == trrep::client_context::m_replicating);
assert(rollback_replicated_for_ != id_);
client_context_.wait_for_replayers(lock);
if (state() == s_must_abort)
{
client_context_.override_error(trrep::e_deadlock_error);
return 1;
}
state(lock, s_certifying);
lock.unlock();
uint32_t flags(0);
if (fragments_.empty())
{
flags |= WSREP_FLAG_TRX_START;
}
trrep::data data;
if (client_context_.prepare_data_for_replication(*this, data))
{
lock.lock();
state(lock, s_must_abort);
return 1;
}
// Client context to store fragment in separate transaction
// Switch temporarily to sr_transaction_context, switch back
// to original when this goes out of scope
std::auto_ptr<trrep::client_context> sr_client_context(
client_context_.server_context().local_client_context());
trrep::client_context_switch client_context_switch(
client_context_,
*sr_client_context);
trrep::transaction_context sr_transaction_context(provider_,
*sr_client_context);
if (sr_client_context->append_fragment(sr_transaction_context, flags, data))
{
lock.lock();
state(lock, s_must_abort);
client_context_.override_error(trrep::e_append_fragment_error);
return 1;
}
wsrep_status_t cert_ret(provider_.certify(client_context_.id().get(),
&sr_transaction_context.ws_handle_,
flags,
&sr_transaction_context.trx_meta_));
int ret(0);
switch (cert_ret)
{
case WSREP_OK:
sr_client_context->commit(sr_transaction_context);
break;
default:
sr_client_context->rollback(sr_transaction_context);
ret = 1;
break;
}
return ret;
}
int trrep::transaction_context::certify_commit(
@ -402,10 +492,10 @@ int trrep::transaction_context::certify_commit(
return 1;
}
wsrep_status cert_ret(provider_.certify_commit(client_context_.id().get(),
&ws_handle_,
flags() | WSREP_FLAG_TRX_END,
&trx_meta_));
wsrep_status cert_ret(provider_.certify(client_context_.id().get(),
&ws_handle_,
flags() | WSREP_FLAG_TRX_END,
&trx_meta_));
lock.lock();

View File

@ -42,6 +42,7 @@ namespace trrep
enum state
{
s_executing,
s_preparing,
s_certifying,
s_committing,
s_ordered_commit,
@ -65,8 +66,29 @@ namespace trrep
, trx_meta_()
, pa_unsafe_(false)
, certified_(false)
, fragments_()
, rollback_replicated_for_(false)
{ }
transaction_context(trrep::provider& provider,
trrep::client_context& client_context,
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta)
: provider_(provider)
, client_context_(client_context)
, id_(trx_meta.stid.trx)
, state_(s_executing)
, state_hist_()
, ws_handle_(ws_handle)
, trx_meta_(trx_meta)
, pa_unsafe_()
, certified_(true)
, fragments_()
, rollback_replicated_for_(false)
{ }
~transaction_context();
#if 0
transaction_context(trrep::provider& provider,
trrep::client_context& client_context,
@ -148,7 +170,7 @@ namespace trrep
if (pa_unsafe()) ret |= WSREP_FLAG_PA_UNSAFE;
return ret;
}
int certify_fragment();
int certify_fragment(trrep::unique_lock<trrep::mutex>&);
int certify_commit(trrep::unique_lock<trrep::mutex>&);
void remove_fragments();
void clear_fragments();
@ -162,6 +184,9 @@ namespace trrep
wsrep_trx_meta_t trx_meta_;
bool pa_unsafe_;
bool certified_;
std::vector<wsrep_gtid_t> fragments_;
trrep::transaction_id rollback_replicated_for_;
};
static inline std::string to_string(enum trrep::transaction_context::state state)

View File

@ -44,7 +44,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc)
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
// Verify initial state
@ -86,7 +86,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc)
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
// Verify initial state
@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_2pc)
// Run before prepare
BOOST_REQUIRE(tc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_preparing);
// Run after prepare
BOOST_REQUIRE(tc.after_prepare() == 0);
@ -135,7 +135,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit)
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
// Verify initial state
@ -172,13 +172,13 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_commit)
//
// Test a 2PC transaction which gets BF aborted before before_prepare
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_prepare)
BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_before_prepare)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
// Verify initial state
@ -215,13 +215,13 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_before_prepare)
//
// Test a 2PC transaction which gets BF aborted before before_prepare
//
BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_after_prepare)
BOOST_AUTO_TEST_CASE(transaction_context_2pc_bf_before_after_prepare)
{
trrep::mock_provider_impl mock_provider;
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
// Verify initial state
@ -236,7 +236,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_before_after_prepare)
// Run before prepare
BOOST_REQUIRE(tc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_executing);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_preparing);
bf_abort_unordered(cc, tc);
@ -269,7 +269,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_uncertified
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
// Verify initial state
@ -314,7 +314,7 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified)
trrep::provider provider(&mock_provider);
trrep::server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_local);
trrep::client_context cc(sc, trrep::client_id(1), trrep::client_context::m_replicating);
trrep::transaction_context tc(provider, cc);
// Verify initial state