1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Initial support for XA

Force fragment replication when XA transaction is prepared, with
prepare fragment. Commit fragment happens in before_commit().
Adjusted fragment removal, which cannot happen in atomically with the
executing transaction.
This commit is contained in:
Daniele Sciascia
2019-02-04 08:45:00 +01:00
parent 5c27d6dafa
commit 9c9323e2a5
4 changed files with 122 additions and 26 deletions

View File

@ -177,11 +177,20 @@ static int commit_fragment(wsrep::server_state& server_state,
{
assert(err.size() == 0);
}
streaming_applier->debug_crash(
"crash_apply_cb_before_fragment_removal");
ret = ret || streaming_applier->remove_fragments(ws_meta);
streaming_applier->debug_crash(
"crash_apply_cb_after_fragment_removal");
const wsrep::transaction& trx(streaming_applier->transaction());
// Fragment removal for XA is going to happen in after_commit
if (!trx.is_xa())
{
streaming_applier->debug_crash(
"crash_apply_cb_before_fragment_removal");
ret = ret || streaming_applier->remove_fragments(ws_meta);
streaming_applier->debug_crash(
"crash_apply_cb_after_fragment_removal");
}
streaming_applier->debug_crash(
"crash_commit_cb_before_last_fragment_commit");
ret = ret || streaming_applier->commit(ws_handle, ws_meta);
@ -427,7 +436,7 @@ static int apply_write_set(wsrep::server_state& server_state,
}
if (ret)
{
wsrep::log_info() << "Failed to apply write set: " << ws_meta;
wsrep::log_error() << "Failed to apply write set: " << ws_meta;
}
return ret;
}

View File

@ -288,8 +288,11 @@ int wsrep::transaction::before_prepare(
wsrep::provider::error_not_allowed);
ret = 1;
}
else
else if (!is_xa())
{
// Note: we can't remove fragments here for XA,
// the transaction has already issued XA END and
// is in IDLE state, no more changes allowed!
ret = client_service_.remove_fragments();
if (ret)
{
@ -305,13 +308,25 @@ int wsrep::transaction::before_prepare(
ret = 1;
}
}
if (ret == 0)
{
ret = certify_commit(lock);
assert((ret == 0 && state() == s_preparing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
if (is_xa())
{
ret = streaming_step(lock);
if (ret == 0)
{
assert(state() == s_executing);
}
}
else
{
ret = certify_commit(lock);
assert((ret == 0 && state() == s_preparing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
}
if (ret)
{
@ -325,7 +340,14 @@ int wsrep::transaction::before_prepare(
case wsrep::client_state::m_high_priority:
// Note: fragment removal is done from applying
// context for high priority mode.
state(lock, s_preparing);
if (is_xa())
{
assert(state() == s_executing);
}
else
{
state(lock, s_preparing);
}
break;
default:
assert(0);
@ -333,6 +355,7 @@ int wsrep::transaction::before_prepare(
}
assert(state() == s_preparing ||
(state() == s_executing && is_xa()) ||
(ret && (state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed ||
@ -347,18 +370,26 @@ int wsrep::transaction::after_prepare(
assert(lock.owns_lock());
debug_log_state("after_prepare_enter");
assert(certified() && ordered());
assert(state() == s_preparing || state() == s_must_abort);
if (state() == s_must_abort)
if (is_xa())
{
assert(client_state_.mode() == wsrep::client_state::m_local);
state(lock, s_must_replay);
return 1;
// TODO XA consider adding state s_prepared
assert(state() == s_executing);
assert(client_state_.mode() == wsrep::client_state::m_local ||
(certified() && ordered()));
}
else
{
assert(certified() && ordered());
assert(state() == s_preparing || state() == s_must_abort);
state(lock, s_committing);
if (state() == s_must_abort)
{
assert(client_state_.mode() == wsrep::client_state::m_local);
state(lock, s_must_replay);
return 1;
}
state(lock, s_committing);
}
debug_log_state("after_prepare_leave");
return 0;
}
@ -380,7 +411,19 @@ int wsrep::transaction::before_commit()
switch (client_state_.mode())
{
case wsrep::client_state::m_local:
if (state() == s_executing)
if (state() == s_executing && is_xa())
{
ret = certify_commit(lock);
assert((ret == 0 && state() == s_preparing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
if (ret == 0)
{
state(lock, s_committing);
}
}
else if (state() == s_executing)
{
ret = before_prepare(lock) || after_prepare(lock);
assert((ret == 0 && state() == s_committing)
@ -438,6 +481,14 @@ int wsrep::transaction::before_commit()
case wsrep::client_state::m_high_priority:
assert(certified());
assert(ordered());
if (is_xa())
{
assert(state() == s_executing);
// one more reason to add prepared state?
state(lock, s_preparing);
state(lock, s_committing);
}
if (state() == s_executing || state() == s_replaying)
{
ret = before_prepare(lock) || after_prepare(lock);
@ -513,6 +564,25 @@ int wsrep::transaction::after_commit()
{
assert(client_state_.mode() == wsrep::client_state::m_local ||
client_state_.mode() == wsrep::client_state::m_high_priority);
if (is_xa())
{
// XA fragment removal happens here,
// see comment in before_prepare
lock.unlock();
scoped_storage_service<storage_service_deleter>
sr_scope(
client_service_,
server_service_.storage_service(client_service_),
storage_service_deleter(server_service_));
wsrep::storage_service& storage_service(
sr_scope.storage_service());
storage_service.adopt_transaction(*this);
storage_service.remove_fragments();
storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
lock.lock();
}
if (client_state_.mode() == wsrep::client_state::m_local)
{
lock.unlock();
@ -1050,7 +1120,7 @@ bool wsrep::transaction::abort_or_interrupt(
int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
assert(streaming_context_.fragment_size());
assert(streaming_context_.fragment_size() || is_xa());
if (client_service_.bytes_generated() <
streaming_context_.bytes_certified())
@ -1153,6 +1223,11 @@ int wsrep::transaction::certify_fragment(
flags(flags() | wsrep::provider::flag::implicit_deps);
}
if (is_xa())
{
flags(flags() | wsrep::provider::flag::prepare);
}
int ret(0);
enum wsrep::client_error error(wsrep::e_success);
enum wsrep::provider::status cert_ret(wsrep::provider::success);
@ -1604,6 +1679,7 @@ void wsrep::transaction::debug_log_state(
<< ", sr_rb: " << streaming_context_.rolled_back()
<< "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id())
<< " thread_id: " << client_state_.owning_thread_id_
<< "\n query: " << client_service_.query()
<< "");
}