diff --git a/src/client_state.cpp b/src/client_state.cpp index 1f32fac..52bb118 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -192,15 +192,22 @@ int wsrep::client_state::before_statement() int wsrep::client_state::after_statement() { - // wsrep::unique_lock lock(mutex_); + wsrep::unique_lock lock(mutex_); debug_log_state("after_statement: enter"); assert(state() == s_exec); assert(mode() == m_local); -#if 0 - /** - * @todo Check for replay state, do rollback if requested. - */ -#endif // 0 + + if (transaction_.active() && + transaction_.state() == wsrep::transaction::s_must_abort) + { + lock.unlock(); + client_service_.bf_rollback(); + lock.lock(); + assert(transaction_.state() == wsrep::transaction::s_aborted); + override_error(wsrep::e_deadlock_error); + } + lock.unlock(); + (void)transaction_.after_statement(); if (current_error() == wsrep::e_deadlock_error) { @@ -368,8 +375,10 @@ int wsrep::client_state::end_rsu() return ret; } + + /////////////////////////////////////////////////////////////////////////////// -// TOI // +// Misc // /////////////////////////////////////////////////////////////////////////////// int wsrep::client_state::sync_wait(int timeout) diff --git a/src/server_state.cpp b/src/server_state.cpp index 87792cc..53899ab 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -98,6 +98,7 @@ namespace high_priority_service.remove_fragments(ws_meta); high_priority_service.commit(ws_handle, ws_meta); + high_priority_service.after_apply(); return ret; } diff --git a/src/transaction.cpp b/src/transaction.cpp index ebacf47..6d33407 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -506,6 +506,7 @@ int wsrep::transaction::before_rollback() wsrep::unique_lock lock(client_state_.mutex()); debug_log_state("before_rollback_enter"); assert(state() == s_executing || + state() == s_preparing || state() == s_must_abort || // Background rollbacker or rollback initiated from SE state() == s_aborting || @@ -517,6 +518,10 @@ int wsrep::transaction::before_rollback() case wsrep::client_state::m_local: switch (state()) { + case s_preparing: + // Error detected during prepare phase + state(lock, s_must_abort); + // fall through case s_executing: // Voluntary rollback if (is_streaming())