mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-04-25 17:42:30 +03:00
for SYNCED event as expected. 2. Fix transition to `s_joined` only after we have a complete state. Complete state is reached in the following 3 cases: - SST seqno exceeds connected seqno - view seqno equals connected seqno (view processed == view connected) - current state is `s_donor` Refs codership/wsrep-lib#175
1609 lines
55 KiB
C++
1609 lines
55 KiB
C++
/*
|
|
* Copyright (C) 2018-2019 Codership Oy <info@codership.com>
|
|
*
|
|
* This file is part of wsrep-lib.
|
|
*
|
|
* Wsrep-lib is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Wsrep-lib is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "wsrep/server_state.hpp"
|
|
#include "wsrep/client_state.hpp"
|
|
#include "wsrep/server_service.hpp"
|
|
#include "wsrep/high_priority_service.hpp"
|
|
#include "wsrep/transaction.hpp"
|
|
#include "wsrep/view.hpp"
|
|
#include "wsrep/logger.hpp"
|
|
#include "wsrep/compiler.hpp"
|
|
#include "wsrep/id.hpp"
|
|
|
|
#include <cassert>
|
|
#include <sstream>
|
|
#include <algorithm>
|
|
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
// Helpers //
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
//
|
|
// This method is used to deal with historical burden of several
|
|
// ways to bootstrap the cluster. Bootstrap happens if
|
|
//
|
|
// * bootstrap option is given
|
|
// * cluster_address is "gcomm://" (Galera provider)
|
|
//
|
|
static bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
|
|
{
|
|
return (bootstrap || cluster_address == "gcomm://");
|
|
}
|
|
|
|
// Helper method to provide detailed error message if transaction
|
|
// adopt for fragment removal fails.
|
|
static void log_adopt_error(const wsrep::transaction& transaction)
|
|
{
|
|
wsrep::log_warning() << "Adopting a transaction ("
|
|
<< transaction.server_id() << "," << transaction.id()
|
|
<< ") for rollback failed, "
|
|
<< "this may leave stale entries to streaming log "
|
|
<< "which may need to be removed manually.";
|
|
}
|
|
|
|
// resolve which of the two errors return to caller
|
|
static inline int resolve_return_error(bool const vote,
|
|
int const vote_err,
|
|
int const apply_err)
|
|
{
|
|
if (vote) return vote_err;
|
|
return vote_err != 0 ? vote_err : apply_err;
|
|
}
|
|
|
|
static void
|
|
discard_streaming_applier(wsrep::server_state& server_state,
|
|
wsrep::high_priority_service& high_priority_service,
|
|
wsrep::high_priority_service* streaming_applier,
|
|
const wsrep::ws_meta& ws_meta)
|
|
{
|
|
server_state.stop_streaming_applier(
|
|
ws_meta.server_id(), ws_meta.transaction_id());
|
|
server_state.server_service().release_high_priority_service(
|
|
streaming_applier);
|
|
high_priority_service.store_globals();
|
|
}
|
|
|
|
static int apply_fragment(wsrep::server_state& server_state,
|
|
wsrep::high_priority_service& high_priority_service,
|
|
wsrep::high_priority_service* streaming_applier,
|
|
const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data)
|
|
{
|
|
int ret(0);
|
|
int apply_err;
|
|
wsrep::mutable_buffer err;
|
|
{
|
|
wsrep::high_priority_switch sw(high_priority_service,
|
|
*streaming_applier);
|
|
apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
|
|
if (!apply_err)
|
|
{
|
|
assert(err.size() == 0);
|
|
streaming_applier->after_apply();
|
|
}
|
|
else
|
|
{
|
|
bool const remove_fragments(streaming_applier->transaction(
|
|
).streaming_context().fragments().size() > 0);
|
|
ret = streaming_applier->rollback(ws_handle, ws_meta);
|
|
ret = ret || (streaming_applier->after_apply(), 0);
|
|
|
|
if (remove_fragments)
|
|
{
|
|
ret = ret || streaming_applier->start_transaction(ws_handle,
|
|
ws_meta);
|
|
ret = ret || (streaming_applier->adopt_apply_error(err), 0);
|
|
ret = ret || streaming_applier->remove_fragments(ws_meta);
|
|
ret = ret || streaming_applier->commit(ws_handle, ws_meta);
|
|
ret = ret || (streaming_applier->after_apply(), 0);
|
|
}
|
|
else
|
|
{
|
|
ret = streaming_applier->log_dummy_write_set(ws_handle,
|
|
ws_meta, err);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!ret)
|
|
{
|
|
if (!apply_err)
|
|
{
|
|
high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
|
|
const wsrep::xid xid(streaming_applier->transaction().xid());
|
|
ret = high_priority_service.append_fragment_and_commit(
|
|
ws_handle, ws_meta, data, xid);
|
|
high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
|
|
ret = ret || (high_priority_service.after_apply(), 0);
|
|
}
|
|
else
|
|
{
|
|
discard_streaming_applier(server_state,
|
|
high_priority_service,
|
|
streaming_applier,
|
|
ws_meta);
|
|
ret = resolve_return_error(err.size() > 0, ret, apply_err);
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int commit_fragment(wsrep::server_state& server_state,
|
|
wsrep::high_priority_service& high_priority_service,
|
|
wsrep::high_priority_service* streaming_applier,
|
|
const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data)
|
|
{
|
|
int ret(0);
|
|
{
|
|
wsrep::high_priority_switch sw(
|
|
high_priority_service, *streaming_applier);
|
|
wsrep::mutable_buffer err;
|
|
int const apply_err(
|
|
streaming_applier->apply_write_set(ws_meta, data, err));
|
|
if (apply_err)
|
|
{
|
|
assert(streaming_applier->transaction(
|
|
).streaming_context().fragments().size() > 0);
|
|
ret = streaming_applier->rollback(ws_handle, ws_meta);
|
|
ret = ret || (streaming_applier->after_apply(), 0);
|
|
ret = ret || streaming_applier->start_transaction(
|
|
ws_handle, ws_meta);
|
|
ret = ret || (streaming_applier->adopt_apply_error(err),0);
|
|
}
|
|
else
|
|
{
|
|
assert(err.size() == 0);
|
|
}
|
|
|
|
const wsrep::transaction& trx(streaming_applier->transaction());
|
|
// Fragment removal for XA is going to happen in after_commit
|
|
if (trx.state() != wsrep::transaction::s_prepared)
|
|
{
|
|
streaming_applier->debug_crash(
|
|
"crash_apply_cb_before_fragment_removal");
|
|
|
|
ret = ret || streaming_applier->remove_fragments(ws_meta);
|
|
|
|
streaming_applier->debug_crash(
|
|
"crash_apply_cb_after_fragment_removal");
|
|
}
|
|
|
|
streaming_applier->debug_crash(
|
|
"crash_commit_cb_before_last_fragment_commit");
|
|
ret = ret || streaming_applier->commit(ws_handle, ws_meta);
|
|
streaming_applier->debug_crash(
|
|
"crash_commit_cb_last_fragment_commit_success");
|
|
ret = ret || (streaming_applier->after_apply(), 0);
|
|
ret = resolve_return_error(err.size() > 0, ret, apply_err);
|
|
}
|
|
|
|
if (!ret)
|
|
{
|
|
discard_streaming_applier(server_state, high_priority_service,
|
|
streaming_applier, ws_meta);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int rollback_fragment(wsrep::server_state& server_state,
|
|
wsrep::high_priority_service& high_priority_service,
|
|
wsrep::high_priority_service* streaming_applier,
|
|
const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta)
|
|
{
|
|
int ret(0);
|
|
int adopt_error(0);
|
|
bool const remove_fragments(streaming_applier->transaction().
|
|
streaming_context().fragments().size() > 0);
|
|
// If fragment removal is needed, adopt transaction state
|
|
// and start a transaction for it.
|
|
if (remove_fragments &&
|
|
(adopt_error = high_priority_service.adopt_transaction(
|
|
streaming_applier->transaction())))
|
|
{
|
|
log_adopt_error(streaming_applier->transaction());
|
|
}
|
|
// Even if the adopt above fails we roll back the streaming transaction.
|
|
// Adopt failure will leave stale entries in streaming log which can
|
|
// be removed manually.
|
|
wsrep::const_buffer no_error;
|
|
{
|
|
wsrep::high_priority_switch ws(
|
|
high_priority_service, *streaming_applier);
|
|
// Streaming applier rolls back out of order. Fragment
|
|
// removal grabs commit order below.
|
|
ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta());
|
|
ret = ret || (streaming_applier->after_apply(), 0);
|
|
}
|
|
|
|
if (!ret)
|
|
{
|
|
discard_streaming_applier(server_state, high_priority_service,
|
|
streaming_applier, ws_meta);
|
|
|
|
if (adopt_error == 0)
|
|
{
|
|
if (remove_fragments)
|
|
{
|
|
ret = high_priority_service.remove_fragments(ws_meta);
|
|
ret = ret || high_priority_service.commit(ws_handle, ws_meta);
|
|
ret = ret || (high_priority_service.after_apply(), 0);
|
|
}
|
|
else
|
|
{
|
|
if (ws_meta.ordered())
|
|
{
|
|
wsrep::mutable_buffer no_error;
|
|
ret = high_priority_service.log_dummy_write_set(
|
|
ws_handle, ws_meta, no_error);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
static int apply_write_set(wsrep::server_state& server_state,
|
|
wsrep::high_priority_service& high_priority_service,
|
|
const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data)
|
|
{
|
|
int ret(0);
|
|
if (wsrep::rolls_back_transaction(ws_meta.flags()))
|
|
{
|
|
wsrep::mutable_buffer no_error;
|
|
if (wsrep::starts_transaction(ws_meta.flags()))
|
|
{
|
|
// No transaction existed before, log a dummy write set
|
|
ret = high_priority_service.log_dummy_write_set(
|
|
ws_handle, ws_meta, no_error);
|
|
}
|
|
else
|
|
{
|
|
wsrep::high_priority_service* sa(
|
|
server_state.find_streaming_applier(
|
|
ws_meta.server_id(), ws_meta.transaction_id()));
|
|
if (sa == 0)
|
|
{
|
|
// It is a known limitation that galera provider
|
|
// cannot always determine if certification test
|
|
// for interrupted transaction will pass or fail
|
|
// (see comments in transaction::certify_fragment()).
|
|
// As a consequence, unnecessary rollback fragments
|
|
// may be delivered here. The message below has
|
|
// been intentionally turned into a debug message,
|
|
// rather than warning.
|
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
|
wsrep::log::debug_level_server_state,
|
|
"Could not find applier context for "
|
|
<< ws_meta.server_id()
|
|
<< ": " << ws_meta.transaction_id());
|
|
ret = high_priority_service.log_dummy_write_set(
|
|
ws_handle, ws_meta, no_error);
|
|
}
|
|
else
|
|
{
|
|
// rollback_fragment() consumes sa
|
|
ret = rollback_fragment(server_state,
|
|
high_priority_service,
|
|
sa,
|
|
ws_handle,
|
|
ws_meta);
|
|
}
|
|
}
|
|
}
|
|
else if (wsrep::starts_transaction(ws_meta.flags()) &&
|
|
wsrep::commits_transaction(ws_meta.flags()))
|
|
{
|
|
ret = high_priority_service.start_transaction(ws_handle, ws_meta);
|
|
if (!ret)
|
|
{
|
|
wsrep::mutable_buffer err;
|
|
int const apply_err(high_priority_service.apply_write_set(
|
|
ws_meta, data, err));
|
|
if (!apply_err)
|
|
{
|
|
assert(err.size() == 0);
|
|
ret = high_priority_service.commit(ws_handle, ws_meta);
|
|
ret = ret || (high_priority_service.after_apply(), 0);
|
|
}
|
|
else
|
|
{
|
|
ret = high_priority_service.rollback(ws_handle, ws_meta);
|
|
ret = ret || (high_priority_service.after_apply(), 0);
|
|
ret = ret || high_priority_service.log_dummy_write_set(
|
|
ws_handle, ws_meta, err);
|
|
ret = resolve_return_error(err.size() > 0, ret, apply_err);
|
|
}
|
|
}
|
|
}
|
|
else if (wsrep::starts_transaction(ws_meta.flags()))
|
|
{
|
|
assert(server_state.find_streaming_applier(
|
|
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
|
|
wsrep::high_priority_service* sa(
|
|
server_state.server_service().streaming_applier_service(
|
|
high_priority_service));
|
|
server_state.start_streaming_applier(
|
|
ws_meta.server_id(), ws_meta.transaction_id(), sa);
|
|
sa->start_transaction(ws_handle, ws_meta);
|
|
ret = apply_fragment(server_state,
|
|
high_priority_service,
|
|
sa,
|
|
ws_handle,
|
|
ws_meta,
|
|
data);
|
|
}
|
|
else if (ws_meta.flags() == 0 || ws_meta.flags() == wsrep::provider::flag::pa_unsafe ||
|
|
wsrep::prepares_transaction(ws_meta.flags()))
|
|
{
|
|
wsrep::high_priority_service* sa(
|
|
server_state.find_streaming_applier(
|
|
ws_meta.server_id(), ws_meta.transaction_id()));
|
|
if (sa == 0)
|
|
{
|
|
// It is possible that rapid group membership changes
|
|
// may cause streaming transaction be rolled back before
|
|
// commit fragment comes in. Although this is a valid
|
|
// situation, log a warning if a sac cannot be found as
|
|
// it may be an indication of a bug too.
|
|
wsrep::log_warning() << "Could not find applier context for "
|
|
<< ws_meta.server_id()
|
|
<< ": " << ws_meta.transaction_id();
|
|
wsrep::mutable_buffer no_error;
|
|
ret = high_priority_service.log_dummy_write_set(
|
|
ws_handle, ws_meta, no_error);
|
|
}
|
|
else
|
|
{
|
|
sa->next_fragment(ws_meta);
|
|
ret = apply_fragment(server_state,
|
|
high_priority_service,
|
|
sa,
|
|
ws_handle,
|
|
ws_meta,
|
|
data);
|
|
}
|
|
}
|
|
else if (wsrep::commits_transaction(ws_meta.flags()))
|
|
{
|
|
if (high_priority_service.is_replaying())
|
|
{
|
|
wsrep::mutable_buffer unused;
|
|
ret = high_priority_service.start_transaction(
|
|
ws_handle, ws_meta) ||
|
|
high_priority_service.apply_write_set(ws_meta, data, unused) ||
|
|
high_priority_service.commit(ws_handle, ws_meta);
|
|
}
|
|
else
|
|
{
|
|
wsrep::high_priority_service* sa(
|
|
server_state.find_streaming_applier(
|
|
ws_meta.server_id(), ws_meta.transaction_id()));
|
|
if (sa == 0)
|
|
{
|
|
// It is possible that rapid group membership changes
|
|
// may cause streaming transaction be rolled back before
|
|
// commit fragment comes in. Although this is a valid
|
|
// situation, log a warning if a sac cannot be found as
|
|
// it may be an indication of a bug too.
|
|
wsrep::log_warning()
|
|
<< "Could not find applier context for "
|
|
<< ws_meta.server_id()
|
|
<< ": " << ws_meta.transaction_id();
|
|
wsrep::mutable_buffer no_error;
|
|
ret = high_priority_service.log_dummy_write_set(
|
|
ws_handle, ws_meta, no_error);
|
|
}
|
|
else
|
|
{
|
|
// Commit fragment consumes sa
|
|
sa->next_fragment(ws_meta);
|
|
ret = commit_fragment(server_state,
|
|
high_priority_service,
|
|
sa,
|
|
ws_handle,
|
|
ws_meta,
|
|
data);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
assert(0);
|
|
}
|
|
if (ret)
|
|
{
|
|
wsrep::log_error() << "Failed to apply write set: " << ws_meta;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
static int apply_toi(wsrep::provider& provider,
|
|
wsrep::high_priority_service& high_priority_service,
|
|
const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data)
|
|
{
|
|
if (wsrep::starts_transaction(ws_meta.flags()) &&
|
|
wsrep::commits_transaction(ws_meta.flags()))
|
|
{
|
|
//
|
|
// Regular TOI.
|
|
//
|
|
provider.commit_order_enter(ws_handle, ws_meta);
|
|
wsrep::mutable_buffer err;
|
|
int const apply_err(high_priority_service.apply_toi(ws_meta,data,err));
|
|
int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err));
|
|
return resolve_return_error(err.size() > 0, vote_err, apply_err);
|
|
}
|
|
else if (wsrep::starts_transaction(ws_meta.flags()))
|
|
{
|
|
provider.commit_order_enter(ws_handle, ws_meta);
|
|
wsrep::mutable_buffer err;
|
|
int const apply_err(high_priority_service.apply_nbo_begin(ws_meta, data, err));
|
|
int const vote_err(provider.commit_order_leave(ws_handle, ws_meta, err));
|
|
return resolve_return_error(err.size() > 0, vote_err, apply_err);
|
|
}
|
|
else if (wsrep::commits_transaction(ws_meta.flags()))
|
|
{
|
|
// NBO end event is ignored here, both local and applied
|
|
// have NBO end handled via local TOI calls.
|
|
provider.commit_order_enter(ws_handle, ws_meta);
|
|
wsrep::mutable_buffer err;
|
|
provider.commit_order_leave(ws_handle, ws_meta, err);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
assert(0);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
// Server State //
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
|
int wsrep::server_state::load_provider(
|
|
const std::string& provider_spec, const std::string& provider_options,
|
|
const wsrep::provider::services& services)
|
|
{
|
|
wsrep::log_info() << "Loading provider " << provider_spec
|
|
<< " initial position: " << initial_position_;
|
|
|
|
provider_ = wsrep::provider::make_provider(*this,
|
|
provider_spec,
|
|
provider_options,
|
|
services);
|
|
return (provider_ ? 0 : 1);
|
|
}
|
|
|
|
void wsrep::server_state::unload_provider()
|
|
{
|
|
delete provider_;
|
|
provider_ = 0;
|
|
}
|
|
|
|
int wsrep::server_state::connect(const std::string& cluster_name,
|
|
const std::string& cluster_address,
|
|
const std::string& state_donor,
|
|
bool bootstrap)
|
|
{
|
|
bootstrap_ = is_bootstrap(cluster_address, bootstrap);
|
|
wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_;
|
|
return provider().connect(cluster_name, cluster_address, state_donor,
|
|
bootstrap_);
|
|
}
|
|
|
|
int wsrep::server_state::disconnect()
|
|
{
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
// In case of failure situations which are caused by provider
|
|
// being shut down some failing operation may also try to shut
|
|
// down the replication. Check the state here and
|
|
// return success if the provider disconnect is already in progress
|
|
// or has completed.
|
|
if (state(lock) == s_disconnecting || state(lock) == s_disconnected)
|
|
{
|
|
return 0;
|
|
}
|
|
state(lock, s_disconnecting);
|
|
interrupt_state_waiters(lock);
|
|
}
|
|
return provider().disconnect();
|
|
}
|
|
|
|
wsrep::server_state::~server_state()
|
|
{
|
|
delete provider_;
|
|
}
|
|
|
|
std::vector<wsrep::provider::status_variable>
|
|
wsrep::server_state::status() const
|
|
{
|
|
return provider().status();
|
|
}
|
|
|
|
|
|
wsrep::seqno wsrep::server_state::pause()
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
// Disallow concurrent calls to pause to in order to have non-concurrent
|
|
// access to desynced_on_pause_ which is checked in resume() call.
|
|
wsrep::log_info() << "pause";
|
|
while (pause_count_ > 0)
|
|
{
|
|
cond_.wait(lock);
|
|
}
|
|
++pause_count_;
|
|
assert(pause_seqno_.is_undefined());
|
|
lock.unlock();
|
|
pause_seqno_ = provider().pause();
|
|
lock.lock();
|
|
if (pause_seqno_.is_undefined())
|
|
{
|
|
--pause_count_;
|
|
}
|
|
return pause_seqno_;
|
|
}
|
|
|
|
void wsrep::server_state::resume()
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
wsrep::log_info() << "resume";
|
|
assert(pause_seqno_.is_undefined() == false);
|
|
assert(pause_count_ == 1);
|
|
if (provider().resume())
|
|
{
|
|
throw wsrep::runtime_error("Failed to resume provider");
|
|
}
|
|
pause_seqno_ = wsrep::seqno::undefined();
|
|
--pause_count_;
|
|
cond_.notify_all();
|
|
}
|
|
|
|
wsrep::seqno wsrep::server_state::desync_and_pause()
|
|
{
|
|
wsrep::log_info() << "Desyncing and pausing the provider";
|
|
// Temporary variable to store desync() return status. This will be
|
|
// assigned to desynced_on_pause_ after pause() call to prevent
|
|
// concurrent access to member variable desynced_on_pause_.
|
|
bool desync_successful;
|
|
if (desync())
|
|
{
|
|
// Desync may give transient error if the provider cannot
|
|
// communicate with the rest of the cluster. However, this
|
|
// error can be tolerated because if the provider can be
|
|
// paused successfully below.
|
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
|
wsrep::log::debug_level_server_state,
|
|
"Failed to desync server before pause");
|
|
desync_successful = false;
|
|
}
|
|
else
|
|
{
|
|
desync_successful = true;
|
|
}
|
|
wsrep::seqno ret(pause());
|
|
if (ret.is_undefined())
|
|
{
|
|
wsrep::log_warning() << "Failed to pause provider";
|
|
resync();
|
|
return wsrep::seqno::undefined();
|
|
}
|
|
else
|
|
{
|
|
desynced_on_pause_ = desync_successful;
|
|
}
|
|
wsrep::log_info() << "Provider paused at: " << ret;
|
|
return ret;
|
|
}
|
|
|
|
void wsrep::server_state::resume_and_resync()
|
|
{
|
|
wsrep::log_info() << "Resuming and resyncing the provider";
|
|
try
|
|
{
|
|
// Assign desynced_on_pause_ to local variable before resuming
|
|
// in order to avoid concurrent access to desynced_on_pause_ member
|
|
// variable.
|
|
bool do_resync = desynced_on_pause_;
|
|
desynced_on_pause_ = false;
|
|
resume();
|
|
if (do_resync)
|
|
{
|
|
resync();
|
|
}
|
|
}
|
|
catch (const wsrep::runtime_error& e)
|
|
{
|
|
wsrep::log_warning()
|
|
<< "Resume and resync failed, server may have to be restarted";
|
|
}
|
|
}
|
|
|
|
std::string wsrep::server_state::prepare_for_sst()
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
state(lock, s_joiner);
|
|
lock.unlock();
|
|
return server_service_.sst_request();
|
|
}
|
|
|
|
int wsrep::server_state::start_sst(const std::string& sst_request,
|
|
const wsrep::gtid& gtid,
|
|
bool bypass)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
state(lock, s_donor);
|
|
int ret(0);
|
|
lock.unlock();
|
|
if (server_service_.start_sst(sst_request, gtid, bypass))
|
|
{
|
|
lock.lock();
|
|
wsrep::log_warning() << "SST preparation failed";
|
|
// v26 API does not have JOINED event, so in anticipation of SYNCED
|
|
// we must do it here.
|
|
state(lock, s_joined);
|
|
ret = 1;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error)
|
|
{
|
|
if (0 == error)
|
|
wsrep::log_info() << "SST sent: " << gtid;
|
|
else
|
|
wsrep::log_info() << "SST sending failed: " << error;
|
|
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
// v26 API does not have JOINED event, so in anticipation of SYNCED
|
|
// we must do it here.
|
|
state(lock, s_joined);
|
|
lock.unlock();
|
|
enum provider::status const retval(provider().sst_sent(gtid, error));
|
|
if (retval != provider::success)
|
|
{
|
|
std::string msg("wsrep::sst_sent() returned an error: ");
|
|
msg += wsrep::provider::to_string(retval);
|
|
server_service_.log_message(wsrep::log::warning, msg.c_str());
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::sst_received(wsrep::client_service& cs,
|
|
int const error)
|
|
{
|
|
wsrep::log_info() << "SST received";
|
|
wsrep::gtid gtid(wsrep::gtid::undefined());
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
assert(state_ == s_joiner || state_ == s_initialized);
|
|
|
|
// Run initialization only if the SST was successful.
|
|
// In case of SST failure the system is in undefined state
|
|
// may not be recoverable.
|
|
if (error == 0)
|
|
{
|
|
if (server_service_.sst_before_init())
|
|
{
|
|
if (init_initialized_ == false)
|
|
{
|
|
state(lock, s_initializing);
|
|
lock.unlock();
|
|
server_service_.debug_sync("on_view_wait_initialized");
|
|
lock.lock();
|
|
wait_until_state(lock, s_initialized);
|
|
assert(init_initialized_);
|
|
}
|
|
}
|
|
lock.unlock();
|
|
|
|
if (id_.is_undefined())
|
|
{
|
|
assert(0);
|
|
throw wsrep::runtime_error(
|
|
"wsrep::sst_received() called before connection to cluster");
|
|
}
|
|
|
|
gtid = server_service_.get_position(cs);
|
|
wsrep::log_info() << "Recovered position from storage: " << gtid;
|
|
|
|
lock.lock();
|
|
if (gtid.seqno() >= connected_gtid().seqno())
|
|
{
|
|
/* Now the node has all the data the cluster has: part in
|
|
* storage, part in replication event queue. */
|
|
state(lock, s_joined);
|
|
}
|
|
lock.unlock();
|
|
|
|
wsrep::view const v(server_service_.get_view(cs, id_));
|
|
wsrep::log_info() << "Recovered view from SST:\n" << v;
|
|
|
|
/*
|
|
* If the state id from recovered view has undefined ID, we may
|
|
* be upgrading from earlier version which does not provide
|
|
* view stored in stable storage. In this case we skip
|
|
* sanity checks and assigning the current view and wait
|
|
* until the first view delivery.
|
|
*/
|
|
if (v.state_id().id().is_undefined() == false)
|
|
{
|
|
if (v.state_id().id() != gtid.id() ||
|
|
v.state_id().seqno() > gtid.seqno())
|
|
{
|
|
/* Since IN GENERAL we may not be able to recover SST GTID from
|
|
* the state data, we have to rely on SST script passing the
|
|
* GTID value explicitly.
|
|
* Here we check if the passed GTID makes any sense: it should
|
|
* have the same UUID and greater or equal seqno than the last
|
|
* logged view. */
|
|
std::ostringstream msg;
|
|
msg << "SST script passed bogus GTID: " << gtid
|
|
<< ". Preceding view GTID: " << v.state_id();
|
|
throw wsrep::runtime_error(msg.str());
|
|
}
|
|
|
|
if (current_view_.status() == wsrep::view::primary)
|
|
{
|
|
previous_primary_view_ = current_view_;
|
|
}
|
|
current_view_ = v;
|
|
server_service_.log_view(NULL /* this view is stored already */, v);
|
|
}
|
|
else
|
|
{
|
|
wsrep::log_warning()
|
|
<< "View recovered from stable storage was empty. If the "
|
|
<< "server is doing rolling upgrade from previous version "
|
|
<< "which does not support storing view info into stable "
|
|
<< "storage, this is ok. Otherwise this may be a sign of "
|
|
<< "malfunction.";
|
|
}
|
|
lock.lock();
|
|
recover_streaming_appliers_if_not_recovered(lock, cs);
|
|
lock.unlock();
|
|
}
|
|
|
|
enum provider::status const retval(provider().sst_received(gtid, error));
|
|
if (retval != provider::success)
|
|
{
|
|
std::string msg("wsrep::sst_received() failed: ");
|
|
msg += wsrep::provider::to_string(retval);
|
|
throw wsrep::runtime_error(msg);
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::initialized()
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
wsrep::log_info() << "Server initialized";
|
|
init_initialized_ = true;
|
|
if (server_service_.sst_before_init())
|
|
{
|
|
state(lock, s_initialized);
|
|
}
|
|
else
|
|
{
|
|
state(lock, s_initializing);
|
|
state(lock, s_initialized);
|
|
}
|
|
}
|
|
|
|
enum wsrep::provider::status
|
|
wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
|
|
const
|
|
{
|
|
return provider().wait_for_gtid(gtid, timeout);
|
|
}
|
|
|
|
int
|
|
wsrep::server_state::set_encryption_key(std::vector<unsigned char>& key)
|
|
{
|
|
encryption_key_ = key;
|
|
if (state_ != s_disconnected)
|
|
{
|
|
wsrep::const_buffer const key(encryption_key_.data(),
|
|
encryption_key_.size());
|
|
enum provider::status const retval(provider_->enc_set_key(key));
|
|
if (retval != provider::success)
|
|
{
|
|
wsrep::log_error() << "Failed to set encryption key: "
|
|
<< provider::to_string(retval);
|
|
return 1;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
std::pair<wsrep::gtid, enum wsrep::provider::status>
|
|
wsrep::server_state::causal_read(int timeout) const
|
|
{
|
|
return provider().causal_read(timeout);
|
|
}
|
|
|
|
void wsrep::server_state::on_connect(const wsrep::view& view)
|
|
{
|
|
// Sanity checks
|
|
if (view.own_index() < 0 ||
|
|
size_t(view.own_index()) >= view.members().size())
|
|
{
|
|
std::ostringstream os;
|
|
os << "Invalid view on connect: own index out of range: " << view;
|
|
#ifndef NDEBUG
|
|
wsrep::log_error() << os.str();
|
|
assert(0);
|
|
#endif
|
|
throw wsrep::runtime_error(os.str());
|
|
}
|
|
|
|
const size_t own_index(static_cast<size_t>(view.own_index()));
|
|
if (id_.is_undefined() == false && id_ != view.members()[own_index].id())
|
|
{
|
|
std::ostringstream os;
|
|
os << "Connection in connected state.\n"
|
|
<< "Connected view:\n" << view
|
|
<< "Previous view:\n" << current_view_
|
|
<< "Current own ID: " << id_;
|
|
#ifndef NDEBUG
|
|
wsrep::log_error() << os.str();
|
|
assert(0);
|
|
#endif
|
|
throw wsrep::runtime_error(os.str());
|
|
}
|
|
else
|
|
{
|
|
id_ = view.members()[own_index].id();
|
|
}
|
|
|
|
wsrep::log_info() << "Server "
|
|
<< name_
|
|
<< " connected to cluster at position "
|
|
<< view.state_id()
|
|
<< " with ID "
|
|
<< id_;
|
|
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
connected_gtid_ = view.state_id();
|
|
state(lock, s_connected);
|
|
}
|
|
|
|
void wsrep::server_state::on_primary_view(
|
|
const wsrep::view& view,
|
|
wsrep::high_priority_service* high_priority_service)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
assert(view.final() == false);
|
|
//
|
|
// Reached primary from connected state. This may mean the following
|
|
//
|
|
// 1) Server was joined to the cluster and got SST transfer
|
|
// 2) Server was partitioned from the cluster and got back
|
|
// 3) A new cluster was bootstrapped from non-prim cluster
|
|
//
|
|
// There is no enough information here what was the cause
|
|
// of the primary component, so we need to walk through
|
|
// all states leading to joined to notify possible state
|
|
// waiters in other threads.
|
|
//
|
|
if (server_service_.sst_before_init())
|
|
{
|
|
if (state_ == s_connected)
|
|
{
|
|
state(lock, s_joiner);
|
|
// We need to assign init_initialized_ here to local
|
|
// variable. If the value here was false, we need to skip
|
|
// the initializing -> initialized -> joined state cycle
|
|
// below. However, if we don't assign the value to
|
|
// local, it is possible that the main thread gets control
|
|
// between changing the state to initializing and checking
|
|
// initialized flag, which may cause the initialzing -> initialized
|
|
// state change to be executed even if it should not be.
|
|
const bool was_initialized(init_initialized_);
|
|
state(lock, s_initializing);
|
|
if (was_initialized)
|
|
{
|
|
// If server side has already been initialized,
|
|
// skip directly to s_joined.
|
|
state(lock, s_initialized);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (state_ == s_connected)
|
|
{
|
|
state(lock, s_joiner);
|
|
}
|
|
}
|
|
if (init_initialized_ == false)
|
|
{
|
|
lock.unlock();
|
|
server_service_.debug_sync("on_view_wait_initialized");
|
|
lock.lock();
|
|
wait_until_state(lock, s_initialized);
|
|
}
|
|
assert(init_initialized_);
|
|
|
|
if (bootstrap_)
|
|
{
|
|
server_service_.bootstrap();
|
|
bootstrap_ = false;
|
|
}
|
|
|
|
assert(high_priority_service);
|
|
|
|
if (high_priority_service)
|
|
{
|
|
recover_streaming_appliers_if_not_recovered(lock,
|
|
*high_priority_service);
|
|
close_orphaned_sr_transactions(lock, *high_priority_service);
|
|
}
|
|
|
|
if (state(lock) < s_joined &&
|
|
view.state_id().seqno() >= connected_gtid().seqno())
|
|
{
|
|
// If we progressed beyond connected seqno, it means we have full state
|
|
state(lock, s_joined);
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::on_non_primary_view(
|
|
const wsrep::view& view,
|
|
wsrep::high_priority_service* high_priority_service)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
wsrep::log_info() << "Non-primary view";
|
|
if (view.final())
|
|
{
|
|
go_final(lock, view, high_priority_service);
|
|
}
|
|
else if (state_ != s_disconnecting)
|
|
{
|
|
state(lock, s_connected);
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::go_final(wsrep::unique_lock<wsrep::mutex>& lock,
|
|
const wsrep::view& view,
|
|
wsrep::high_priority_service* hps)
|
|
{
|
|
(void)view; // avoid compiler warning "unused parameter 'view'"
|
|
assert(view.final());
|
|
assert(hps);
|
|
if (hps)
|
|
{
|
|
close_transactions_at_disconnect(*hps);
|
|
}
|
|
state(lock, s_disconnected);
|
|
id_ = wsrep::id::undefined();
|
|
}
|
|
|
|
void wsrep::server_state::on_view(const wsrep::view& view,
|
|
wsrep::high_priority_service* high_priority_service)
|
|
{
|
|
wsrep::log_info()
|
|
<< "================================================\nView:\n"
|
|
<< view
|
|
<< "=================================================";
|
|
if (current_view_.status() == wsrep::view::primary)
|
|
{
|
|
previous_primary_view_ = current_view_;
|
|
}
|
|
current_view_ = view;
|
|
switch (view.status())
|
|
{
|
|
case wsrep::view::primary:
|
|
on_primary_view(view, high_priority_service);
|
|
break;
|
|
case wsrep::view::non_primary:
|
|
on_non_primary_view(view, high_priority_service);
|
|
break;
|
|
case wsrep::view::disconnected:
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
go_final(lock, view, high_priority_service);
|
|
break;
|
|
}
|
|
default:
|
|
wsrep::log_warning() << "Unrecognized view status: " << view.status();
|
|
assert(0);
|
|
}
|
|
|
|
server_service_.log_view(high_priority_service, view);
|
|
}
|
|
|
|
void wsrep::server_state::on_sync()
|
|
{
|
|
wsrep::log_info() << "Server " << name_ << " synced with group";
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
|
|
// Initial sync
|
|
if (server_service_.sst_before_init() && init_synced_ == false)
|
|
{
|
|
switch (state_)
|
|
{
|
|
case s_synced:
|
|
break;
|
|
case s_connected: // Seed node path: provider becomes
|
|
state(lock, s_joiner); // synced with itself before anything
|
|
WSREP_FALLTHROUGH; // else. Then goes DB initialization.
|
|
case s_joiner: // |
|
|
state(lock, s_initializing); // V
|
|
break;
|
|
case s_donor:
|
|
assert(false); // this should never happen
|
|
state(lock, s_joined);
|
|
state(lock, s_synced);
|
|
break;
|
|
case s_initialized:
|
|
state(lock, s_joined);
|
|
WSREP_FALLTHROUGH;
|
|
default:
|
|
/* State */
|
|
state(lock, s_synced);
|
|
};
|
|
}
|
|
else
|
|
{
|
|
// Calls to on_sync() in synced state are possible if
|
|
// server desyncs itself from the group. Provider does not
|
|
// inform about this through callbacks.
|
|
if (state_ != s_synced)
|
|
{
|
|
state(lock, s_synced);
|
|
}
|
|
}
|
|
init_synced_ = true;
|
|
|
|
enum wsrep::provider::status status(send_pending_rollback_events(lock));
|
|
if (status)
|
|
{
|
|
// TODO should be retried?
|
|
wsrep::log_warning()
|
|
<< "Failed to flush rollback event cache: " << status;
|
|
}
|
|
}
|
|
|
|
int wsrep::server_state::on_apply(
|
|
wsrep::high_priority_service& high_priority_service,
|
|
const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data)
|
|
{
|
|
if (is_toi(ws_meta.flags()))
|
|
{
|
|
return apply_toi(provider(), high_priority_service,
|
|
ws_handle, ws_meta, data);
|
|
}
|
|
else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags()))
|
|
{
|
|
// Not implemented yet.
|
|
assert(0);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
return apply_write_set(*this, high_priority_service,
|
|
ws_handle, ws_meta, data);
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::start_streaming_client(
|
|
wsrep::client_state* client_state)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
|
wsrep::log::debug_level_server_state,
|
|
"Start streaming client: " << client_state->id());
|
|
if (streaming_clients_.insert(
|
|
std::make_pair(client_state->id(), client_state)).second == false)
|
|
{
|
|
wsrep::log_warning() << "Failed to insert streaming client "
|
|
<< client_state->id();
|
|
assert(0);
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::convert_streaming_client_to_applier(
|
|
wsrep::client_state* client_state)
|
|
{
|
|
// create streaming_applier beforehand as server_state lock should
|
|
// not be held when calling server_service methods
|
|
wsrep::high_priority_service* streaming_applier(
|
|
server_service_.streaming_applier_service(
|
|
client_state->client_service()));
|
|
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
|
wsrep::log::debug_level_server_state,
|
|
"Convert streaming client to applier "
|
|
<< client_state->id());
|
|
streaming_clients_map::iterator i(
|
|
streaming_clients_.find(client_state->id()));
|
|
assert(i != streaming_clients_.end());
|
|
if (i == streaming_clients_.end())
|
|
{
|
|
wsrep::log_warning() << "Unable to find streaming client "
|
|
<< client_state->id();
|
|
assert(0);
|
|
}
|
|
else
|
|
{
|
|
streaming_clients_.erase(i);
|
|
}
|
|
|
|
// Convert to applier only if the state is not disconnected. In
|
|
// disconnected state the applier map is supposed to be empty
|
|
// and it will be reconstructed from fragment storage when
|
|
// joining back to cluster.
|
|
if (state(lock) != s_disconnected)
|
|
{
|
|
if (streaming_applier->adopt_transaction(client_state->transaction()))
|
|
{
|
|
log_adopt_error(client_state->transaction());
|
|
streaming_applier->after_apply();
|
|
server_service_.release_high_priority_service(streaming_applier);
|
|
return;
|
|
}
|
|
if (streaming_appliers_.insert(
|
|
std::make_pair(
|
|
std::make_pair(client_state->transaction().server_id(),
|
|
client_state->transaction().id()),
|
|
streaming_applier)).second == false)
|
|
{
|
|
wsrep::log_warning() << "Could not insert streaming applier "
|
|
<< id_
|
|
<< ", "
|
|
<< client_state->transaction().id();
|
|
assert(0);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
server_service_.release_high_priority_service(streaming_applier);
|
|
client_state->client_service().store_globals();
|
|
}
|
|
}
|
|
|
|
|
|
void wsrep::server_state::stop_streaming_client(
|
|
wsrep::client_state* client_state)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
|
wsrep::log::debug_level_server_state,
|
|
"Stop streaming client: " << client_state->id());
|
|
streaming_clients_map::iterator i(
|
|
streaming_clients_.find(client_state->id()));
|
|
assert(i != streaming_clients_.end());
|
|
if (i == streaming_clients_.end())
|
|
{
|
|
wsrep::log_warning() << "Unable to find streaming client "
|
|
<< client_state->id();
|
|
assert(0);
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
streaming_clients_.erase(i);
|
|
cond_.notify_all();
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::start_streaming_applier(
|
|
const wsrep::id& server_id,
|
|
const wsrep::transaction_id& transaction_id,
|
|
wsrep::high_priority_service* sa)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
if (streaming_appliers_.insert(
|
|
std::make_pair(std::make_pair(server_id, transaction_id),
|
|
sa)).second == false)
|
|
{
|
|
wsrep::log_error() << "Could not insert streaming applier";
|
|
throw wsrep::fatal_error();
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::stop_streaming_applier(
|
|
const wsrep::id& server_id,
|
|
const wsrep::transaction_id& transaction_id)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
streaming_appliers_map::iterator i(
|
|
streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
|
|
assert(i != streaming_appliers_.end());
|
|
if (i == streaming_appliers_.end())
|
|
{
|
|
wsrep::log_warning() << "Could not find streaming applier for "
|
|
<< server_id << ":" << transaction_id;
|
|
}
|
|
else
|
|
{
|
|
streaming_appliers_.erase(i);
|
|
cond_.notify_all();
|
|
}
|
|
}
|
|
|
|
wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
|
|
const wsrep::id& server_id,
|
|
const wsrep::transaction_id& transaction_id) const
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
streaming_appliers_map::const_iterator i(
|
|
streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
|
|
return (i == streaming_appliers_.end() ? 0 : i->second);
|
|
}
|
|
|
|
wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
|
|
const wsrep::xid& xid) const
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
streaming_appliers_map::const_iterator i(streaming_appliers_.begin());
|
|
while (i != streaming_appliers_.end())
|
|
{
|
|
wsrep::high_priority_service* sa(i->second);
|
|
if (sa->transaction().xid() == xid)
|
|
{
|
|
return sa;
|
|
}
|
|
i++;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
// Private //
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
|
int wsrep::server_state::desync(wsrep::unique_lock<wsrep::mutex>& lock)
|
|
{
|
|
assert(lock.owns_lock());
|
|
++desync_count_;
|
|
lock.unlock();
|
|
int ret(provider().desync());
|
|
lock.lock();
|
|
if (ret)
|
|
{
|
|
--desync_count_;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void wsrep::server_state::resync(wsrep::unique_lock<wsrep::mutex>&
|
|
lock WSREP_UNUSED)
|
|
{
|
|
assert(lock.owns_lock());
|
|
assert(desync_count_ > 0);
|
|
if (desync_count_ > 0)
|
|
{
|
|
--desync_count_;
|
|
if (provider().resync())
|
|
{
|
|
throw wsrep::runtime_error("Failed to resync");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
wsrep::log_warning() << "desync_count " << desync_count_
|
|
<< " on resync";
|
|
}
|
|
}
|
|
|
|
|
|
void wsrep::server_state::state(
|
|
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
|
|
enum wsrep::server_state::state state)
|
|
{
|
|
assert(lock.owns_lock());
|
|
static const char allowed[n_states_][n_states_] =
|
|
{
|
|
/* dis, ing, ized, cted, jer, jed, dor, sed, ding to/from */
|
|
{ 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */
|
|
{ 1, 0, 1, 0, 0, 0, 0, 0, 1}, /* ing */
|
|
{ 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* ized */
|
|
{ 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */
|
|
{ 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */
|
|
{ 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */
|
|
{ 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */
|
|
{ 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */
|
|
{ 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */
|
|
};
|
|
|
|
if (allowed[state_][state] == false)
|
|
{
|
|
std::ostringstream os;
|
|
os << "server: " << name_ << " unallowed state transition: "
|
|
<< wsrep::to_string(state_) << " -> " << wsrep::to_string(state);
|
|
wsrep::log_warning() << os.str() << "\n";
|
|
assert(0);
|
|
}
|
|
|
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
|
wsrep::log::debug_level_server_state,
|
|
"server " << name_ << " state change: "
|
|
<< to_c_string(state_) << " -> "
|
|
<< to_c_string(state));
|
|
state_hist_.push_back(state_);
|
|
server_service_.log_state_change(state_, state);
|
|
state_ = state;
|
|
cond_.notify_all();
|
|
while (state_waiters_[state_])
|
|
{
|
|
cond_.wait(lock);
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::wait_until_state(
|
|
wsrep::unique_lock<wsrep::mutex>& lock,
|
|
enum wsrep::server_state::state state) const
|
|
{
|
|
++state_waiters_[state];
|
|
while (state_ != state)
|
|
{
|
|
cond_.wait(lock);
|
|
// If the waiter waits for any other state than disconnecting
|
|
// or disconnected and the state has been changed to disconnecting,
|
|
// this usually means that some error was encountered
|
|
if (state != s_disconnecting && state != s_disconnected
|
|
&& state_ == s_disconnecting)
|
|
{
|
|
throw wsrep::runtime_error("State wait was interrupted");
|
|
}
|
|
}
|
|
--state_waiters_[state];
|
|
cond_.notify_all();
|
|
}
|
|
|
|
void wsrep::server_state::interrupt_state_waiters(
|
|
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
|
|
{
|
|
assert(lock.owns_lock());
|
|
cond_.notify_all();
|
|
}
|
|
|
|
template <class C>
|
|
void wsrep::server_state::recover_streaming_appliers_if_not_recovered(
|
|
wsrep::unique_lock<wsrep::mutex>& lock, C& c)
|
|
{
|
|
assert(lock.owns_lock());
|
|
if (streaming_appliers_recovered_ == false)
|
|
{
|
|
lock.unlock();
|
|
server_service_.recover_streaming_appliers(c);
|
|
lock.lock();
|
|
}
|
|
streaming_appliers_recovered_ = true;
|
|
}
|
|
|
|
class transaction_state_cmp
|
|
{
|
|
public:
|
|
transaction_state_cmp(const enum wsrep::transaction::state s)
|
|
: state_(s) { }
|
|
bool operator()(const std::pair<wsrep::client_id,
|
|
wsrep::client_state*>& vt) const
|
|
{
|
|
return vt.second->transaction().state() == state_;
|
|
}
|
|
private:
|
|
enum wsrep::transaction::state state_;
|
|
};
|
|
|
|
void wsrep::server_state::close_orphaned_sr_transactions(
|
|
wsrep::unique_lock<wsrep::mutex>& lock,
|
|
wsrep::high_priority_service& high_priority_service)
|
|
{
|
|
assert(lock.owns_lock());
|
|
|
|
// When the originator of an SR transaction leaves the primary
|
|
// component of the cluster, that SR must be rolled back. When two
|
|
// consecutive primary views have the same membership, the system
|
|
// may have been in a state with no primary components.
|
|
// Example with 2 node cluster:
|
|
// - (1,2 primary)
|
|
// - (1 non-primary) and (2 non-primary)
|
|
// - (1,2 primary)
|
|
// We need to rollback SRs owned by both 1 and 2.
|
|
// Notice that since the introduction of rollback_event_queue_,
|
|
// checking for equal consecutive views is no longer needed.
|
|
// However, we must keep it here for the time being, for backwards
|
|
// compatibility.
|
|
const bool equal_consecutive_views =
|
|
current_view_.equal_membership(previous_primary_view_);
|
|
|
|
if (current_view_.own_index() == -1 || equal_consecutive_views)
|
|
{
|
|
streaming_clients_map::iterator i;
|
|
transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared);
|
|
while ((i = std::find_if_not(streaming_clients_.begin(),
|
|
streaming_clients_.end(),
|
|
prepared_state_cmp))
|
|
!= streaming_clients_.end())
|
|
{
|
|
wsrep::client_id client_id(i->first);
|
|
wsrep::transaction_id transaction_id(i->second->transaction().id());
|
|
// It is safe to unlock the server state temporarily here.
|
|
// The processing happens inside view handler which is
|
|
// protected by the provider commit ordering critical
|
|
// section. The lock must be unlocked temporarily to
|
|
// allow converting the current client to streaming
|
|
// applier in transaction::streaming_rollback().
|
|
// The iterator i may be invalidated when the server state
|
|
// remains unlocked, so it should not be accessed after
|
|
// the bf abort call.
|
|
lock.unlock();
|
|
i->second->total_order_bf_abort(current_view_.view_seqno());
|
|
lock.lock();
|
|
streaming_clients_map::const_iterator found_i;
|
|
while ((found_i = streaming_clients_.find(client_id)) !=
|
|
streaming_clients_.end() &&
|
|
found_i->second->transaction().id() == transaction_id)
|
|
{
|
|
cond_.wait(lock);
|
|
}
|
|
}
|
|
}
|
|
|
|
streaming_appliers_map::iterator i(streaming_appliers_.begin());
|
|
while (i != streaming_appliers_.end())
|
|
{
|
|
wsrep::high_priority_service* streaming_applier(i->second);
|
|
|
|
// Rollback SR on equal consecutive primary views or if its
|
|
// originator is not in the current view.
|
|
// Transactions in prepared state must be committed or
|
|
// rolled back explicitly, those are never rolled back here.
|
|
if ((streaming_applier->transaction().state() !=
|
|
wsrep::transaction::s_prepared) &&
|
|
(equal_consecutive_views ||
|
|
not current_view_.is_member(
|
|
streaming_applier->transaction().server_id())))
|
|
{
|
|
WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
|
|
wsrep::log::debug_level_server_state,
|
|
"Removing SR fragments for "
|
|
<< i->first.first
|
|
<< ", " << i->first.second);
|
|
wsrep::id server_id(i->first.first);
|
|
wsrep::transaction_id transaction_id(i->first.second);
|
|
int adopt_error;
|
|
if ((adopt_error = high_priority_service.adopt_transaction(
|
|
streaming_applier->transaction())))
|
|
{
|
|
log_adopt_error(streaming_applier->transaction());
|
|
}
|
|
// Even if the transaction adopt above fails, we roll back
|
|
// the transaction. Adopt error will leave stale entries
|
|
// in the streaming log which can be removed manually.
|
|
{
|
|
wsrep::high_priority_switch sw(high_priority_service,
|
|
*streaming_applier);
|
|
streaming_applier->rollback(
|
|
wsrep::ws_handle(), wsrep::ws_meta());
|
|
streaming_applier->after_apply();
|
|
}
|
|
|
|
streaming_appliers_.erase(i++);
|
|
server_service_.release_high_priority_service(streaming_applier);
|
|
high_priority_service.store_globals();
|
|
wsrep::ws_meta ws_meta(
|
|
wsrep::gtid(),
|
|
wsrep::stid(server_id, transaction_id, wsrep::client_id()),
|
|
wsrep::seqno::undefined(), 0);
|
|
lock.unlock();
|
|
if (adopt_error == 0)
|
|
{
|
|
high_priority_service.remove_fragments(ws_meta);
|
|
high_priority_service.commit(wsrep::ws_handle(transaction_id, 0),
|
|
ws_meta);
|
|
}
|
|
high_priority_service.after_apply();
|
|
lock.lock();
|
|
}
|
|
else
|
|
{
|
|
++i;
|
|
}
|
|
}
|
|
}
|
|
|
|
void wsrep::server_state::close_transactions_at_disconnect(
|
|
wsrep::high_priority_service& high_priority_service)
|
|
{
|
|
// Close streaming applier without removing fragments
|
|
// from fragment storage. When the server is started again,
|
|
// it must be able to recover ongoing streaming transactions.
|
|
streaming_appliers_map::iterator i(streaming_appliers_.begin());
|
|
while (i != streaming_appliers_.end())
|
|
{
|
|
wsrep::high_priority_service* streaming_applier(i->second);
|
|
{
|
|
wsrep::high_priority_switch sw(high_priority_service,
|
|
*streaming_applier);
|
|
streaming_applier->rollback(
|
|
wsrep::ws_handle(), wsrep::ws_meta());
|
|
streaming_applier->after_apply();
|
|
}
|
|
streaming_appliers_.erase(i++);
|
|
server_service_.release_high_priority_service(streaming_applier);
|
|
high_priority_service.store_globals();
|
|
}
|
|
streaming_appliers_recovered_ = false;
|
|
}
|
|
|
|
//
|
|
// Rollback event queue
|
|
//
|
|
|
|
void wsrep::server_state::queue_rollback_event(
|
|
const wsrep::transaction_id& id)
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
#ifndef NDEBUG
|
|
// Make sure we don't have duplicate
|
|
// transaction ids in rollback event queue.
|
|
// There is no need to do this in release
|
|
// build given that caller (streaming_rollback())
|
|
// should avoid duplicates.
|
|
for (auto i : rollback_event_queue_)
|
|
{
|
|
assert(id != i);
|
|
}
|
|
#endif
|
|
rollback_event_queue_.push_back(id);
|
|
}
|
|
|
|
enum wsrep::provider::status
|
|
wsrep::server_state::send_pending_rollback_events(
|
|
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
|
|
{
|
|
assert(lock.owns_lock());
|
|
while (not rollback_event_queue_.empty())
|
|
{
|
|
const wsrep::transaction_id& id(rollback_event_queue_.front());
|
|
const enum wsrep::provider::status status(provider().rollback(id));
|
|
if (status)
|
|
{
|
|
return status;
|
|
}
|
|
rollback_event_queue_.pop_front();
|
|
}
|
|
return wsrep::provider::success;
|
|
}
|
|
|
|
enum wsrep::provider::status
|
|
wsrep::server_state::send_pending_rollback_events()
|
|
{
|
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
|
return send_pending_rollback_events(lock);
|
|
}
|