mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-28 20:02:00 +03:00
Assertion transaction.is_streaming() in wsrep::transaction::adopt()
Assertion is_streaming() fires in transaction::adopt() when a transaction is BF aborted, while it is in s_executing state, and it manages to complete rollback and cleanup while the BF aborter is executing streaming_rollback() with client_state lock is unlocked. In this case method transaction::adopt() finds a transaction that is no longer marked as streaming, triggering the assertion. A condition variable and flag streaming_rollback_in_progress_ now prevents a client thread to finish rollback, even if the BF aborter has temporarily unlocked the client_state lock.
This commit is contained in:
@ -276,6 +276,7 @@ namespace wsrep
|
|||||||
wsrep::sr_key_set sr_keys_;
|
wsrep::sr_key_set sr_keys_;
|
||||||
wsrep::mutable_buffer apply_error_buf_;
|
wsrep::mutable_buffer apply_error_buf_;
|
||||||
wsrep::xid xid_;
|
wsrep::xid xid_;
|
||||||
|
bool streaming_rollback_in_progress_;
|
||||||
};
|
};
|
||||||
|
|
||||||
static inline const char* to_c_string(enum wsrep::transaction::state state)
|
static inline const char* to_c_string(enum wsrep::transaction::state state)
|
||||||
|
@ -108,6 +108,7 @@ wsrep::transaction::transaction(
|
|||||||
, sr_keys_()
|
, sr_keys_()
|
||||||
, apply_error_buf_()
|
, apply_error_buf_()
|
||||||
, xid_()
|
, xid_()
|
||||||
|
, streaming_rollback_in_progress_(false)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
|
|
||||||
@ -1828,19 +1829,30 @@ int wsrep::transaction::append_sr_keys_for_commit()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lock)
|
void wsrep::transaction::streaming_rollback(
|
||||||
|
wsrep::unique_lock<wsrep::mutex>& lock)
|
||||||
{
|
{
|
||||||
debug_log_state("streaming_rollback enter");
|
debug_log_state("streaming_rollback enter");
|
||||||
assert(state_ != s_must_replay);
|
assert(state_ != s_must_replay);
|
||||||
assert(is_streaming());
|
assert(is_streaming());
|
||||||
|
assert(lock.owns_lock());
|
||||||
|
|
||||||
|
// Prevent streaming_rollback() to be executed simultaneously.
|
||||||
|
// Notice that lock is unlocked when calling into server_state
|
||||||
|
// methods, to avoid violating lock order.
|
||||||
|
// The condition variable below prevents a thread to go
|
||||||
|
// through streaming_rollback() while another thread is busy
|
||||||
|
// stopping or converting the streaming_client().
|
||||||
|
// This would be problematic if a thread is performing BF abort,
|
||||||
|
// while the original client manages to complete its rollback
|
||||||
|
// and therefore change the state of the transaction, causing
|
||||||
|
// assertions to fire.
|
||||||
|
while (streaming_rollback_in_progress_)
|
||||||
|
client_state_.cond_.wait(lock);
|
||||||
|
streaming_rollback_in_progress_ = true;
|
||||||
|
|
||||||
if (streaming_context_.rolled_back() == false)
|
if (streaming_context_.rolled_back() == false)
|
||||||
{
|
{
|
||||||
// We must set rolled_back id before stopping streaming client
|
|
||||||
// or converting to applier. Accessing server_state requires
|
|
||||||
// releasing the client_state lock in order to avoid violating
|
|
||||||
// locking order, and this will open up a possibility for two
|
|
||||||
// threads accessing this block simultaneously.
|
|
||||||
streaming_context_.rolled_back(id_);
|
|
||||||
if (bf_aborted_in_total_order_)
|
if (bf_aborted_in_total_order_)
|
||||||
{
|
{
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
@ -1852,17 +1864,14 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lo
|
|||||||
// Create a high priority applier which will handle the
|
// Create a high priority applier which will handle the
|
||||||
// rollback fragment or clean up on configuration change.
|
// rollback fragment or clean up on configuration change.
|
||||||
// Adopt transaction will copy fragment set and appropriate
|
// Adopt transaction will copy fragment set and appropriate
|
||||||
// meta data. Mark current transaction streaming context
|
// meta data.
|
||||||
// rolled back.
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
server_service_.debug_sync("wsrep_streaming_rollback");
|
||||||
client_state_.server_state_.convert_streaming_client_to_applier(
|
client_state_.server_state_.convert_streaming_client_to_applier(
|
||||||
&client_state_);
|
&client_state_);
|
||||||
lock.lock();
|
lock.lock();
|
||||||
streaming_context_.cleanup();
|
streaming_context_.cleanup();
|
||||||
// Cleanup cleans rolled_back_for from streaming context, but
|
|
||||||
// we want to preserve it to avoid executing this block
|
|
||||||
// more than once.
|
|
||||||
streaming_context_.rolled_back(id_);
|
|
||||||
enum wsrep::provider::status ret;
|
enum wsrep::provider::status ret;
|
||||||
if ((ret = provider().rollback(id_)))
|
if ((ret = provider().rollback(id_)))
|
||||||
{
|
{
|
||||||
@ -1871,8 +1880,15 @@ void wsrep::transaction::streaming_rollback(wsrep::unique_lock<wsrep::mutex>& lo
|
|||||||
<< id_ << ": " << ret;
|
<< id_ << ": " << ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark the streaming context as rolled back,
|
||||||
|
// so that this block is executed once.
|
||||||
|
streaming_context_.rolled_back(id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug_log_state("streaming_rollback leave");
|
debug_log_state("streaming_rollback leave");
|
||||||
|
streaming_rollback_in_progress_ = false;
|
||||||
|
client_state_.cond_.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
|
||||||
|
Reference in New Issue
Block a user