1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-30 07:23:07 +03:00

Replay prepared transactions found BFed after prepare

Handle the case were prepare is bf aborted after it has replicated a
fragment, and before the command finishes in
after_command_before_result() and after_command_after_result() hooks.
This commit is contained in:
Daniele Sciascia
2021-08-12 10:56:00 +02:00
parent 4f1c201c9d
commit 7d6641764b
4 changed files with 135 additions and 40 deletions

View File

@ -194,6 +194,8 @@ namespace wsrep
int after_statement();
void after_command_must_abort(wsrep::unique_lock<wsrep::mutex>&);
void after_applying();
bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
@ -251,6 +253,8 @@ namespace wsrep
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&);
int replay(wsrep::unique_lock<wsrep::mutex>&);
void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&);
int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&);
void cleanup();
void debug_log_state(const char*) const;
void debug_log_key_append(const wsrep::key& key) const;

View File

@ -177,20 +177,19 @@ void wsrep::client_state::after_command_before_result()
if (transaction_.active() &&
transaction_.state() == wsrep::transaction::s_must_abort)
{
override_error(wsrep::e_deadlock_error);
lock.unlock();
client_service_.bf_rollback();
transaction_.after_command_must_abort(lock);
// If keep current error is set, the result will be propagated
// back to client with some future command, so keep the transaction
// open here so that error handling can happen in before_command()
// hook.
if (not keep_command_error_)
{
lock.unlock();
(void)transaction_.after_statement();
lock.lock();
}
lock.lock();
assert(transaction_.state() == wsrep::transaction::s_aborted);
assert(current_error() != wsrep::e_success);
}
state(lock, s_result);
debug_log_state("after_command_before_result: leave");
@ -205,11 +204,8 @@ void wsrep::client_state::after_command_after_result()
if (transaction_.active() &&
transaction_.state() == wsrep::transaction::s_must_abort)
{
lock.unlock();
client_service_.bf_rollback();
lock.lock();
transaction_.after_command_must_abort(lock);
assert(transaction_.state() == wsrep::transaction::s_aborted);
override_error(wsrep::e_deadlock_error);
}
else if (transaction_.active() == false && not keep_command_error_)
{

View File

@ -835,7 +835,13 @@ int wsrep::transaction::after_statement()
break;
case s_must_abort:
case s_cert_failed:
client_state_.override_error(wsrep::e_deadlock_error);
// Error may be set already. For example, if fragment size
// exceeded the maximum size in certify_fragment(), then
// we already have wsrep::e_error_during_commit
if (client_state_.current_error() == wsrep::e_success)
{
client_state_.override_error(wsrep::e_deadlock_error);
}
lock.unlock();
ret = client_service_.bf_rollback();
lock.lock();
@ -849,7 +855,7 @@ int wsrep::transaction::after_statement()
{
if (is_xa() && !ordered())
{
ret = xa_replay(lock);
ret = xa_replay_commit(lock);
}
else
{
@ -900,6 +906,34 @@ int wsrep::transaction::after_statement()
return ret;
}
void wsrep::transaction::after_command_must_abort(
wsrep::unique_lock<wsrep::mutex>& lock)
{
debug_log_state("after_command_must_abort enter");
assert(active());
assert(state_ == s_must_abort);
if (is_xa() && is_streaming())
{
state(lock, s_must_replay);
}
lock.unlock();
client_service_.bf_rollback();
lock.lock();
if (is_xa() && is_streaming())
{
xa_replay(lock);
}
else
{
client_state_.override_error(wsrep::e_deadlock_error);
}
debug_log_state("after_command_must_abort leave");
}
void wsrep::transaction::after_applying()
{
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
@ -1173,9 +1207,8 @@ void wsrep::transaction::xa_detach()
debug_log_state("xa_detach leave");
}
int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock)
{
debug_log_state("xa_replay enter");
assert(lock.owns_lock());
assert(is_xa());
assert(is_streaming());
@ -1197,42 +1230,49 @@ int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
{
client_service_.emergency_shutdown();
}
}
int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
{
debug_log_state("xa_replay enter");
xa_replay_common(lock);
state(lock, s_aborted);
streaming_context_.cleanup();
provider().release(ws_handle_);
cleanup();
client_service_.signal_replayed();
debug_log_state("xa_replay leave");
return 0;
}
int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
{
debug_log_state("xa_replay_commit enter");
xa_replay_common(lock);
lock.unlock();
enum wsrep::provider::status status(client_service_.commit_by_xid());
lock.lock();
int ret(1);
if (bf_abort_client_state_ == wsrep::client_state::s_idle)
switch (status)
{
state(lock, s_aborted);
case wsrep::provider::success:
state(lock, s_committed);
streaming_context_.cleanup();
provider().release(ws_handle_);
cleanup();
ret = 0;
}
else
{
lock.unlock();
enum wsrep::provider::status status(client_service_.commit_by_xid());
lock.lock();
switch (status)
{
case wsrep::provider::success:
state(lock, s_committed);
streaming_context_.cleanup();
provider().release(ws_handle_);
cleanup();
ret = 0;
break;
default:
log_warning() << "Failed to commit by xid during replay";
// Commit by xid failed, return a commit
// error and let the client retry
state(lock, s_preparing);
state(lock, s_prepared);
client_state_.override_error(wsrep::e_error_during_commit, status);
}
break;
default:
log_warning() << "Failed to commit by xid during replay";
// Commit by xid failed, return a commit
// error and let the client retry
state(lock, s_preparing);
state(lock, s_prepared);
client_state_.override_error(wsrep::e_error_during_commit, status);
}
client_service_.signal_replayed();
debug_log_state("xa_replay leave");
debug_log_state("xa_replay_commit leave");
return ret;
}

View File

@ -145,7 +145,62 @@ BOOST_FIXTURE_TEST_CASE(transaction_xa_replay,
BOOST_REQUIRE(cc.unordered_replays() == 1);
// xa_replay() createa a streaming applier, clean it up
// xa_replay() creates a streaming applier, clean it up
wsrep::mock_high_priority_service* hps(
static_cast<wsrep::mock_high_priority_service*>(
sc.find_streaming_applier(sc.id(), wsrep::transaction_id(1))));
BOOST_REQUIRE(hps);
hps->rollback(wsrep::ws_handle(), wsrep::ws_meta());
hps->after_apply();
sc.stop_streaming_applier(sc.id(), wsrep::transaction_id(1));
server_service.release_high_priority_service(hps);
}
BOOST_FIXTURE_TEST_CASE(transaction_xa_replay_after_command_before_result,
replicating_client_fixture_sync_rm)
{
wsrep::xid xid(1, 1, 1, "id");
cc.start_transaction(wsrep::transaction_id(1));
cc.assign_xid(xid);
cc.before_prepare();
cc.after_prepare();
BOOST_REQUIRE(cc.state() == wsrep::client_state::s_exec);
wsrep_test::bf_abort_unordered(cc);
cc.after_command_before_result();
cc.after_command_after_result();
BOOST_REQUIRE(cc.unordered_replays() == 1);
// xa_replay() creates a streaming applier, clean it up
wsrep::mock_high_priority_service* hps(
static_cast<wsrep::mock_high_priority_service*>(
sc.find_streaming_applier(sc.id(), wsrep::transaction_id(1))));
BOOST_REQUIRE(hps);
hps->rollback(wsrep::ws_handle(), wsrep::ws_meta());
hps->after_apply();
sc.stop_streaming_applier(sc.id(), wsrep::transaction_id(1));
server_service.release_high_priority_service(hps);
}
BOOST_FIXTURE_TEST_CASE(transaction_xa_replay_after_command_after_result,
replicating_client_fixture_sync_rm)
{
wsrep::xid xid(1, 1, 1, "id");
cc.start_transaction(wsrep::transaction_id(1));
cc.assign_xid(xid);
cc.before_prepare();
cc.after_prepare();
cc.after_command_before_result();
BOOST_REQUIRE(cc.state() == wsrep::client_state::s_result);
wsrep_test::bf_abort_unordered(cc);
cc.after_command_after_result();
BOOST_REQUIRE(cc.unordered_replays() == 1);
// xa_replay() creates a a streaming applier, clean it up
wsrep::mock_high_priority_service* hps(
static_cast<wsrep::mock_high_priority_service*>(
sc.find_streaming_applier(sc.id(), wsrep::transaction_id(1))));