diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index b2c751a..1239f92 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -33,7 +33,9 @@ namespace db public: client_service(db::client& client); - bool interrupted() const override { return false; } + bool interrupted(wsrep::unique_lock&) + const override + { return false; } void reset_globals() override { } void store_globals() override { } int prepare_data_for_replication() override diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 3a455d4..5ca04fe 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -43,9 +43,12 @@ namespace wsrep /** * Return true if the current transaction has been interrupted - * by the DBMS. + * by the DBMS. The lock which is passed to interrupted call + * will always have underlying mutex locked. + * + * @param lock Lock object grabbed by the client_state */ - virtual bool interrupted() const = 0; + virtual bool interrupted(wsrep::unique_lock& lock) const = 0; /** * Reset possible global or thread local parameters associated diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index eb31a0d..5284180 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -197,6 +197,12 @@ namespace wsrep wsrep::provider& provider(); void flags(int flags) { flags_ = flags; } + // Return true if the transaction must abort, is aborting, + // or has been aborted, or has been interrupted by DBMS + // as indicated by client_service::interrupted() call. + // The call will adjust transaction state and set client_state + // error status accordingly. + bool abort_or_interrupt(wsrep::unique_lock&); int streaming_step(wsrep::unique_lock&); int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); diff --git a/src/server_state.cpp b/src/server_state.cpp index 4d92ccf..b7a13c1 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -1153,7 +1153,7 @@ void wsrep::server_state::state( { 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */ { 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */ { 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */ - { 1, 0, 0, 0, 0, 1, 0, 0, 1}, /* dor */ + { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */ { 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */ { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */ }; diff --git a/src/transaction.cpp b/src/transaction.cpp index fb0d920..2152b94 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -983,24 +983,49 @@ void wsrep::transaction::state( { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */ { 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0} /* re */ }; - if (allowed[state_][next_state]) - { - state_hist_.push_back(state_); - if (state_hist_.size() == 12) - { - state_hist_.erase(state_hist_.begin()); - } - state_ = next_state; - } - else + + if (!allowed[state_][next_state]) { std::ostringstream os; os << "unallowed state transition for transaction " << id_ << ": " << wsrep::to_string(state_) << " -> " << wsrep::to_string(next_state); - wsrep::log_error() << os.str(); - throw wsrep::runtime_error(os.str()); + wsrep::log_warning() << os.str(); + assert(0); } + + state_hist_.push_back(state_); + if (state_hist_.size() == 12) + { + state_hist_.erase(state_hist_.begin()); + } + state_ = next_state; +} + +bool wsrep::transaction::abort_or_interrupt( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + if (state() == s_must_abort) + { + client_state_.override_error(wsrep::e_deadlock_error); + return true; + } + else if (state() == s_aborting || state() == s_aborted) + { + assert(client_state_.current_error()); + return true; + } + else if (client_service_.interrupted(lock)) + { + client_state_.override_error(wsrep::e_interrupted_error); + if (state() != s_must_abort) + { + state(lock, s_must_abort); + } + return true; + } + return false; } int wsrep::transaction::streaming_step(wsrep::unique_lock& lock) @@ -1052,24 +1077,14 @@ int wsrep::transaction::certify_fragment( state() == s_must_abort); client_service_.wait_for_replayers(lock); - if (state() == s_must_abort) + if (abort_or_interrupt(lock)) { - client_state_.override_error(wsrep::e_deadlock_error); return 1; } state(lock, s_certifying); - lock.unlock(); - if (client_service_.interrupted()) - { - lock.lock(); - state(lock, s_must_abort); - client_state_.override_error(wsrep::e_interrupted_error); - return 1; - } - wsrep::mutable_buffer data; if (client_service_.prepare_fragment_for_replication(data)) { @@ -1272,9 +1287,8 @@ int wsrep::transaction::certify_commit( assert(lock.owns_lock()); - if (state() == s_must_abort) + if (abort_or_interrupt(lock)) { - client_state_.override_error(wsrep::e_deadlock_error); return 1; } @@ -1310,17 +1324,6 @@ int wsrep::transaction::certify_commit( return 1; } - if (client_service_.interrupted()) - { - lock.lock(); - client_state_.override_error(wsrep::e_interrupted_error); - if (state_ != s_must_abort) - { - state(lock, s_must_abort); - } - return 1; - } - client_service_.debug_sync("wsrep_before_certification"); enum wsrep::provider::status cert_ret(provider().certify(client_state_.id(), diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index af25fd5..899ca68 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -78,7 +78,8 @@ namespace wsrep int bf_rollback() WSREP_OVERRIDE; - bool interrupted() const WSREP_OVERRIDE + bool interrupted(wsrep::unique_lock&) + const WSREP_OVERRIDE { return killed_before_certify_; }