1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Revised logic to handle SR replaying

* Added server_id into transaction in order to be able to stop
  streaming applier during high priority BF abort
* Added missing commit fragment applying
* Don't clear fragments for replaying SR transaction
This commit is contained in:
Teemu Ollakka
2018-07-12 13:36:45 +03:00
parent ddc6c6495b
commit 3f4e5dea3b
6 changed files with 255 additions and 250 deletions

View File

@ -425,15 +425,6 @@ namespace wsrep
return transaction_.bf_abort(lock, bf_seqno); return transaction_.bf_abort(lock, bf_seqno);
} }
//
// Replaying
//
int start_replaying(const wsrep::ws_meta& ws_meta)
{
assert(mode_ == m_high_priority);
return transaction_.start_replaying(ws_meta);
}
/** /**
* Adopt a streaming transaction state. This is must be * Adopt a streaming transaction state. This is must be
* called from high_priority_service::adopt_transaction() * called from high_priority_service::adopt_transaction()

View File

@ -47,6 +47,7 @@ namespace wsrep
void disable() void disable()
{ {
wsrep::log_info() << "Disabling streaming";
fragment_size_ = 0; fragment_size_ = 0;
} }

View File

@ -52,6 +52,9 @@ namespace wsrep
wsrep::transaction_id id() const wsrep::transaction_id id() const
{ return id_; } { return id_; }
const wsrep::id& server_id() const
{ return server_id_; }
bool active() const bool active() const
{ return (id_ != wsrep::transaction_id::undefined()); } { return (id_ != wsrep::transaction_id::undefined()); }
@ -95,8 +98,6 @@ namespace wsrep
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
bool is_commit); bool is_commit);
int start_replaying(const wsrep::ws_meta&);
int append_key(const wsrep::key&); int append_key(const wsrep::key&);
int append_data(const wsrep::const_buffer&); int append_data(const wsrep::const_buffer&);
@ -161,6 +162,7 @@ namespace wsrep
wsrep::server_service& server_service_; wsrep::server_service& server_service_;
wsrep::client_service& client_service_; wsrep::client_service& client_service_;
wsrep::client_state& client_state_; wsrep::client_state& client_state_;
wsrep::id server_id_;
wsrep::transaction_id id_; wsrep::transaction_id id_;
enum state state_; enum state state_;
std::vector<enum state> state_hist_; std::vector<enum state> state_hist_;

View File

@ -14,140 +14,163 @@
#include <cassert> #include <cassert>
#include <sstream> #include <sstream>
namespace //////////////////////////////////////////////////////////////////////////////
// Helpers //
//////////////////////////////////////////////////////////////////////////////
//
// This method is used to deal with historical burden of several
// ways to bootstrap the cluster. Bootstrap happens if
//
// * bootstrap option is given
// * cluster_address is "gcomm://" (Galera provider)
//
static bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
{ {
// return (bootstrap || cluster_address == "gcomm://");
// This method is used to deal with historical burden of several }
// ways to bootstrap the cluster. Bootstrap happens if
//
// * bootstrap option is given
// * cluster_address is "gcomm://" (Galera provider)
//
bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
{
return (bootstrap || cluster_address == "gcomm://");
}
int apply_fragment(wsrep::server_state& server_state, static int apply_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service& streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret;
{
wsrep::high_priority_switch sw(high_priority_service,
streaming_applier);
ret = streaming_applier.apply_write_set(ws_meta, data);
streaming_applier.after_apply();
}
ret = ret || high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data);
high_priority_service.after_apply();
return ret;
}
int commit_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service* streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret;
// Make high priority switch to go out of scope
// before the streaming applier is released.
{
wsrep::high_priority_switch sw(
high_priority_service, *streaming_applier);
streaming_applier->remove_fragments(ws_meta);
ret = streaming_applier->commit(ws_handle, ws_meta);
streaming_applier->after_apply();
}
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(
streaming_applier);
return ret;
}
int rollback_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service, wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service* streaming_applier, wsrep::high_priority_service& streaming_applier,
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data)
{
int ret;
{
wsrep::high_priority_switch sw(high_priority_service,
streaming_applier);
ret = streaming_applier.apply_write_set(ws_meta, data);
streaming_applier.after_apply();
}
ret = ret || high_priority_service.append_fragment_and_commit(
ws_handle, ws_meta, data);
high_priority_service.after_apply();
return ret;
}
static int commit_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
wsrep::high_priority_service* streaming_applier,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret;
// Make high priority switch to go out of scope
// before the streaming applier is released.
{
wsrep::high_priority_switch sw(
high_priority_service, *streaming_applier);
ret = streaming_applier->apply_write_set(ws_meta, data) ||
streaming_applier->remove_fragments(ws_meta) ||
streaming_applier->commit(ws_handle, ws_meta);
streaming_applier->after_apply();
}
if (ret == 0)
{ {
int ret= 0;
// Adopts transaction state and starts a transaction for
// high priority service
high_priority_service.adopt_transaction(
streaming_applier->transaction());
{
wsrep::high_priority_switch ws(
high_priority_service, *streaming_applier);
streaming_applier->rollback();
streaming_applier->after_apply();
}
server_state.stop_streaming_applier( server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()); ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service( server_state.server_service().release_high_priority_service(
streaming_applier); streaming_applier);
high_priority_service.remove_fragments(ws_meta);
high_priority_service.commit(ws_handle, ws_meta);
high_priority_service.after_apply();
return ret;
} }
return ret;
}
int apply_write_set(wsrep::server_state& server_state, static int rollback_fragment(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service, wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle, wsrep::high_priority_service* streaming_applier,
const wsrep::ws_meta& ws_meta, const wsrep::ws_handle& ws_handle,
const wsrep::const_buffer& data) const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret= 0;
// Adopts transaction state and starts a transaction for
// high priority service
high_priority_service.adopt_transaction(
streaming_applier->transaction());
{ {
int ret(0); wsrep::high_priority_switch ws(
if (wsrep::starts_transaction(ws_meta.flags()) && high_priority_service, *streaming_applier);
wsrep::commits_transaction(ws_meta.flags()) && streaming_applier->rollback();
wsrep::rolls_back_transaction(ws_meta.flags())) streaming_applier->after_apply();
}
server_state.stop_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id());
server_state.server_service().release_high_priority_service(
streaming_applier);
high_priority_service.remove_fragments(ws_meta);
high_priority_service.commit(ws_handle, ws_meta);
high_priority_service.after_apply();
return ret;
}
static int apply_write_set(wsrep::server_state& server_state,
wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
int ret(0);
if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()) &&
wsrep::rolls_back_transaction(ws_meta.flags()))
{
// Non streaming rollback (certification failed)
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
}
else if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()))
{
ret = high_priority_service.start_transaction(ws_handle, ws_meta) ||
high_priority_service.apply_write_set(ws_meta, data) ||
high_priority_service.commit(ws_handle, ws_meta);
if (ret)
{ {
// Non streaming rollback (certification failed) high_priority_service.rollback();
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
} }
else if (wsrep::starts_transaction(ws_meta.flags()) && high_priority_service.after_apply();
wsrep::commits_transaction(ws_meta.flags())) }
else if (wsrep::starts_transaction(ws_meta.flags()))
{
assert(server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
wsrep::high_priority_service* sa(
server_state.server_service().streaming_applier_service(
high_priority_service));
server_state.start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sa);
sa->start_transaction(ws_handle, ws_meta);
ret = apply_fragment(server_state,
high_priority_service,
*sa,
ws_handle,
ws_meta,
data);
}
else if (ws_meta.flags() == 0)
{
wsrep::high_priority_service* sa(
server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
if (sa == 0)
{ {
if (high_priority_service.start_transaction(ws_handle, ws_meta)) // It is possible that rapid group membership changes
{ // may cause streaming transaction be rolled back before
ret = 1; // commit fragment comes in. Although this is a valid
} // situation, log a warning if a sac cannot be found as
else if (high_priority_service.apply_write_set(ws_meta, data)) // it may be an indication of a bug too.
{ wsrep::log_warning() << "Could not find applier context for "
ret = 1; << ws_meta.server_id()
} << ": " << ws_meta.transaction_id();
else if (high_priority_service.commit(ws_handle, ws_meta))
{
ret = 1;
}
if (ret)
{
high_priority_service.rollback();
}
high_priority_service.after_apply();
} }
else if (wsrep::starts_transaction(ws_meta.flags())) else
{ {
assert(server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()) == 0);
wsrep::high_priority_service* sa(
server_state.server_service().streaming_applier_service(
high_priority_service));
server_state.start_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id(), sa);
sa->start_transaction(ws_handle, ws_meta);
ret = apply_fragment(server_state, ret = apply_fragment(server_state,
high_priority_service, high_priority_service,
*sa, *sa,
@ -155,69 +178,17 @@ namespace
ws_meta, ws_meta,
data); data);
} }
else if (ws_meta.flags() == 0) }
else if (wsrep::commits_transaction(ws_meta.flags()))
{
if (high_priority_service.is_replaying())
{ {
wsrep::high_priority_service* sa( ret = high_priority_service.start_transaction(
server_state.find_streaming_applier( ws_handle, ws_meta) ||
ws_meta.server_id(), ws_meta.transaction_id())); high_priority_service.apply_write_set(ws_meta, data) ||
if (sa == 0) high_priority_service.commit(ws_handle, ws_meta);
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning() << "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
ret = apply_fragment(server_state,
high_priority_service,
*sa,
ws_handle,
ws_meta,
data);
}
} }
else if (wsrep::commits_transaction(ws_meta.flags())) else
{
if (high_priority_service.is_replaying())
{
ret = high_priority_service.apply_write_set(ws_meta, data) ||
high_priority_service.commit(ws_handle, ws_meta);
}
else
{
wsrep::high_priority_service* sa(
server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
if (sa == 0)
{
// It is possible that rapid group membership changes
// may cause streaming transaction be rolled back before
// commit fragment comes in. Although this is a valid
// situation, log a warning if a sac cannot be found as
// it may be an indication of a bug too.
wsrep::log_warning()
<< "Could not find applier context for "
<< ws_meta.server_id()
<< ": " << ws_meta.transaction_id();
}
else
{
// Commit fragment consumes sa
ret = commit_fragment(server_state,
high_priority_service,
sa,
ws_handle,
ws_meta,
data);
}
}
}
else if (wsrep::rolls_back_transaction(ws_meta.flags()))
{ {
wsrep::high_priority_service* sa( wsrep::high_priority_service* sa(
server_state.find_streaming_applier( server_state.find_streaming_applier(
@ -233,67 +204,98 @@ namespace
<< "Could not find applier context for " << "Could not find applier context for "
<< ws_meta.server_id() << ws_meta.server_id()
<< ": " << ws_meta.transaction_id(); << ": " << ws_meta.transaction_id();
ret = high_priority_service.log_dummy_write_set(
ws_handle, ws_meta);
} }
else else
{ {
// Rollback fragment consumes sa // Commit fragment consumes sa
ret = rollback_fragment(server_state, ret = commit_fragment(server_state,
high_priority_service, high_priority_service,
sa, sa,
ws_handle, ws_handle,
ws_meta, ws_meta,
data); data);
} }
} }
else
{
assert(0);
}
return ret;
} }
else if (wsrep::rolls_back_transaction(ws_meta.flags()))
int apply_toi(wsrep::provider& provider,
wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{ {
if (wsrep::starts_transaction(ws_meta.flags()) && wsrep::high_priority_service* sa(
wsrep::commits_transaction(ws_meta.flags())) server_state.find_streaming_applier(
ws_meta.server_id(), ws_meta.transaction_id()));
if (sa == 0)
{ {
// // It is possible that rapid group membership changes
// Regular TOI. // may cause streaming transaction be rolled back before
// // commit fragment comes in. Although this is a valid
// Note that we ignore error returned by apply_toi // situation, log a warning if a sac cannot be found as
// call here. This must be revised after the error // it may be an indication of a bug too.
// voting is added. wsrep::log_warning()
// << "Could not find applier context for "
provider.commit_order_enter(ws_handle, ws_meta); << ws_meta.server_id()
(void)high_priority_service.apply_toi(ws_meta, data); << ": " << ws_meta.transaction_id();
provider.commit_order_leave(ws_handle, ws_meta); ret = high_priority_service.log_dummy_write_set(
return 0; ws_handle, ws_meta);
}
else if (wsrep::starts_transaction(ws_meta.flags()))
{
// NBO begin
throw wsrep::not_implemented_error();
}
else if (wsrep::commits_transaction(ws_meta.flags()))
{
// NBO end
throw wsrep::not_implemented_error();
} }
else else
{ {
assert(0); // Rollback fragment consumes sa
return 0; ret = rollback_fragment(server_state,
high_priority_service,
sa,
ws_handle,
ws_meta,
data);
} }
} }
else
{
assert(0);
}
return ret;
} }
static int apply_toi(wsrep::provider& provider,
wsrep::high_priority_service& high_priority_service,
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
{
if (wsrep::starts_transaction(ws_meta.flags()) &&
wsrep::commits_transaction(ws_meta.flags()))
{
//
// Regular TOI.
//
// Note that we ignore error returned by apply_toi
// call here. This must be revised after the error
// voting is added.
//
provider.commit_order_enter(ws_handle, ws_meta);
(void)high_priority_service.apply_toi(ws_meta, data);
provider.commit_order_leave(ws_handle, ws_meta);
return 0;
}
else if (wsrep::starts_transaction(ws_meta.flags()))
{
// NBO begin
throw wsrep::not_implemented_error();
}
else if (wsrep::commits_transaction(ws_meta.flags()))
{
// NBO end
throw wsrep::not_implemented_error();
}
else
{
assert(0);
return 0;
}
}
//////////////////////////////////////////////////////////////////////////////
// Server State //
//////////////////////////////////////////////////////////////////////////////
int wsrep::server_state::load_provider(const std::string& provider_spec, int wsrep::server_state::load_provider(const std::string& provider_spec,
const std::string& provider_options) const std::string& provider_options)
{ {

View File

@ -86,6 +86,7 @@ wsrep::transaction::transaction(
: server_service_(client_state.server_state().server_service()) : server_service_(client_state.server_state().server_service())
, client_service_(client_state.client_service()) , client_service_(client_state.client_service())
, client_state_(client_state) , client_state_(client_state)
, server_id_()
, id_(transaction_id::undefined()) , id_(transaction_id::undefined())
, state_(s_executing) , state_(s_executing)
, state_hist_() , state_hist_()
@ -112,6 +113,7 @@ int wsrep::transaction::start_transaction(
debug_log_state("start_transaction enter"); debug_log_state("start_transaction enter");
assert(active() == false); assert(active() == false);
assert(flags() == 0); assert(flags() == 0);
server_id_ = client_state_.server_state().id();
id_ = id; id_ = id;
state_ = s_executing; state_ = s_executing;
state_hist_.clear(); state_hist_.clear();
@ -142,6 +144,7 @@ int wsrep::transaction::start_transaction(
// assert(ws_meta.flags()); // assert(ws_meta.flags());
assert(active() == false); assert(active() == false);
assert(flags() == 0); assert(flags() == 0);
server_id_ = ws_meta.server_id();
id_ = ws_meta.transaction_id(); id_ = ws_meta.transaction_id();
assert(client_state_.mode() == wsrep::client_state::m_high_priority); assert(client_state_.mode() == wsrep::client_state::m_high_priority);
state_ = s_executing; state_ = s_executing;
@ -153,7 +156,13 @@ int wsrep::transaction::start_transaction(
} }
else else
{ {
start_replaying(ws_meta); ws_meta_ = ws_meta;
assert(ws_meta_.flags() & wsrep::provider::flag::commit);
assert(active());
assert(client_state_.mode() == wsrep::client_state::m_high_priority);
assert(state() == s_replaying);
assert(ws_meta_.seqno().is_undefined() == false);
certified_ = true;
} }
debug_log_state("start_transaction leave"); debug_log_state("start_transaction leave");
return 0; return 0;
@ -191,21 +200,8 @@ int wsrep::transaction::prepare_for_ordering(
return 0; return 0;
} }
int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta)
{
ws_meta_ = ws_meta;
assert(ws_meta_.flags() & wsrep::provider::flag::commit);
assert(active());
assert(client_state_.mode() == wsrep::client_state::m_high_priority);
assert(state() == s_replaying);
assert(ws_meta_.seqno().is_undefined() == false);
certified_ = true;
return 0;
}
int wsrep::transaction::append_key(const wsrep::key& key) int wsrep::transaction::append_key(const wsrep::key& key)
{ {
/** @todo Collect table level keys for SR commit */
try try
{ {
debug_log_key_append(key); debug_log_key_append(key);
@ -583,7 +579,7 @@ int wsrep::transaction::after_rollback()
assert(state() == s_aborting || assert(state() == s_aborting ||
state() == s_must_replay); state() == s_must_replay);
if (is_streaming()) if (is_streaming() && state() != s_must_replay)
{ {
clear_fragments(); clear_fragments();
} }
@ -823,6 +819,12 @@ bool wsrep::transaction::bf_abort(
// between releasing the lock and before background // between releasing the lock and before background
// rollbacker gets control. // rollbacker gets control.
state(lock, wsrep::transaction::s_aborting); state(lock, wsrep::transaction::s_aborting);
if (client_state_.mode() == wsrep::client_state::m_high_priority)
{
assert(is_streaming());
client_state_.server_state().stop_streaming_applier(
server_id_, id_);
}
lock.unlock(); lock.unlock();
server_service_.background_rollback(client_state_); server_service_.background_rollback(client_state_);
} }
@ -913,6 +915,7 @@ int wsrep::transaction::certify_fragment(
{ {
lock.lock(); lock.lock();
state(lock, s_must_abort); state(lock, s_must_abort);
client_state_.override_error(wsrep::e_error_during_commit);
return 1; return 1;
} }
@ -921,6 +924,7 @@ int wsrep::transaction::certify_fragment(
{ {
lock.lock(); lock.lock();
state(lock, s_must_abort); state(lock, s_must_abort);
client_state_.override_error(wsrep::e_error_during_commit);
return 1; return 1;
} }
@ -996,7 +1000,11 @@ int wsrep::transaction::certify_fragment(
lock.lock(); lock.lock();
if (ret) if (ret)
{ {
state(lock, s_must_abort); if (state_ != s_must_abort)
{
state(lock, s_must_abort);
}
client_state_.override_error(wsrep::e_deadlock_error);
} }
else else
{ {
@ -1198,6 +1206,7 @@ int wsrep::transaction::append_sr_keys_for_commit()
void wsrep::transaction::streaming_rollback() void wsrep::transaction::streaming_rollback()
{ {
debug_log_state("streaming_rollback enter"); debug_log_state("streaming_rollback enter");
assert(state_ != s_must_replay);
assert(streaming_context_.rolled_back() == false); assert(streaming_context_.rolled_back() == false);
// Create a high priority applier which will handle the // Create a high priority applier which will handle the
// rollback fragment or clean up on configuration change. // rollback fragment or clean up on configuration change.

View File

@ -1068,7 +1068,7 @@ BOOST_FIXTURE_TEST_CASE(transaction_row_streaming_cert_fail_non_commit,
sc.provider().certify_result_ = wsrep::provider::success; sc.provider().certify_result_ = wsrep::provider::success;
BOOST_REQUIRE(cc.before_rollback() == 0); BOOST_REQUIRE(cc.before_rollback() == 0);
BOOST_REQUIRE(cc.after_rollback() == 0); BOOST_REQUIRE(cc.after_rollback() == 0);
BOOST_REQUIRE(cc.after_statement() == 0); BOOST_REQUIRE(cc.after_statement() == 1);
BOOST_REQUIRE(sc.provider().fragments() == 2); BOOST_REQUIRE(sc.provider().fragments() == 2);
BOOST_REQUIRE(sc.provider().start_fragments() == 1); BOOST_REQUIRE(sc.provider().start_fragments() == 1);
BOOST_REQUIRE(sc.provider().rollback_fragments() == 1); BOOST_REQUIRE(sc.provider().rollback_fragments() == 1);