diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index abd8d5f..d5706fe 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -19,6 +19,12 @@ wsrep::storage_service* db::server_service::storage_service( return new db::storage_service(); } +wsrep::storage_service* db::server_service::storage_service( + wsrep::high_priority_service&) +{ + return new db::storage_service(); +} + void db::server_service::release_storage_service( wsrep::storage_service* storage_service) { diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index b080035..9834c42 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -16,6 +16,8 @@ namespace db public: server_service(db::server& server); wsrep::storage_service* storage_service(wsrep::client_service&) override; + wsrep::storage_service* storage_service(wsrep::high_priority_service&) override; + void release_storage_service(wsrep::storage_service*) override; wsrep::high_priority_service* streaming_applier_service(wsrep::client_service&) override; wsrep::high_priority_service* streaming_applier_service(wsrep::high_priority_service&) override; diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp index 48a23a6..b979af0 100644 --- a/dbsim/db_storage_service.hpp +++ b/dbsim/db_storage_service.hpp @@ -14,6 +14,8 @@ namespace db { int start_transaction(const wsrep::ws_handle&) override { throw wsrep::not_implemented_error(); } + void adopt_transaction(const wsrep::transaction&) override + { throw wsrep::not_implemented_error(); } int append_fragment(const wsrep::id&, wsrep::transaction_id, int, diff --git a/include/wsrep/applying_service.hpp b/include/wsrep/applying_service.hpp deleted file mode 100644 index 1141abd..0000000 --- a/include/wsrep/applying_service.hpp +++ /dev/null @@ -1,25 +0,0 @@ -// -// Copyright (C) 2018 Codership Oy -// - -#ifndef WSREP_APPLYING_SERVICE_HPP -#define WSREP_APPLYING_SERVICE_HPP - -#include "transaction_termination_service.hpp" - -namespace wsrep -{ - class ws_handle; - class ws_meta; - class applying_service : public wsrep::transaction_termination_service - { - public: - virtual int start_transaction(const wsrep::ws_handle&, - const wsrep::ws_meta&) = 0; - virtual int apply_write_set(const wsrep::const_buffer&) = 0; - virtual int apply_toi(const wsrep::const_buffer&); - virtual int apply_nbo(const wsrep::const_buffer&); - }; -} - -#endif // WSREP_APPLYING_SERVICE_HPP diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 3a4cd79..2209670 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -33,7 +33,8 @@ namespace wsrep virtual ~server_service() { } virtual wsrep::storage_service* storage_service( wsrep::client_service&) = 0; - + virtual wsrep::storage_service* storage_service( + wsrep::high_priority_service&) = 0; virtual void release_storage_service(wsrep::storage_service*) = 0; /** diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp index 6fa2737..f4e68a1 100644 --- a/include/wsrep/storage_service.hpp +++ b/include/wsrep/storage_service.hpp @@ -23,7 +23,7 @@ namespace wsrep // Forward declarations class ws_handle; class ws_meta; - + class transaction; /** * Storage service abstract interface. @@ -41,6 +41,7 @@ namespace wsrep */ virtual int start_transaction(const wsrep::ws_handle&) = 0; + virtual void adopt_transaction(const wsrep::transaction&) = 0; /** * Append fragment into stable storage. */ diff --git a/src/transaction.cpp b/src/transaction.cpp index 5555d74..8ec54c9 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -173,6 +173,7 @@ void wsrep::transaction::adopt(const wsrep::transaction& transaction) debug_log_state("adopt enter"); assert(transaction.is_streaming()); start_transaction(transaction.id()); + server_id_ = transaction.server_id_; flags_ = transaction.flags(); streaming_context_ = transaction.streaming_context(); debug_log_state("adopt leave"); @@ -863,10 +864,18 @@ void wsrep::transaction::state( // BF aborter is allowed to change the state to must abort and // further to aborting and aborted if the background rollbacker // is launched. - assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || - next_state == s_must_abort || - next_state == s_aborting || - next_state == s_aborted); + // + // For high priority streaming applier threads the assertion must + // be relaxed to check only current thread id which indicates that + // the store_globals() has been called before processing of write set + // starts. + assert((client_state_.owning_thread_id_ == wsrep::this_thread::get_id() || + next_state == s_must_abort || + next_state == s_aborting || + next_state == s_aborted) + || + (client_state_.mode() == wsrep::client_state::m_high_priority && + wsrep::this_thread::get_id() == client_state_.current_thread_id_)); static const char allowed[n_states][n_states] = { /* ex pr ce co oc ct cf ma ab ad mr re */ { 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */ diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 5840b10..d957412 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -45,6 +45,12 @@ namespace wsrep wsrep::client_id(++last_client_id_)); } + wsrep::storage_service* storage_service(wsrep::high_priority_service&) + { + return new wsrep::mock_storage_service(*this, + wsrep::client_id(++last_client_id_)); + } + void release_storage_service(wsrep::storage_service* storage_service) { delete storage_service; diff --git a/test/mock_storage_service.cpp b/test/mock_storage_service.cpp index 746b469..214615b 100644 --- a/test/mock_storage_service.cpp +++ b/test/mock_storage_service.cpp @@ -28,11 +28,18 @@ wsrep::mock_storage_service::~mock_storage_service() client_state_.cleanup(); } -int wsrep::mock_storage_service::start_transaction(const wsrep::ws_handle& ws_handle) +int wsrep::mock_storage_service::start_transaction( + const wsrep::ws_handle& ws_handle) { return client_state_.start_transaction(ws_handle.transaction_id()); } +void wsrep::mock_storage_service::adopt_transaction( + const wsrep::transaction& transaction) +{ + client_state_.adopt_transaction(transaction); +} + int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp index feba800..f00518f 100644 --- a/test/mock_storage_service.hpp +++ b/test/mock_storage_service.hpp @@ -20,6 +20,8 @@ class mock_server_state; int start_transaction(const wsrep::ws_handle&) WSREP_OVERRIDE; + void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; + int append_fragment(const wsrep::id&, wsrep::transaction_id, int,