mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-02 05:22:26 +03:00
Close SR transactions when disconnecting from the group.
Moved SR fragment removal for total order BFd SR transactions into after_rollback() call to avoid deadlocking while trying to access storage before rolling back the transaction.
This commit is contained in:
@ -574,6 +574,9 @@ namespace wsrep
|
|||||||
wsrep::unique_lock<wsrep::mutex>&,
|
wsrep::unique_lock<wsrep::mutex>&,
|
||||||
wsrep::high_priority_service&);
|
wsrep::high_priority_service&);
|
||||||
|
|
||||||
|
// Close transactions when handling disconnect from the group.
|
||||||
|
void close_transactions_at_disconnect(wsrep::high_priority_service&);
|
||||||
|
|
||||||
wsrep::mutex& mutex_;
|
wsrep::mutex& mutex_;
|
||||||
wsrep::condition_variable& cond_;
|
wsrep::condition_variable& cond_;
|
||||||
wsrep::server_service& server_service_;
|
wsrep::server_service& server_service_;
|
||||||
|
@ -688,6 +688,11 @@ void wsrep::server_state::on_view(const wsrep::view& view,
|
|||||||
wsrep::log_info() << "Non-primary view";
|
wsrep::log_info() << "Non-primary view";
|
||||||
if (view.final())
|
if (view.final())
|
||||||
{
|
{
|
||||||
|
assert(high_priority_service);
|
||||||
|
if (high_priority_service)
|
||||||
|
{
|
||||||
|
close_transactions_at_disconnect(*high_priority_service);
|
||||||
|
}
|
||||||
state(lock, s_disconnected);
|
state(lock, s_disconnected);
|
||||||
}
|
}
|
||||||
else if (state_ != s_disconnecting)
|
else if (state_ != s_disconnecting)
|
||||||
@ -698,6 +703,11 @@ void wsrep::server_state::on_view(const wsrep::view& view,
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
assert(view.final());
|
assert(view.final());
|
||||||
|
assert(high_priority_service);
|
||||||
|
if (high_priority_service)
|
||||||
|
{
|
||||||
|
close_transactions_at_disconnect(*high_priority_service);
|
||||||
|
}
|
||||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||||
state(lock, s_disconnected);
|
state(lock, s_disconnected);
|
||||||
id_ = wsrep::id::undefined();
|
id_ = wsrep::id::undefined();
|
||||||
@ -1054,3 +1064,25 @@ void wsrep::server_state::close_foreign_sr_transactions(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void wsrep::server_state::close_transactions_at_disconnect(
|
||||||
|
wsrep::high_priority_service& high_priority_service)
|
||||||
|
{
|
||||||
|
// Close streaming applier without removing fragments
|
||||||
|
// from fragment storage. When the server is started again,
|
||||||
|
// it must be able to recover ongoing streaming transactions.
|
||||||
|
streaming_appliers_map::iterator i(streaming_appliers_.begin());
|
||||||
|
while (i != streaming_appliers_.end())
|
||||||
|
{
|
||||||
|
wsrep::high_priority_service* streaming_applier(i->second);
|
||||||
|
{
|
||||||
|
wsrep::high_priority_switch sw(high_priority_service,
|
||||||
|
*streaming_applier);
|
||||||
|
streaming_applier->rollback(
|
||||||
|
wsrep::ws_handle(), wsrep::ws_meta());
|
||||||
|
streaming_applier->after_apply();
|
||||||
|
}
|
||||||
|
streaming_appliers_.erase(i++);
|
||||||
|
server_service_.release_high_priority_service(streaming_applier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -609,6 +609,17 @@ int wsrep::transaction::before_rollback()
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug_log_state("before_rollback_leave");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int wsrep::transaction::after_rollback()
|
||||||
|
{
|
||||||
|
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
||||||
|
debug_log_state("after_rollback_enter");
|
||||||
|
assert(state() == s_aborting ||
|
||||||
|
state() == s_must_replay);
|
||||||
|
|
||||||
if (is_streaming() && bf_aborted_in_total_order_)
|
if (is_streaming() && bf_aborted_in_total_order_)
|
||||||
{
|
{
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
@ -626,17 +637,6 @@ int wsrep::transaction::before_rollback()
|
|||||||
streaming_context_.cleanup();
|
streaming_context_.cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
debug_log_state("before_rollback_leave");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int wsrep::transaction::after_rollback()
|
|
||||||
{
|
|
||||||
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
|
|
||||||
debug_log_state("after_rollback_enter");
|
|
||||||
assert(state() == s_aborting ||
|
|
||||||
state() == s_must_replay);
|
|
||||||
|
|
||||||
if (is_streaming() && state() != s_must_replay)
|
if (is_streaming() && state() != s_must_replay)
|
||||||
{
|
{
|
||||||
clear_fragments();
|
clear_fragments();
|
||||||
|
Reference in New Issue
Block a user