mirror of
https://github.com/codership/wsrep-lib.git
synced 2025-07-30 07:23:07 +03:00
Add support for custom provider implementation
- Add a set_provider_factory() method to server_state to allow injecting provider factory which will be called when the provider is loaded. Other related changes: - Implement to_string() helper method for id class. - Fix id ostream operator human readable id printing. - Pass victim client_service as an argument to provider::bf_abort() to allow passing victim context to custom provider. - Implement prev() helper method for seqno class. - Make server_state recover_streaming_appliers_if_not_recovered() public. In some recovery scenarios the method must be called outside of server_state internal code paths. - Add storage_service requires_globals() method. The storage service implementation may override this to return false if changing to storage service context does not require store/reset globals. - Change view final() to also require that the view status is not primary for the view to be final. Also change the method name to is_final() to avoid confusion with C++ final identifier. - Fixes to server state handling in disconnecting and disconnected states. - Keep TOI meta data over whole TOI critical section. Co-authored-by: Denis Protivensky <denis.protivensky@galeracluster.com>
This commit is contained in:
@ -511,7 +511,7 @@ int wsrep::client_state::bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
|
||||
{
|
||||
assert(lock.owns_lock());
|
||||
assert(mode_ == m_local || transaction_.is_streaming());
|
||||
auto ret = transaction_.bf_abort(lock, bf_seqno);
|
||||
auto ret = transaction_.bf_abort(lock, bf_seqno, client_service_);
|
||||
assert(lock.owns_lock());
|
||||
return ret;
|
||||
}
|
||||
@ -527,7 +527,7 @@ int wsrep::client_state::total_order_bf_abort(
|
||||
{
|
||||
assert(lock.owns_lock());
|
||||
assert(mode_ == m_local || transaction_.is_streaming());
|
||||
auto ret = transaction_.total_order_bf_abort(lock, bf_seqno);
|
||||
auto ret = transaction_.total_order_bf_abort(lock, bf_seqno, client_service_);
|
||||
assert(lock.owns_lock());
|
||||
return ret;
|
||||
}
|
||||
@ -585,7 +585,7 @@ wsrep::client_state::poll_enter_toi(
|
||||
// Successfully entered TOI, but the provider reported failure.
|
||||
// This may happen for example if certification fails.
|
||||
// Leave TOI before proceeding.
|
||||
if (provider().leave_toi(id_, wsrep::mutable_buffer()))
|
||||
if (provider().leave_toi(id_, poll_meta, wsrep::mutable_buffer()))
|
||||
{
|
||||
wsrep::log_warning()
|
||||
<< "Failed to leave TOI after failure in "
|
||||
@ -689,10 +689,12 @@ int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err)
|
||||
{
|
||||
debug_log_state("leave_toi_local: enter");
|
||||
assert(toi_mode_ == m_local);
|
||||
leave_toi_common();
|
||||
|
||||
auto ret = (provider().leave_toi(id_, toi_meta_, err) == provider::success ? 0 : 1);
|
||||
leave_toi_common();
|
||||
debug_log_state("leave_toi_local: leave");
|
||||
return (provider().leave_toi(id_, err) == provider::success ? 0 : 1);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void wsrep::client_state::leave_toi_mode()
|
||||
@ -809,7 +811,7 @@ int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err)
|
||||
assert(mode_ == m_nbo);
|
||||
assert(in_toi());
|
||||
|
||||
enum wsrep::provider::status status(provider().leave_toi(id_, err));
|
||||
enum wsrep::provider::status status(provider().leave_toi(id_, toi_meta_, err));
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
int ret;
|
||||
switch (status)
|
||||
@ -910,7 +912,7 @@ int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err)
|
||||
assert(toi_mode_ == m_local);
|
||||
assert(in_toi());
|
||||
enum wsrep::provider::status status(
|
||||
provider().leave_toi(id_, err));
|
||||
provider().leave_toi(id_, toi_meta_, err));
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
int ret;
|
||||
switch (status)
|
||||
|
@ -151,6 +151,11 @@ int wsrep::config_service_v1_fetch(wsrep::provider& provider,
|
||||
wsrep::provider_options* options)
|
||||
{
|
||||
struct wsrep_st* wsrep = (struct wsrep_st*)provider.native();
|
||||
if (wsrep == nullptr)
|
||||
{
|
||||
// Not a provider which was loaded via wsrep-API
|
||||
return 0;
|
||||
}
|
||||
if (config_service_v1_probe(wsrep->dlh))
|
||||
{
|
||||
wsrep::log_warning() << "Provider does not support config service v1";
|
||||
|
14
src/id.cpp
14
src/id.cpp
@ -50,13 +50,23 @@ wsrep::id::id(const std::string& str)
|
||||
}
|
||||
}
|
||||
|
||||
std::string wsrep::id::to_string() const
|
||||
{
|
||||
std::ostringstream os;
|
||||
os << *this;
|
||||
return os.str();
|
||||
}
|
||||
|
||||
std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id)
|
||||
{
|
||||
const char* ptr(static_cast<const char*>(id.data()));
|
||||
size_t size(id.size());
|
||||
if (static_cast<size_t>(std::count_if(ptr, ptr + size, ::isalnum)) == size)
|
||||
if (static_cast<size_t>(
|
||||
std::count_if(ptr, ptr + size,
|
||||
[](char c) { return (::isalnum(c) || c == '\0'); }))
|
||||
== size)
|
||||
{
|
||||
return (os << std::string(ptr, size));
|
||||
return (os << std::string(ptr, ::strnlen(ptr, size)));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -26,7 +26,7 @@
|
||||
#include <cassert>
|
||||
#include <memory>
|
||||
|
||||
wsrep::provider* wsrep::provider::make_provider(
|
||||
std::unique_ptr<wsrep::provider> wsrep::provider::make_provider(
|
||||
wsrep::server_state& server_state,
|
||||
const std::string& provider_spec,
|
||||
const std::string& provider_options,
|
||||
@ -34,8 +34,8 @@ wsrep::provider* wsrep::provider::make_provider(
|
||||
{
|
||||
try
|
||||
{
|
||||
return new wsrep::wsrep_provider_v26(
|
||||
server_state, provider_options, provider_spec, services);
|
||||
return std::unique_ptr<wsrep::provider>(new wsrep::wsrep_provider_v26(
|
||||
server_state, provider_options, provider_spec, services));
|
||||
}
|
||||
catch (const wsrep::runtime_error& e)
|
||||
{
|
||||
@ -120,7 +120,6 @@ std::string wsrep::provider::capability::str(int caps)
|
||||
WSREP_PRINT_CAPABILITY(streaming, "STREAMING");
|
||||
WSREP_PRINT_CAPABILITY(snapshot, "READ_VIEW");
|
||||
WSREP_PRINT_CAPABILITY(nbo, "NBO");
|
||||
|
||||
#undef WSREP_PRINT_CAPABILITY
|
||||
|
||||
if (caps)
|
||||
|
@ -307,7 +307,8 @@ static int apply_write_set(wsrep::server_state& server_state,
|
||||
wsrep::log::debug_level_server_state,
|
||||
"Could not find applier context for "
|
||||
<< ws_meta.server_id()
|
||||
<< ": " << ws_meta.transaction_id());
|
||||
<< ": " << ws_meta.transaction_id()
|
||||
<< ", " << ws_meta.seqno());
|
||||
ret = high_priority_service.log_dummy_write_set(
|
||||
ws_handle, ws_meta, no_error);
|
||||
}
|
||||
@ -379,7 +380,8 @@ static int apply_write_set(wsrep::server_state& server_state,
|
||||
// it may be an indication of a bug too.
|
||||
wsrep::log_warning() << "Could not find applier context for "
|
||||
<< ws_meta.server_id()
|
||||
<< ": " << ws_meta.transaction_id();
|
||||
<< ": " << ws_meta.transaction_id()
|
||||
<< ", " << ws_meta.seqno();
|
||||
wsrep::mutable_buffer no_error;
|
||||
ret = high_priority_service.log_dummy_write_set(
|
||||
ws_handle, ws_meta, no_error);
|
||||
@ -420,7 +422,8 @@ static int apply_write_set(wsrep::server_state& server_state,
|
||||
wsrep::log_warning()
|
||||
<< "Could not find applier context for "
|
||||
<< ws_meta.server_id()
|
||||
<< ": " << ws_meta.transaction_id();
|
||||
<< ": " << ws_meta.transaction_id()
|
||||
<< ", " << ws_meta.seqno();
|
||||
wsrep::mutable_buffer no_error;
|
||||
ret = high_priority_service.log_dummy_write_set(
|
||||
ws_handle, ws_meta, no_error);
|
||||
@ -501,18 +504,21 @@ int wsrep::server_state::load_provider(
|
||||
{
|
||||
wsrep::log_info() << "Loading provider " << provider_spec
|
||||
<< " initial position: " << initial_position_;
|
||||
|
||||
provider_ = wsrep::provider::make_provider(*this,
|
||||
provider_spec,
|
||||
provider_options,
|
||||
services);
|
||||
provider_
|
||||
= provider_factory_(*this, provider_spec, provider_options, services);
|
||||
return (provider_ ? 0 : 1);
|
||||
}
|
||||
|
||||
void wsrep::server_state::set_provider_factory(
|
||||
const provider_factory_func& provider_factory)
|
||||
{
|
||||
assert(provider_factory);
|
||||
provider_factory_ = provider_factory;
|
||||
}
|
||||
|
||||
void wsrep::server_state::unload_provider()
|
||||
{
|
||||
delete provider_;
|
||||
provider_ = 0;
|
||||
provider_.reset();
|
||||
}
|
||||
|
||||
int wsrep::server_state::connect(const std::string& cluster_name,
|
||||
@ -545,11 +551,6 @@ int wsrep::server_state::disconnect()
|
||||
return provider().disconnect();
|
||||
}
|
||||
|
||||
wsrep::server_state::~server_state()
|
||||
{
|
||||
delete provider_;
|
||||
}
|
||||
|
||||
std::vector<wsrep::provider::status_variable>
|
||||
wsrep::server_state::status() const
|
||||
{
|
||||
@ -914,7 +915,7 @@ void wsrep::server_state::on_primary_view(
|
||||
wsrep::high_priority_service* high_priority_service)
|
||||
{
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
assert(view.final() == false);
|
||||
assert(view.is_final() == false);
|
||||
//
|
||||
// Reached primary from connected state. This may mean the following
|
||||
//
|
||||
@ -995,7 +996,7 @@ void wsrep::server_state::on_non_primary_view(
|
||||
{
|
||||
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
|
||||
wsrep::log_info() << "Non-primary view";
|
||||
if (view.final())
|
||||
if (view.is_final())
|
||||
{
|
||||
go_final(lock, view, high_priority_service);
|
||||
}
|
||||
@ -1010,7 +1011,7 @@ void wsrep::server_state::go_final(wsrep::unique_lock<wsrep::mutex>& lock,
|
||||
wsrep::high_priority_service* hps)
|
||||
{
|
||||
(void)view; // avoid compiler warning "unused parameter 'view'"
|
||||
assert(view.final());
|
||||
assert(view.is_final());
|
||||
assert(hps);
|
||||
if (hps)
|
||||
{
|
||||
@ -1064,6 +1065,8 @@ void wsrep::server_state::on_sync()
|
||||
{
|
||||
switch (state_)
|
||||
{
|
||||
case s_disconnecting:
|
||||
break;
|
||||
case s_synced:
|
||||
break;
|
||||
case s_connected: // Seed node path: provider becomes
|
||||
@ -1090,7 +1093,7 @@ void wsrep::server_state::on_sync()
|
||||
// Calls to on_sync() in synced state are possible if
|
||||
// server desyncs itself from the group. Provider does not
|
||||
// inform about this through callbacks.
|
||||
if (state_ != s_synced)
|
||||
if (state_ != s_synced && state_ != s_disconnecting)
|
||||
{
|
||||
state(lock, s_synced);
|
||||
}
|
||||
@ -1395,8 +1398,9 @@ void wsrep::server_state::wait_until_state(
|
||||
// or disconnected and the state has been changed to disconnecting,
|
||||
// this usually means that some error was encountered
|
||||
if (state != s_disconnecting && state != s_disconnected
|
||||
&& state_ == s_disconnecting)
|
||||
&& (state_ == s_disconnecting || state_ == s_disconnected))
|
||||
{
|
||||
--state_waiters_[state];
|
||||
throw wsrep::runtime_error("State wait was interrupted");
|
||||
}
|
||||
}
|
||||
@ -1484,6 +1488,7 @@ void wsrep::server_state::close_orphaned_sr_transactions(
|
||||
{
|
||||
wsrep::client_id client_id(i->first);
|
||||
wsrep::transaction_id transaction_id(i->second->transaction().id());
|
||||
auto& client_state = *i->second;
|
||||
// It is safe to unlock the server state temporarily here.
|
||||
// The processing happens inside view handler which is
|
||||
// protected by the provider commit ordering critical
|
||||
@ -1494,7 +1499,7 @@ void wsrep::server_state::close_orphaned_sr_transactions(
|
||||
// remains unlocked, so it should not be accessed after
|
||||
// the bf abort call.
|
||||
lock.unlock();
|
||||
i->second->total_order_bf_abort(current_view_.view_seqno());
|
||||
client_state.total_order_bf_abort(current_view_.view_seqno());
|
||||
lock.lock();
|
||||
streaming_clients_map::const_iterator found_i;
|
||||
while ((found_i = streaming_clients_.find(client_id)) !=
|
||||
|
@ -63,8 +63,10 @@ namespace
|
||||
{
|
||||
throw wsrep::runtime_error("Null client_state provided");
|
||||
}
|
||||
client_service_.reset_globals();
|
||||
storage_service_->store_globals();
|
||||
if (storage_service_->requires_globals()) {
|
||||
client_service_.reset_globals();
|
||||
storage_service_->store_globals();
|
||||
}
|
||||
}
|
||||
|
||||
wsrep::storage_service& storage_service()
|
||||
@ -74,8 +76,11 @@ namespace
|
||||
|
||||
~scoped_storage_service()
|
||||
{
|
||||
bool restore_globals = storage_service_->requires_globals();
|
||||
deleter_(storage_service_);
|
||||
client_service_.store_globals();
|
||||
if (restore_globals) {
|
||||
client_service_.store_globals();
|
||||
}
|
||||
}
|
||||
private:
|
||||
scoped_storage_service(const scoped_storage_service&);
|
||||
@ -990,7 +995,8 @@ void wsrep::transaction::after_applying()
|
||||
|
||||
bool wsrep::transaction::bf_abort(
|
||||
wsrep::unique_lock<wsrep::mutex>& lock,
|
||||
wsrep::seqno bf_seqno)
|
||||
wsrep::seqno bf_seqno,
|
||||
wsrep::client_service& victim_ctx)
|
||||
{
|
||||
bool ret(false);
|
||||
const enum wsrep::transaction::state state_at_enter(state());
|
||||
@ -1021,7 +1027,7 @@ bool wsrep::transaction::bf_abort(
|
||||
wsrep::seqno victim_seqno;
|
||||
enum wsrep::provider::status
|
||||
status(client_state_.provider().bf_abort(
|
||||
bf_seqno, id_, victim_seqno));
|
||||
bf_seqno, id_, victim_ctx, victim_seqno));
|
||||
switch (status)
|
||||
{
|
||||
case wsrep::provider::success:
|
||||
@ -1108,14 +1114,15 @@ bool wsrep::transaction::bf_abort(
|
||||
|
||||
bool wsrep::transaction::total_order_bf_abort(
|
||||
wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
|
||||
wsrep::seqno bf_seqno)
|
||||
wsrep::seqno bf_seqno,
|
||||
wsrep::client_service& victim_ctx)
|
||||
{
|
||||
/* We must set this flag before entering bf_abort() in order
|
||||
* to streaming_rollback() work correctly. The flag will be
|
||||
* unset if BF abort was not allowed. Note that we rely in
|
||||
* bf_abort() not to release lock if the BF abort is not allowed. */
|
||||
bf_aborted_in_total_order_ = true;
|
||||
bool ret(bf_abort(lock, bf_seqno));
|
||||
bool ret(bf_abort(lock, bf_seqno, victim_ctx));
|
||||
if (not ret)
|
||||
{
|
||||
bf_aborted_in_total_order_ = false;
|
||||
|
@ -57,7 +57,7 @@ void wsrep::view::print(std::ostream& os) const
|
||||
<< " status: " << to_c_string(status()) << "\n"
|
||||
<< " protocol_version: " << protocol_version() << "\n"
|
||||
<< " capabilities: " << provider::capability::str(capabilities())<<"\n"
|
||||
<< " final: " << (final() ? "yes" : "no") << "\n"
|
||||
<< " final: " << (is_final() ? "yes" : "no") << "\n"
|
||||
<< " own_index: " << own_index() << "\n"
|
||||
<< " members(" << members().size() << "):\n";
|
||||
|
||||
|
@ -950,6 +950,7 @@ enum wsrep::provider::status
|
||||
wsrep::wsrep_provider_v26::bf_abort(
|
||||
wsrep::seqno bf_seqno,
|
||||
wsrep::transaction_id victim_id,
|
||||
wsrep::client_service& /* Ignored here */,
|
||||
wsrep::seqno& victim_seqno)
|
||||
{
|
||||
wsrep_seqno_t wsrep_victim_seqno;
|
||||
@ -1047,6 +1048,7 @@ wsrep::wsrep_provider_v26::enter_toi(
|
||||
|
||||
enum wsrep::provider::status
|
||||
wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id,
|
||||
const wsrep::ws_meta&,
|
||||
const wsrep::mutable_buffer& err)
|
||||
{
|
||||
const wsrep_buf_t err_buf = { err.data(), err.size() };
|
||||
|
@ -63,6 +63,7 @@ namespace wsrep
|
||||
enum wsrep::provider::status
|
||||
bf_abort(wsrep::seqno,
|
||||
wsrep::transaction_id,
|
||||
wsrep::client_service&,
|
||||
wsrep::seqno&) WSREP_OVERRIDE;
|
||||
enum wsrep::provider::status
|
||||
rollback(const wsrep::transaction_id) WSREP_OVERRIDE;
|
||||
@ -83,6 +84,7 @@ namespace wsrep
|
||||
int)
|
||||
WSREP_OVERRIDE;
|
||||
enum wsrep::provider::status leave_toi(wsrep::client_id,
|
||||
const wsrep::ws_meta& ws_meta,
|
||||
const wsrep::mutable_buffer&)
|
||||
WSREP_OVERRIDE;
|
||||
std::pair<wsrep::gtid, enum wsrep::provider::status>
|
||||
|
Reference in New Issue
Block a user