1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-16 02:01:44 +03:00

Applier side transaction state changes and tests

This commit is contained in:
Teemu Ollakka
2018-04-17 16:37:03 +03:00
parent ab795f7979
commit 468e66dea0
6 changed files with 212 additions and 80 deletions

View File

@ -34,8 +34,8 @@ namespace trrep
void on_connect() { }
void on_view() { }
void on_sync() { }
void on_apply(trrep::transaction_context&) { }
void on_commit(trrep::transaction_context&) { }
// void on_apply(trrep::transaction_context&) { }
// void on_commit(trrep::transaction_context&) { }
private:
mutable trrep::mock_provider_impl mock_provider_impl_;

View File

@ -47,29 +47,15 @@ namespace
assert(client_context->mode() == trrep::client_context::m_applier);
trrep::data data(buf->ptr, buf->len);
if (starts_transaction(flags) && commits_transaction(flags))
{
trrep::transaction_context transaction_context(
*client_context,
trrep::transaction_context transaction_context(*client_context,
*wsh,
*meta);
assert(transaction_context.active() == false);
transaction_context.start_transaction(meta->stid.trx);
if (client_context->apply(transaction_context, data))
*meta,
flags);
if (client_context->server_context().on_apply(
*client_context, transaction_context, data))
{
ret = WSREP_CB_FAILURE;
}
else if (client_context->commit(transaction_context))
{
ret = WSREP_CB_FAILURE;
}
}
else
{
// SR not implemented yet
assert(0);
}
return ret;
}
}
@ -89,3 +75,36 @@ int trrep::server_context::load_provider(const std::string& provider_spec)
}
return 0;
}
int trrep::server_context::on_apply(
trrep::client_context& client_context,
trrep::transaction_context& transaction_context,
const trrep::data& data)
{
int ret(0);
if (starts_transaction(transaction_context.flags()) &&
commits_transaction(transaction_context.flags()))
{
assert(transaction_context.active() == false);
transaction_context.start_transaction();
if (client_context.apply(transaction_context, data))
{
ret = 1;
}
else if (client_context.commit(transaction_context))
{
ret = 1;
}
}
else
{
// SR not implemented yet
assert(0);
}
if (ret)
{
client_context.rollback(transaction_context);
}
return ret;
}

View File

