1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-31 18:24:25 +03:00

Support recovery of XA transactions

* Add method `restore_prepared_transaction` to `client_state` class
  which restores a transaction state from storage given its xid.
* Add method `commit_or_rollback_by_xid` to terminate prepared XA
  transactions by xid.
* Make sure that transactions in prepared state are not rolled back
  when their master fails/partitions away.
This commit is contained in:
Daniele Sciascia
2019-03-14 11:30:01 +01:00
parent 98af85498b
commit 052247144f
16 changed files with 210 additions and 31 deletions

View File

@ -130,8 +130,9 @@ static int apply_fragment(wsrep::server_state& server_state,
if (!apply_err)
{
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
const std::string xid(streaming_applier->transaction().xid());
ret = high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data);
ws_handle, ws_meta, data, xid);
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
ret = ret || (high_priority_service.after_apply(), 0);
}
@ -148,7 +149,6 @@ static int apply_fragment(wsrep::server_state& server_state,
return ret;
}
static int commit_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service* streaming_applier,
@ -180,7 +180,7 @@ static int commit_fragment(wsrep::server_state& server_state,
const wsrep::transaction& trx(streaming_applier->transaction());
// Fragment removal for XA is going to happen in after_commit
if (!trx.is_xa())
if (trx.state() != wsrep::transaction::s_prepared)
{
streaming_applier->debug_crash(
"crash_apply_cb_before_fragment_removal");
@ -381,7 +381,8 @@ static int apply_write_set(wsrep::server_state& server_state,
else
{
sa->next_fragment(ws_meta);
ret = apply_fragment(high_priority_service,
ret = apply_fragment(server_state,
high_priority_service,
sa,
ws_handle,
ws_meta,
@ -1256,6 +1257,22 @@ wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
return (i == streaming_appliers_.end() ? 0 : i->second);
}
wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
const std::string& xid) const
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
streaming_appliers_map::const_iterator i(streaming_appliers_.begin());
while (i != streaming_appliers_.end())
{
wsrep::high_priority_service* sa(i->second);
if (sa->transaction().xid() == xid)
{
return sa;
}
}
return NULL;
}
//////////////////////////////////////////////////////////////////////////////
// Private //
//////////////////////////////////////////////////////////////////////////////
@ -1430,13 +1447,19 @@ void wsrep::server_state::close_orphaned_sr_transactions(
streaming_appliers_map::iterator i(streaming_appliers_.begin());
while (i != streaming_appliers_.end())
{
// rollback SR on equal consecutive primary views or if its
// originator is not in the current view
if (equal_consecutive_views ||
(std::find_if(current_view_.members().begin(),
current_view_.members().end(),
server_id_cmp(i->first.first)) ==
current_view_.members().end()))
wsrep::high_priority_service* streaming_applier(i->second);
// Rollback SR on equal consecutive primary views or if its
// originator is not in the current view.
// Transactions in prepared state must be committed or
// rolled back explicitly, those are never rolled back here.
if ((streaming_applier->transaction().state() !=
wsrep::transaction::s_prepared) &&
(equal_consecutive_views ||
(std::find_if(current_view_.members().begin(),
current_view_.members().end(),
server_id_cmp(i->first.first)) ==
current_view_.members().end())))
{
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_server_state,
@ -1445,7 +1468,6 @@ void wsrep::server_state::close_orphaned_sr_transactions(
<< ", " << i->first.second);
wsrep::id server_id(i->first.first);
wsrep::transaction_id transaction_id(i->first.second);
wsrep::high_priority_service* streaming_applier(i->second);
int adopt_error;
if ((adopt_error = high_priority_service.adopt_transaction(
streaming_applier->transaction())))

View File

@ -108,6 +108,7 @@ wsrep::transaction::transaction(
, streaming_context_()
, sr_keys_()
, apply_error_buf_()
, is_recovered_xa_(false)
{ }
@ -425,7 +426,8 @@ int wsrep::transaction::before_commit()
assert((ret == 0 && state() == s_committing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
state() == s_cert_failed));
state() == s_cert_failed ||
state() == s_prepared));
}
else if (state() == s_executing)
{
@ -485,9 +487,8 @@ int wsrep::transaction::before_commit()
case wsrep::client_state::m_high_priority:
assert(certified());
assert(ordered());
if (is_xa())
if (state() == s_prepared)
{
assert(state() == s_prepared);
state(lock, s_committing);
}
@ -567,7 +568,7 @@ int wsrep::transaction::after_commit()
assert(client_state_.mode() == wsrep::client_state::m_local ||
client_state_.mode() == wsrep::client_state::m_high_priority);
if (is_xa())
if (is_xa() || is_recovered_xa_)
{
// XA fragment removal happens here,
// see comment in before_prepare
@ -1035,6 +1036,73 @@ void wsrep::transaction::after_replay(const wsrep::transaction& other)
clear_fragments();
}
int wsrep::transaction::restore_to_prepared_state()
{
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
assert(active());
assert(is_empty());
assert(state() == s_executing);
flags(flags() & ~wsrep::provider::flag::start_transaction);
state(lock, s_certifying);
state(lock, s_prepared);
is_recovered_xa_ = true;
return 0;
}
int wsrep::transaction::commit_or_rollback_by_xid(const std::string& xid,
bool commit)
{
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
wsrep::server_state& server_state(client_state_.server_state());
wsrep::high_priority_service* sa(server_state.find_streaming_applier(xid));
if (!sa)
{
assert(sa);
client_state_.override_error(wsrep::e_error_during_commit);
return 1;
}
int flags(commit ?
wsrep::provider::flag::commit :
wsrep::provider::flag::rollback);
flags = flags | wsrep::provider::flag::pa_unsafe;
wsrep::stid stid(sa->transaction().server_id(),
sa->transaction().id(),
client_state_.id());
wsrep::ws_meta meta(stid);
const enum wsrep::provider::status cert_ret(
provider().certify(client_state_.id(),
ws_handle_,
flags,
meta));
if (cert_ret == wsrep::provider::success)
{
if (commit)
{
state(lock, s_certifying);
state(lock, s_committing);
state(lock, s_committed);
}
else
{
state(lock, s_aborting);
state(lock, s_aborted);
}
return 0;
}
else
{
client_state_.override_error(wsrep::e_error_during_commit);
wsrep::log_error() << "Failed to commit_or_rollback_by_xid,"
<< " xid: " << xid
<< " error: " << cert_ret;
return 1;
}
}
////////////////////////////////////////////////////////////////////////////////
// Private //
////////////////////////////////////////////////////////////////////////////////
@ -1164,13 +1232,13 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock)
break;
}
if (streaming_context_.fragment_size_exceeded() || client_service_.is_xa_prepare())
if (streaming_context_.fragment_size_exceeded() || is_xa_prepare())
{
// Some statements have no effect. Do not atttempt to
// replicate a fragment if no data has been generated since
// last fragment replication. A XA PREPARE statement generates a
// fragment even if no data is currently pending replication.
if (bytes_to_replicate <= 0 && !client_service_.is_xa_prepare())
if (bytes_to_replicate <= 0 && !is_xa_prepare())
{
assert(bytes_to_replicate == 0);
return ret;
@ -1268,7 +1336,8 @@ int wsrep::transaction::certify_fragment(
server_id,
id(),
flags(),
wsrep::const_buffer(data.data(), data.size())))
wsrep::const_buffer(data.data(), data.size()),
xid()))
{
ret = 1;
error = wsrep::e_append_fragment_error;
@ -1538,7 +1607,14 @@ int wsrep::transaction::certify_commit(
{
client_state_.override_error(wsrep::e_error_during_commit,
cert_ret);
state(lock, s_must_abort);
if (is_xa())
{
state(lock, s_prepared);
}
else
{
state(lock, s_must_abort);
}
}
break;
case wsrep::provider::error_provider_failed:
@ -1670,6 +1746,7 @@ void wsrep::transaction::cleanup()
streaming_context_.cleanup();
client_service_.cleanup_transaction();
apply_error_buf_.clear();
is_recovered_xa_ = false;
debug_log_state("cleanup_leave");
}

View File

@ -209,7 +209,12 @@ namespace
: ws_meta_(ws_meta)
, trx_meta_()
, flags_(flags)
{ }
{
std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
sizeof(trx_meta_.stid.node.data));
trx_meta_.stid.conn = ws_meta.client_id().get();
trx_meta_.stid.trx = ws_meta.transaction_id().get();
}
~mutable_ws_meta()
{