mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-28 20:02:00 +03:00
Bootstrap server service, fixes to server state management
* Added bootstrap service call to do DBMS side bootstrap operations during the cluster bootstrap. * Added last_committed_gtid() to provider interface * Implemented wait_for_gtid() provider call * Pass initial position to the server state
This commit is contained in:
@ -48,6 +48,10 @@ void db::server_service::background_rollback(wsrep::client_state&)
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void db::server_service::bootstrap()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
void db::server_service::log_message(enum wsrep::log::level level,
|
void db::server_service::log_message(enum wsrep::log::level level,
|
||||||
const char* message)
|
const char* message)
|
||||||
{
|
{
|
||||||
|
@ -22,6 +22,7 @@ namespace db
|
|||||||
int start_sst(const std::string&, const wsrep::gtid&, bool) override;
|
int start_sst(const std::string&, const wsrep::gtid&, bool) override;
|
||||||
std::string sst_request() override;
|
std::string sst_request() override;
|
||||||
void background_rollback(wsrep::client_state&) override;
|
void background_rollback(wsrep::client_state&) override;
|
||||||
|
void bootstrap() override;
|
||||||
void log_message(enum wsrep::log::level, const char* message);
|
void log_message(enum wsrep::log::level, const char* message);
|
||||||
void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
|
void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&)
|
||||||
override;
|
override;
|
||||||
|
@ -31,6 +31,7 @@ namespace db
|
|||||||
server_id,
|
server_id,
|
||||||
address,
|
address,
|
||||||
working_dir,
|
working_dir,
|
||||||
|
wsrep::gtid::undefined(),
|
||||||
1,
|
1,
|
||||||
wsrep::server_state::rm_async)
|
wsrep::server_state::rm_async)
|
||||||
, mutex_()
|
, mutex_()
|
||||||
|
@ -16,8 +16,8 @@ namespace wsrep
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
gtid()
|
gtid()
|
||||||
: id_()
|
: id_(wsrep::id::undefined())
|
||||||
, seqno_()
|
, seqno_(wsrep::seqno::undefined())
|
||||||
{ }
|
{ }
|
||||||
gtid(const wsrep::id& id, wsrep::seqno seqno)
|
gtid(const wsrep::id& id, wsrep::seqno seqno)
|
||||||
: id_(id)
|
: id_(id)
|
||||||
@ -41,6 +41,7 @@ namespace wsrep
|
|||||||
};
|
};
|
||||||
|
|
||||||
std::ostream& operator<<(std::ostream&, const wsrep::gtid&);
|
std::ostream& operator<<(std::ostream&, const wsrep::gtid&);
|
||||||
|
std::istream& operator>>(std::istream&, wsrep::gtid&);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // WSREP_GTID_HPP
|
#endif // WSREP_GTID_HPP
|
||||||
|
@ -299,6 +299,11 @@ namespace wsrep
|
|||||||
* @return Provider status indicating the result of the call.
|
* @return Provider status indicating the result of the call.
|
||||||
*/
|
*/
|
||||||
virtual enum status causal_read(int timeout) const = 0;
|
virtual enum status causal_read(int timeout) const = 0;
|
||||||
|
virtual enum status wait_for_gtid(const wsrep::gtid&, int timeout) const = 0;
|
||||||
|
/**
|
||||||
|
* Return last committed GTID.
|
||||||
|
*/
|
||||||
|
virtual wsrep::gtid last_committed_gtid() const = 0;
|
||||||
virtual int sst_sent(const wsrep::gtid&, int) = 0;
|
virtual int sst_sent(const wsrep::gtid&, int) = 0;
|
||||||
virtual int sst_received(const wsrep::gtid&, int) = 0;
|
virtual int sst_received(const wsrep::gtid&, int) = 0;
|
||||||
|
|
||||||
|
@ -55,6 +55,18 @@ namespace wsrep
|
|||||||
*/
|
*/
|
||||||
virtual void background_rollback(wsrep::client_state&) = 0;
|
virtual void background_rollback(wsrep::client_state&) = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bootstrap a DBMS state for a new cluster.
|
||||||
|
*
|
||||||
|
* This method is called by the wsrep lib after the
|
||||||
|
* new cluster is bootstrapped and the server has reached
|
||||||
|
* initialized state. From this call the DBMS should initialize
|
||||||
|
* environment for the new cluster.
|
||||||
|
*
|
||||||
|
* @param gtid Gtid of the bootstrap position.
|
||||||
|
*/
|
||||||
|
virtual void bootstrap() = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log message
|
* Log message
|
||||||
*
|
*
|
||||||
|
@ -193,6 +193,11 @@ namespace wsrep
|
|||||||
*/
|
*/
|
||||||
const std::string& working_dir() const { return working_dir_; }
|
const std::string& working_dir() const { return working_dir_; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return initial position for server.
|
||||||
|
*/
|
||||||
|
const wsrep::gtid& initial_position() const
|
||||||
|
{ return initial_position_; }
|
||||||
/**
|
/**
|
||||||
* Return maximum protocol version.
|
* Return maximum protocol version.
|
||||||
*/
|
*/
|
||||||
@ -316,7 +321,8 @@ namespace wsrep
|
|||||||
*
|
*
|
||||||
* @return Zero on success, non-zero on failure.
|
* @return Zero on success, non-zero on failure.
|
||||||
*/
|
*/
|
||||||
int wait_for_gtid(const wsrep::gtid&) const;
|
enum wsrep::provider::status
|
||||||
|
wait_for_gtid(const wsrep::gtid&, int timeout) const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a causal read in the cluster. After the call returns,
|
* Perform a causal read in the cluster. After the call returns,
|
||||||
@ -484,6 +490,7 @@ namespace wsrep
|
|||||||
const std::string& id,
|
const std::string& id,
|
||||||
const std::string& address,
|
const std::string& address,
|
||||||
const std::string& working_dir,
|
const std::string& working_dir,
|
||||||
|
const wsrep::gtid& initial_position,
|
||||||
int max_protocol_version,
|
int max_protocol_version,
|
||||||
enum rollback_mode rollback_mode)
|
enum rollback_mode rollback_mode)
|
||||||
: mutex_(mutex)
|
: mutex_(mutex)
|
||||||
@ -492,6 +499,8 @@ namespace wsrep
|
|||||||
, state_(s_disconnected)
|
, state_(s_disconnected)
|
||||||
, state_hist_()
|
, state_hist_()
|
||||||
, state_waiters_(n_states_)
|
, state_waiters_(n_states_)
|
||||||
|
, bootstrap_()
|
||||||
|
, initial_position_(initial_position)
|
||||||
, init_initialized_()
|
, init_initialized_()
|
||||||
, init_synced_()
|
, init_synced_()
|
||||||
, sst_gtid_()
|
, sst_gtid_()
|
||||||
@ -529,6 +538,8 @@ namespace wsrep
|
|||||||
enum state state_;
|
enum state state_;
|
||||||
std::vector<enum state> state_hist_;
|
std::vector<enum state> state_hist_;
|
||||||
mutable std::vector<int> state_waiters_;
|
mutable std::vector<int> state_waiters_;
|
||||||
|
bool bootstrap_;
|
||||||
|
const wsrep::gtid initial_position_;
|
||||||
bool init_initialized_;
|
bool init_initialized_;
|
||||||
bool init_synced_;
|
bool init_synced_;
|
||||||
wsrep::gtid sst_gtid_;
|
wsrep::gtid sst_gtid_;
|
||||||
|
@ -86,7 +86,6 @@ namespace wsrep
|
|||||||
|
|
||||||
int protocol_version() const
|
int protocol_version() const
|
||||||
{ return protocol_version_; }
|
{ return protocol_version_; }
|
||||||
|
|
||||||
const std::vector<member>& members() const { return members_; }
|
const std::vector<member>& members() const { return members_; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
11
src/gtid.cpp
11
src/gtid.cpp
@ -10,3 +10,14 @@ std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::gtid& gtid)
|
|||||||
{
|
{
|
||||||
return (os << gtid.id() << ":" << gtid.seqno());
|
return (os << gtid.id() << ":" << gtid.seqno());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::istream& wsrep::operator>>(std::istream& is, wsrep::gtid& gtid)
|
||||||
|
{
|
||||||
|
std::string id_str;
|
||||||
|
std::getline(is, id_str, ':');
|
||||||
|
long long seq;
|
||||||
|
is >> seq;
|
||||||
|
gtid = wsrep::gtid(wsrep::id(id_str), wsrep::seqno(seq));
|
||||||
|
std::cout << "GTID: " << gtid << "\n";
|
||||||
|
return is;
|
||||||
|
}
|
||||||
|
@ -43,6 +43,18 @@ namespace
|
|||||||
return oss.str();
|
return oss.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// This method is used to deal with historical burden of several
|
||||||
|
// ways to bootstrap the cluster. Bootstrap happens if
|
||||||
|
//
|
||||||
|
// * bootstrap option is given
|
||||||
|
// * cluster_address is "gcomm://" (Galera provider)
|
||||||
|
//
|
||||||
|
bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
|
||||||
|
{
|
||||||
|
return (bootstrap || cluster_address == "gcomm://");
|
||||||
|
}
|
||||||
|
|
||||||
int apply_write_set(wsrep::server_state& server_state,
|
int apply_write_set(wsrep::server_state& server_state,
|
||||||
wsrep::client_state& client_state,
|
wsrep::client_state& client_state,
|
||||||
const wsrep::ws_handle& ws_handle,
|
const wsrep::ws_handle& ws_handle,
|
||||||
@ -242,7 +254,10 @@ namespace
|
|||||||
int wsrep::server_state::load_provider(const std::string& provider_spec,
|
int wsrep::server_state::load_provider(const std::string& provider_spec,
|
||||||
const std::string& provider_options)
|
const std::string& provider_options)
|
||||||
{
|
{
|
||||||
wsrep::log_info() << "Loading provider " << provider_spec;
|
wsrep::log_info() << "Loading provider "
|
||||||
|
<< provider_spec
|
||||||
|
<< "initial position: "
|
||||||
|
<< initial_position_;
|
||||||
provider_ = wsrep::provider::make_provider(
|
provider_ = wsrep::provider::make_provider(
|
||||||
*this, provider_spec, provider_options);
|
*this, provider_spec, provider_options);
|
||||||
return (provider_ ? 0 : 1);
|
return (provider_ ? 0 : 1);
|
||||||
@ -259,8 +274,10 @@ int wsrep::server_state::connect(const std::string& cluster_name,
|
|||||||
const std::string& state_donor,
|
const std::string& state_donor,
|
||||||
bool bootstrap)
|
bool bootstrap)
|
||||||
{
|
{
|
||||||
|
bootstrap_ = is_bootstrap(cluster_address, bootstrap);
|
||||||
|
wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_;
|
||||||
return provider().connect(cluster_name, cluster_address, state_donor,
|
return provider().connect(cluster_name, cluster_address, state_donor,
|
||||||
bootstrap);
|
bootstrap_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int wsrep::server_state::disconnect()
|
int wsrep::server_state::disconnect()
|
||||||
@ -468,18 +485,11 @@ wsrep::gtid wsrep::server_state::last_committed_gtid() const
|
|||||||
return last_committed_gtid_;
|
return last_committed_gtid_;
|
||||||
}
|
}
|
||||||
|
|
||||||
int wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid) const
|
enum wsrep::provider::status
|
||||||
|
wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
|
||||||
|
const
|
||||||
{
|
{
|
||||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
return provider_->wait_for_gtid(gtid, timeout);
|
||||||
if (gtid.id() != last_committed_gtid_.id())
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
while (last_committed_gtid_.seqno() < gtid.seqno())
|
|
||||||
{
|
|
||||||
cond_.wait(lock);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enum wsrep::provider::status
|
enum wsrep::provider::status
|
||||||
@ -522,8 +532,20 @@ void wsrep::server_state::on_view(const wsrep::view& view)
|
|||||||
{
|
{
|
||||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||||
assert(view.final() == false);
|
assert(view.final() == false);
|
||||||
// Cluster was bootstrapped
|
|
||||||
if (state_ == s_connected && view.members().size() == 1)
|
//
|
||||||
|
// Reached primary from connected state. This may mean the following
|
||||||
|
//
|
||||||
|
// 1) Server was joined to the cluster and got SST transfer
|
||||||
|
// 2) Server was partitioned from the cluster and got back
|
||||||
|
// 3) A new cluster was bootstrapped from non-prim cluster
|
||||||
|
//
|
||||||
|
// There is no enough information here what was the cause
|
||||||
|
// of the primary component, so we need to walk through
|
||||||
|
// all states leading to joined to notify possible state
|
||||||
|
// waiters in other threads.
|
||||||
|
//
|
||||||
|
if (state_ == s_connected)
|
||||||
{
|
{
|
||||||
state(lock, s_joiner);
|
state(lock, s_joiner);
|
||||||
state(lock, s_initializing);
|
state(lock, s_initializing);
|
||||||
@ -531,12 +553,16 @@ void wsrep::server_state::on_view(const wsrep::view& view)
|
|||||||
|
|
||||||
if (init_initialized_ == false)
|
if (init_initialized_ == false)
|
||||||
{
|
{
|
||||||
// DBMS has not been initialized yet
|
|
||||||
wait_until_state(lock, s_initialized);
|
wait_until_state(lock, s_initialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(init_initialized_);
|
assert(init_initialized_);
|
||||||
|
|
||||||
|
if (bootstrap_)
|
||||||
|
{
|
||||||
|
server_service_.bootstrap();
|
||||||
|
bootstrap_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (state_ == s_initialized)
|
if (state_ == s_initialized)
|
||||||
{
|
{
|
||||||
state(lock, s_joined);
|
state(lock, s_joined);
|
||||||
|
@ -379,6 +379,8 @@ int wsrep::transaction::after_commit()
|
|||||||
}
|
}
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
state(lock, s_committed);
|
state(lock, s_committed);
|
||||||
|
|
||||||
|
// client_state_.server_state().last_committed_gtid(ws_meta.gitd());
|
||||||
debug_log_state("after_commit_leave");
|
debug_log_state("after_commit_leave");
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -550,7 +552,12 @@ int wsrep::transaction::after_statement()
|
|||||||
if (ordered())
|
if (ordered())
|
||||||
{
|
{
|
||||||
ret = provider().commit_order_enter(ws_handle_, ws_meta_);
|
ret = provider().commit_order_enter(ws_handle_, ws_meta_);
|
||||||
if (ret == 0) provider().commit_order_leave(ws_handle_, ws_meta_);
|
if (ret == 0)
|
||||||
|
{
|
||||||
|
// client_state_.server_state().last_committed_gtid(
|
||||||
|
// ws_meta.gtid());
|
||||||
|
provider().commit_order_leave(ws_handle_, ws_meta_);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
provider().release(ws_handle_);
|
provider().release(ws_handle_);
|
||||||
}
|
}
|
||||||
|
@ -465,6 +465,11 @@ wsrep::wsrep_provider_v26::wsrep_provider_v26(
|
|||||||
: provider(server_state)
|
: provider(server_state)
|
||||||
, wsrep_()
|
, wsrep_()
|
||||||
{
|
{
|
||||||
|
wsrep_gtid_t state_id;
|
||||||
|
std::memcpy(state_id.uuid.data,
|
||||||
|
server_state.initial_position().id().data(),
|
||||||
|
sizeof(state_id.uuid.data));
|
||||||
|
state_id.seqno = server_state.initial_position().seqno().get();
|
||||||
struct wsrep_init_args init_args;
|
struct wsrep_init_args init_args;
|
||||||
memset(&init_args, 0, sizeof(init_args));
|
memset(&init_args, 0, sizeof(init_args));
|
||||||
init_args.app_ctx = &server_state;
|
init_args.app_ctx = &server_state;
|
||||||
@ -474,7 +479,7 @@ wsrep::wsrep_provider_v26::wsrep_provider_v26(
|
|||||||
init_args.data_dir = server_state_.working_dir().c_str();
|
init_args.data_dir = server_state_.working_dir().c_str();
|
||||||
init_args.options = provider_options.c_str();
|
init_args.options = provider_options.c_str();
|
||||||
init_args.proto_ver = server_state.max_protocol_version();
|
init_args.proto_ver = server_state.max_protocol_version();
|
||||||
init_args.state_id = 0;
|
init_args.state_id = &state_id;
|
||||||
init_args.state = 0;
|
init_args.state = 0;
|
||||||
init_args.logger_cb = &logger_cb;
|
init_args.logger_cb = &logger_cb;
|
||||||
init_args.connected_cb = &connected_cb;
|
init_args.connected_cb = &connected_cb;
|
||||||
@ -704,6 +709,29 @@ wsrep::wsrep_provider_v26::causal_read(int timeout) const
|
|||||||
return map_return_value(wsrep_->sync_wait(wsrep_, 0, timeout, 0));
|
return map_return_value(wsrep_->sync_wait(wsrep_, 0, timeout, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum wsrep::provider::status
|
||||||
|
wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
|
||||||
|
const
|
||||||
|
{
|
||||||
|
wsrep_gtid_t wsrep_gtid;
|
||||||
|
std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
|
||||||
|
sizeof(wsrep_gtid.uuid.data));
|
||||||
|
wsrep_gtid.seqno = gtid.seqno().get();
|
||||||
|
return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const
|
||||||
|
{
|
||||||
|
wsrep_gtid_t wsrep_gtid;
|
||||||
|
if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK)
|
||||||
|
{
|
||||||
|
throw wsrep::runtime_error("Failed to read last committed id");
|
||||||
|
}
|
||||||
|
return wsrep::gtid(
|
||||||
|
wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)),
|
||||||
|
wsrep::seqno(wsrep_gtid.seqno));
|
||||||
|
}
|
||||||
|
|
||||||
int wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
|
int wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
|
||||||
{
|
{
|
||||||
wsrep_gtid_t wsrep_gtid;
|
wsrep_gtid_t wsrep_gtid;
|
||||||
|
@ -57,7 +57,8 @@ namespace wsrep
|
|||||||
int);
|
int);
|
||||||
enum wsrep::provider::status leave_toi(wsrep::client_id);
|
enum wsrep::provider::status leave_toi(wsrep::client_id);
|
||||||
enum wsrep::provider::status causal_read(int) const;
|
enum wsrep::provider::status causal_read(int) const;
|
||||||
|
enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&, int) const;
|
||||||
|
wsrep::gtid last_committed_gtid() const;
|
||||||
int sst_sent(const wsrep::gtid&,int);
|
int sst_sent(const wsrep::gtid&,int);
|
||||||
int sst_received(const wsrep::gtid& gtid, int);
|
int sst_received(const wsrep::gtid& gtid, int);
|
||||||
|
|
||||||
|
@ -212,6 +212,10 @@ namespace wsrep
|
|||||||
{
|
{
|
||||||
return wsrep::provider::success;
|
return wsrep::provider::success;
|
||||||
}
|
}
|
||||||
|
enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&,
|
||||||
|
int) const WSREP_OVERRIDE
|
||||||
|
{ return wsrep::provider::success; }
|
||||||
|
wsrep::gtid last_committed_gtid() const { return wsrep::gtid(); }
|
||||||
int sst_sent(const wsrep::gtid&, int) { return 0; }
|
int sst_sent(const wsrep::gtid&, int) { return 0; }
|
||||||
int sst_received(const wsrep::gtid&, int) { return 0; }
|
int sst_received(const wsrep::gtid&, int) { return 0; }
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ namespace wsrep
|
|||||||
const std::string& id,
|
const std::string& id,
|
||||||
enum wsrep::server_state::rollback_mode rollback_mode)
|
enum wsrep::server_state::rollback_mode rollback_mode)
|
||||||
: wsrep::server_state(mutex_, cond_, *this,
|
: wsrep::server_state(mutex_, cond_, *this,
|
||||||
name, id, "", "./", 1, rollback_mode)
|
name, id, "", "./", wsrep::gtid::undefined(), 1, rollback_mode)
|
||||||
, sst_before_init_()
|
, sst_before_init_()
|
||||||
, mutex_()
|
, mutex_()
|
||||||
, cond_()
|
, cond_()
|
||||||
@ -53,6 +53,7 @@ namespace wsrep
|
|||||||
{
|
{
|
||||||
delete client_state;
|
delete client_state;
|
||||||
}
|
}
|
||||||
|
void bootstrap() WSREP_OVERRIDE { }
|
||||||
void log_message(enum wsrep::log::level level, const char* message)
|
void log_message(enum wsrep::log::level level, const char* message)
|
||||||
{
|
{
|
||||||
wsrep::log(level, name().c_str()) << message;
|
wsrep::log(level, name().c_str()) << message;
|
||||||
|
Reference in New Issue
Block a user