1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-08-06 15:02:41 +03:00

Support detach on transaction prepare

Split method `transaction::xa_detach()` in `before_xa_detach()` and
`after_xa_detach()`. This allows to handle bf abort or replay while
transaction is detaching and to distinguish if a transaction is bf
aborted before or after the DBMS has detached a transaction from its
storage engine.
This commit is contained in:
Daniele Sciascia
2022-10-10 11:14:35 +02:00
parent 5185ad3481
commit de2ead8d70
9 changed files with 187 additions and 37 deletions

View File

@@ -45,6 +45,11 @@ const wsrep::transaction& db::high_priority_service::transaction() const
return client_.client_state().transaction(); return client_.client_state().transaction();
} }
wsrep::client_state& db::high_priority_service::client_state() const
{
return client_.client_state();
}
int db::high_priority_service::adopt_transaction(const wsrep::transaction&) int db::high_priority_service::adopt_transaction(const wsrep::transaction&)
{ {
throw wsrep::not_implemented_error(); throw wsrep::not_implemented_error();

View File

@@ -34,6 +34,7 @@ namespace db
const wsrep::ws_meta&) override; const wsrep::ws_meta&) override;
int next_fragment(const wsrep::ws_meta&) override; int next_fragment(const wsrep::ws_meta&) override;
const wsrep::transaction& transaction() const override; const wsrep::transaction& transaction() const override;
wsrep::client_state& client_state() const override;
int adopt_transaction(const wsrep::transaction&) override; int adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::ws_meta&, int apply_write_set(const wsrep::ws_meta&,
const wsrep::const_buffer&, const wsrep::const_buffer&,

View File

@@ -519,17 +519,31 @@ namespace wsrep
* transaction disconnects, and the transaction must not rollback. * transaction disconnects, and the transaction must not rollback.
* After this call, a different client may later attempt to terminate * After this call, a different client may later attempt to terminate
* the transaction by calling method commit_by_xid() or rollback_by_xid(). * the transaction by calling method commit_by_xid() or rollback_by_xid().
*
* @return Zero on success, non-zero if the transaction was BF aborted
*/
int before_xa_detach();
/**
* This method should be called to conclude the XA detach operation,
* after the DBMS has detached the transaction.
*
* @return Zero on success, non-zero if transaction was BF aborted
*/
int after_xa_detach();
/**
* @deprecated Use before_xa_detach() and after_xa_detach()
*/ */
void xa_detach(); void xa_detach();
/** /**
* Replay a XA transaction * Replay a XA transaction
* *
* Replay a XA transaction that is in s_idle state. * Replay a local XA transaction in s_idle state,
* or detached.
* This may happen if the transaction is BF aborted * This may happen if the transaction is BF aborted
* between prepare and commit. * between prepare and commit.
* Since the victim is idle, this method can be called
* by the BF aborter or the backround rollbacker.
*/ */
void xa_replay(); void xa_replay();

View File

@@ -63,6 +63,11 @@ namespace wsrep
*/ */
virtual const wsrep::transaction& transaction() const = 0; virtual const wsrep::transaction& transaction() const = 0;
/**
* Return the associated client_state object
*/
virtual wsrep::client_state& client_state() const = 0;
/** /**
* Adopt a transaction. * Adopt a transaction.
*/ */

View File

@@ -126,6 +126,11 @@ namespace wsrep
return !xid_.is_null(); return !xid_.is_null();
} }
/**
* Return true if the transaction has completed the prepare step.
*/
bool is_prepared_xa() const;
void assign_xid(const wsrep::xid& xid); void assign_xid(const wsrep::xid& xid);
const wsrep::xid& xid() const const wsrep::xid& xid() const
@@ -137,7 +142,9 @@ namespace wsrep
int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit); int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit);
void xa_detach(); int before_xa_detach(wsrep::unique_lock<mutex>&);
int after_xa_detach(wsrep::unique_lock<mutex>&);
int xa_replay(wsrep::unique_lock<wsrep::mutex>&); int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
@@ -213,6 +220,11 @@ namespace wsrep
return bf_aborted_in_total_order_; return bf_aborted_in_total_order_;
} }
wsrep::seqno bf_seqno() const
{
return bf_seqno_;
}
int flags() const int flags() const
{ {
return flags_; return flags_;
@@ -270,6 +282,7 @@ namespace wsrep
enum wsrep::provider::status bf_abort_provider_status_; enum wsrep::provider::status bf_abort_provider_status_;
int bf_abort_client_state_; int bf_abort_client_state_;
bool bf_aborted_in_total_order_; bool bf_aborted_in_total_order_;
wsrep::seqno bf_seqno_;
wsrep::ws_handle ws_handle_; wsrep::ws_handle ws_handle_;
wsrep::ws_meta ws_meta_; wsrep::ws_meta ws_meta_;
int flags_; int flags_;

View File

@@ -23,6 +23,7 @@
#include "wsrep/server_state.hpp" #include "wsrep/server_state.hpp"
#include "wsrep/server_service.hpp" #include "wsrep/server_service.hpp"
#include "wsrep/client_service.hpp" #include "wsrep/client_service.hpp"
#include "wsrep/high_priority_service.hpp"
#include <unistd.h> // usleep() #include <unistd.h> // usleep()
#include <cassert> #include <cassert>
@@ -450,8 +451,9 @@ void wsrep::client_state::sync_rollback_complete()
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("sync_rollback_complete: enter"); debug_log_state("sync_rollback_complete: enter");
assert(state_ == s_idle && mode_ == m_local && assert((state_ == s_idle && mode_ == m_local &&
transaction_.state() == wsrep::transaction::s_aborted); transaction_.state() == wsrep::transaction::s_aborted) ||
mode_ == m_high_priority);
set_rollbacker_active(false); set_rollbacker_active(false);
cond_.notify_all(); cond_.notify_all();
debug_log_state("sync_rollback_complete: leave"); debug_log_state("sync_rollback_complete: leave");
@@ -461,7 +463,7 @@ void wsrep::client_state::wait_rollback_complete_and_acquire_ownership()
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("wait_rollback_complete_and_acquire_ownership: enter"); debug_log_state("wait_rollback_complete_and_acquire_ownership: enter");
if (state_ == s_idle) if (state_ == s_idle || mode_ == m_high_priority)
{ {
do_wait_rollback_complete_and_acquire_ownership(lock); do_wait_rollback_complete_and_acquire_ownership(lock);
} }
@@ -511,17 +513,72 @@ void wsrep::client_state::disable_streaming()
// XA // // XA //
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void wsrep::client_state::xa_detach() int wsrep::client_state::before_xa_detach()
{ {
int ret(0);
client_service_.debug_sync("wsrep_before_xa_detach_enter");
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local); assert(mode_ == m_local);
assert(state_ == s_none || state_ == s_exec || state_ == s_quitting); assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
transaction_.xa_detach(); if (transaction_.state() == wsrep::transaction::s_must_abort)
{
transaction_.state(lock, wsrep::transaction::s_must_replay);
lock.unlock();
client_service_.bf_rollback();
lock.lock();
ret = 1;
}
else
{
ret = transaction_.before_xa_detach(lock);
}
}
client_service_.debug_sync("wsrep_before_xa_detach_leave");
return ret;
}
int wsrep::client_state::after_xa_detach()
{
int ret(0);
client_service_.debug_sync("wsrep_after_xa_detach_enter");
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(mode_ == m_local);
if (transaction_.state() == wsrep::transaction::s_must_abort)
{
wsrep::high_priority_service* sa(
server_state_.find_streaming_applier(transaction_.server_id(),
transaction_.id()));
assert(sa);
if (sa)
{
wsrep::client_state& cs(sa->client_state());
cs.transaction_.state(lock, wsrep::transaction::s_must_abort);
cs.transaction_.state(lock, wsrep::transaction::s_must_replay);
cs.set_rollbacker_active(true);
lock.unlock();
server_state_.server_service().background_rollback(
sa->client_state());
lock.lock();
}
}
ret = transaction_.after_xa_detach(lock);
}
client_service_.debug_sync("wsrep_after_xa_detach_leave");
return ret;
}
void wsrep::client_state::xa_detach()
{
before_xa_detach();
after_xa_detach();
} }
void wsrep::client_state::xa_replay() void wsrep::client_state::xa_replay()
{ {
assert(mode_ == m_local); assert((mode_ == m_local && state_ == s_idle) ||
assert(state_ == s_idle); (mode_ == m_high_priority));
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
transaction_.xa_replay(lock); transaction_.xa_replay(lock);
} }
@@ -1000,13 +1057,16 @@ void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership(
wsrep::unique_lock<wsrep::mutex>& lock) wsrep::unique_lock<wsrep::mutex>& lock)
{ {
assert(lock.owns_lock()); assert(lock.owns_lock());
assert(state_ == s_idle); assert(state_ == s_idle || mode_ == m_high_priority);
while (is_rollbacker_active()) while (is_rollbacker_active())
{ {
cond_.wait(lock); cond_.wait(lock);
} }
do_acquire_ownership(lock); do_acquire_ownership(lock);
if (state_ == s_idle)
{
state(lock, s_exec); state(lock, s_exec);
}
} }
void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid) void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)

