mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-06-16 02:01:44 +03:00
Fixes to SR transaction processing
* Release server lock temporarily when BF aborting local SR transaction during view event processing * Check transaction state for BF aborts in before_prepare() after the lock has acquired after fragment removal * Send rollback fragment only from streaming_rollback()
This commit is contained in:
@ -990,7 +990,18 @@ void wsrep::server_state::close_foreign_sr_transactions(
|
|||||||
streaming_clients_map::iterator i(streaming_clients_.begin());
|
streaming_clients_map::iterator i(streaming_clients_.begin());
|
||||||
wsrep::client_id client_id(i->first);
|
wsrep::client_id client_id(i->first);
|
||||||
wsrep::transaction_id transaction_id(i->second->transaction().id());
|
wsrep::transaction_id transaction_id(i->second->transaction().id());
|
||||||
|
// It is safe to unlock the server state temporarily here.
|
||||||
|
// The processing happens inside view handler which is
|
||||||
|
// protected by the provider commit ordering critical
|
||||||
|
// section. The lock must be unlocked temporarily to
|
||||||
|
// allow converting the current client to streaming
|
||||||
|
// applier in transaction::streaming_rollback().
|
||||||
|
// The iterator i may be invalidated when the server state
|
||||||
|
// remains unlocked, so it should not be accessed after
|
||||||
|
// the bf abort call.
|
||||||
|
lock.unlock();
|
||||||
i->second->total_order_bf_abort(current_view_.view_seqno());
|
i->second->total_order_bf_abort(current_view_.view_seqno());
|
||||||
|
lock.lock();
|
||||||
streaming_clients_map::const_iterator found_i;
|
streaming_clients_map::const_iterator found_i;
|
||||||
while ((found_i = streaming_clients_.find(client_id)) !=
|
while ((found_i = streaming_clients_.find(client_id)) !=
|
||||||
streaming_clients_.end() &&
|
streaming_clients_.end() &&
|
||||||
|
@ -301,6 +301,11 @@ int wsrep::transaction::before_prepare(
|
|||||||
lock.lock();
|
lock.lock();
|
||||||
client_service_.debug_crash(
|
client_service_.debug_crash(
|
||||||
"crash_last_fragment_commit_after_fragment_removal");
|
"crash_last_fragment_commit_after_fragment_removal");
|
||||||
|
if (state() == s_must_abort)
|
||||||
|
{
|
||||||
|
client_state_.override_error(wsrep::e_deadlock_error);
|
||||||
|
ret = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case wsrep::client_state::m_high_priority:
|
case wsrep::client_state::m_high_priority:
|
||||||
@ -538,6 +543,10 @@ int wsrep::transaction::before_rollback()
|
|||||||
switch (client_state_.mode())
|
switch (client_state_.mode())
|
||||||
{
|
{
|
||||||
case wsrep::client_state::m_local:
|
case wsrep::client_state::m_local:
|
||||||
|
if (is_streaming())
|
||||||
|
{
|
||||||
|
client_service_.debug_sync("wsrep_before_SR_rollback");
|
||||||
|
}
|
||||||
switch (state())
|
switch (state())
|
||||||
{
|
{
|
||||||
case s_preparing:
|
case s_preparing:
|
||||||
@ -869,14 +878,7 @@ bool wsrep::transaction::bf_abort(
|
|||||||
else if (client_state_.mode() == wsrep::client_state::m_local &&
|
else if (client_state_.mode() == wsrep::client_state::m_local &&
|
||||||
is_streaming())
|
is_streaming())
|
||||||
{
|
{
|
||||||
streaming_context_.rolled_back(id_);
|
streaming_rollback();
|
||||||
enum wsrep::provider::status ret;
|
|
||||||
if ((ret = provider().rollback(id_)))
|
|
||||||
{
|
|
||||||
wsrep::log_warning()
|
|
||||||
<< "Failed to replicate rollback fragment for "
|
|
||||||
<< id_ << ": " << ret;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
server_service_.background_rollback(client_state_);
|
server_service_.background_rollback(client_state_);
|
||||||
@ -1339,29 +1341,27 @@ void wsrep::transaction::streaming_rollback()
|
|||||||
{
|
{
|
||||||
debug_log_state("streaming_rollback enter");
|
debug_log_state("streaming_rollback enter");
|
||||||
assert(state_ != s_must_replay);
|
assert(state_ != s_must_replay);
|
||||||
// assert(streaming_context_.rolled_back() == false);
|
|
||||||
assert(is_streaming());
|
assert(is_streaming());
|
||||||
|
|
||||||
if (bf_aborted_in_total_order_)
|
if (streaming_context_.rolled_back() == false)
|
||||||
{
|
{
|
||||||
client_state_.server_state_.stop_streaming_client(&client_state_);
|
if (bf_aborted_in_total_order_)
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Create a high priority applier which will handle the
|
|
||||||
// rollback fragment or clean up on configuration change.
|
|
||||||
// Adopt transaction will copy fragment set and appropriate
|
|
||||||
// meta data. Mark current transaction streaming context
|
|
||||||
// rolled back.
|
|
||||||
client_state_.server_state_.convert_streaming_client_to_applier(
|
|
||||||
&client_state_);
|
|
||||||
const bool was_rolled_back_for(streaming_context_.rolled_back());
|
|
||||||
streaming_context_.cleanup();
|
|
||||||
client_service_.debug_sync("wsrep_before_SR_rollback");
|
|
||||||
// Send a rollback fragment only if it was not sent before
|
|
||||||
// for this transaction.
|
|
||||||
if (was_rolled_back_for == false)
|
|
||||||
{
|
{
|
||||||
|
client_state_.server_state_.stop_streaming_client(&client_state_);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Create a high priority applier which will handle the
|
||||||
|
// rollback fragment or clean up on configuration change.
|
||||||
|
// Adopt transaction will copy fragment set and appropriate
|
||||||
|
// meta data. Mark current transaction streaming context
|
||||||
|
// rolled back.
|
||||||
|
client_state_.server_state_.convert_streaming_client_to_applier(
|
||||||
|
&client_state_);
|
||||||
|
|
||||||
|
streaming_context_.cleanup();
|
||||||
|
// Send a rollback fragment only if it was not sent before
|
||||||
|
// for this transaction.
|
||||||
streaming_context_.rolled_back(id_);
|
streaming_context_.rolled_back(id_);
|
||||||
enum wsrep::provider::status ret;
|
enum wsrep::provider::status ret;
|
||||||
if ((ret = provider().rollback(id_)))
|
if ((ret = provider().rollback(id_)))
|
||||||
|
@ -1219,9 +1219,15 @@ BOOST_FIXTURE_TEST_CASE(transaction_statement_streaming_cert_fail,
|
|||||||
sc.provider().certify_result_ = wsrep::provider::error_certification_failed;
|
sc.provider().certify_result_ = wsrep::provider::error_certification_failed;
|
||||||
BOOST_REQUIRE(cc.after_statement());
|
BOOST_REQUIRE(cc.after_statement());
|
||||||
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
BOOST_REQUIRE(cc.current_error() == wsrep::e_deadlock_error);
|
||||||
BOOST_REQUIRE(sc.provider().fragments() == 0);
|
// Note: Due to possible limitation in wsrep-API error codes
|
||||||
|
// or a bug in current Galera provider, rollback fragment may be
|
||||||
|
// replicated even in case of certification failure.
|
||||||
|
// If the limitation is lifted later on or the provider is fixed,
|
||||||
|
// the above check should be change for fragments == 0,
|
||||||
|
// rollback_fragments == 0.
|
||||||
|
BOOST_REQUIRE(sc.provider().fragments() == 1);
|
||||||
BOOST_REQUIRE(sc.provider().start_fragments() == 0);
|
BOOST_REQUIRE(sc.provider().start_fragments() == 0);
|
||||||
BOOST_REQUIRE(sc.provider().rollback_fragments() == 0);
|
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
Reference in New Issue
Block a user