1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Minimize client_service interface for XA

Remove methods `is_xa()`, `is_xa_prepare()`, and `xid()` from
client_service interface. Instead, transactions are explicitly
assigned their xid, through at start of XA.
This commit is contained in:
Daniele Sciascia
2019-07-30 11:25:38 +02:00
parent 052247144f
commit 5d18ce3e75
7 changed files with 85 additions and 126 deletions

View File

@ -44,18 +44,6 @@ namespace db
return 0; return 0;
} }
void cleanup_transaction() override { } void cleanup_transaction() override { }
bool is_xa () const override
{
return false;
}
bool is_xa_prepare() const override
{
return false;
}
std::string xid() const override
{
return "";
}
size_t bytes_generated() const override size_t bytes_generated() const override
{ {
return 0; return 0;

View File

@ -73,24 +73,6 @@ namespace wsrep
*/ */
virtual void cleanup_transaction() = 0; virtual void cleanup_transaction() = 0;
//
// XA
//
/**
* Return true if the current transactions is XA
*/
virtual bool is_xa() const = 0;
/**
* Return true if the current statement is XA PREPARE
*/
virtual bool is_xa_prepare() const = 0;
/**
* Return a string representing the xid of the transaction
*/
virtual std::string xid() const = 0;
// //
// Streaming // Streaming
// //

View File

@ -305,36 +305,6 @@ namespace wsrep
return transaction_.assign_read_view(gtid); return transaction_.assign_read_view(gtid);
} }
/**
* Restores the client's transaction to prepared state
*
* The purpose of this method is to restore transaction state
* during recovery of a prepared XA transaction.
*/
int restore_prepared_transaction()
{
return transaction_.restore_to_prepared_state();
}
/**
* Terminate transaction with the given xid
*
* Sends a commit or rollback fragment to terminate the a
* transaction with the given xid. For the fragment to be
* sent, a streaming applier for the transaction must exist
* and the transaction must be in prepared state.
*
* @param xid the xid of the the transaction to terminate
* @param commit whether to send a commmit or rollback fragment
*
* @return Zero on success, non-zero on error. In case of error
* the client_state's current_error is set
*/
int commit_or_rollback_by_xid(const std::string& xid, bool commit)
{
return transaction_.commit_or_rollback_by_xid(xid, commit);
}
/** /**
* Append a key into transaction write set. * Append a key into transaction write set.
* *
@ -552,6 +522,52 @@ namespace wsrep
void wait_rollback_complete_and_acquire_ownership(); void wait_rollback_complete_and_acquire_ownership();
/** @} */ /** @} */
//
// XA
//
/**
* Assign transaction external id.
*
* Other than storing the xid, the transaction is marked as XA.
* This should be called when XA transaction is started.
*
* @param xid transaction id
*/
void assign_xid(const std::string& xid)
{
transaction_.assign_xid(xid);
}
/**
* Restores the client's transaction to prepared state
*
* The purpose of this method is to restore transaction state
* during recovery of a prepared XA transaction.
*/
int restore_xid(std::string& xid)
{
return transaction_.restore_to_prepared_state(xid);
}
/**
* Terminate transaction with the given xid
*
* Sends a commit or rollback fragment to terminate a
* transaction with the given xid. For the fragment to be
* sent, a streaming applier for the transaction must exist
* and the transaction must be in prepared state.
*
* @param xid the xid of the the transaction to terminate
* @param commit whether to send a commmit or rollback fragment
*
* @return Zero on success, non-zero on error. In case of error
* the client_state's current_error is set
*/
int commit_or_rollback_by_xid(const std::string& xid, bool commit)
{
return transaction_.commit_or_rollback_by_xid(xid, commit);
}
// //
// BF aborting // BF aborting
// //

View File

@ -124,23 +124,25 @@ namespace wsrep
bool is_xa() const bool is_xa() const
{ {
return client_service_.is_xa(); return !xid_.empty();
} }
bool is_xa_prepare() const void assign_xid(const std::string& xid)
{ {
return client_service_.is_xa_prepare(); assert(active());
assert(xid_.empty());
xid_ = xid;
} }
int restore_to_prepared_state();
int commit_or_rollback_by_xid(const std::string& xid, bool commit);
const std::string xid() const const std::string xid() const
{ {
return client_service_.xid(); return xid_;
} }
int restore_to_prepared_state(const std::string& xid);
int commit_or_rollback_by_xid(const std::string& xid, bool commit);
bool pa_unsafe() const { return pa_unsafe_; } bool pa_unsafe() const { return pa_unsafe_; }
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
@ -239,7 +241,7 @@ namespace wsrep
// The call will adjust transaction state and set client_state // The call will adjust transaction state and set client_state
// error status accordingly. // error status accordingly.
bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&); bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
int streaming_step(wsrep::unique_lock<wsrep::mutex>&); int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false);
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&); int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&); int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
int append_sr_keys_for_commit(); int append_sr_keys_for_commit();
@ -271,7 +273,7 @@ namespace wsrep
wsrep::streaming_context streaming_context_; wsrep::streaming_context streaming_context_;
wsrep::sr_key_set sr_keys_; wsrep::sr_key_set sr_keys_;
wsrep::mutable_buffer apply_error_buf_; wsrep::mutable_buffer apply_error_buf_;
bool is_recovered_xa_; std::string xid_;
}; };
static inline const char* to_c_string(enum wsrep::transaction::state state) static inline const char* to_c_string(enum wsrep::transaction::state state)

