1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Fixes for XA transactions with streaming enabled

Changes mostly related to handling of XA PREPARE fragments
This commit is contained in:
Leandro Pacheco
2019-02-11 11:38:17 +01:00
committed by Daniele Sciascia
parent 54b0eeee45
commit 36346beab4
12 changed files with 66 additions and 6 deletions

View File

@ -48,6 +48,10 @@ namespace db
{ {
return false; return false;
} }
bool is_xa_prepare () const override
{
return false;
}
size_t bytes_generated() const override size_t bytes_generated() const override
{ {
return 0; return 0;

View File

@ -35,6 +35,11 @@ int db::high_priority_service::start_transaction(
return client_.client_state().start_transaction(ws_handle, ws_meta); return client_.client_state().start_transaction(ws_handle, ws_meta);
} }
int db::high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta)
{
return client_.client_state().next_fragment(ws_meta);
}
const wsrep::transaction& db::high_priority_service::transaction() const const wsrep::transaction& db::high_priority_service::transaction() const
{ {
return client_.client_state().transaction(); return client_.client_state().transaction();

View File

@ -32,6 +32,7 @@ namespace db
high_priority_service(db::server& server, db::client& client); high_priority_service(db::server& server, db::client& client);
int start_transaction(const wsrep::ws_handle&, int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) override; const wsrep::ws_meta&) override;
int next_fragment(const wsrep::ws_meta&) override;
const wsrep::transaction& transaction() const override; const wsrep::transaction& transaction() const override;
int adopt_transaction(const wsrep::transaction&) override; int adopt_transaction(const wsrep::transaction&) override;
int apply_write_set(const wsrep::ws_meta&, int apply_write_set(const wsrep::ws_meta&,

View File

@ -78,6 +78,11 @@ namespace wsrep
*/ */
virtual bool is_xa() const = 0; virtual bool is_xa() const = 0;
/**
* Return true if the current statement is XA PREPARE
*/
virtual bool is_xa_prepare() const = 0;
// //
// Streaming // Streaming
// //

View File

@ -436,6 +436,14 @@ namespace wsrep
return transaction_.start_transaction(wsh, meta); return transaction_.start_transaction(wsh, meta);
} }
int next_fragment(const wsrep::ws_meta& meta)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(mode_ == m_high_priority);
return transaction_.next_fragment(meta);
}
/** @name Commit ordering interface */ /** @name Commit ordering interface */
/** @{ */ /** @{ */
int before_prepare() int before_prepare()

View File

