1
0
mirror of https://github.com/codership/wsrep-lib.git synced 2025-07-28 20:02:00 +03:00

Introduced adopt_transaction() for storage_service interface

A SR transaction must be adopted by a storage service instance
running in background rollbacker thread while it is aborting a
SR transaction.
This commit is contained in:
Teemu Ollakka
2018-07-13 15:36:27 +03:00
parent 22d7a31d81
commit 4cfb9b6413
10 changed files with 43 additions and 32 deletions

View File

@ -19,6 +19,12 @@ wsrep::storage_service* db::server_service::storage_service(
return new db::storage_service(); return new db::storage_service();
} }
wsrep::storage_service* db::server_service::storage_service(
wsrep::high_priority_service&)
{
return new db::storage_service();
}
void db::server_service::release_storage_service( void db::server_service::release_storage_service(
wsrep::storage_service* storage_service) wsrep::storage_service* storage_service)
{ {

View File

@ -16,6 +16,8 @@ namespace db
public: public:
server_service(db::server& server); server_service(db::server& server);
wsrep::storage_service* storage_service(wsrep::client_service&) override; wsrep::storage_service* storage_service(wsrep::client_service&) override;
wsrep::storage_service* storage_service(wsrep::high_priority_service&) override;
void release_storage_service(wsrep::storage_service*) override; void release_storage_service(wsrep::storage_service*) override;
wsrep::high_priority_service* streaming_applier_service(wsrep::client_service&) override; wsrep::high_priority_service* streaming_applier_service(wsrep::client_service&) override;
wsrep::high_priority_service* streaming_applier_service(wsrep::high_priority_service&) override; wsrep::high_priority_service* streaming_applier_service(wsrep::high_priority_service&) override;

View File

@ -14,6 +14,8 @@ namespace db
{ {
int start_transaction(const wsrep::ws_handle&) override int start_transaction(const wsrep::ws_handle&) override
{ throw wsrep::not_implemented_error(); } { throw wsrep::not_implemented_error(); }
void adopt_transaction(const wsrep::transaction&) override
{ throw wsrep::not_implemented_error(); }
int append_fragment(const wsrep::id&, int append_fragment(const wsrep::id&,
wsrep::transaction_id, wsrep::transaction_id,
int, int,

View File

@ -1,25 +0,0 @@
//
// Copyright (C) 2018 Codership Oy <info@codership.com>
//
#ifndef WSREP_APPLYING_SERVICE_HPP
#define WSREP_APPLYING_SERVICE_HPP
#include "transaction_termination_service.hpp"
namespace wsrep
{
class ws_handle;
class ws_meta;
class applying_service : public wsrep::transaction_termination_service
{
public:
virtual int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&) = 0;
virtual int apply_write_set(const wsrep::const_buffer&) = 0;
virtual int apply_toi(const wsrep::const_buffer&);
virtual int apply_nbo(const wsrep::const_buffer&);
};
}
#endif // WSREP_APPLYING_SERVICE_HPP

View File

@ -33,7 +33,8 @@ namespace wsrep
virtual ~server_service() { } virtual ~server_service() { }
virtual wsrep::storage_service* storage_service( virtual wsrep::storage_service* storage_service(
wsrep::client_service&) = 0; wsrep::client_service&) = 0;
virtual wsrep::storage_service* storage_service(
wsrep::high_priority_service&) = 0;
virtual void release_storage_service(wsrep::storage_service*) = 0; virtual void release_storage_service(wsrep::storage_service*) = 0;
/** /**

View File

@ -23,7 +23,7 @@ namespace wsrep
// Forward declarations // Forward declarations
class ws_handle; class ws_handle;
class ws_meta; class ws_meta;
class transaction;
/** /**
* Storage service abstract interface. * Storage service abstract interface.
@ -41,6 +41,7 @@ namespace wsrep
*/ */
virtual int start_transaction(const wsrep::ws_handle&) = 0; virtual int start_transaction(const wsrep::ws_handle&) = 0;
virtual void adopt_transaction(const wsrep::transaction&) = 0;
/** /**
* Append fragment into stable storage. * Append fragment into stable storage.
*/ */

View File

@ -173,6 +173,7 @@ void wsrep::transaction::adopt(const wsrep::transaction& transaction)
debug_log_state("adopt enter"); debug_log_state("adopt enter");
assert(transaction.is_streaming()); assert(transaction.is_streaming());
start_transaction(transaction.id()); start_transaction(transaction.id());
server_id_ = transaction.server_id_;
flags_ = transaction.flags(); flags_ = transaction.flags();
streaming_context_ = transaction.streaming_context(); streaming_context_ = transaction.streaming_context();
debug_log_state("adopt leave"); debug_log_state("adopt leave");
@ -863,10 +864,18 @@ void wsrep::transaction::state(
// BF aborter is allowed to change the state to must abort and // BF aborter is allowed to change the state to must abort and
// further to aborting and aborted if the background rollbacker // further to aborting and aborted if the background rollbacker
// is launched. // is launched.
assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || //
next_state == s_must_abort || // For high priority streaming applier threads the assertion must
next_state == s_aborting || // be relaxed to check only current thread id which indicates that
next_state == s_aborted); // the store_globals() has been called before processing of write set
// starts.
assert((client_state_.owning_thread_id_ == wsrep::this_thread::get_id() ||
next_state == s_must_abort ||
next_state == s_aborting ||
next_state == s_aborted)
||
(client_state_.mode() == wsrep::client_state::m_high_priority &&
wsrep::this_thread::get_id() == client_state_.current_thread_id_));
static const char allowed[n_states][n_states] = static const char allowed[n_states][n_states] =
{ /* ex pr ce co oc ct cf ma ab ad mr re */ { /* ex pr ce co oc ct cf ma ab ad mr re */
{ 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */

View File

@ -45,6 +45,12 @@ namespace wsrep
wsrep::client_id(++last_client_id_)); wsrep::client_id(++last_client_id_));
} }
wsrep::storage_service* storage_service(wsrep::high_priority_service&)
{
return new wsrep::mock_storage_service(*this,
wsrep::client_id(++last_client_id_));
}
void release_storage_service(wsrep::storage_service* storage_service) void release_storage_service(wsrep::storage_service* storage_service)
{ {
delete storage_service; delete storage_service;

View File

@ -28,11 +28,18 @@ wsrep::mock_storage_service::~mock_storage_service()
client_state_.cleanup(); client_state_.cleanup();
} }
int wsrep::mock_storage_service::start_transaction(const wsrep::ws_handle& ws_handle) int wsrep::mock_storage_service::start_transaction(
const wsrep::ws_handle& ws_handle)
{ {
return client_state_.start_transaction(ws_handle.transaction_id()); return client_state_.start_transaction(ws_handle.transaction_id());
} }
void wsrep::mock_storage_service::adopt_transaction(
const wsrep::transaction& transaction)
{
client_state_.adopt_transaction(transaction);
}
int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta) const wsrep::ws_meta& ws_meta)
{ {

View File

@ -20,6 +20,8 @@ class mock_server_state;
int start_transaction(const wsrep::ws_handle&) WSREP_OVERRIDE; int start_transaction(const wsrep::ws_handle&) WSREP_OVERRIDE;
void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE;
int append_fragment(const wsrep::id&, int append_fragment(const wsrep::id&,
wsrep::transaction_id, wsrep::transaction_id,
int, int,