@ -17,6 +17,7 @@ namespace trrep
class provider;
class client_context;
class transaction_context;
class data;
class server_context
{
@ -85,9 +86,15 @@ namespace trrep
virtual void on_connect() = 0;
virtual void on_view() = 0;
virtual void on_sync() = 0;
virtual void on_apply(trrep::transaction_context&) = 0;
virtual void on_commit(trrep::transaction_context&) = 0;
//
// This method will be called by the applier thread when
// a remote write set is being applied. It is the responsibility
// of the caller to set up transaction context and data properly.
//
int on_apply(trrep::client_context& client_context,
trrep::transaction_context& transaction_context,
const trrep::data& data);
virtual bool statement_allowed_for_streaming(
const trrep::client_context&,

View File

@ -23,6 +23,7 @@ trrep::transaction_context::transaction_context(
, state_hist_()
, ws_handle_()
, trx_meta_()
, flags_()
, pa_unsafe_(false)
, certified_(false)
, fragments_()
@ -32,14 +33,16 @@ trrep::transaction_context::transaction_context(
trrep::transaction_context::transaction_context(
trrep::client_context& client_context,
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta)
const wsrep_trx_meta_t& trx_meta,
uint32_t flags)
: provider_(client_context.provider())
, client_context_(client_context)
, id_(trx_meta.stid.trx)
, id_(transaction_id::invalid())
, state_(s_executing)
, state_hist_()
, ws_handle_(ws_handle)
, trx_meta_(trx_meta)
, flags_(flags)
, pa_unsafe_()
, certified_(true)
, fragments_()
@ -55,6 +58,27 @@ trrep::transaction_context::~transaction_context()
}
}
int trrep::transaction_context::start_transaction(
const trrep::transaction_id& id)
{
assert(active() == false);
id_ = id;
ws_handle_.trx_id = id_.get();
flags_ |= WSREP_FLAG_TRX_START;
switch (client_context_.mode())
{
case trrep::client_context::m_local:
case trrep::client_context::m_applier:
return 0;
case trrep::client_context::m_replicating:
return provider_.start_transaction(&ws_handle_);
default:
assert(0);
return 1;
}
}
int trrep::transaction_context::append_key(const trrep::key& key)
{
@ -73,15 +97,19 @@ int trrep::transaction_context::before_prepare()
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(client_context_.mode() == trrep::client_context::m_replicating);
assert(state() == s_executing || state() == s_must_abort);
if (state() == s_must_abort)
{
assert(client_context_.mode() == trrep::client_context::m_replicating);
return 1;
}
state(lock, s_preparing);
switch (client_context_.mode())
{
case trrep::client_context::m_replicating:
if (is_streaming())
{
client_context_.debug_suicide(
@ -101,7 +129,12 @@ int trrep::transaction_context::before_prepare()
client_context_.debug_suicide(
"crash_last_fragment_commit_after_fragment_removal");
}
assert(client_context_.mode() == trrep::client_context::m_replicating);
break;
case trrep::client_context::m_local:
case trrep::client_context::m_applier:
break;
}
assert(state() == s_preparing);
return ret;
}
@ -111,13 +144,16 @@ int trrep::transaction_context::after_prepare()
int ret(1);
trrep::unique_lock<trrep::mutex> lock(client_context_.mutex());
assert(client_context_.mode() == trrep::client_context::m_replicating);
assert(state() == s_preparing || state() == s_must_abort);
if (state() == s_must_abort)
{
assert(client_context_.mode() == trrep::client_context::m_replicating);
return 1;
}
switch (client_context_.mode())
{
case trrep::client_context::m_replicating:
if (state() == s_preparing)
{
ret = certify_commit(lock);
@ -131,7 +167,14 @@ int trrep::transaction_context::after_prepare()
assert(state() == s_must_abort);
client_context_.override_error(trrep::e_deadlock_error);
}
assert(client_context_.mode() == trrep::client_context::m_replicating);
break;
case trrep::client_context::m_local:
case trrep::client_context::m_applier:
state(lock, s_certifying);
state(lock, s_committing);
ret = 0;
break;
}
return ret;
}
@ -191,7 +234,6 @@ int trrep::transaction_context::before_commit()
}
lock.lock();
}
break;
case trrep::client_context::m_applier:
assert(ordered());
@ -200,6 +242,15 @@ int trrep::transaction_context::before_commit()
{
state(lock, s_must_abort);
}
else
{
if (state() == s_executing)
{
// 1pc
state(lock, s_certifying);
}
state(lock, s_committing);
}
break;
}
return ret;

View File

