1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Fixes to streaming rollback

* Check fragment removal error code in prepare phase. It is possible
  that the transaction gets BF aborted during fragment removal.
* Mark fragment certified in certify_fragment() even if the provider
  returns cert failed error. With current wsrep-API error codes
  it may not be possible to distinquish certification failure
  and BF abort during fragment replication. This may also be a
  provider bug. As a result rollback fragment may sometimes be
  replicated when it would not be necessary.
This commit is contained in:
Teemu Ollakka
2018-07-17 14:34:24 +03:00
parent b054c7320c
commit b02200b1ef
8 changed files with 87 additions and 40 deletions

View File

@ -19,46 +19,29 @@ namespace db
client_service(db::client& client); client_service(db::client& client);
bool do_2pc() const override; bool do_2pc() const override;
bool interrupted() const override { return false; }
bool interrupted() const override
{
return false;
}
void reset_globals() override { } void reset_globals() override { }
void store_globals() override { } void store_globals() override { }
int prepare_data_for_replication() override int prepare_data_for_replication() override
{ {
return 0; return 0;
} }
void cleanup_transaction() override { } void cleanup_transaction() override { }
size_t bytes_generated() const override size_t bytes_generated() const override
{ {
return 0; return 0;
} }
bool statement_allowed_for_streaming() const override bool statement_allowed_for_streaming() const override
{ {
return true; return true;
} }
int prepare_fragment_for_replication(wsrep::mutable_buffer&) override int prepare_fragment_for_replication(wsrep::mutable_buffer&) override
{ {
return 0; return 0;
} }
int remove_fragments() override { return 0; }
void remove_fragments() override
{ }
int bf_rollback() override; int bf_rollback() override;
void will_replay() override { }
void will_replay() override
{ }
void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override { } void wait_for_replayers(wsrep::unique_lock<wsrep::mutex>&) override { }
enum wsrep::provider::status replay() enum wsrep::provider::status replay()
override; override;

View File

@ -73,8 +73,10 @@ namespace wsrep
* Remove fragments from the storage within current transaction. * Remove fragments from the storage within current transaction.
* Fragment removal will be committed once the current transaction * Fragment removal will be committed once the current transaction
* commits. * commits.
*
* @return Zero in case of success, non-zero on failure.
*/ */
virtual void remove_fragments() = 0; virtual int remove_fragments() = 0;
// //
// Rollback // Rollback

View File

@ -346,6 +346,7 @@ namespace wsrep
const wsrep::ws_meta& meta) const wsrep::ws_meta& meta)
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(mode_ == m_high_priority); assert(mode_ == m_high_priority);
return transaction_.start_transaction(wsh, meta); return transaction_.start_transaction(wsh, meta);
} }
@ -355,6 +356,7 @@ namespace wsrep
int before_prepare() int before_prepare()
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec); assert(state_ == s_exec);
return transaction_.before_prepare(lock); return transaction_.before_prepare(lock);
} }
@ -362,30 +364,35 @@ namespace wsrep
int after_prepare() int after_prepare()
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec); assert(state_ == s_exec);
return transaction_.after_prepare(lock); return transaction_.after_prepare(lock);
} }
int before_commit() int before_commit()
{ {
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local); assert(state_ == s_exec || mode_ == m_local);
return transaction_.before_commit(); return transaction_.before_commit();
} }
int ordered_commit() int ordered_commit()
{ {
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local); assert(state_ == s_exec || mode_ == m_local);
return transaction_.ordered_commit(); return transaction_.ordered_commit();
} }
int after_commit() int after_commit()
{ {
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec || mode_ == m_local); assert(state_ == s_exec || mode_ == m_local);
return transaction_.after_commit(); return transaction_.after_commit();
} }
/** @} */ /** @} */
int before_rollback() int before_rollback()
{ {
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_idle || assert(state_ == s_idle ||
state_ == s_exec || state_ == s_exec ||
state_ == s_result || state_ == s_result ||
@ -395,6 +402,7 @@ namespace wsrep
int after_rollback() int after_rollback()
{ {
assert(current_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_idle || assert(state_ == s_idle ||
state_ == s_exec || state_ == s_exec ||
state_ == s_result || state_ == s_result ||
@ -440,6 +448,7 @@ namespace wsrep
assert(mode_ == m_local || transaction_.is_streaming()); assert(mode_ == m_local || transaction_.is_streaming());
return transaction_.total_order_bf_abort(lock, bf_seqno); return transaction_.total_order_bf_abort(lock, bf_seqno);
} }
/** /**
* Adopt a streaming transaction state. This is must be * Adopt a streaming transaction state. This is must be
* called from high_priority_service::adopt_transaction() * called from high_priority_service::adopt_transaction()

View File

@ -174,6 +174,7 @@ namespace wsrep
* Provide a server level debug sync point for a caller. * Provide a server level debug sync point for a caller.
*/ */
virtual void debug_sync(const char* sync_point) = 0; virtual void debug_sync(const char* sync_point) = 0;
}; };
} }

View File

@ -67,6 +67,7 @@ int wsrep::client_state::before_command()
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
debug_log_state("before_command: enter"); debug_log_state("before_command: enter");
store_globals(); // Marks the control for this thread
assert(state_ == s_idle); assert(state_ == s_idle);
if (transaction_.active() && if (transaction_.active() &&
server_state_.rollback_mode() == wsrep::server_state::rm_sync) server_state_.rollback_mode() == wsrep::server_state::rm_sync)

View File

@ -272,6 +272,10 @@ static int apply_write_set(wsrep::server_state& server_state,
{ {
assert(0); assert(0);
} }
if (ret)
{
wsrep::log_info() << "Failed to apply write set: " << ws_meta;
}
return ret; return ret;
} }

