diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index ce195fe..e476377 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -276,6 +276,7 @@ namespace wsrep wsrep::sr_key_set sr_keys_; wsrep::mutable_buffer apply_error_buf_; wsrep::xid xid_; + bool streaming_rollback_in_progress_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/transaction.cpp b/src/transaction.cpp index 2738271..4a63fa8 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -108,6 +108,7 @@ wsrep::transaction::transaction( , sr_keys_() , apply_error_buf_() , xid_() + , streaming_rollback_in_progress_(false) { } @@ -1828,19 +1829,30 @@ int wsrep::transaction::append_sr_keys_for_commit() return ret; } -void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lock) +void wsrep::transaction::streaming_rollback( + wsrep::unique_lock& lock) { debug_log_state("streaming_rollback enter"); assert(state_ != s_must_replay); assert(is_streaming()); + assert(lock.owns_lock()); + + // Prevent streaming_rollback() to be executed simultaneously. + // Notice that lock is unlocked when calling into server_state + // methods, to avoid violating lock order. + // The condition variable below prevents a thread to go + // through streaming_rollback() while another thread is busy + // stopping or converting the streaming_client(). + // This would be problematic if a thread is performing BF abort, + // while the original client manages to complete its rollback + // and therefore change the state of the transaction, causing + // assertions to fire. + while (streaming_rollback_in_progress_) + client_state_.cond_.wait(lock); + streaming_rollback_in_progress_ = true; + if (streaming_context_.rolled_back() == false) { - // We must set rolled_back id before stopping streaming client - // or converting to applier. Accessing server_state requires - // releasing the client_state lock in order to avoid violating - // locking order, and this will open up a possibility for two - // threads accessing this block simultaneously. - streaming_context_.rolled_back(id_); if (bf_aborted_in_total_order_) { lock.unlock(); @@ -1852,17 +1864,14 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lo // Create a high priority applier which will handle the // rollback fragment or clean up on configuration change. // Adopt transaction will copy fragment set and appropriate - // meta data. Mark current transaction streaming context - // rolled back. + // meta data. lock.unlock(); + server_service_.debug_sync("wsrep_streaming_rollback"); client_state_.server_state_.convert_streaming_client_to_applier( &client_state_); lock.lock(); streaming_context_.cleanup(); - // Cleanup cleans rolled_back_for from streaming context, but - // we want to preserve it to avoid executing this block - // more than once. - streaming_context_.rolled_back(id_); + enum wsrep::provider::status ret; if ((ret = provider().rollback(id_))) { @@ -1871,8 +1880,15 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock& lo << id_ << ": " << ret; } } + + // Mark the streaming context as rolled back, + // so that this block is executed once. + streaming_context_.rolled_back(id_); } + debug_log_state("streaming_rollback leave"); + streaming_rollback_in_progress_ = false; + client_state_.cond_.notify_all(); } int wsrep::transaction::replay(wsrep::unique_lock& lock)