@ -58,20 +58,9 @@ namespace trrep
transaction_context(trrep::client_context& client_context);
transaction_context(trrep::client_context& client_context,
const wsrep_ws_handle_t& ws_handle,
const wsrep_trx_meta_t& trx_meta);
const wsrep_trx_meta_t& trx_meta,
uint32_t flags);
~transaction_context();
#if 0
transaction_context(trrep::provider& provider,
trrep::client_context& client_context,
const transaction_id& id)
: provider_(provider)
, client_context_(client_context)
, id_(id)
, state_(s_executing)
, ws_handle_()
, gtid_()
{ }
#endif
// Accessors
trrep::transaction_id id() const
{ return id_; }
@ -101,14 +90,15 @@ namespace trrep
bool pa_unsafe() const { return pa_unsafe_; }
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
//
int start_transaction(const trrep::transaction_id& id)
int start_transaction()
{
assert(active() == false);
id_ = id;
ws_handle_.trx_id = id_.get();
return provider_.start_transaction(&ws_handle_);
assert(trx_meta_.stid.trx != transaction_id::invalid());
return start_transaction(trx_meta_.stid.trx);
}
int start_transaction(const trrep::transaction_id& id);
int append_key(const trrep::key&);
int append_data(const trrep::data&);
@ -133,14 +123,11 @@ namespace trrep
int after_statement();
private:
uint32_t flags() const
{
uint32_t ret(0);
if (is_streaming() == false) ret |= WSREP_FLAG_TRX_START;
if (pa_unsafe()) ret |= WSREP_FLAG_PA_UNSAFE;
return ret;
return flags_;
}
private:
int certify_fragment(trrep::unique_lock<trrep::mutex>&);
int certify_commit(trrep::unique_lock<trrep::mutex>&);
void remove_fragments();
@ -153,6 +140,7 @@ namespace trrep
std::vector<enum state> state_hist_;
wsrep_ws_handle_t ws_handle_;
wsrep_trx_meta_t trx_meta_;
uint32_t flags_;
bool pa_unsafe_;
bool certified_;
@ -165,6 +153,7 @@ namespace trrep
switch (state)
{
case trrep::transaction_context::s_executing: return "executing";
case trrep::transaction_context::s_preparing: return "preparing";
case trrep::transaction_context::s_certifying: return "certifying";
case trrep::transaction_context::s_committing: return "committing";
case trrep::transaction_context::s_ordered_commit: return "ordered_commit";

View File

@ -32,6 +32,23 @@ namespace
sc.mock_provider().bf_abort(cc.id().get(), tc.id().get(), seqno);
}
trrep::transaction_context applying_transaction(
trrep::client_context& cc,
trrep::transaction_id id,
wsrep_seqno_t seqno,
uint32_t flags)
{
wsrep_ws_handle_t ws_handle = { id.get(), 0 };
wsrep_trx_meta_t meta = {
{ {1 }, seqno }, /* gtid */
{ { static_cast<uint8_t>(cc.id().get()) }, id.get(), cc.id().get() }, /* stid */
seqno - 1
};
trrep::transaction_context ret(cc, ws_handle, meta, flags);
return ret;
}
}
//
@ -332,3 +349,52 @@ BOOST_AUTO_TEST_CASE(transaction_context_1pc_bf_during_before_commit_certified)
BOOST_REQUIRE(tc.ordered() == false);
BOOST_REQUIRE(tc.certified() == false);
}
BOOST_AUTO_TEST_CASE(transaction_context_1pc_applying)
{
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc,
trrep::client_id(1),
trrep::client_context::m_applier);
trrep::transaction_context tc(applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.start_transaction() == 0);
BOOST_REQUIRE(tc.active() == true);
BOOST_REQUIRE(tc.certified() == true);
BOOST_REQUIRE(tc.ordered() == true);
BOOST_REQUIRE(tc.before_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing);
BOOST_REQUIRE(tc.ordered_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_ordered_commit);
BOOST_REQUIRE(tc.after_commit() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committed);
}
BOOST_AUTO_TEST_CASE(transaction_context_2pc_applying)
{
trrep::mock_server_context sc("s1", "s1",
trrep::server_context::rm_sync);
trrep::client_context cc(sc,
trrep::client_id(1),
trrep::client_context::m_applier);
trrep::transaction_context tc(applying_transaction(
cc, 1, 1,
WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END));
BOOST_REQUIRE(tc.active() == false);
BOOST_REQUIRE(tc.start_transaction() == 0);
BOOST_REQUIRE(tc.active() == true);
BOOST_REQUIRE(tc.certified() == true);
BOOST_REQUIRE(tc.ordered() == true);
BOOST_REQUIRE(tc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_preparing);
BOOST_REQUIRE(tc.after_prepare() == 0);
BOOST_REQUIRE(tc.state() == trrep::transaction_context::s_committing);
}