View File

@ -108,7 +108,7 @@ wsrep::transaction::transaction(
, streaming_context_() , streaming_context_()
, sr_keys_() , sr_keys_()
, apply_error_buf_() , apply_error_buf_()
, is_recovered_xa_(false) , xid_()
{ } { }
@ -121,6 +121,7 @@ int wsrep::transaction::start_transaction(
{ {
debug_log_state("start_transaction enter"); debug_log_state("start_transaction enter");
assert(active() == false); assert(active() == false);
assert(is_xa() == false);
assert(flags() == 0); assert(flags() == 0);
server_id_ = client_state_.server_state().id(); server_id_ = client_state_.server_state().id();
id_ = id; id_ = id;
@ -323,7 +324,10 @@ int wsrep::transaction::before_prepare(
{ {
if (is_xa()) if (is_xa())
{ {
ret = streaming_step(lock); // Force fragment replication on XA prepare
flags(flags() | wsrep::provider::flag::prepare);
const bool force_streaming_step = true;
ret = streaming_step(lock, force_streaming_step);
if (ret == 0) if (ret == 0)
{ {
assert(state() == s_executing); assert(state() == s_executing);
@ -568,7 +572,7 @@ int wsrep::transaction::after_commit()
assert(client_state_.mode() == wsrep::client_state::m_local || assert(client_state_.mode() == wsrep::client_state::m_local ||
client_state_.mode() == wsrep::client_state::m_high_priority); client_state_.mode() == wsrep::client_state::m_high_priority);
if (is_xa() || is_recovered_xa_) if (is_xa())
{ {
// XA fragment removal happens here, // XA fragment removal happens here,
// see comment in before_prepare // see comment in before_prepare
@ -1036,7 +1040,7 @@ void wsrep::transaction::after_replay(const wsrep::transaction& other)
clear_fragments(); clear_fragments();
} }
int wsrep::transaction::restore_to_prepared_state() int wsrep::transaction::restore_to_prepared_state(const std::string& xid)
{ {
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
assert(active()); assert(active());
@ -1045,7 +1049,7 @@ int wsrep::transaction::restore_to_prepared_state()
flags(flags() & ~wsrep::provider::flag::start_transaction); flags(flags() & ~wsrep::provider::flag::start_transaction);
state(lock, s_certifying); state(lock, s_certifying);
state(lock, s_prepared); state(lock, s_prepared);
is_recovered_xa_ = true; xid_ = xid;
return 0; return 0;
} }
@ -1198,7 +1202,8 @@ bool wsrep::transaction::abort_or_interrupt(
return false; return false;
} }
int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock) int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
bool force)
{ {
assert(lock.owns_lock()); assert(lock.owns_lock());
assert(streaming_context_.fragment_size() || is_xa()); assert(streaming_context_.fragment_size() || is_xa());
@ -1232,18 +1237,19 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock)
break; break;
} }
if (streaming_context_.fragment_size_exceeded() || is_xa_prepare())
{
// Some statements have no effect. Do not atttempt to // Some statements have no effect. Do not atttempt to
// replicate a fragment if no data has been generated since // replicate a fragment if no data has been generated since
// last fragment replication. A XA PREPARE statement generates a // last fragment replication.
// fragment even if no data is currently pending replication. // We use `force = true` on XA prepare: a fragment will be
if (bytes_to_replicate <= 0 && !is_xa_prepare()) // generated even if no data is pending replication.
if (bytes_to_replicate <= 0 && !force)
{ {
assert(bytes_to_replicate == 0); assert(bytes_to_replicate == 0);
return ret; return ret;
} }
if (streaming_context_.fragment_size_exceeded() || force)
{
streaming_context_.reset_unit_counter(); streaming_context_.reset_unit_counter();
ret = certify_fragment(lock); ret = certify_fragment(lock);
} }
@ -1307,11 +1313,6 @@ int wsrep::transaction::certify_fragment(
flags(flags() | wsrep::provider::flag::implicit_deps); flags(flags() | wsrep::provider::flag::implicit_deps);
} }
if (is_xa_prepare())
{
flags(flags() | wsrep::provider::flag::prepare);
}
int ret(0); int ret(0);
enum wsrep::client_error error(wsrep::e_success); enum wsrep::client_error error(wsrep::e_success);
enum wsrep::provider::status cert_ret(wsrep::provider::success); enum wsrep::provider::status cert_ret(wsrep::provider::success);
@ -1746,7 +1747,7 @@ void wsrep::transaction::cleanup()
streaming_context_.cleanup(); streaming_context_.cleanup();
client_service_.cleanup_transaction(); client_service_.cleanup_transaction();
apply_error_buf_.clear(); apply_error_buf_.clear();
is_recovered_xa_ = false; xid_.clear();
debug_log_state("cleanup_leave"); debug_log_state("cleanup_leave");
} }

View File

@ -62,8 +62,6 @@ namespace wsrep
: wsrep::client_service() : wsrep::client_service()
, is_autocommit_() , is_autocommit_()
, do_2pc_() , do_2pc_()
, is_xa_()
, is_xa_prepare_()
// , fail_next_applying_() // , fail_next_applying_()
// , fail_next_toi_() // , fail_next_toi_()
, bf_abort_during_wait_() , bf_abort_during_wait_()
@ -131,21 +129,6 @@ namespace wsrep
void cleanup_transaction() WSREP_OVERRIDE { } void cleanup_transaction() WSREP_OVERRIDE { }
bool is_xa() const WSREP_OVERRIDE
{
return is_xa_;
}
bool is_xa_prepare() const WSREP_OVERRIDE
{
return is_xa_prepare_;
}
std::string xid() const WSREP_OVERRIDE
{
return "";
}
size_t bytes_generated() const WSREP_OVERRIDE size_t bytes_generated() const WSREP_OVERRIDE
{ {
return bytes_generated_; return bytes_generated_;
@ -197,8 +180,6 @@ namespace wsrep
// //
bool is_autocommit_; bool is_autocommit_;
bool do_2pc_; bool do_2pc_;
bool is_xa_;
bool is_xa_prepare_;
// bool fail_next_applying_; // bool fail_next_applying_;
// bool fail_next_toi_; // bool fail_next_toi_;
bool bf_abort_during_wait_; bool bf_abort_during_wait_;

View File

@ -8,14 +8,12 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa,
replicating_client_fixture_sync_rm) replicating_client_fixture_sync_rm)
{ {
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
cc.is_xa_ = true; cc.assign_xid("test xid");
BOOST_REQUIRE(tc.active()); BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1)); BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
cc.is_xa_prepare_ = true;
BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);
@ -28,8 +26,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa,
BOOST_REQUIRE(sc.provider().start_fragments() == 1); BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().fragments() == 1); BOOST_REQUIRE(sc.provider().fragments() == 1);
cc.is_xa_prepare_ = false;
BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing);
BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0);
@ -38,7 +34,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa,
BOOST_REQUIRE(tc.certified()); BOOST_REQUIRE(tc.certified());
BOOST_REQUIRE(cc.after_commit() == 0); BOOST_REQUIRE(cc.after_commit() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committed); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committed);
// XA PRERAPRE and XA COMMIT fragments // XA PREPARE and XA COMMIT fragments
BOOST_REQUIRE(sc.provider().fragments() == 2); BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().commit_fragments() == 1); BOOST_REQUIRE(sc.provider().commit_fragments() == 1);
@ -56,8 +52,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa,
BOOST_FIXTURE_TEST_CASE(transaction_xa_applying, BOOST_FIXTURE_TEST_CASE(transaction_xa_applying,
applying_client_fixture) applying_client_fixture)
{ {
cc.is_xa_ = true; cc.assign_xid("test xid");
cc.is_xa_prepare_ = true;
BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
@ -67,8 +62,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_applying,
BOOST_REQUIRE(cc.after_prepare() == 0); BOOST_REQUIRE(cc.after_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_prepared);
cc.is_xa_prepare_ = false;
BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing);
BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0);
@ -92,7 +85,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr,
streaming_client_fixture_byte) streaming_client_fixture_byte)
{ {
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0); BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
cc.is_xa_ = true; cc.assign_xid("test xid");
cc.bytes_generated_ = 1; cc.bytes_generated_ = 1;
BOOST_REQUIRE(cc.after_row() == 0); BOOST_REQUIRE(cc.after_row() == 0);
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1); BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
@ -103,8 +96,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr,
BOOST_REQUIRE(tc.active()); BOOST_REQUIRE(tc.active());
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_executing);
cc.is_xa_prepare_ = true;
BOOST_REQUIRE(cc.before_prepare() == 0); BOOST_REQUIRE(cc.before_prepare() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
BOOST_REQUIRE(tc.ordered() == false); BOOST_REQUIRE(tc.ordered() == false);
@ -114,8 +105,6 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_sr,
// XA PREPARE fragment // XA PREPARE fragment
BOOST_REQUIRE(sc.provider().fragments() == 2); BOOST_REQUIRE(sc.provider().fragments() == 2);
cc.is_xa_prepare_ = false;
BOOST_REQUIRE(cc.before_commit() == 0); BOOST_REQUIRE(cc.before_commit() == 0);
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing); BOOST_REQUIRE(tc.state() == wsrep::transaction::s_committing);
BOOST_REQUIRE(cc.ordered_commit() == 0); BOOST_REQUIRE(cc.ordered_commit() == 0);