1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-06-08 18:21:54 +03:00

Implemented SR transaction rollbacking during configuration changes.

SR tranasctions are BF aborted or rolled back on primary view
changes according to the following rules:
* Ongoing local SR transactions are BF aborted if the processing
  server is not found from the current view.
* All remote SR transactions whose origin server is not included in the
  current view are rolled back.
This commit is contained in:
Teemu Ollakka 2018-07-14 16:11:13 +03:00
parent 13487781d8
commit 86472ee420
6 changed files with 265 additions and 44 deletions

View File

@ -5,6 +5,8 @@
#ifndef WSREP_CLIENT_ID_HPP #ifndef WSREP_CLIENT_ID_HPP
#define WSREP_CLIENT_ID_HPP #define WSREP_CLIENT_ID_HPP
#include <ostream>
namespace wsrep namespace wsrep
{ {
class client_id class client_id
@ -20,9 +22,19 @@ namespace wsrep
{ } { }
type get() const { return id_; } type get() const { return id_; }
static type undefined() { return -1; } static type undefined() { return -1; }
bool operator<(const client_id& other) const
{
return (id_ < other.id_);
}
private: private:
type id_; type id_;
}; };
static inline std::ostream& operator<<(
std::ostream& os, const wsrep::client_id& client_id)
{
return (os << client_id.get());
}
} }
#endif // WSREP_CLIENT_ID_HPP #endif // WSREP_CLIENT_ID_HPP

View File

