1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Fixes to local streaming replication processing.

This commit is contained in:
Teemu Ollakka
2018-07-08 15:27:49 +03:00
parent 2913aecebd
commit 7c424d8337
12 changed files with 122 additions and 75 deletions

View File

@ -463,6 +463,20 @@ void wsrep::server_state::on_view(const wsrep::view& view)
if (view.status() == wsrep::view::primary)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (view.own_index() >= 0)
{
if (id_.is_undefined())
{
// No identifier was passed during server state initialization
// and the ID was generated by the provider.
id_ = view.members()[view.own_index()].id();
}
else
{
// Own identifier must not change between views.
// assert(id_ == view.members()[view.own_index()].id());
}
}
assert(view.final() == false);
//
@ -555,6 +569,7 @@ void wsrep::server_state::on_view(const wsrep::view& view)
assert(view.final());
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
state(lock, s_disconnected);
id_ = wsrep::id::undefined();
}
}

View File

@ -64,9 +64,8 @@ namespace
~scoped_storage_service()
{
storage_service_->reset_globals();
client_service_.store_globals();
deleter_(storage_service_);
client_service_.store_globals();
}
private:
scoped_storage_service(const scoped_storage_service&);
@ -666,8 +665,15 @@ int wsrep::transaction::after_statement()
void wsrep::transaction::after_applying()
{
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
assert(state_ == s_committed || state_ == s_aborted);
cleanup();
debug_log_state("after_applying enter");
assert(state_ == s_executing ||
state_ == s_committed ||
state_ == s_aborted);
if (state_ != s_executing)
{
cleanup();
}
debug_log_state("after_applying leave");
}
bool wsrep::transaction::bf_abort(
@ -820,60 +826,80 @@ int wsrep::transaction::certify_fragment(
return 1;
}
// The rest of this method will be executed in storage service
// scope.
scoped_storage_service<storage_service_deleter>
sr_scope(
client_service_,
server_service_.storage_service(client_service_),
storage_service_deleter(server_service_));
wsrep::storage_service& storage_service(
sr_scope.storage_service());
// First the fragment is appended to the stable storage.
// This is done to ensure that there is enough capacity
// available to store the fragment. The fragment meta data
// is updated after certification.
if (storage_service.start_transaction())
if (provider().append_data(ws_handle_,
wsrep::const_buffer(data.data(), data.size())))
{
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_append_fragment_error);
return 1;
}
if (storage_service.append_fragment(
client_state_.server_state().id(),
id(),
flags_,
wsrep::const_buffer(data.data(), data.size())))
{
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_append_fragment_error);
return 1;
}
enum wsrep::provider::status
cert_ret(provider().certify(client_state_.id().get(),
ws_handle_,
flags_,
ws_meta_));
int ret(0);
switch (cert_ret)
// Storage service scope
{
case wsrep::provider::success:
streaming_context_.certified(ws_meta_.seqno());
if (storage_service.commit(ws_handle_, ws_meta_))
scoped_storage_service<storage_service_deleter>
sr_scope(
client_service_,
server_service_.storage_service(client_service_),
storage_service_deleter(server_service_));
wsrep::storage_service& storage_service(
sr_scope.storage_service());
// First the fragment is appended to the stable storage.
// This is done to ensure that there is enough capacity
// available to store the fragment. The fragment meta data
// is updated after certification.
if (storage_service.start_transaction(ws_handle_))
{
ret = 1;
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_append_fragment_error);
return 1;
}
break;
default:
storage_service.rollback(ws_handle_, ws_meta_);
ret = 1;
break;
wsrep::id server_id(client_state_.server_state().id());
assert(server_id.is_undefined() == false);
if (storage_service.append_fragment(
server_id,
id(),
flags_,
wsrep::const_buffer(data.data(), data.size())))
{
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_append_fragment_error);
return 1;
}
wsrep::ws_meta sr_ws_meta;
enum wsrep::provider::status
cert_ret(provider().certify(client_state_.id().get(),
ws_handle_,
flags_,
sr_ws_meta));
switch (cert_ret)
{
case wsrep::provider::success:
assert(sr_ws_meta.seqno().is_undefined() == false);
streaming_context_.certified(sr_ws_meta.seqno(), data.size());
if (storage_service.update_fragment_meta(sr_ws_meta))
{
ret = 1;
break;
}
wsrep::log_info() << "Committing "
<< sr_ws_meta.transaction_id().get();
if (storage_service.commit(ws_handle_, sr_ws_meta))
{
ret = 1;
}
break;
default:
storage_service.rollback(ws_handle_, sr_ws_meta);
ret = 1;
break;
}
provider().release(ws_handle_);
}
lock.lock();
if (ret)