diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 9735048..7b0409a 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -136,13 +136,25 @@ namespace wsrep static const int state_max_ = s_quitting + 1; /** - * Store variables related to global execution context. + * Aqcuire ownership on the thread. + * * This method should be called every time the thread - * operating the client state changes. + * operating the client state changes. This method is called + * implicitly from before_command() and + * wait_rollback_complete_and_acquire_ownership(). + */ + void acquire_ownership() + { + wsrep::unique_lock lock(mutex_); + do_acquire_ownership(lock); + } + + /** + * @deprecated Use acquire_ownership() instead. */ void store_globals() { - current_thread_id_ = wsrep::this_thread::get_id(); + acquire_ownership(); } /** @@ -394,7 +406,7 @@ namespace wsrep const wsrep::ws_meta& meta) { wsrep::unique_lock lock(mutex_); - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(mode_ == m_high_priority); return transaction_.start_transaction(wsh, meta); } @@ -404,7 +416,7 @@ namespace wsrep int before_prepare() { wsrep::unique_lock lock(mutex_); - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec); return transaction_.before_prepare(lock); } @@ -412,35 +424,35 @@ namespace wsrep int after_prepare() { wsrep::unique_lock lock(mutex_); - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec); return transaction_.after_prepare(lock); } int before_commit() { - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); return transaction_.before_commit(); } int ordered_commit() { - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); return transaction_.ordered_commit(); } int after_commit() { - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_exec || mode_ == m_local); return transaction_.after_commit(); } /** @} */ int before_rollback() { - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_idle || state_ == s_exec || state_ == s_result || @@ -450,7 +462,7 @@ namespace wsrep int after_rollback() { - assert(current_thread_id_ == wsrep::this_thread::get_id()); + assert(owning_thread_id_ == wsrep::this_thread::get_id()); assert(state_ == s_idle || state_ == s_exec || state_ == s_result || @@ -461,16 +473,21 @@ namespace wsrep /** * This method should be called by the background rollbacker * thread after the rollback is complete. This will allow - * the client to proceed through before_command(). + * the client to proceed through before_command() and + * wait_rollback_complete_and_acquire_ownership(). */ - void sync_rollback_complete() - { - wsrep::unique_lock lock(mutex_); - assert(state_ == s_idle && mode_ == m_local && - transaction_.state() == wsrep::transaction::s_aborted); - set_rollbacker(false); - cond_.notify_all(); - } + void sync_rollback_complete(); + + /** + * Wait for background rollback to complete. This method can + * be called before before_command() to verify that the + * background rollback has been finished. After the call returns, + * it is guaranteed that BF abort does not launch background + * rollback process before after_command_after_result() is called. + * This method is idempotent, it can be called many times + * by the same thread before before_command() is called. + */ + void wait_rollback_complete_and_acquire_ownership(); /** @} */ // @@ -785,8 +802,7 @@ namespace wsrep const client_id& id, enum mode mode) : owning_thread_id_(wsrep::this_thread::get_id()) - , current_thread_id_(owning_thread_id_) - , has_rollbacker_(false) + , rollbacker_active_(false) , mutex_(mutex) , cond_(cond) , server_state_(server_state) @@ -815,6 +831,11 @@ namespace wsrep friend class client_toi_mode; friend class transaction; + void do_acquire_ownership(wsrep::unique_lock& lock); + // Wait for sync rollbacker to finish, with lock. Changes state + // to exec. + void do_wait_rollback_complete_and_acquire_ownership( + wsrep::unique_lock& lock); void update_last_written_gtid(const wsrep::gtid&); void debug_log_state(const char*) const; void state(wsrep::unique_lock& lock, enum state state); @@ -831,8 +852,7 @@ namespace wsrep void leave_toi_common(); wsrep::thread::id owning_thread_id_; - wsrep::thread::id current_thread_id_; - bool has_rollbacker_; + bool rollbacker_active_; wsrep::mutex& mutex_; wsrep::condition_variable& cond_; wsrep::server_state& server_state_; @@ -852,18 +872,18 @@ namespace wsrep enum wsrep::provider::status current_error_status_; /** - * Assigns external rollbacker thread for the client - * this will block client in before_command(), until - * rolbacker has released the client + * Marks external rollbacker thread for the client + * as active. This will block client in before_command(), until + * rolbacker has released the client. */ - void set_rollbacker(bool value) + void set_rollbacker_active(bool value) { - has_rollbacker_ = value; + rollbacker_active_ = value; } - bool has_rollbacker() + bool is_rollbacker_active() { - return(has_rollbacker_); + return rollbacker_active_; } }; diff --git a/src/client_state.cpp b/src/client_state.cpp index a5fa52f..2b8ead9 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -36,8 +36,7 @@ void wsrep::client_state::open(wsrep::client_id id) assert(state_ == s_none); debug_log_state("open: enter"); owning_thread_id_ = wsrep::this_thread::get_id(); - current_thread_id_ = owning_thread_id_; - has_rollbacker_ = false; + rollbacker_active_ = false; sync_wait_gtid_ = wsrep::gtid::undefined(); last_written_gtid_ = wsrep::gtid::undefined(); state(lock, s_idle); @@ -88,25 +87,30 @@ int wsrep::client_state::before_command() { wsrep::unique_lock lock(mutex_); debug_log_state("before_command: enter"); - assert(state_ == s_idle); - if (transaction_.active() && - server_state_.rollback_mode() == wsrep::server_state::rm_sync) + // If the state is s_exec, the processing thread has already grabbed + // control with wait_rollback_complete_and_acquire_ownership() + if (state_ != s_exec) { - /* - * has_rollbacker() returns false, when background rollback is over - */ - while (has_rollbacker()) - { - cond_.wait(lock); - } + assert(state_ == s_idle); + do_wait_rollback_complete_and_acquire_ownership(lock); + assert(state_ == s_exec); } - store_globals(); // Marks the control for this thread - state(lock, s_exec); + else + { + // This thread must have acquired control by other means, + // for example via wait_rollback_complete_and_acquire_ownership(). + assert(wsrep::this_thread::get_id() == owning_thread_id_); + } + + // If the transaction is active, it must be either executing, + // aborted as rolled back by rollbacker, or must_abort if the + // client thread gained control via + // wait_rollback_complete_and_acquire_ownership() + // just before BF abort happened. assert(transaction_.active() == false || (transaction_.state() == wsrep::transaction::s_executing || transaction_.state() == wsrep::transaction::s_aborted || - (transaction_.state() == wsrep::transaction::s_must_abort && - server_state_.rollback_mode() == wsrep::server_state::rm_async))); + transaction_.state() == wsrep::transaction::s_must_abort)); if (transaction_.active()) { @@ -257,6 +261,33 @@ int wsrep::client_state::after_statement() return 0; } +////////////////////////////////////////////////////////////////////////////// +// Rollbacker synchronization // +////////////////////////////////////////////////////////////////////////////// + +void wsrep::client_state::sync_rollback_complete() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("sync_rollback_complete: enter"); + assert(state_ == s_idle && mode_ == m_local && + transaction_.state() == wsrep::transaction::s_aborted); + set_rollbacker_active(false); + cond_.notify_all(); + debug_log_state("sync_rollback_complete: leave"); +} + +void wsrep::client_state::wait_rollback_complete_and_acquire_ownership() +{ + wsrep::unique_lock lock(mutex_); + debug_log_state("wait_rollback_complete_and_acquire_ownership: enter"); + if (state_ == s_idle) + { + do_wait_rollback_complete_and_acquire_ownership(lock); + } + assert(state_ == s_exec); + debug_log_state("wait_rollback_complete_and_acquire_ownership: leave"); +} + ////////////////////////////////////////////////////////////////////////////// // Streaming // ////////////////////////////////////////////////////////////////////////////// @@ -443,6 +474,32 @@ int wsrep::client_state::sync_wait(int timeout) // Private // /////////////////////////////////////////////////////////////////////////////// +void wsrep::client_state::do_acquire_ownership( + wsrep::unique_lock& lock WSREP_UNUSED) +{ + assert(lock.owns_lock()); + // Be strict about client state for clients in local mode. The + // owning_thread_id_ is used to detect bugs which are caused by + // more than one thread operating the client state at the time, + // for example thread handling the client session and background + // rollbacker. + assert(state_ == s_idle || mode_ != m_local); + owning_thread_id_ = wsrep::this_thread::get_id(); +} + +void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership( + wsrep::unique_lock& lock) +{ + assert(lock.owns_lock()); + assert(state_ == s_idle); + while (is_rollbacker_active()) + { + cond_.wait(lock); + } + do_acquire_ownership(lock); + state(lock, s_exec); +} + void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid) { assert(last_written_gtid_.is_undefined() || @@ -468,19 +525,9 @@ void wsrep::client_state::state( wsrep::unique_lock& lock WSREP_UNUSED, enum wsrep::client_state::state state) { - // For locally processing client states (local, toi, rsu) - // changing the state is allowed only from the owning thread. - // In high priority mode the processing thread however may - // change and we check only that store_globals() has been - // called by the current thread to gain ownership. - // - // Note that this check assumes that there is always a single - // thread per local client connection. This may not always - // hold and the sanity check mechanism may need to be revised. - assert((mode_ != m_high_priority && - wsrep::this_thread::get_id() == owning_thread_id_) || - (mode_ == m_high_priority && - wsrep::this_thread::get_id() == current_thread_id_)); + // Verify that the current thread has gained control to the + // connection by calling before_command() + assert(wsrep::this_thread::get_id() == owning_thread_id_); assert(lock.owns_lock()); static const char allowed[state_max_][state_max_] = { @@ -497,7 +544,6 @@ void wsrep::client_state::state( << state_ << " -> " << state; assert(0); } - state_hist_.push_back(state_); state_ = state; if (state_hist_.size() > 10) diff --git a/src/server_state.cpp b/src/server_state.cpp index 0b71349..b83273d 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -71,6 +71,7 @@ static inline int resolve_return_error(bool const vote, static void discard_streaming_applier(wsrep::server_state& server_state, + wsrep::high_priority_service& high_priority_service, wsrep::high_priority_service* streaming_applier, const wsrep::ws_meta& ws_meta) { @@ -78,6 +79,7 @@ discard_streaming_applier(wsrep::server_state& server_state, ws_meta.server_id(), ws_meta.transaction_id()); server_state.server_service().release_high_priority_service( streaming_applier); + high_priority_service.store_globals(); } static int apply_fragment(wsrep::server_state& server_state, @@ -135,7 +137,10 @@ static int apply_fragment(wsrep::server_state& server_state, } else { - discard_streaming_applier(server_state, streaming_applier,ws_meta); + discard_streaming_applier(server_state, + high_priority_service, + streaming_applier, + ws_meta); ret = resolve_return_error(err.size() > 0, ret, apply_err); } } @@ -188,7 +193,8 @@ static int commit_fragment(wsrep::server_state& server_state, if (!ret) { - discard_streaming_applier(server_state, streaming_applier, ws_meta); + discard_streaming_applier(server_state, high_priority_service, + streaming_applier, ws_meta); } return ret; @@ -227,7 +233,8 @@ static int rollback_fragment(wsrep::server_state& server_state, if (!ret) { - discard_streaming_applier(server_state, streaming_applier, ws_meta); + discard_streaming_applier(server_state, high_priority_service, + streaming_applier, ws_meta); if (adopt_error == 0) { @@ -1282,7 +1289,7 @@ void wsrep::server_state::state( { 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */ { 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */ { 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */ - { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */ + { 1, 0, 0, 1, 0, 1, 0, 1, 1}, /* dor */ { 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */ { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */ }; @@ -1438,6 +1445,7 @@ void wsrep::server_state::close_orphaned_sr_transactions( streaming_appliers_.erase(i++); server_service_.release_high_priority_service(streaming_applier); + high_priority_service.store_globals(); wsrep::ws_meta ws_meta( wsrep::gtid(), wsrep::stid(server_id, transaction_id, wsrep::client_id()), @@ -1478,6 +1486,7 @@ void wsrep::server_state::close_transactions_at_disconnect( } streaming_appliers_.erase(i++); server_service_.release_high_priority_service(streaming_applier); + high_priority_service.store_globals(); } streaming_appliers_recovered_ = false; } diff --git a/src/transaction.cpp b/src/transaction.cpp index b6048d7..59fa369 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -906,7 +906,7 @@ bool wsrep::transaction::bf_abort( // between releasing the lock and before background // rollbacker gets control. state(lock, wsrep::transaction::s_aborting); - client_state_.set_rollbacker(true); + client_state_.set_rollbacker_active(true); if (client_state_.mode() == wsrep::client_state::m_high_priority) { @@ -974,21 +974,12 @@ void wsrep::transaction::state( << " -> " << to_string(next_state)); assert(lock.owns_lock()); - // BF aborter is allowed to change the state to must abort and - // further to aborting and aborted if the background rollbacker - // is launched. - // - // For high priority streaming applier threads the assertion must - // be relaxed to check only current thread id which indicates that - // the store_globals() has been called before processing of write set - // starts. - assert((client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || - next_state == s_must_abort || - next_state == s_aborting || - next_state == s_aborted) - || - (client_state_.mode() == wsrep::client_state::m_high_priority && - wsrep::this_thread::get_id() == client_state_.current_thread_id_)); + // BF aborter is allowed to change the state without gaining control + // to the state if the next state is s_must_abort or s_aborting. + assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || + next_state == s_must_abort || + next_state == s_aborting); + static const char allowed[n_states][n_states] = { /* ex pr ce co oc ct cf ma ab ad mr re */ { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ @@ -1007,11 +998,9 @@ void wsrep::transaction::state( if (!allowed[state_][next_state]) { - std::ostringstream os; - os << "unallowed state transition for transaction " - << id_ << ": " << wsrep::to_string(state_) - << " -> " << wsrep::to_string(next_state); - wsrep::log_warning() << os.str(); + wsrep::log_debug() << "unallowed state transition for transaction " + << id_ << ": " << wsrep::to_string(state_) + << " -> " << wsrep::to_string(next_state); assert(0); } diff --git a/test/transaction_test.cpp b/test/transaction_test.cpp index 73f3b6b..dd4cf38 100644 --- a/test/transaction_test.cpp +++ b/test/transaction_test.cpp @@ -773,8 +773,6 @@ BOOST_FIXTURE_TEST_CASE_TEMPLATE( BOOST_REQUIRE(cc.current_error() == wsrep::e_success); } - - // // Test a transaction which gets BF aborted before before_statement. // @@ -938,6 +936,40 @@ BOOST_FIXTURE_TEST_CASE( BOOST_REQUIRE(tc.active() == false); } +// Check the case where client program calls wait_rollback_complete() to +// gain control before before_command(). +BOOST_FIXTURE_TEST_CASE( + transaction_1pc_bf_abort_after_after_command_after_result_sync_rm_wait_rollback, + replicating_client_fixture_sync_rm) +{ + cc.start_transaction(wsrep::transaction_id(1)); + BOOST_REQUIRE(tc.active()); + cc.after_statement(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_exec); + cc.after_command_before_result(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_result); + cc.after_command_after_result(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_idle); + wsrep_test::bf_abort_unordered(cc); + BOOST_REQUIRE(tc.state() == wsrep::transaction::s_aborted); + BOOST_REQUIRE(tc.active()); + cc.sync_rollback_complete(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_idle); + cc.wait_rollback_complete_and_acquire_ownership(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_exec); + // Idempotent + cc.wait_rollback_complete_and_acquire_ownership(); + BOOST_REQUIRE(cc.state() == wsrep::client_state::s_exec); + BOOST_REQUIRE(cc.before_command() == 1); + BOOST_REQUIRE(tc.active() == false); + BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error); + cc.after_command_before_result(); + BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error); + cc.after_command_after_result(); + BOOST_REQUIRE(cc.current_error() == wsrep::e_success); + BOOST_REQUIRE(tc.active() == false); +} + BOOST_FIXTURE_TEST_CASE( transaction_1pc_bf_abort_after_after_command_after_result_async_rm, replicating_client_fixture_async_rm)