1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Incorrect assertion and state handling in after_replay().

If the transaction fails during replay because of certification
failure, the provider will return control to applier without
terminating the transaction and transaction remains in
s_replaying.

Fixed transaction::after_statement() to handle the state changes
correctly if certification failure is returned from replay.
Replaying was extracted to separate private method from
after_statement(). Removed transaction::after_replay() as it
seems now unnecessary and it bypassed state change sanity checks.

Allowed replaying -> committed transaction transition to handle
the situation where DBMS allocates a new context and client_state
to do the replay.
This commit is contained in:
Teemu Ollakka
2019-12-27 18:27:16 +02:00
parent 90157ed1b0
commit 76f7249b8d
3 changed files with 59 additions and 55 deletions

View File

@ -640,8 +640,6 @@ namespace wsrep
/** /**
* Clone enough state from another transaction so that replaing will * Clone enough state from another transaction so that replaing will
* be possible with a transaction contained in this client state. * be possible with a transaction contained in this client state.
* Method after_replay() must be used to inject the state after
* replaying back to this client state.
* *
* @param transaction Transaction which is to be replied in this * @param transaction Transaction which is to be replied in this
* client state * client state
@ -652,16 +650,6 @@ namespace wsrep
transaction_.clone_for_replay(transaction); transaction_.clone_for_replay(transaction);
} }
/**
* Copy state from another transaction context after replay.
*
* @param transaction Transaction which was used for replaying.
*/
void after_replay(const wsrep::transaction& transaction)
{
transaction_.after_replay(transaction);
}
/** @name Non-transactional operations */ /** @name Non-transactional operations */
/** @{*/ /** @{*/

View File

@ -199,8 +199,6 @@ namespace wsrep
void clone_for_replay(const wsrep::transaction& other); void clone_for_replay(const wsrep::transaction& other);
void after_replay(const wsrep::transaction& other);
bool bf_aborted() const bool bf_aborted() const
{ {
return (bf_abort_client_state_ != 0); return (bf_abort_client_state_ != 0);
@ -248,6 +246,7 @@ namespace wsrep
int append_sr_keys_for_commit(); int append_sr_keys_for_commit();
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&); int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&); void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&);
int replay(wsrep::unique_lock<wsrep::mutex>&);
void clear_fragments(); void clear_fragments();
void cleanup(); void cleanup();
void debug_log_state(const char*) const; void debug_log_state(const char*) const;

View File

@ -808,37 +808,7 @@ int wsrep::transaction::after_statement()
// Fall through // Fall through
case s_must_replay: case s_must_replay:
{ {
state(lock, s_replaying); ret = replay(lock);
// Need to remember streaming state before replay, entering
// after_commit() after succesful replay will clear
// fragments.
const bool was_streaming(is_streaming());
lock.unlock();
enum wsrep::provider::status replay_ret(client_service_.replay());
if (was_streaming)
{
client_state_.server_state_.stop_streaming_client(&client_state_);
}
switch (replay_ret)
{
case wsrep::provider::success:
provider().release(ws_handle_);
break;
case wsrep::provider::error_certification_failed:
client_state_.override_error(
wsrep::e_deadlock_error);
ret = 1;
break;
default:
client_service_.emergency_shutdown();
break;
}
lock.lock();
if (ret)
{
wsrep::log_info() << "Replay ret " << replay_ret;
state(lock, s_aborted);
}
break; break;
} }
case s_aborted: case s_aborted:
@ -886,10 +856,12 @@ void wsrep::transaction::after_applying()
assert(state_ == s_executing || assert(state_ == s_executing ||
state_ == s_prepared || state_ == s_prepared ||
state_ == s_committed || state_ == s_committed ||
state_ == s_aborted); state_ == s_aborted ||
state_ == s_replaying);
if (state_ != s_executing && state_ != s_prepared) if (state_ != s_executing && state_ != s_prepared)
{ {
if (state_ == s_replaying) state(lock, s_aborted);
cleanup(); cleanup();
} }
else else
@ -1032,14 +1004,6 @@ void wsrep::transaction::clone_for_replay(const wsrep::transaction& other)
state_ = s_replaying; state_ = s_replaying;
} }
void wsrep::transaction::after_replay(const wsrep::transaction& other)
{
// Other must have been terminated
assert(other.state() == s_committed || other.state() == s_aborted);
state_ = other.state();
clear_fragments();
}
int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid) int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid)
{ {
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_); wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
@ -1154,7 +1118,7 @@ void wsrep::transaction::state(
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
{ 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ { 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0} /* re */
}; };
if (!allowed[state_][next_state]) if (!allowed[state_][next_state])
@ -1721,6 +1685,59 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lo
debug_log_state("streaming_rollback leave"); debug_log_state("streaming_rollback leave");
} }
int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
{
int ret(0);
state(lock, s_replaying);
// Need to remember streaming state before replay, entering
// after_commit() after succesful replay will clear
// fragments.
const bool was_streaming(is_streaming());
lock.unlock();
client_service_.debug_sync("wsrep_before_replay");
enum wsrep::provider::status replay_ret(client_service_.replay());
if (was_streaming)
{
client_state_.server_state_.stop_streaming_client(&client_state_);
}
lock.lock();
switch (replay_ret)
{
case wsrep::provider::success:
if (state() == s_replaying)
{
// Replay was done by using different client state, adjust state
// to committed.
state(lock, s_committed);
}
if (is_streaming())
{
clear_fragments();
}
provider().release(ws_handle_);
break;
case wsrep::provider::error_certification_failed:
client_state_.override_error(
wsrep::e_deadlock_error);
if (is_streaming())
{
client_service_.remove_fragments();
clear_fragments();
}
state(lock, s_aborted);
ret = 1;
break;
default:
client_service_.emergency_shutdown();
break;
}
WSREP_LOG_DEBUG(client_state_.debug_log_level(),
wsrep::log::debug_level_transaction,
"replay returned" << replay_ret);
return ret;
}
void wsrep::transaction::clear_fragments() void wsrep::transaction::clear_fragments()
{ {
streaming_context_.cleanup(); streaming_context_.cleanup();