mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-28 20:02:00 +03:00
Support for detaching prepared XA transactions
Add support for detaching XA transactions. This is useful for handling the case where the DBMS client has a transaction in prepared state and disconnects. Before disconnect, the DBMS calls the newly introduced client_state::xa_detach(), to cleanup the local transaction and convert it to a high priority transaction. The DBMS may later attempt to terminate the transaction through client_state::commit_by_xid() or client_state::rollback_by_xid(). Also in this patch: - Fix client_state::close() so that it does not rollback transactions in prepared state - Changed class wsrep::xid representation to hold enough information so that DBMS can convert to its native representation - Fix potential infinite loop in server_state::find_streaming_applier(wsrep:xid&) - Append SR keys on prepare fragment and make it pa_unsafe - Handle one phase commit (simply fall back to two phase) - Do not rollback prepared streaming clients in server_state::close_orphaned_transactions()
This commit is contained in:
@ -66,6 +66,10 @@ namespace wsrep
|
|||||||
: buffer_()
|
: buffer_()
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
|
mutable_buffer(const mutable_buffer& b)
|
||||||
|
: buffer_(b.buffer_)
|
||||||
|
{ }
|
||||||
|
|
||||||
void resize(size_t s) { buffer_.resize(s); }
|
void resize(size_t s) { buffer_.resize(s); }
|
||||||
|
|
||||||
void clear()
|
void clear()
|
||||||
@ -111,6 +115,11 @@ namespace wsrep
|
|||||||
buffer_ = other.buffer_;
|
buffer_ = other.buffer_;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool operator==(const mutable_buffer& other) const
|
||||||
|
{
|
||||||
|
return buffer_ == other.buffer_;
|
||||||
|
}
|
||||||
private:
|
private:
|
||||||
std::vector<char> buffer_;
|
std::vector<char> buffer_;
|
||||||
};
|
};
|
||||||
|
@ -589,6 +589,21 @@ namespace wsrep
|
|||||||
return transaction_.commit_or_rollback_by_xid(xid, false);
|
return transaction_.commit_or_rollback_by_xid(xid, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Detach a prepared XA transaction
|
||||||
|
*
|
||||||
|
* This method cleans up a local XA transaction in prepared state
|
||||||
|
* and converts it to high priority mode.
|
||||||
|
* This can be used to handle the case where the client of a XA
|
||||||
|
* transaction disconnects, and the transaction must not rollback.
|
||||||
|
* After this call, a different client may later attempt to terminate
|
||||||
|
* the transaction by calling method commit_by_xid() or rollback_by_xid().
|
||||||
|
*/
|
||||||
|
void xa_detach()
|
||||||
|
{
|
||||||
|
transaction_.xa_detach();
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// BF aborting
|
// BF aborting
|
||||||
//
|
//
|
||||||
|
@ -125,17 +125,17 @@ namespace wsrep
|
|||||||
|
|
||||||
bool is_xa() const
|
bool is_xa() const
|
||||||
{
|
{
|
||||||
return !xid_.empty();
|
return !xid_.is_null();
|
||||||
}
|
}
|
||||||
|
|
||||||
void assign_xid(const wsrep::xid& xid)
|
void assign_xid(const wsrep::xid& xid)
|
||||||
{
|
{
|
||||||
assert(active());
|
assert(active());
|
||||||
assert(xid_.empty());
|
assert(!is_xa());
|
||||||
xid_ = xid;
|
xid_ = xid;
|
||||||
}
|
}
|
||||||
|
|
||||||
const wsrep::xid xid() const
|
const wsrep::xid& xid() const
|
||||||
{
|
{
|
||||||
return xid_;
|
return xid_;
|
||||||
}
|
}
|
||||||
@ -144,6 +144,8 @@ namespace wsrep
|
|||||||
|
|
||||||
int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit);
|
int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit);
|
||||||
|
|
||||||
|
void xa_detach();
|
||||||
|
|
||||||
bool pa_unsafe() const { return pa_unsafe_; }
|
bool pa_unsafe() const { return pa_unsafe_; }
|
||||||
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
|
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
|
||||||
|
|
||||||
|
@ -21,23 +21,80 @@
|
|||||||
#define WSREP_XID_HPP
|
#define WSREP_XID_HPP
|
||||||
|
|
||||||
#include <iosfwd>
|
#include <iosfwd>
|
||||||
#include <string>
|
#include "buffer.hpp"
|
||||||
|
|
||||||
namespace wsrep
|
namespace wsrep
|
||||||
{
|
{
|
||||||
class xid
|
class xid
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
xid() : xid_() { }
|
xid()
|
||||||
xid(const std::string& str) : xid_(str) { }
|
: format_id_(-1)
|
||||||
xid(const char* s) : xid_(s) { }
|
, gtrid_len_(0)
|
||||||
bool empty() const { return xid_.empty(); }
|
, bqual_len_(0)
|
||||||
void clear() { xid_.clear(); }
|
, data_()
|
||||||
bool operator==(const xid& other) const { return xid_ == other.xid_; }
|
{ }
|
||||||
|
|
||||||
|
xid(long format_id, long gtrid_len,
|
||||||
|
long bqual_len, const char* data)
|
||||||
|
: format_id_(format_id)
|
||||||
|
, gtrid_len_(gtrid_len)
|
||||||
|
, bqual_len_(bqual_len)
|
||||||
|
, data_()
|
||||||
|
{
|
||||||
|
const long len = gtrid_len_ + bqual_len_;
|
||||||
|
if (len > 0)
|
||||||
|
{
|
||||||
|
data_.push_back(data, data + len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
xid(const xid& xid)
|
||||||
|
: format_id_(xid.format_id_)
|
||||||
|
, gtrid_len_(xid.gtrid_len_)
|
||||||
|
, bqual_len_(xid.bqual_len_)
|
||||||
|
, data_(xid.data_)
|
||||||
|
{ }
|
||||||
|
|
||||||
|
bool is_null() const
|
||||||
|
{
|
||||||
|
return format_id_ == -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void clear()
|
||||||
|
{
|
||||||
|
format_id_ = -1;
|
||||||
|
data_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
xid& operator= (const xid& other)
|
||||||
|
{
|
||||||
|
format_id_ = other.format_id_;
|
||||||
|
gtrid_len_ = other.gtrid_len_;
|
||||||
|
bqual_len_ = other.bqual_len_;
|
||||||
|
data_ = other.data_;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator==(const xid& other) const
|
||||||
|
{
|
||||||
|
if (format_id_ != other.format_id_ ||
|
||||||
|
gtrid_len_ != other.gtrid_len_ ||
|
||||||
|
bqual_len_ != other.bqual_len_ ||
|
||||||
|
data_.size() != other.data_.size())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return data_ == other.data_;
|
||||||
|
}
|
||||||
|
|
||||||
friend std::string to_string(const wsrep::xid& xid);
|
friend std::string to_string(const wsrep::xid& xid);
|
||||||
friend std::ostream& operator<<(std::ostream& os, const wsrep::xid& xid);
|
friend std::ostream& operator<<(std::ostream& os, const wsrep::xid& xid);
|
||||||
private:
|
protected:
|
||||||
std::string xid_;
|
long format_id_;
|
||||||
|
long gtrid_len_;
|
||||||
|
long bqual_len_;
|
||||||
|
mutable_buffer data_;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::string to_string(const wsrep::xid& xid);
|
std::string to_string(const wsrep::xid& xid);
|
||||||
|
@ -50,7 +50,9 @@ void wsrep::client_state::close()
|
|||||||
debug_log_state("close: enter");
|
debug_log_state("close: enter");
|
||||||
state(lock, s_quitting);
|
state(lock, s_quitting);
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
if (transaction_.active())
|
if (transaction_.active() &&
|
||||||
|
(mode_ != m_local ||
|
||||||
|
transaction_.state() != wsrep::transaction::s_prepared))
|
||||||
{
|
{
|
||||||
client_service_.bf_rollback();
|
client_service_.bf_rollback();
|
||||||
transaction_.after_statement();
|
transaction_.after_statement();
|
||||||
|
@ -1297,6 +1297,7 @@ wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
|
|||||||
{
|
{
|
||||||
return sa;
|
return sa;
|
||||||
}
|
}
|
||||||
|
i++;
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -1425,6 +1426,19 @@ void wsrep::server_state::recover_streaming_appliers_if_not_recovered(
|
|||||||
streaming_appliers_recovered_ = true;
|
streaming_appliers_recovered_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class transaction_state_cmp
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
transaction_state_cmp(const enum wsrep::transaction::state s) : state_(s)
|
||||||
|
{ }
|
||||||
|
bool operator()(std::pair<const wsrep::client_id, wsrep::client_state*>& vt) const
|
||||||
|
{
|
||||||
|
return vt.second->transaction().state() == state_;
|
||||||
|
}
|
||||||
|
private:
|
||||||
|
enum wsrep::transaction::state state_;
|
||||||
|
};
|
||||||
|
|
||||||
void wsrep::server_state::close_orphaned_sr_transactions(
|
void wsrep::server_state::close_orphaned_sr_transactions(
|
||||||
wsrep::unique_lock<wsrep::mutex>& lock,
|
wsrep::unique_lock<wsrep::mutex>& lock,
|
||||||
wsrep::high_priority_service& high_priority_service)
|
wsrep::high_priority_service& high_priority_service)
|
||||||
@ -1445,9 +1459,13 @@ void wsrep::server_state::close_orphaned_sr_transactions(
|
|||||||
|
|
||||||
if (current_view_.own_index() == -1 || equal_consecutive_views)
|
if (current_view_.own_index() == -1 || equal_consecutive_views)
|
||||||
{
|
{
|
||||||
while (streaming_clients_.empty() == false)
|
streaming_clients_map::iterator i;
|
||||||
|
transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared);
|
||||||
|
while ((i = std::find_if_not(streaming_clients_.begin(),
|
||||||
|
streaming_clients_.end(),
|
||||||
|
prepared_state_cmp))
|
||||||
|
!= streaming_clients_.end())
|
||||||
{
|
{
|
||||||
streaming_clients_map::iterator i(streaming_clients_.begin());
|
|
||||||
wsrep::client_id client_id(i->first);
|
wsrep::client_id client_id(i->first);
|
||||||
wsrep::transaction_id transaction_id(i->second->transaction().id());
|
wsrep::transaction_id transaction_id(i->second->transaction().id());
|
||||||
// It is safe to unlock the server state temporarily here.
|
// It is safe to unlock the server state temporarily here.
|
||||||
|
@ -326,6 +326,8 @@ int wsrep::transaction::before_prepare(
|
|||||||
{
|
{
|
||||||
// Force fragment replication on XA prepare
|
// Force fragment replication on XA prepare
|
||||||
flags(flags() | wsrep::provider::flag::prepare);
|
flags(flags() | wsrep::provider::flag::prepare);
|
||||||
|
flags(flags() | wsrep::provider::flag::pa_unsafe);
|
||||||
|
append_sr_keys_for_commit();
|
||||||
const bool force_streaming_step = true;
|
const bool force_streaming_step = true;
|
||||||
ret = streaming_step(lock, force_streaming_step);
|
ret = streaming_step(lock, force_streaming_step);
|
||||||
if (ret == 0)
|
if (ret == 0)
|
||||||
@ -423,15 +425,27 @@ int wsrep::transaction::before_commit()
|
|||||||
switch (client_state_.mode())
|
switch (client_state_.mode())
|
||||||
{
|
{
|
||||||
case wsrep::client_state::m_local:
|
case wsrep::client_state::m_local:
|
||||||
if (state() == s_prepared)
|
if (is_xa() &&
|
||||||
|
(state() == s_prepared || state() == s_executing))
|
||||||
{
|
{
|
||||||
assert(is_xa());
|
ret = 0;
|
||||||
ret = certify_commit(lock);
|
if (state() == s_executing)
|
||||||
assert((ret == 0 && state() == s_committing) ||
|
{
|
||||||
(state() == s_must_abort ||
|
// XA COMMIT in one phase
|
||||||
state() == s_must_replay ||
|
// This optimization is not supported...
|
||||||
state() == s_cert_failed ||
|
// fall back to prepare + commit
|
||||||
state() == s_prepared));
|
ret = before_prepare(lock) || after_prepare(lock);
|
||||||
|
}
|
||||||
|
if (!ret)
|
||||||
|
{
|
||||||
|
assert(state() == s_prepared);
|
||||||
|
ret = certify_commit(lock);
|
||||||
|
assert((ret == 0 && state() == s_committing) ||
|
||||||
|
(state() == s_must_abort ||
|
||||||
|
state() == s_must_replay ||
|
||||||
|
state() == s_cert_failed ||
|
||||||
|
state() == s_prepared));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (state() == s_executing)
|
else if (state() == s_executing)
|
||||||
{
|
{
|
||||||
@ -1084,6 +1098,20 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void wsrep::transaction::xa_detach()
|
||||||
|
{
|
||||||
|
assert(state() == s_prepared);
|
||||||
|
wsrep::server_state& server_state(client_state_.server_state());
|
||||||
|
server_state.convert_streaming_client_to_applier(&client_state_);
|
||||||
|
client_service_.cleanup_transaction();
|
||||||
|
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
|
||||||
|
streaming_context_.cleanup();
|
||||||
|
state(lock, s_aborting);
|
||||||
|
state(lock, s_aborted);
|
||||||
|
provider().release(ws_handle_);
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// Private //
|
// Private //
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
@ -1470,7 +1498,7 @@ int wsrep::transaction::certify_commit(
|
|||||||
state(lock, s_certifying);
|
state(lock, s_certifying);
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
if (is_streaming())
|
if (is_streaming() && !is_xa())
|
||||||
{
|
{
|
||||||
append_sr_keys_for_commit();
|
append_sr_keys_for_commit();
|
||||||
flags(flags() | wsrep::provider::flag::pa_unsafe);
|
flags(flags() | wsrep::provider::flag::pa_unsafe);
|
||||||
|
@ -22,10 +22,10 @@
|
|||||||
|
|
||||||
std::string wsrep::to_string(const wsrep::xid& xid)
|
std::string wsrep::to_string(const wsrep::xid& xid)
|
||||||
{
|
{
|
||||||
return xid.xid_;
|
return std::string(xid.data_.data(), xid.data_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::xid& xid)
|
std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::xid& xid)
|
||||||
{
|
{
|
||||||
return os << xid.xid_;
|
return os << to_string(xid);
|
||||||
}
|
}
|
||||||
|
@ -7,8 +7,10 @@
|
|||||||
BOOST_FIXTURE_TEST_CASE(transaction_xa,
|
BOOST_FIXTURE_TEST_CASE(transaction_xa,
|
||||||
replicating_client_fixture_sync_rm)
|
replicating_client_fixture_sync_rm)
|
||||||
{
|
{
|
||||||
|
wsrep::xid xid(1, 9, 0, "test xid");
|
||||||
|
|
||||||
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
|
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
|
||||||
cc.assign_xid("test xid");
|
cc.assign_xid(xid);
|
||||||
|
|
||||||
BOOST_REQUIRE(tc.active());
|
BOOST_REQUIRE(tc.active());
|
||||||
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
|
BOOST_REQUIRE(tc.id() == wsrep::transaction_id(1));
|
||||||
@ -52,8 +54,10 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa,
|
|||||||
BOOST_FIXTURE_TEST_CASE(transaction_xa_applying,
|
BOOST_FIXTURE_TEST_CASE(transaction_xa_applying,
|
||||||
applying_client_fixture)
|
applying_client_fixture)
|
||||||
{
|
{
|
||||||
|
wsrep::xid xid(1, 9, 0, "test xid");
|
||||||
|
|
||||||
start_transaction(wsrep::transaction_id(1), wsrep::seqno(1));
|
start_transaction(wsrep::transaction_id(1), wsrep::seqno(1));
|
||||||
cc.assign_xid("test xid");
|
cc.assign_xid(xid);
|
||||||
|
|
||||||
BOOST_REQUIRE(cc.before_prepare() == 0);
|
BOOST_REQUIRE(cc.before_prepare() == 0);
|
||||||
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
|
BOOST_REQUIRE(tc.state() == wsrep::transaction::s_preparing);
|
||||||
@ -85,8 +89,11 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_applying,
|
|||||||
BOOST_FIXTURE_TEST_CASE(transaction_xa_sr,
|
BOOST_FIXTURE_TEST_CASE(transaction_xa_sr,
|
||||||
streaming_client_fixture_byte)
|
streaming_client_fixture_byte)
|
||||||
{
|
{
|
||||||
|
wsrep::xid xid(1, 9, 0, "test xid");
|
||||||
|
|
||||||
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
|
BOOST_REQUIRE(cc.start_transaction(wsrep::transaction_id(1)) == 0);
|
||||||
cc.assign_xid("test xid");
|
cc.assign_xid(xid);
|
||||||
|
|
||||||
cc.bytes_generated_ = 1;
|
cc.bytes_generated_ = 1;
|
||||||
BOOST_REQUIRE(cc.after_row() == 0);
|
BOOST_REQUIRE(cc.after_row() == 0);
|
||||||
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
|
BOOST_REQUIRE(tc.streaming_context().fragments_certified() == 1);
|
||||||
|
Reference in New Issue
Block a user