1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-10-25 09:37:36 +03:00
- populate and pass real error description buffer to provider in case
   of applying error
 - return 0 from server_state::on_apply() if error voting confirmed
   consistency
 - remove fragments and rollback after fragment applying failure
 - always release streaming applier on commit or rollback
This commit is contained in:
Alexey Yurchenko
2019-04-30 19:00:48 +03:00
parent fd66bdef0b
commit 0f676bd893
22 changed files with 470 additions and 187 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2018 Codership Oy <info@codership.com>
* Copyright (C) 2018-2019 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
@@ -60,25 +60,86 @@ static void log_adopt_error(const wsrep::transaction& transaction)
<< "which may need to be removed manually.";
}
static int apply_fragment(wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service& streaming_applier,
// resolve which of the two errors return to caller
static inline int resolve_return_error(bool const vote,
int const vote_err,
int const apply_err)
{
if (vote) return vote_err;
return vote_err != 0 ? vote_err : apply_err;
}
static void
discard_streaming_applier(wsrep::server_state& server_state,
wsrep::high_priority_service* streaming_applier,
const wsrep::ws_meta& ws_meta)
{
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(
streaming_applier);
}
static int apply_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service* streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret;
int ret(0);
int apply_err;
wsrep::mutable_buffer err;
{
wsrep::high_priority_switch sw(high_priority_service,
streaming_applier);
*streaming_applier);
apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
if (!apply_err)
{
assert(err.size() == 0);
streaming_applier->after_apply();
}
else
{
bool const remove_fragments(streaming_applier->transaction(
).streaming_context().fragments().size() > 0);
ret = streaming_applier->rollback(ws_handle, ws_meta);
ret = ret || (streaming_applier->after_apply(), 0);
ret = streaming_applier.apply_write_set(ws_meta, data);
streaming_applier.after_apply();
if (remove_fragments)
{
ret = ret || streaming_applier->start_transaction(ws_handle,
ws_meta);
ret = ret || (streaming_applier->adopt_apply_error(err), 0);
ret = ret || streaming_applier->remove_fragments(ws_meta);
ret = ret || streaming_applier->commit(ws_handle, ws_meta);
ret = ret || (streaming_applier->after_apply(), 0);
}
else
{
ret = streaming_applier->log_dummy_write_set(ws_handle,
ws_meta, err);
}
}
}
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
ret = ret || high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data);
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
high_priority_service.after_apply();
if (!ret)
{
if (!apply_err)
{
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
ret = high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data);
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
ret = ret || (high_priority_service.after_apply(), 0);
}
else
{
discard_streaming_applier(server_state, streaming_applier,ws_meta);
ret = resolve_return_error(err.size() > 0, ret, apply_err);
}
}
return ret;
}
@@ -90,13 +151,27 @@ static int commit_fragment(wsrep::server_state& server_state,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret;
// Make high priority switch to go out of scope
// before the streaming applier is released.
int ret(0);
{
wsrep::high_priority_switch sw(
high_priority_service, *streaming_applier);
ret = streaming_applier->apply_write_set(ws_meta, data);
wsrep::mutable_buffer err;
int const apply_err(
streaming_applier->apply_write_set(ws_meta, data, err));
if (apply_err)
{
assert(streaming_applier->transaction(
).streaming_context().fragments().size() > 0);
ret = streaming_applier->rollback(ws_handle, ws_meta);
ret = ret || (streaming_applier->after_apply(), 0);
ret = ret || streaming_applier->start_transaction(
ws_handle, ws_meta);
ret = ret || (streaming_applier->adopt_apply_error(err),0);
}
else
{
assert(err.size() == 0);
}
streaming_applier->debug_crash(
"crash_apply_cb_before_fragment_removal");
ret = ret || streaming_applier->remove_fragments(ws_meta);
@@ -107,15 +182,15 @@ static int commit_fragment(wsrep::server_state& server_state,
ret = ret || streaming_applier->commit(ws_handle, ws_meta);
streaming_applier->debug_crash(
"crash_commit_cb_last_fragment_commit_success");
streaming_applier->after_apply();
ret = ret || (streaming_applier->after_apply(), 0);
ret = resolve_return_error(err.size() > 0, ret, apply_err);
}
if (ret == 0)
if (!ret)
{
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(
streaming_applier);
discard_streaming_applier(server_state, streaming_applier, ws_meta);
}
return ret;
}
@@ -129,6 +204,7 @@ static int rollback_fragment(wsrep::server_state& server_state,
// Adopts transaction state and starts a transaction for
// high priority service
int adopt_error;
bool remove_fragments(false);
if ((adopt_error = high_priority_service.adopt_transaction(
streaming_applier->transaction())))
{
@@ -137,25 +213,41 @@ static int rollback_fragment(wsrep::server_state& server_state,
// Even if the adopt above fails we roll back the streaming transaction.
// Adopt failure will leave stale entries in streaming log which can
// be removed manually.
wsrep::const_buffer no_error;
{
wsrep::high_priority_switch ws(
high_priority_service, *streaming_applier);
// Streaming applier rolls back out of order. Fragment
// removal grabs commit order below.
streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta());
streaming_applier->after_apply();
remove_fragments = streaming_applier->transaction().
streaming_context().fragments().size() > 0;
ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta());
ret = ret || (streaming_applier->after_apply(), 0);
}
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(
streaming_applier);
if (adopt_error == 0)
if (!ret)
{
high_priority_service.remove_fragments(ws_meta);
high_priority_service.commit(ws_handle, ws_meta);
discard_streaming_applier(server_state, streaming_applier, ws_meta);
if (adopt_error == 0)
{
if (remove_fragments)
{
ret = high_priority_service.remove_fragments(ws_meta);
ret = ret || high_priority_service.commit(ws_handle, ws_meta);
ret = ret || (high_priority_service.after_apply(), 0);
}
else
{
if (ws_meta.ordered())
{
wsrep::mutable_buffer no_error;
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta, no_error);
}
}
}
}
high_priority_service.after_apply();
return ret;
}
@@ -166,14 +258,14 @@ static int apply_write_set(wsrep::server_state& server_state,
const wsrep::const_buffer& data)
{
int ret(0);
// wsrep::log_info() << "apply_write_set: " << ws_meta;
if (wsrep::rolls_back_transaction(ws_meta.flags()))
{
wsrep::mutable_buffer no_error;
if (wsrep::starts_transaction(ws_meta.flags()))
{
// No transaction existed before, log a dummy write set
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
ws_handle, ws_meta, no_error);
}
else
{
@@ -196,7 +288,7 @@ static int apply_write_set(wsrep::server_state& server_state,
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id());
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
ws_handle, ws_meta, no_error);
}
else
{
@@ -212,14 +304,27 @@ static int apply_write_set(wsrep::server_state& server_state,
else if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()))
{
ret = high_priority_service.start_transaction(ws_handle, ws_meta) ||
high_priority_service.apply_write_set(ws_meta, data) ||
high_priority_service.commit(ws_handle, ws_meta);
if (ret)
ret = high_priority_service.start_transaction(ws_handle, ws_meta);
if (!ret)
{
high_priority_service.rollback(ws_handle, ws_meta);
wsrep::mutable_buffer err;
int const apply_err(high_priority_service.apply_write_set(
ws_meta, data, err));
if (!apply_err)
{
assert(err.size() == 0);
ret = high_priority_service.commit(ws_handle, ws_meta);
ret = ret || (high_priority_service.after_apply(), 0);
}
else
{
ret = high_priority_service.rollback(ws_handle, ws_meta);
ret = ret || (high_priority_service.after_apply(), 0);
ret = ret || high_priority_service.log_dummy_write_set(
ws_handle, ws_meta, err);
ret = resolve_return_error(err.size() > 0, ret, apply_err);
}
}
high_priority_service.after_apply();
}
else if (wsrep::starts_transaction(ws_meta.flags()))
{
@@ -231,8 +336,9 @@ static int apply_write_set(wsrep::server_state& server_state,
server_state.start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sa);
sa->start_transaction(ws_handle, ws_meta);
ret = apply_fragment(high_priority_service,
*sa,
ret = apply_fragment(server_state,
high_priority_service,
sa,
ws_handle,
ws_meta,
data);
@@ -252,13 +358,15 @@ static int apply_write_set(wsrep::server_state& server_state,
wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
wsrep::mutable_buffer no_error;
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
ws_handle, ws_meta, no_error);
}
else
{
ret = apply_fragment(high_priority_service,
*sa,
ret = apply_fragment(server_state,
high_priority_service,
sa,
ws_handle,
ws_meta,
data);
@@ -268,9 +376,10 @@ static int apply_write_set(wsrep::server_state& server_state,
{
if (high_priority_service.is_replaying())
{
wsrep::mutable_buffer unused;
ret = high_priority_service.start_transaction(
ws_handle, ws_meta) ||
high_priority_service.apply_write_set(ws_meta, data) ||
high_priority_service.apply_write_set(ws_meta, data, unused) ||
high_priority_service.commit(ws_handle, ws_meta);
}
else
@@ -289,8 +398,9 @@ static int apply_write_set(wsrep::server_state& server_state,
<< "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
wsrep::mutable_buffer no_error;
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
ws_handle, ws_meta, no_error);
}
else
{
@@ -327,14 +437,11 @@ static int apply_toi(wsrep::provider& provider,
//
// Regular TOI.
//
// Note that we ignore error returned by apply_toi
// call here. This must be revised after the error
// voting is added.
//
provider.commit_order_enter(ws_handle, ws_meta);
(void)high_priority_service.apply_toi(ws_meta, data);
provider.commit_order_leave(ws_handle, ws_meta);
return 0;
wsrep::mutable_buffer err;
int const apply_err(high_priority_service.apply_toi(ws_meta,data,err));
int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err));
return resolve_return_error(err.size() > 0, vote_err, apply_err);
}
else if (wsrep::starts_transaction(ws_meta.flags()))
{
@@ -987,9 +1094,9 @@ void wsrep::server_state::start_streaming_client(
wsrep::client_state* client_state)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_server_state,
"Start streaming client: " << client_state->id());
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
wsrep::log::debug_level_server_state,
"Start streaming client: " << client_state->id());
if (streaming_clients_.insert(
std::make_pair(client_state->id(), client_state)).second == false)
{