@ -212,6 +212,15 @@ namespace wsrep
*/ */
enum rollback_mode rollback_mode() const { return rollback_mode_; } enum rollback_mode rollback_mode() const { return rollback_mode_; }
/**
* Registers a streaming client.
*/
void start_streaming_client(wsrep::client_state* client_state);
void convert_streaming_client_to_applier(
wsrep::client_state* client_state);
void stop_streaming_client(wsrep::client_state* client_state);
void start_streaming_applier( void start_streaming_applier(
const wsrep::id&, const wsrep::id&,
const wsrep::transaction_id&, const wsrep::transaction_id&,
@ -278,7 +287,8 @@ namespace wsrep
* @params view wsrep::view object which holds the new view * @params view wsrep::view object which holds the new view
* information. * information.
*/ */
void on_view(const wsrep::view& view); void on_view(const wsrep::view& view,
wsrep::high_priority_service*);
/** /**
* A method which will be called when the server * A method which will be called when the server
@ -533,6 +543,7 @@ namespace wsrep
, desync_count_() , desync_count_()
, pause_count_() , pause_count_()
, pause_seqno_() , pause_seqno_()
, streaming_clients_()
, streaming_appliers_() , streaming_appliers_()
, provider_() , provider_()
, name_(name) , name_(name)
@ -557,6 +568,11 @@ namespace wsrep
void resync(wsrep::unique_lock<wsrep::mutex>&); void resync(wsrep::unique_lock<wsrep::mutex>&);
void state(wsrep::unique_lock<wsrep::mutex>&, enum state); void state(wsrep::unique_lock<wsrep::mutex>&, enum state);
void wait_until_state(wsrep::unique_lock<wsrep::mutex>&, enum state) const; void wait_until_state(wsrep::unique_lock<wsrep::mutex>&, enum state) const;
// Close SR transcations whose origin is outside of current
// cluster view.
void close_foreign_sr_transactions(
wsrep::unique_lock<wsrep::mutex>&,
wsrep::high_priority_service&);
wsrep::mutex& mutex_; wsrep::mutex& mutex_;
wsrep::condition_variable& cond_; wsrep::condition_variable& cond_;
@ -572,7 +588,25 @@ namespace wsrep
size_t desync_count_; size_t desync_count_;
size_t pause_count_; size_t pause_count_;
wsrep::seqno pause_seqno_; wsrep::seqno pause_seqno_;
typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>, wsrep::high_priority_service*> streaming_appliers_map; typedef std::map<wsrep::client_id, wsrep::client_state*>
streaming_clients_map;
streaming_clients_map streaming_clients_;
typedef std::map<std::pair<wsrep::id, wsrep::transaction_id>,
wsrep::high_priority_service*> streaming_appliers_map;
class server_id_cmp
{
public:
server_id_cmp(const wsrep::id& server_id)
: server_id_(server_id)
{ }
bool operator()(const std::vector<wsrep::view::member>::value_type& vt) const
{
return (vt.id() == server_id_);
}
private:
wsrep::id server_id_;
};
streaming_appliers_map streaming_appliers_; streaming_appliers_map streaming_appliers_;
wsrep::provider* provider_; wsrep::provider* provider_;
std::string name_; std::string name_;

View File

@ -3,6 +3,7 @@
// //
#include "wsrep/server_state.hpp" #include "wsrep/server_state.hpp"
#include "wsrep/client_state.hpp"
#include "wsrep/server_service.hpp" #include "wsrep/server_service.hpp"
#include "wsrep/high_priority_service.hpp" #include "wsrep/high_priority_service.hpp"
#include "wsrep/transaction.hpp" #include "wsrep/transaction.hpp"
@ -13,6 +14,7 @@
#include <cassert> #include <cassert>
#include <sstream> #include <sstream>
#include <algorithm>
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Helpers // // Helpers //
@ -504,19 +506,6 @@ void wsrep::server_state::initialized()
} }
} }
void wsrep::server_state::wait_until_state(
wsrep::unique_lock<wsrep::mutex>& lock,
enum wsrep::server_state::state state) const
{
++state_waiters_[state];
while (state_ != state)
{
cond_.wait(lock);
}
--state_waiters_[state];
cond_.notify_all();
}
void wsrep::server_state::last_committed_gtid(const wsrep::gtid& gtid) void wsrep::server_state::last_committed_gtid(const wsrep::gtid& gtid)
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_); wsrep::unique_lock<wsrep::mutex> lock(mutex_);
@ -556,7 +545,8 @@ void wsrep::server_state::on_connect(const wsrep::gtid& gtid)
state(lock, s_connected); state(lock, s_connected);
} }
void wsrep::server_state::on_view(const wsrep::view& view) void wsrep::server_state::on_view(const wsrep::view& view,
wsrep::high_priority_service* high_priority_service)
{ {
wsrep::log_info() wsrep::log_info()
<< "================================================\nView:\n" << "================================================\nView:\n"
@ -644,6 +634,11 @@ void wsrep::server_state::on_view(const wsrep::view& view)
bootstrap_ = false; bootstrap_ = false;
} }
assert(high_priority_service);
if (high_priority_service)
{
close_foreign_sr_transactions(lock, *high_priority_service);
}
if (server_service_.sst_before_init()) if (server_service_.sst_before_init())
{ {
if (state_ == s_initialized) if (state_ == s_initialized)
@ -756,11 +751,85 @@ int wsrep::server_state::on_apply(
} }
} }
void wsrep::server_state::start_streaming_client(
wsrep::client_state* client_state)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
wsrep::log_debug() << "Start streaming client: " << client_state->id();
if (streaming_clients_.insert(
std::make_pair(client_state->id(), client_state)).second == false)
{
wsrep::log_warning() << "Failed to insert streaming client "
<< client_state->id();
assert(0);
}
}
void wsrep::server_state::convert_streaming_client_to_applier(
wsrep::client_state* client_state)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
wsrep::log_debug() << "Convert streaming client to applier "
<< client_state->id();
streaming_clients_map::iterator i(
streaming_clients_.find(client_state->id()));
assert(i != streaming_clients_.end());
if (i == streaming_clients_.end())
{
wsrep::log_warning() << "Unable to find streaming client "
<< client_state->id();
assert(0);
}
else
{
streaming_clients_.erase(i);
}
wsrep::high_priority_service* streaming_applier(
server_service_.streaming_applier_service(
client_state->client_service()));
streaming_applier->adopt_transaction(client_state->transaction());
if (streaming_appliers_.insert(
std::make_pair(
std::make_pair(id_, client_state->transaction().id()),
streaming_applier)).second == false)
{
wsrep::log_warning() << "Could not insert streaming applier "
<< id_
<< ", "
<< client_state->transaction().id();
assert(0);
}
}
void wsrep::server_state::stop_streaming_client(
wsrep::client_state* client_state)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
wsrep::log_debug() << "Stop streaming client: " << client_state->id();
streaming_clients_map::iterator i(
streaming_clients_.find(client_state->id()));
assert(i != streaming_clients_.end());
if (i == streaming_clients_.end())
{
wsrep::log_warning() << "Unable to find streaming client "
<< client_state->id();
assert(0);
return;
}
else
{
streaming_clients_.erase(i);
cond_.notify_all();
}
}
void wsrep::server_state::start_streaming_applier( void wsrep::server_state::start_streaming_applier(
const wsrep::id& server_id, const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id, const wsrep::transaction_id& transaction_id,
wsrep::high_priority_service* sa) wsrep::high_priority_service* sa)
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (streaming_appliers_.insert( if (streaming_appliers_.insert(
std::make_pair(std::make_pair(server_id, transaction_id), std::make_pair(std::make_pair(server_id, transaction_id),
sa)).second == false) sa)).second == false)
@ -774,6 +843,7 @@ void wsrep::server_state::stop_streaming_applier(
const wsrep::id& server_id, const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id) const wsrep::transaction_id& transaction_id)
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
streaming_appliers_map::iterator i( streaming_appliers_map::iterator i(
streaming_appliers_.find(std::make_pair(server_id, transaction_id))); streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
assert(i != streaming_appliers_.end()); assert(i != streaming_appliers_.end());
@ -785,6 +855,7 @@ void wsrep::server_state::stop_streaming_applier(
else else
{ {
streaming_appliers_.erase(i); streaming_appliers_.erase(i);
cond_.notify_all();
} }
} }
@ -792,13 +863,15 @@ wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
const wsrep::id& server_id, const wsrep::id& server_id,
const wsrep::transaction_id& transaction_id) const const wsrep::transaction_id& transaction_id) const
{ {
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
streaming_appliers_map::const_iterator i( streaming_appliers_map::const_iterator i(
streaming_appliers_.find(std::make_pair(server_id, transaction_id))); streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
return (i == streaming_appliers_.end() ? 0 : i->second); return (i == streaming_appliers_.end() ? 0 : i->second);
} }
// Private //////////////////////////////////////////////////////////////////////////////
// Private //
//////////////////////////////////////////////////////////////////////////////
int wsrep::server_state::desync(wsrep::unique_lock<wsrep::mutex>& lock) int wsrep::server_state::desync(wsrep::unique_lock<wsrep::mutex>& lock)
{ {
@ -869,3 +942,79 @@ void wsrep::server_state::state(
// throw wsrep::runtime_error(os.str()); // throw wsrep::runtime_error(os.str());
} }
} }
void wsrep::server_state::wait_until_state(
wsrep::unique_lock<wsrep::mutex>& lock,
enum wsrep::server_state::state state) const
{
++state_waiters_[state];
while (state_ != state)
{
cond_.wait(lock);
}
--state_waiters_[state];
cond_.notify_all();
}
void wsrep::server_state::close_foreign_sr_transactions(
wsrep::unique_lock<wsrep::mutex>& lock,
wsrep::high_priority_service& high_priority_service)
{
assert(lock.owns_lock());
if (current_view_.own_index() == -1)
{
while (streaming_clients_.empty() == false)
{
streaming_clients_map::iterator i(streaming_clients_.begin());
wsrep::client_id client_id(i->first);
wsrep::transaction_id transaction_id(i->second->transaction().id());
i->second->total_order_bf_abort(current_view_.view_seqno());
streaming_clients_map::const_iterator found_i;
while ((found_i = streaming_clients_.find(client_id)) !=
streaming_clients_.end() &&
found_i->second->transaction().id() == transaction_id)
{
cond_.wait(lock);
}
}
}
streaming_appliers_map::iterator i(streaming_appliers_.begin());
while (i != streaming_appliers_.end())
{
if (std::find_if(current_view_.members().begin(),
current_view_.members().end(),
server_id_cmp(i->first.first)) !=
current_view_.members().end())
{
wsrep::id server_id(i->first.first);
wsrep::transaction_id transaction_id(i->first.second);
wsrep::high_priority_service* streaming_applier(i->second);
high_priority_service.adopt_transaction(
streaming_applier->transaction());
{
wsrep::high_priority_switch sw(high_priority_service,
*streaming_applier);
streaming_applier->rollback(
wsrep::ws_handle(), wsrep::ws_meta());
streaming_applier->after_apply();
}
streaming_appliers_.erase(i++);
server_service_.release_high_priority_service(streaming_applier);
wsrep::ws_meta ws_meta(
wsrep::gtid(),
wsrep::stid(server_id, transaction_id, wsrep::client_id()),
wsrep::seqno::undefined(), 0);
high_priority_service.remove_fragments(ws_meta);
high_priority_service.commit(wsrep::ws_handle(transaction_id, 0),
ws_meta);
high_priority_service.after_apply();
}
else
{
++i;
}
}
}

View File

@ -477,6 +477,10 @@ int wsrep::transaction::after_commit()
{ {
assert(client_state_.mode() == wsrep::client_state::m_local || assert(client_state_.mode() == wsrep::client_state::m_local ||
client_state_.mode() == wsrep::client_state::m_high_priority); client_state_.mode() == wsrep::client_state::m_high_priority);
if (client_state_.mode() == wsrep::client_state::m_local)
{
client_state_.server_state_.stop_streaming_client(&client_state_);
}
clear_fragments(); clear_fragments();
} }
@ -522,7 +526,7 @@ int wsrep::transaction::before_rollback()
// fall through // fall through
case s_executing: case s_executing:
// Voluntary rollback // Voluntary rollback
if (is_streaming() && bf_aborted_in_total_order_ == false) if (is_streaming())
{ {
streaming_rollback(); streaming_rollback();
} }
@ -535,7 +539,7 @@ int wsrep::transaction::before_rollback()
} }
else else
{ {
if (is_streaming() && bf_aborted_in_total_order_ == false) if (is_streaming())
{ {
streaming_rollback(); streaming_rollback();
} }
@ -543,14 +547,14 @@ int wsrep::transaction::before_rollback()
} }
break; break;
case s_cert_failed: case s_cert_failed:
if (is_streaming() && bf_aborted_in_total_order_ == false) if (is_streaming())
{ {
streaming_rollback(); streaming_rollback();
} }
state(lock, s_aborting); state(lock, s_aborting);
break; break;
case s_aborting: case s_aborting:
if (is_streaming() && bf_aborted_in_total_order_ == false) if (is_streaming())
{ {
streaming_rollback(); streaming_rollback();
} }
@ -653,8 +657,16 @@ int wsrep::transaction::after_statement()
case s_must_replay: case s_must_replay:
{ {
state(lock, s_replaying); state(lock, s_replaying);
// Need to remember streaming state before replay, entering
// after_commit() after succesful replay will clear
// fragments.
const bool was_streaming(is_streaming());
lock.unlock(); lock.unlock();
enum wsrep::provider::status replay_ret(client_service_.replay()); enum wsrep::provider::status replay_ret(client_service_.replay());
if (was_streaming)
{
client_state_.server_state_.stop_streaming_client(&client_state_);
}
switch (replay_ret) switch (replay_ret)
{ {
case wsrep::provider::success: case wsrep::provider::success:
@ -961,6 +973,10 @@ int wsrep::transaction::certify_fragment(
return 1; return 1;
} }
if (is_streaming() == false)
{
client_state_.server_state_.start_streaming_client(&client_state_);
}
int ret(0); int ret(0);
// Storage service scope // Storage service scope
{ {
@ -1033,6 +1049,10 @@ int wsrep::transaction::certify_fragment(
lock.lock(); lock.lock();
if (ret) if (ret)
{ {
if (is_streaming() == false)
{
client_state_.server_state_.stop_streaming_client(&client_state_);
}
if (state_ != s_must_abort) if (state_ != s_must_abort)
{ {
state(lock, s_must_abort); state(lock, s_must_abort);
@ -1246,17 +1266,20 @@ void wsrep::transaction::streaming_rollback()
debug_log_state("streaming_rollback enter"); debug_log_state("streaming_rollback enter");
assert(state_ != s_must_replay); assert(state_ != s_must_replay);
assert(streaming_context_.rolled_back() == false); assert(streaming_context_.rolled_back() == false);
if (bf_aborted_in_total_order_)
{
client_state_.server_state_.stop_streaming_client(&client_state_);
}
else
{
// Create a high priority applier which will handle the // Create a high priority applier which will handle the
// rollback fragment or clean up on configuration change. // rollback fragment or clean up on configuration change.
// Adopt transaction will copy fragment set and appropriate // Adopt transaction will copy fragment set and appropriate
// meta data. Mark current transaction streaming context // meta data. Mark current transaction streaming context
// rolled back. // rolled back.
wsrep::high_priority_service* sa( client_state_.server_state_.convert_streaming_client_to_applier(
server_service_.streaming_applier_service( &client_state_);
client_state_.client_service()));
client_state_.server_state().start_streaming_applier(
client_state_.server_state().id(), id(), sa);
sa->adopt_transaction(*this);
streaming_context_.cleanup(); streaming_context_.cleanup();
streaming_context_.rolled_back(id_); streaming_context_.rolled_back(id_);
client_service_.debug_sync("wsrep_before_SR_rollback"); client_service_.debug_sync("wsrep_before_SR_rollback");
@ -1266,6 +1289,7 @@ void wsrep::transaction::streaming_rollback()
wsrep::log_warning() << "Failed to replicate rollback fragment for " wsrep::log_warning() << "Failed to replicate rollback fragment for "
<< id_ << ": " << ret; << id_ << ": " << ret;
} }
}
debug_log_state("streaming_rollback leave"); debug_log_state("streaming_rollback leave");
} }

View File

@ -306,7 +306,7 @@ namespace
} }
wsrep_cb_status_t view_cb(void* app_ctx, wsrep_cb_status_t view_cb(void* app_ctx,
void* recv_ctx __attribute__((unused)), void* recv_ctx,
const wsrep_view_info_t* view_info, const wsrep_view_info_t* view_info,
const char*, const char*,
size_t) size_t)
@ -315,10 +315,12 @@ namespace
assert(view_info); assert(view_info);
wsrep::server_state& server_state( wsrep::server_state& server_state(
*reinterpret_cast<wsrep::server_state*>(app_ctx)); *reinterpret_cast<wsrep::server_state*>(app_ctx));
wsrep::high_priority_service* high_priority_service(
reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
try try
{ {
wsrep::view view(view_from_native(*view_info)); wsrep::view view(view_from_native(*view_info));
server_state.on_view(view); server_state.on_view(view, high_priority_service);
return WSREP_CB_SUCCESS; return WSREP_CB_SUCCESS;
} }
catch (const wsrep::runtime_error& e) catch (const wsrep::runtime_error& e)

View File

@ -192,7 +192,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_sst_first_boostrap,
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
ss.sync_point_enabled_ = "on_view_wait_initialized"; ss.sync_point_enabled_ = "on_view_wait_initialized";
ss.sync_point_action_ = ss.spa_initialize; ss.sync_point_action_ = ss.spa_initialize;
ss.on_view(bootstrap_view); ss.on_view(bootstrap_view, &hps);
ss.sync_point_enabled_ = ""; ss.sync_point_enabled_ = "";
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined);
ss.on_sync(); ss.on_sync();
@ -218,7 +218,7 @@ BOOST_FIXTURE_TEST_CASE(server_state_init_first_boostrap,
BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0); BOOST_REQUIRE(ss.connect("cluster", "local", "0", false) == 0);
ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0))); ss.on_connect(wsrep::gtid(cluster_id, wsrep::seqno(0)));
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_connected);
ss.on_view(bootstrap_view); ss.on_view(bootstrap_view, &hps);
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_joined);
ss.on_sync(); ss.on_sync();
BOOST_REQUIRE(ss.state() == wsrep::server_state::s_synced); BOOST_REQUIRE(ss.state() == wsrep::server_state::s_synced);