@ -51,6 +51,11 @@ namespace wsrep
virtual int start_transaction(const wsrep::ws_handle&, virtual int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0; const wsrep::ws_meta&) = 0;
/**
* Start the next fragment of current transaction
*/
virtual int next_fragment(const wsrep::ws_meta&) = 0;
/** /**
* Return transaction object associated to high priority * Return transaction object associated to high priority
* service state. * service state.

View File

@ -126,6 +126,11 @@ namespace wsrep
return client_service_.is_xa(); return client_service_.is_xa();
} }
bool is_xa_prepare() const
{
return client_service_.is_xa_prepare();
}
bool pa_unsafe() const { return pa_unsafe_; } bool pa_unsafe() const { return pa_unsafe_; }
void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; }
@ -137,6 +142,8 @@ namespace wsrep
int start_transaction(const wsrep::ws_handle& ws_handle, int start_transaction(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta); const wsrep::ws_meta& ws_meta);
int next_fragment(const wsrep::ws_meta& ws_meta);
void adopt(const transaction& transaction); void adopt(const transaction& transaction);
void fragment_applied(wsrep::seqno seqno); void fragment_applied(wsrep::seqno seqno);

View File

@ -359,7 +359,7 @@ static int apply_write_set(wsrep::server_state& server_state,
ws_meta, ws_meta,
data); data);
} }
else if (ws_meta.flags() == 0) else if (ws_meta.flags() == 0 || wsrep::prepares_transaction(ws_meta.flags()))
{ {
wsrep::high_priority_service* sa( wsrep::high_priority_service* sa(
server_state.find_streaming_applier( server_state.find_streaming_applier(
@ -380,8 +380,8 @@ static int apply_write_set(wsrep::server_state& server_state,
} }
else else
{ {
ret = apply_fragment(server_state, sa->next_fragment(ws_meta);
high_priority_service, ret = apply_fragment(high_priority_service,
sa, sa,
ws_handle, ws_handle,
ws_meta, ws_meta,
@ -421,6 +421,7 @@ static int apply_write_set(wsrep::server_state& server_state,
else else
{ {
// Commit fragment consumes sa // Commit fragment consumes sa
sa->next_fragment(ws_meta);
ret = commit_fragment(server_state, ret = commit_fragment(server_state,
high_priority_service, high_priority_service,
sa, sa,

View File

@ -176,6 +176,16 @@ int wsrep::transaction::start_transaction(
return 0; return 0;
} }
int wsrep::transaction::next_fragment(
const wsrep::ws_meta& ws_meta)
{
debug_log_state("next_fragment enter");
ws_meta_ = ws_meta;
debug_log_state("next_fragment leave");
return 0;
}
void wsrep::transaction::adopt(const wsrep::transaction& transaction) void wsrep::transaction::adopt(const wsrep::transaction& transaction)
{ {
debug_log_state("adopt enter"); debug_log_state("adopt enter");
@ -1151,12 +1161,12 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock)
break; break;
} }
if (streaming_context_.fragment_size_exceeded()) if (streaming_context_.fragment_size_exceeded() || client_service_.is_xa_prepare())
{ {
// Some statements have no effect. Do not atttempt to // Some statements have no effect. Do not atttempt to
// replicate a fragment if no data has been generated // replicate a fragment if no data has been generated
// since last fragment replication. // since last fragment replication, and statement is not XA PREPARE.
if (bytes_to_replicate <= 0) if (bytes_to_replicate <= 0 && !client_service_.is_xa_prepare())
{ {
assert(bytes_to_replicate == 0); assert(bytes_to_replicate == 0);
return ret; return ret;
@ -1414,6 +1424,7 @@ int wsrep::transaction::certify_commit(
} }
flags(flags() | wsrep::provider::flag::commit); flags(flags() | wsrep::provider::flag::commit);
flags(flags() & ~wsrep::provider::flag::prepare);
if (client_service_.prepare_data_for_replication()) if (client_service_.prepare_data_for_replication())
{ {

View File

@ -134,6 +134,11 @@ namespace wsrep
return false; return false;
} }
bool is_xa_prepare() const WSREP_OVERRIDE
{
return false;
}
size_t bytes_generated() const WSREP_OVERRIDE size_t bytes_generated() const WSREP_OVERRIDE
{ {
return bytes_generated_; return bytes_generated_;

View File

@ -27,6 +27,12 @@ int wsrep::mock_high_priority_service::start_transaction(
return client_state_->start_transaction(ws_handle, ws_meta); return client_state_->start_transaction(ws_handle, ws_meta);
} }
int wsrep::mock_high_priority_service::next_fragment(
const wsrep::ws_meta& ws_meta)
{
return client_state_->next_fragment(ws_meta);
}
int wsrep::mock_high_priority_service::adopt_transaction( int wsrep::mock_high_priority_service::adopt_transaction(
const wsrep::transaction& transaction) const wsrep::transaction& transaction)
{ {

View File

@ -43,6 +43,8 @@ namespace wsrep
int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&)
WSREP_OVERRIDE; WSREP_OVERRIDE;
int next_fragment(const wsrep::ws_meta&) WSREP_OVERRIDE;
const wsrep::transaction& transaction() const WSREP_OVERRIDE const wsrep::transaction& transaction() const WSREP_OVERRIDE
{ return client_state_->transaction(); } { return client_state_->transaction(); }
int adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; int adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;