View File

@ -253,6 +253,7 @@ int wsrep::transaction::after_row()
break; break;
} }
} }
debug_log_state("after_row_leave");
return 0; return 0;
} }
@ -291,7 +292,11 @@ int wsrep::transaction::before_prepare(
} }
else else
{ {
client_service_.remove_fragments(); ret = client_service_.remove_fragments();
if (ret)
{
client_state_.override_error(wsrep::e_deadlock_error);
}
} }
lock.lock(); lock.lock();
client_service_.debug_crash( client_service_.debug_crash(
@ -307,7 +312,7 @@ int wsrep::transaction::before_prepare(
break; break;
} }
assert(state() == s_preparing); assert(state() == s_preparing || (ret && state() == s_must_abort));
debug_log_state("before_prepare_leave"); debug_log_state("before_prepare_leave");
return ret; return ret;
} }
@ -861,6 +866,18 @@ bool wsrep::transaction::bf_abort(
client_state_.server_state().stop_streaming_applier( client_state_.server_state().stop_streaming_applier(
server_id_, id_); server_id_, id_);
} }
else if (client_state_.mode() == wsrep::client_state::m_local &&
is_streaming())
{
streaming_context_.rolled_back(id_);
enum wsrep::provider::status ret;
if ((ret = provider().rollback(id_)))
{
wsrep::log_warning()
<< "Failed to replicate rollback fragment for "
<< id_ << ": " << ret;
}
}
lock.unlock(); lock.unlock();
server_service_.background_rollback(client_state_); server_service_.background_rollback(client_state_);
} }
@ -994,6 +1011,7 @@ int wsrep::transaction::certify_fragment(
} }
int ret(0); int ret(0);
// Storage service scope // Storage service scope
enum wsrep::provider::status cert_ret(wsrep::provider::success);
{ {
scoped_storage_service<storage_service_deleter> scoped_storage_service<storage_service_deleter>
sr_scope( sr_scope(
@ -1032,11 +1050,10 @@ int wsrep::transaction::certify_fragment(
client_service_.debug_crash( client_service_.debug_crash(
"crash_replicate_fragment_before_certify"); "crash_replicate_fragment_before_certify");
wsrep::ws_meta sr_ws_meta; wsrep::ws_meta sr_ws_meta;
enum wsrep::provider::status cert_ret = provider().certify(client_state_.id(),
cert_ret(provider().certify(client_state_.id(), ws_handle_,
ws_handle_, flags(),
flags(), sr_ws_meta);
sr_ws_meta));
client_service_.debug_crash( client_service_.debug_crash(
"crash_replicate_fragment_after_certify"); "crash_replicate_fragment_after_certify");
switch (cert_ret) switch (cert_ret)
@ -1062,6 +1079,28 @@ int wsrep::transaction::certify_fragment(
client_service_.debug_crash( client_service_.debug_crash(
"crash_replicate_fragment_success"); "crash_replicate_fragment_success");
break; break;
case wsrep::provider::error_bf_abort:
case wsrep::provider::error_certification_failed:
// Streaming transcation got BF aborted, so it must roll
// back. Roll back the fragment storage operation out of
// order as the commit order will be grabbed later on
// during rollback process. Mark the fragment as certified
// though in streaming context in order to enter streaming
// rollback codepath.
//
// Note that despite we handle error_certification_failed
// here, we mark the transaction as streaming. Apparently
// the provider may return status corresponding to certification
// failure even if the fragment has passed certification.
// This may be a bug in provider implementation or a limitation
// of error codes defined in wsrep-API. In order to make
// sure that the transaction will be cleaned on other servers,
// we take a risk of sending one rollback fragment for nothing.
storage_service.rollback(wsrep::ws_handle(),
wsrep::ws_meta());
streaming_context_.certified(data.size());
ret = 1;
break;
default: default:
// Storage service rollback must be done out of order, // Storage service rollback must be done out of order,
// otherwise there may be a deadlock between BF aborter // otherwise there may be a deadlock between BF aborter
@ -1092,11 +1131,11 @@ int wsrep::transaction::certify_fragment(
{ {
state(lock, s_must_abort); state(lock, s_must_abort);
} }
client_state_.override_error(wsrep::e_deadlock_error); client_state_.override_error(wsrep::e_deadlock_error, cert_ret);
} }
else if (state_ == s_must_abort) else if (state_ == s_must_abort)
{ {
client_state_.override_error(wsrep::e_deadlock_error); client_state_.override_error(wsrep::e_deadlock_error, cert_ret);
} }
else else
{ {
@ -1300,7 +1339,7 @@ void wsrep::transaction::streaming_rollback()
{ {
debug_log_state("streaming_rollback enter"); debug_log_state("streaming_rollback enter");
assert(state_ != s_must_replay); assert(state_ != s_must_replay);
assert(streaming_context_.rolled_back() == false); // assert(streaming_context_.rolled_back() == false);
assert(is_streaming()); assert(is_streaming());
if (bf_aborted_in_total_order_) if (bf_aborted_in_total_order_)
@ -1316,14 +1355,21 @@ void wsrep::transaction::streaming_rollback()
// rolled back. // rolled back.
client_state_.server_state_.convert_streaming_client_to_applier( client_state_.server_state_.convert_streaming_client_to_applier(
&client_state_); &client_state_);
const bool was_rolled_back_for(streaming_context_.rolled_back());
streaming_context_.cleanup(); streaming_context_.cleanup();
streaming_context_.rolled_back(id_);
client_service_.debug_sync("wsrep_before_SR_rollback"); client_service_.debug_sync("wsrep_before_SR_rollback");
enum wsrep::provider::status ret; // Send a rollback fragment only if it was not sent before
if ((ret = provider().rollback(id_))) // for this transaction.
if (was_rolled_back_for == false)
{ {
wsrep::log_warning() << "Failed to replicate rollback fragment for " streaming_context_.rolled_back(id_);
<< id_ << ": " << ret; enum wsrep::provider::status ret;
if ((ret = provider().rollback(id_)))
{
wsrep::log_warning()
<< "Failed to replicate rollback fragment for "
<< id_ << ": " << ret;
}
} }
} }
debug_log_state("streaming_rollback leave"); debug_log_state("streaming_rollback leave");
@ -1366,8 +1412,8 @@ void wsrep::transaction::debug_log_state(
{ {
WSREP_TC_LOG_DEBUG( WSREP_TC_LOG_DEBUG(
1, context 1, context
<< "\n server: " << client_state_.server_state().id() << "\n server: " << server_id_
<< ", client: " << client_state_.id().get() << ", client: " << int64_t(client_state_.id().get())
<< ", state: " << wsrep::to_c_string(client_state_.state()) << ", state: " << wsrep::to_c_string(client_state_.state())
<< ", mode: " << wsrep::to_c_string(client_state_.mode()) << ", mode: " << wsrep::to_c_string(client_state_.mode())
<< "\n trx_id: " << int64_t(id_.get()) << "\n trx_id: " << int64_t(id_.get())
@ -1377,9 +1423,11 @@ void wsrep::transaction::debug_log_state(
<< " state: " << wsrep::to_c_string(state_) << " state: " << wsrep::to_c_string(state_)
<< ", bfa_state: " << wsrep::to_c_string(bf_abort_state_) << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_)
<< ", error: " << wsrep::to_c_string(client_state_.current_error()) << ", error: " << wsrep::to_c_string(client_state_.current_error())
<< ", status: " << client_state_.current_error_status()
<< "\n" << "\n"
<< " is_sr: " << is_streaming() << " is_sr: " << is_streaming()
<< ", frags: " << streaming_context_.fragments_certified() << ", frags: " << streaming_context_.fragments_certified()
<< ", frags size: " << streaming_context_.fragments().size()
<< ", unit: " << streaming_context_.fragment_unit() << ", unit: " << streaming_context_.fragment_unit()
<< ", size: " << streaming_context_.fragment_size() << ", size: " << streaming_context_.fragment_size()
<< ", counter: " << streaming_context_.unit_counter() << ", counter: " << streaming_context_.unit_counter()

View File

@ -78,8 +78,7 @@ namespace wsrep
int, int,
const wsrep::const_buffer&) WSREP_OVERRIDE const wsrep::const_buffer&) WSREP_OVERRIDE
{ return 0; } { return 0; }
void remove_fragments() int remove_fragments() WSREP_OVERRIDE { return 0; }
WSREP_OVERRIDE { }
void will_replay() WSREP_OVERRIDE { } void will_replay() WSREP_OVERRIDE { }
enum wsrep::provider::status enum wsrep::provider::status