View File

@@ -101,6 +101,7 @@ wsrep::transaction::transaction(
, bf_abort_provider_status_() , bf_abort_provider_status_()
, bf_abort_client_state_() , bf_abort_client_state_()
, bf_aborted_in_total_order_() , bf_aborted_in_total_order_()
, bf_seqno_()
, ws_handle_() , ws_handle_()
, ws_meta_() , ws_meta_()
, flags_() , flags_()
@@ -716,10 +717,17 @@ int wsrep::transaction::before_rollback()
break; break;
case wsrep::client_state::m_high_priority: case wsrep::client_state::m_high_priority:
// Rollback by rollback write set or BF abort // Rollback by rollback write set or BF abort
assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting); switch (state_)
if (state_ != s_aborting)
{ {
case s_executing:
case s_prepared:
state(lock, s_aborting); state(lock, s_aborting);
break;
case s_must_replay:
case s_aborting:
break;
default:
assert(0);
} }
break; break;
default: default:
@@ -929,7 +937,7 @@ void wsrep::transaction::after_command_must_abort(
if (is_xa() && is_streaming()) if (is_xa() && is_streaming())
{ {
xa_replay(lock); xa_replay_commit(lock);
} }
else else
{ {
@@ -1004,6 +1012,7 @@ bool wsrep::transaction::bf_abort(
<< " successfully BF aborted " << id_ << " successfully BF aborted " << id_
<< " victim_seqno " << victim_seqno); << " victim_seqno " << victim_seqno);
bf_abort_state_ = state_at_enter; bf_abort_state_ = state_at_enter;
bf_seqno_ = bf_seqno;
state(lock, s_must_abort); state(lock, s_must_abort);
ret = true; ret = true;
break; break;
@@ -1117,8 +1126,16 @@ void wsrep::transaction::assign_xid(const wsrep::xid& xid)
xid_ = xid; xid_ = xid;
} }
bool wsrep::transaction::is_prepared_xa() const
{
return (state() == s_prepared)
|| (is_xa() && state() == s_replaying)
|| (state() == s_must_abort && bf_abort_state_ == s_prepared);
}
int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid)
{ {
debug_log_state("restore_to_prepared_state enter");
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
assert(active()); assert(active());
assert(is_empty()); assert(is_empty());
@@ -1134,6 +1151,7 @@ int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid)
} }
state(lock, s_prepared); state(lock, s_prepared);
xid_ = xid; xid_ = xid;
debug_log_state("restore_to_prepared_state leave");
return 0; return 0;
} }
@@ -1166,12 +1184,16 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
client_state_.id()); client_state_.id());
wsrep::ws_meta meta(stid); wsrep::ws_meta meta(stid);
lock.unlock();
client_service_.debug_sync("wsrep_before_certification");
const enum wsrep::provider::status cert_ret( const enum wsrep::provider::status cert_ret(
provider().certify(client_state_.id(), provider().certify(client_state_.id(),
ws_handle_, ws_handle_,
flags(), flags(),
meta)); meta));
lock.lock();
int ret; int ret;
if (cert_ret == wsrep::provider::success) if (cert_ret == wsrep::provider::success)
{ {
@@ -1201,25 +1223,38 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
return ret; return ret;
} }
void wsrep::transaction::xa_detach() int wsrep::transaction::before_xa_detach(wsrep::unique_lock<wsrep::mutex>& lock)
{ {
debug_log_state("xa_detach enter"); debug_log_state("before_xa_detach enter");
assert(state() == s_prepared || assert(lock.owns_lock());
assert (state() == s_prepared ||
client_state_.state() == wsrep::client_state::s_quitting); client_state_.state() == wsrep::client_state::s_quitting);
if (state() == s_prepared) if (state() == s_prepared)
{ {
wsrep::server_state& server_state(client_state_.server_state()); wsrep::server_state& server_state(client_state_.server_state());
lock.unlock();
client_service_.debug_sync("wsrep_before_xa_detach_convert_client");
server_state.convert_streaming_client_to_applier(&client_state_); server_state.convert_streaming_client_to_applier(&client_state_);
client_service_.store_globals(); client_service_.store_globals();
client_service_.cleanup_transaction(); client_service_.cleanup_transaction();
lock.lock();
} }
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); debug_log_state("before_xa_detach leave");
streaming_context_.cleanup(); return 0;
}
int wsrep::transaction::after_xa_detach(wsrep::unique_lock<wsrep::mutex>& lock)
{
assert(lock.owns_lock());
debug_log_state("after_xa_detach enter");
assert(state() == s_must_abort || state() == s_prepared);
state(lock, s_aborting); state(lock, s_aborting);
state(lock, s_aborted); state(lock, s_aborted);
provider().release(ws_handle_); provider().release(ws_handle_);
streaming_context_.cleanup();
cleanup(); cleanup();
debug_log_state("xa_detach leave"); debug_log_state("after_xa_detach leave");
return 0;
} }
void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock) void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock)
@@ -1228,15 +1263,22 @@ void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock
assert(is_xa()); assert(is_xa());
assert(is_streaming()); assert(is_streaming());
assert(state() == s_must_replay); assert(state() == s_must_replay);
assert(bf_aborted());
state(lock, s_replaying); state(lock, s_replaying);
enum wsrep::provider::status status; enum wsrep::provider::status status;
wsrep::server_state& server_state(client_state_.server_state()); wsrep::server_state& server_state(client_state_.server_state());
// Convert to streaming applier an replay. If transaction
// is detaching, streaming applier has been created
// already. Perhaps we should have a better way to tell
// if a transaction is detaching.
lock.unlock(); lock.unlock();
if (client_state_.mode() == wsrep::client_state::m_local &&
!server_state.find_streaming_applier(server_id_, id_))
{
server_state.convert_streaming_client_to_applier(&client_state_); server_state.convert_streaming_client_to_applier(&client_state_);
}
status = client_service_.replay_unordered(); status = client_service_.replay_unordered();
client_service_.store_globals(); client_service_.store_globals();
lock.lock(); lock.lock();
@@ -1251,10 +1293,13 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
{ {
debug_log_state("xa_replay enter"); debug_log_state("xa_replay enter");
xa_replay_common(lock); xa_replay_common(lock);
if (client_state_.mode() == wsrep::client_state::m_local)
{
state(lock, s_aborted); state(lock, s_aborted);
streaming_context_.cleanup(); streaming_context_.cleanup();
provider().release(ws_handle_); provider().release(ws_handle_);
cleanup(); cleanup();
}
client_service_.signal_replayed(); client_service_.signal_replayed();
debug_log_state("xa_replay leave"); debug_log_state("xa_replay leave");
return 0; return 0;
@@ -1263,15 +1308,19 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock) int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
{ {
debug_log_state("xa_replay_commit enter"); debug_log_state("xa_replay_commit enter");
enum wsrep::provider::status status(wsrep::provider::success);
xa_replay_common(lock); xa_replay_common(lock);
if (client_state_.mode() == wsrep::client_state::m_local)
{
lock.unlock(); lock.unlock();
enum wsrep::provider::status status(client_service_.commit_by_xid()); status = client_service_.commit_by_xid();
lock.lock(); lock.lock();
}
int ret(1); int ret(1);
switch (status) switch (status)
{ {
case wsrep::provider::success: case wsrep::provider::success:
state(lock, s_committed); state(lock, s_aborted);
streaming_context_.cleanup(); streaming_context_.cleanup();
provider().release(ws_handle_); provider().release(ws_handle_);
cleanup(); cleanup();
@@ -2087,6 +2136,7 @@ void wsrep::transaction::cleanup()
bf_abort_provider_status_ = wsrep::provider::success; bf_abort_provider_status_ = wsrep::provider::success;
bf_abort_client_state_ = 0; bf_abort_client_state_ = 0;
bf_aborted_in_total_order_ = false; bf_aborted_in_total_order_ = false;
bf_seqno_ = wsrep::seqno::undefined();
ws_meta_ = wsrep::ws_meta(); ws_meta_ = wsrep::ws_meta();
flags_ = 0; flags_ = 0;
certified_ = false; certified_ = false;

View File

@@ -85,7 +85,7 @@ namespace wsrep
bool is_replaying() const WSREP_OVERRIDE { return replaying_; } bool is_replaying() const WSREP_OVERRIDE { return replaying_; }
void debug_crash(const char*) WSREP_OVERRIDE { /* Not in unit tests*/} void debug_crash(const char*) WSREP_OVERRIDE { /* Not in unit tests*/}
wsrep::client_state& client_state() wsrep::client_state& client_state() const WSREP_OVERRIDE
{ {
return *client_state_; return *client_state_;
} }

View File

@@ -63,7 +63,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_detach_commit_by_xid,
BOOST_REQUIRE(sc.provider().fragments() == 1); BOOST_REQUIRE(sc.provider().fragments() == 1);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
cc1.xa_detach(); cc1.before_xa_detach();
cc1.after_xa_detach();
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted);
BOOST_REQUIRE(cc1.after_statement() == 0); BOOST_REQUIRE(cc1.after_statement() == 0);
@@ -97,7 +98,8 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_detach_rollback_by_xid,
BOOST_REQUIRE(sc.provider().fragments() == 1); BOOST_REQUIRE(sc.provider().fragments() == 1);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
cc1.xa_detach(); cc1.before_xa_detach();
cc1.after_xa_detach();
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted);
BOOST_REQUIRE(cc1.after_statement() == 0); BOOST_REQUIRE(cc1.after_statement() == 0);