diff --git a/dbsim/db_client.cpp b/dbsim/db_client.cpp index d0436b8..c0ab683 100644 --- a/dbsim/db_client.cpp +++ b/dbsim/db_client.cpp @@ -16,8 +16,8 @@ db::client::client(db::server& server, , params_(params) , server_(server) , server_state_(server.server_state()) - , client_state_(mutex_, cond_, this, server_state_, client_service_, client_id, mode) - , client_service_(client_state_) + , client_state_(mutex_, cond_, server_state_, client_service_, client_id, mode) + , client_service_(*this) , se_trx_(server.storage_engine()) , stats_() { } @@ -114,7 +114,7 @@ void db::client::run_one_transaction() { // wsrep::log_debug() << "Commit"; assert(err == 0); - if (client_state_.do_2pc()) + if (do_2pc()) { err = err || client_state_.before_prepare(); err = err || client_state_.after_prepare(); diff --git a/dbsim/db_client.hpp b/dbsim/db_client.hpp index 2057d9c..6046af4 100644 --- a/dbsim/db_client.hpp +++ b/dbsim/db_client.hpp @@ -44,6 +44,7 @@ namespace db { } void start(); wsrep::client_state& client_state() { return client_state_; } + bool do_2pc() const { return false; } private: friend class db::server_state; friend class db::client_service; diff --git a/dbsim/db_client_service.cpp b/dbsim/db_client_service.cpp index 965a372..b3289a8 100644 --- a/dbsim/db_client_service.cpp +++ b/dbsim/db_client_service.cpp @@ -6,24 +6,22 @@ #include "db_high_priority_service.hpp" #include "db_client.hpp" +db::client_service::client_service(db::client& client) + : wsrep::client_service() + , client_(client) + , client_state_(client_.client_state()) +{ } -int db::client_service::commit(const wsrep::ws_handle&, - const wsrep::ws_meta&) +bool db::client_service::do_2pc() const { - db::client* client(client_state_.client()); - int ret(client_state_.before_commit()); - if (ret == 0) client->se_trx_.commit(); - ret = ret || client_state_.ordered_commit(); - ret = ret || client_state_.after_commit(); - return ret; + return client_.do_2pc(); } int db::client_service::bf_rollback() { - db::client* client(client_state_.client()); int ret(client_state_.before_rollback()); assert(ret == 0); - client->se_trx_.rollback(); + client_.se_trx_.rollback(); ret = client_state_.after_rollback(); assert(ret == 0); return ret; @@ -34,13 +32,13 @@ db::client_service::replay() { wsrep::high_priority_context high_priority_context(client_state_); db::high_priority_service high_priority_service( - client_state_.client()->server_, client_state_.client()); + client_.server_, client_); auto ret(client_state_.provider().replay( client_state_.transaction().ws_handle(), &high_priority_service)); if (ret == wsrep::provider::success) { - ++client_state_.client()->stats_.replays; + ++client_.stats_.replays; } return ret; } diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index d0b3cfa..59e6f2d 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -8,37 +8,26 @@ #include "wsrep/client_service.hpp" #include "wsrep/transaction.hpp" -#include "db_client_state.hpp" - namespace db { + class client; + class client_state; + class client_service : public wsrep::client_service { public: - client_service(db::client_state& client_state) - : wsrep::client_service() - , client_state_(client_state) - { } + client_service(db::client& client); - bool do_2pc() const override - { - return client_state_.do_2pc(); - } + bool do_2pc() const override; bool interrupted() const override { return false; } - void reset_globals() override - { - client_state_.reset_globals(); - } + void reset_globals() override { } - void store_globals() override - { - client_state_.store_globals(); - } + void store_globals() override { } int prepare_data_for_replication() override { @@ -56,6 +45,7 @@ namespace db { return true; } + int prepare_fragment_for_replication(wsrep::mutable_buffer&) override { return 0; @@ -64,12 +54,6 @@ namespace db void remove_fragments() override { } - // int apply_write_set(const wsrep::const_buffer&) override; - - // int apply_toi(const wsrep::const_buffer&) override; - - int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override; - int bf_rollback() override; void will_replay() override @@ -79,17 +63,12 @@ namespace db enum wsrep::provider::status replay() override; - int append_fragment(const wsrep::transaction&, int, - const wsrep::const_buffer&) override - { - return 0; - } - void emergency_shutdown() { ::abort(); } void debug_sync(const char*) override { } void debug_crash(const char*) override { } private: - db::client_state& client_state_; + db::client& client_; + wsrep::client_state& client_state_; }; } diff --git a/dbsim/db_client_state.hpp b/dbsim/db_client_state.hpp index cb4a180..8d9a91f 100644 --- a/dbsim/db_client_state.hpp +++ b/dbsim/db_client_state.hpp @@ -16,7 +16,6 @@ namespace db public: client_state(wsrep::mutex& mutex, wsrep::condition_variable& cond, - db::client* client, db::server_state& server_state, wsrep::client_service& client_service, const wsrep::client_id& client_id, @@ -27,22 +26,11 @@ namespace db client_service, client_id, mode) - , client_(client) - , is_autocommit_(false) - , do_2pc_(false) { } - db::client* client() { return client_; } - void reset_globals() { } - void store_globals() { wsrep::client_state::store_globals(); } - bool is_autocommit() const { return is_autocommit_; } - bool do_2pc() const { return do_2pc_; } private: client_state(const client_state&); client_state& operator=(const client_state&); - db::client* client_; - bool is_autocommit_; - bool do_2pc_; }; } diff --git a/dbsim/db_high_priority_service.cpp b/dbsim/db_high_priority_service.cpp index 5861526..7ed59dc 100644 --- a/dbsim/db_high_priority_service.cpp +++ b/dbsim/db_high_priority_service.cpp @@ -7,7 +7,7 @@ #include "db_client.hpp" db::high_priority_service::high_priority_service( - db::server& server, db::client* client) + db::server& server, db::client& client) : wsrep::high_priority_service(server_.server_state()) , server_(server) , client_(client) @@ -17,7 +17,7 @@ int db::high_priority_service::start_transaction( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { - return client_->client_state().start_transaction(ws_handle, ws_meta); + return client_.client_state().start_transaction(ws_handle, ws_meta); } void db::high_priority_service::adopt_transaction(const wsrep::transaction&) @@ -27,8 +27,8 @@ void db::high_priority_service::adopt_transaction(const wsrep::transaction&) int db::high_priority_service::apply_write_set(const wsrep::const_buffer&) { - client_->se_trx_.start(client_); - client_->se_trx_.apply(client_->client_state().transaction()); + client_.se_trx_.start(&client_); + client_.se_trx_.apply(client_.client_state().transaction()); return 0; } @@ -41,29 +41,29 @@ int db::high_priority_service::apply_toi( int db::high_priority_service::commit() { - int ret(client_->client_state_.before_commit()); - if (ret == 0) client_->se_trx_.commit(); - ret = ret || client_->client_state_.ordered_commit(); - ret = ret || client_->client_state_.after_commit(); + int ret(client_.client_state_.before_commit()); + if (ret == 0) client_.se_trx_.commit(); + ret = ret || client_.client_state_.ordered_commit(); + ret = ret || client_.client_state_.after_commit(); return ret; } int db::high_priority_service::rollback() { - int ret(client_->client_state_.before_rollback()); + int ret(client_.client_state_.before_rollback()); assert(ret == 0); - client_->se_trx_.rollback(); - ret = client_->client_state_.after_rollback(); + client_.se_trx_.rollback(); + ret = client_.client_state_.after_rollback(); assert(ret == 0); return ret; } void db::high_priority_service::after_apply() { - client_->client_state_.after_statement(); + client_.client_state_.after_statement(); } bool db::high_priority_service::is_replaying() const { - return (client_->client_state_.transaction().state() == wsrep::transaction::s_replaying); + return (client_.client_state_.transaction().state() == wsrep::transaction::s_replaying); } diff --git a/dbsim/db_high_priority_service.hpp b/dbsim/db_high_priority_service.hpp index d18653a..02eefd3 100644 --- a/dbsim/db_high_priority_service.hpp +++ b/dbsim/db_high_priority_service.hpp @@ -14,11 +14,14 @@ namespace db class high_priority_service : public wsrep::high_priority_service { public: - high_priority_service(db::server& server, db::client* client); + high_priority_service(db::server& server, db::client& client); int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&) override; void adopt_transaction(const wsrep::transaction&) override; int apply_write_set(const wsrep::const_buffer&) override; + int append_fragment(const wsrep::ws_meta&, const wsrep::const_buffer&) + override + { return 0; } int commit() override; int rollback() override; int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&) override; @@ -33,7 +36,7 @@ namespace db high_priority_service(const high_priority_service&); high_priority_service& operator=(const high_priority_service&); db::server& server_; - db::client* client_; + db::client& client_; }; } diff --git a/dbsim/db_server.cpp b/dbsim/db_server.cpp index 15c677c..1bc212f 100644 --- a/dbsim/db_server.cpp +++ b/dbsim/db_server.cpp @@ -36,7 +36,7 @@ void db::server::applier_thread() simulator_.params()); wsrep::client_state* cc(static_cast( &applier.client_state())); - db::high_priority_service hps(*this, &applier); + db::high_priority_service hps(*this, applier); cc->open(cc->id()); cc->before_command(); enum wsrep::provider::status ret( @@ -110,21 +110,6 @@ void db::server::donate_sst(const std::string& req, simulator_.sst(*this, req, gtid, bypass); } -wsrep::client_state* db::server::local_client_state() -{ - std::ostringstream id_os; - size_t client_id(++last_client_id_); - db::client* client(new db::client(*this, client_id, - wsrep::client_state::m_local, - simulator_.params())); - return &client->client_state(); -} -void db::server::release_client_state(wsrep::client_state* client_state) -{ - db::client_state* db_client_state( - dynamic_cast(client_state)); - delete db_client_state->client(); -} wsrep::high_priority_service* db::server::streaming_applier_service() { diff --git a/dbsim/db_server_service.cpp b/dbsim/db_server_service.cpp index 605fcab..bf53d65 100644 --- a/dbsim/db_server_service.cpp +++ b/dbsim/db_server_service.cpp @@ -4,6 +4,7 @@ #include "db_server_service.hpp" #include "db_server.hpp" +#include "db_storage_service.hpp" #include "wsrep/logger.hpp" #include "wsrep/high_priority_service.hpp" @@ -12,15 +13,16 @@ db::server_service::server_service(db::server& server) : server_(server) { } -wsrep::client_state* db::server_service::local_client_state() +wsrep::storage_service* db::server_service::storage_service( + wsrep::client_service&) { - return server_.local_client_state(); + return new db::storage_service(); } -void db::server_service::release_client_state( - wsrep::client_state* client_state) +void db::server_service::release_storage_service( + wsrep::storage_service* storage_service) { - server_.release_client_state(client_state); + delete storage_service; } wsrep::high_priority_service* db::server_service::streaming_applier_service() @@ -41,7 +43,12 @@ bool db::server_service::sst_before_init() const std::string db::server_service::sst_request() { - return server_.server_state().id(); + std::ostringstream os; + os << server_.server_state().id(); + wsrep::log_info() << "SST request: " + << server_.server_state().id(); + + return os.str(); } int db::server_service::start_sst( diff --git a/dbsim/db_server_service.hpp b/dbsim/db_server_service.hpp index c34fe4b..77590cc 100644 --- a/dbsim/db_server_service.hpp +++ b/dbsim/db_server_service.hpp @@ -15,8 +15,8 @@ namespace db { public: server_service(db::server& server); - wsrep::client_state* local_client_state() override; - void release_client_state(wsrep::client_state*) override; + wsrep::storage_service* storage_service(wsrep::client_service&) override; + void release_storage_service(wsrep::storage_service*) override; wsrep::high_priority_service* streaming_applier_service() override; void release_high_priority_service(wsrep::high_priority_service*) override; diff --git a/dbsim/db_simulator.cpp b/dbsim/db_simulator.cpp index 4b67095..d9b5089 100644 --- a/dbsim/db_simulator.cpp +++ b/dbsim/db_simulator.cpp @@ -23,11 +23,12 @@ void db::simulator::sst(db::server& server, const wsrep::gtid& gtid, bool bypass) { - size_t id; + wsrep::id id; std::istringstream is(request); is >> id; wsrep::unique_lock lock(mutex_); auto i(servers_.find(id)); + wsrep::log_info() << "SST request"; if (i == servers_.end()) { throw wsrep::runtime_error("Server " + request + " not found"); @@ -86,9 +87,10 @@ void db::simulator::start() id_os << (i + 1); std::ostringstream address_os; address_os << "127.0.0.1:" << server_port(i); + wsrep::id server_id(id_os.str()); auto it(servers_.insert( std::make_pair( - (i + 1), + server_id, std::make_unique( *this, name_os.str(), diff --git a/dbsim/db_simulator.hpp b/dbsim/db_simulator.hpp index d386354..b8f9ba6 100644 --- a/dbsim/db_simulator.hpp +++ b/dbsim/db_simulator.hpp @@ -46,7 +46,7 @@ namespace db wsrep::default_mutex mutex_; const db::params& params_; - std::map> servers_; + std::map> servers_; std::chrono::time_point clients_start_; std::chrono::time_point clients_stop_; public: diff --git a/dbsim/db_storage_service.hpp b/dbsim/db_storage_service.hpp new file mode 100644 index 0000000..8974f9e --- /dev/null +++ b/dbsim/db_storage_service.hpp @@ -0,0 +1,33 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_DB_STORAGE_SERVICE_HPP +#define WSREP_DB_STORAGE_SERVICE_HPP + +#include "wsrep/storage_service.hpp" +#include "wsrep/exception.hpp" + +namespace db +{ + class storage_service : public wsrep::storage_service + { + int start_transaction() override + { throw wsrep::not_implemented_error(); } + int append_fragment(const wsrep::id&, wsrep::client_id, + int, + const wsrep::const_buffer&) override + { throw wsrep::not_implemented_error(); } + int update_fragment_meta(const wsrep::ws_meta&) override + { throw wsrep::not_implemented_error(); } + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) override + { throw wsrep::not_implemented_error(); } + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) + override + { throw wsrep::not_implemented_error(); } + void store_globals() override { } + void reset_globals() override { } + }; +} + +#endif // WSREP_DB_STORAGE_SERVICE_HPP diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 8a3878b..a463977 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -69,9 +69,13 @@ namespace wsrep virtual bool statement_allowed_for_streaming() const = 0; virtual size_t bytes_generated() const = 0; virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; + /** + * Remove fragments from the storage within current transaction. + * Fragment removal will be committed once the current transaction + * commits. + */ virtual void remove_fragments() = 0; - virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; // // Rollback // @@ -128,12 +132,6 @@ namespace wsrep */ virtual void wait_for_replayers(wsrep::unique_lock&) = 0; - // Streaming replication - /** - * Append a write set fragment into fragment storage. - */ - virtual int append_fragment(const wsrep::transaction&, int flag, const wsrep::const_buffer&) = 0; - // // Debug interface // diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 0792770..916ce19 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -306,6 +306,23 @@ namespace wsrep enum wsrep::streaming_context::fragment_unit fragment_unit, size_t fragment_size); + + /** + * Prepare write set meta data for fragment storage ordering. + * This method should be called from storage service commit + * or rollback before performing the operation. + */ + int prepare_for_fragment_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) + { + assert(mode_ == m_high_priority); + assert(state_ == s_exec); + return transaction_.prepare_for_fragment_ordering( + ws_handle, ws_meta, is_commit); + } + /** @} */ + /** @name Applying interface */ /** @{ */ int start_transaction(const wsrep::ws_handle& wsh, @@ -734,26 +751,6 @@ namespace wsrep return to_c_string(mode); } - class client_state_switch - { - public: - client_state_switch(wsrep::client_state& orig_context, - wsrep::client_state& current_context) - : orig_context_(orig_context) - , current_context_(current_context) - { - current_context_.client_service_.store_globals(); - } - ~client_state_switch() - { - orig_context_.client_service_.store_globals(); - } - private: - client_state& orig_context_; - client_state& current_context_; - }; - - /** * Utility class to switch the client state to high priority * mode. The client is switched back to the original mode @@ -804,44 +801,6 @@ namespace wsrep enum wsrep::client_state::mode orig_mode_; }; - class client_deleter - { - public: - client_deleter(wsrep::server_service& server_service) - : server_service_(server_service) - { } - void operator()(wsrep::client_state* client_state) - { - server_service_.release_client_state(client_state); - } - private: - wsrep::server_service& server_service_; - }; - - template - class scoped_client_state - { - public: - scoped_client_state(wsrep::client_state* client_state, D deleter) - : client_state_(client_state) - , deleter_(deleter) - { - if (client_state_ == 0) - { - throw wsrep::runtime_error("Null client_state provided"); - } - } - wsrep::client_state& client_state() { return *client_state_; } - ~scoped_client_state() - { - deleter_(client_state_); - } - private: - scoped_client_state(const scoped_client_state&); - scoped_client_state& operator=(const scoped_client_state&); - wsrep::client_state* client_state_; - D deleter_; - }; } #endif // WSREP_CLIENT_STATE_HPP diff --git a/include/wsrep/high_priority_service.hpp b/include/wsrep/high_priority_service.hpp index 3af7dba..e702544 100644 --- a/include/wsrep/high_priority_service.hpp +++ b/include/wsrep/high_priority_service.hpp @@ -40,6 +40,7 @@ namespace wsrep * Adopt a transaction. */ virtual void adopt_transaction(const wsrep::transaction&) = 0; + /** * Apply a write set. * @@ -50,6 +51,11 @@ namespace wsrep */ virtual int apply_write_set(const wsrep::const_buffer&) = 0; + /** + * + */ + virtual int append_fragment(const wsrep::ws_meta&, + const wsrep::const_buffer& data) = 0; /** * Commit a transaction. */ diff --git a/include/wsrep/id.hpp b/include/wsrep/id.hpp index 25a35fe..91b1f5c 100644 --- a/include/wsrep/id.hpp +++ b/include/wsrep/id.hpp @@ -83,6 +83,7 @@ namespace wsrep }; std::ostream& operator<<(std::ostream&, const wsrep::id& id); + std::istream& operator>>(std::istream&, wsrep::id& id); } #endif // WSREP_ID_HPP diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 155ff45..81a13f1 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -20,7 +20,8 @@ namespace wsrep { - class client_state; + class client_service; + class storage_service; class high_priority_service; class ws_meta; class gtid; @@ -28,17 +29,11 @@ namespace wsrep class server_service { public: - /** - * Create client state instance which acts only locally, i.e. does - * not participate in replication. However, local client - * state may execute transactions which require ordering, - * as when modifying local SR fragment storage requires - * strict commit ordering. - * - * @return Pointer to Client State. - */ - virtual wsrep::client_state* local_client_state() = 0; - virtual void release_client_state(wsrep::client_state*) = 0; + + virtual wsrep::storage_service* storage_service( + wsrep::client_service&) = 0; + + virtual void release_storage_service(wsrep::storage_service*) = 0; /** * Create an applier state for streaming transaction applying. * diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 26be715..6d7277c 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -87,6 +87,7 @@ namespace wsrep class transaction; class const_buffer; class server_service; + /** @class Server Context * * @@ -172,11 +173,11 @@ namespace wsrep const std::string& name() const { return name_; } /** - * Return Server identifier string. + * Return Server identifier. * - * @return Server indetifier string. + * @return Server identifier. */ - const std::string& id() const { return id_; } + const wsrep::id& id() const { return id_; } const std::string& incoming_address() const { return incoming_address_; } @@ -575,7 +576,7 @@ namespace wsrep streaming_appliers_map streaming_appliers_; wsrep::provider* provider_; std::string name_; - std::string id_; + wsrep::id id_; std::string incoming_address_; std::string address_; std::string working_dir_; diff --git a/include/wsrep/storage_service.hpp b/include/wsrep/storage_service.hpp new file mode 100644 index 0000000..43d20df --- /dev/null +++ b/include/wsrep/storage_service.hpp @@ -0,0 +1,75 @@ +// +// Copyright (C) 2018 Codership Oy +// + +/** @file storage_service.hpp + * + * Abstract interface which defines required access to DBMS storage + * service. The service is used for storing streaming replication + * write set fragments into stable storage. The interface is used + * from locally processing transaction context only. Corresponding + * operations for high priority processing can be found from + * wsrep::high_priority_service interface. + */ +#ifndef WSREP_STORAGE_SERVICE_HPP +#define WSREP_STORAGE_SERVICE_HPP + +#include "client_id.hpp" +#include "id.hpp" +#include "buffer.hpp" + +namespace wsrep +{ + // Forward declarations + class ws_handle; + class ws_meta; + + + /** + * Storage service abstract interface. + */ + class storage_service + { + public: + virtual ~storage_service() { } + /** + * Start a new transaction for storage access. + * + * @param ws_hande Write set handle + * @param ws_meta Write set meta data + * + * @return Zero in case of success, non-zero on error. + */ + virtual int start_transaction() = 0; + + /** + * Append fragment into stable storage. + */ + virtual int append_fragment(const wsrep::id& server_id, + wsrep::client_id client_id, + int flags, + const wsrep::const_buffer& data) = 0; + + /** + * Update fragment meta data after certification process. + */ + virtual int update_fragment_meta(const wsrep::ws_meta&) = 0; + + /** + * Commit the transaction. + */ + virtual int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) = 0; + + /** + * Roll back the transaction. + */ + virtual int rollback(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; + + + virtual void store_globals() = 0; + virtual void reset_globals() = 0; + }; +} + +#endif // WSREP_STORAGE_SERVICE_HPP diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 59bd203..8eaeca5 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -87,6 +87,10 @@ namespace wsrep int start_transaction(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta); + int prepare_for_fragment_ordering(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit); + int start_replaying(const wsrep::ws_meta&); int append_key(const wsrep::key&); diff --git a/src/id.cpp b/src/id.cpp index 3ff5c7e..64b0aab 100644 --- a/src/id.cpp +++ b/src/id.cpp @@ -52,3 +52,11 @@ std::ostream& wsrep::operator<<(std::ostream& os, const wsrep::id& id) return (os << uuid_str); } } + +std::istream& wsrep::operator>>(std::istream& is, wsrep::id& id) +{ + std::string id_str; + std::getline(is, id_str); + id = wsrep::id(id_str); + return is; +} diff --git a/src/server_state.cpp b/src/server_state.cpp index 625fe0f..9b0efc0 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -576,6 +576,10 @@ void wsrep::server_state::on_sync() case s_joiner: state(lock, s_initializing); break; + case s_donor: + state(lock, s_joined); + state(lock, s_synced); + break; case s_initialized: state(lock, s_joined); // fall through diff --git a/src/transaction.cpp b/src/transaction.cpp index 7e1d341..6ae493d 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -5,6 +5,7 @@ #include "wsrep/transaction.hpp" #include "wsrep/client_state.hpp" #include "wsrep/server_state.hpp" +#include "wsrep/storage_service.hpp" #include "wsrep/high_priority_service.hpp" #include "wsrep/key.hpp" #include "wsrep/logger.hpp" @@ -20,6 +21,65 @@ else wsrep::log_debug() << msg; \ } while (0) + +namespace +{ + class storage_service_deleter + { + public: + storage_service_deleter(wsrep::server_service& server_service) + : server_service_(server_service) + { } + void operator()(wsrep::storage_service* storage_service) + { + server_service_.release_storage_service(storage_service); + } + private: + wsrep::server_service& server_service_; + }; + + template + class scoped_storage_service + { + public: + scoped_storage_service(wsrep::client_service& client_service, + wsrep::storage_service* storage_service, + D deleter) + : client_service_(client_service) + , storage_service_(storage_service) + , deleter_(deleter) + { + if (storage_service_ == 0) + { + throw wsrep::runtime_error("Null client_state provided"); + } + client_service_.reset_globals(); + storage_service_->store_globals(); + } + + wsrep::storage_service& storage_service() + { + return *storage_service_; + } + + ~scoped_storage_service() + { + storage_service_->reset_globals(); + client_service_.store_globals(); + deleter_(storage_service_); + } + private: + scoped_storage_service(const scoped_storage_service&); + scoped_storage_service& operator=(const scoped_storage_service&); + wsrep::client_service& client_service_; + wsrep::storage_service* storage_service_; + D deleter_; + }; + + + +} + // Public wsrep::transaction::transaction( @@ -94,6 +154,20 @@ int wsrep::transaction::start_transaction( return 0; } +int wsrep::transaction::prepare_for_fragment_ordering( + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta, + bool is_commit) +{ + assert(active()); + + ws_handle_ = ws_handle; + ws_meta_ = ws_meta; + certified_ = is_commit; + + return 0; +} + int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta) { ws_meta_ = ws_meta; @@ -745,25 +819,32 @@ int wsrep::transaction::certify_fragment( return 1; } - // Client context to store fragment in separate transaction - // Switch temporarily to sr_transaction, switch back - // to original when this goes out of scope - wsrep::scoped_client_state sr_client_state_scope( - server_service_.local_client_state(), - wsrep::client_deleter(server_service_)); - wsrep::client_state& sr_client_state( - sr_client_state_scope.client_state()); - wsrep::client_state_switch client_state_switch( - client_state_, - sr_client_state); + // The rest of this method will be executed in storage service + // scope. + scoped_storage_service + sr_scope( + client_service_, + server_service_.storage_service(client_service_), + storage_service_deleter(server_service_)); + wsrep::storage_service& storage_service( + sr_scope.storage_service()); - wsrep::unique_lock sr_lock(sr_client_state.mutex()); - wsrep::transaction& sr_transaction( - sr_client_state.transaction_); - sr_transaction.state(sr_lock, s_certifying); - sr_lock.unlock(); - if (sr_client_state.client_service().append_fragment( - sr_transaction, flags_, + // First the fragment is appended to the stable storage. + // This is done to ensure that there is enough capacity + // available to store the fragment. The fragment meta data + // is updated after certification. + if (storage_service.start_transaction()) + { + lock.lock(); + state(lock, s_must_abort); + client_state_.override_error(wsrep::e_append_fragment_error); + return 1; + } + + if (storage_service.append_fragment( + client_state_.server_state().id(), + client_state_.id(), + flags_, wsrep::const_buffer(data.data(), data.size()))) { lock.lock(); @@ -774,30 +855,22 @@ int wsrep::transaction::certify_fragment( enum wsrep::provider::status cert_ret(provider().certify(client_state_.id().get(), - sr_transaction.ws_handle_, - flags_, - sr_transaction.ws_meta_)); + ws_handle_, + flags_, + ws_meta_)); int ret(0); switch (cert_ret) { case wsrep::provider::success: - streaming_context_.certified(sr_transaction.ws_meta().seqno()); - sr_lock.lock(); - sr_transaction.certified_ = true; - sr_transaction.state(sr_lock, s_committing); - sr_lock.unlock(); - if (sr_client_state.client_service().commit( - sr_transaction.ws_handle(), sr_transaction.ws_meta())) + streaming_context_.certified(ws_meta_.seqno()); + if (storage_service.commit(ws_handle_, ws_meta_)) { ret = 1; } break; default: - sr_lock.lock(); - sr_transaction.state(sr_lock, s_must_abort); - sr_lock.unlock(); - sr_client_state.client_service().bf_rollback(); + storage_service.rollback(ws_handle_, ws_meta_); ret = 1; break; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d34d19f..9b8d83b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,8 +3,9 @@ # add_executable(wsrep-lib_test - mock_high_priority_service.cpp mock_client_state.cpp + mock_high_priority_service.cpp + mock_storage_service.cpp test_utils.cpp id_test.cpp server_context_test.cpp diff --git a/test/mock_client_state.cpp b/test/mock_client_state.cpp index 58016a2..a3a61f2 100644 --- a/test/mock_client_state.cpp +++ b/test/mock_client_state.cpp @@ -6,31 +6,6 @@ #include "mock_client_state.hpp" #include "mock_high_priority_service.hpp" -int wsrep::mock_client_service::commit( - const wsrep::ws_handle&, const wsrep::ws_meta&) -{ - int ret(0); - if (do_2pc()) - { - if (client_state_.before_prepare()) - { - ret = 1; - } - else if (client_state_.after_prepare()) - { - ret = 1; - } - } - if (ret == 0 && - (client_state_.before_commit() || - client_state_.ordered_commit() || - client_state_.after_commit())) - { - ret = 1; - } - return ret; -} - int wsrep::mock_client_service::bf_rollback() { int ret(0); diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 5515dbf..c256ac1 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -63,21 +63,15 @@ namespace wsrep int apply_toi(const wsrep::const_buffer&) WSREP_OVERRIDE; - int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) - WSREP_OVERRIDE; - int bf_rollback() WSREP_OVERRIDE; - bool is_autocommit() const WSREP_OVERRIDE - { return is_autocommit_; } - bool do_2pc() const WSREP_OVERRIDE { return do_2pc_; } bool interrupted() const WSREP_OVERRIDE { return killed_before_certify_; } - void reset_globals() WSREP_OVERRIDE { } + void emergency_shutdown() WSREP_OVERRIDE { ++aborts_; } int append_fragment(const wsrep::transaction&, @@ -135,6 +129,7 @@ namespace wsrep } void store_globals() WSREP_OVERRIDE { } + void reset_globals() WSREP_OVERRIDE { } void debug_sync(const char* sync_point) WSREP_OVERRIDE { @@ -181,7 +176,6 @@ namespace wsrep // size_t replays() const { return replays_; } size_t aborts() const { return aborts_; } - private: wsrep::mock_client_state& client_state_; size_t replays_; diff --git a/test/mock_high_priority_service.hpp b/test/mock_high_priority_service.hpp index e1e7467..c228c70 100644 --- a/test/mock_high_priority_service.hpp +++ b/test/mock_high_priority_service.hpp @@ -33,6 +33,9 @@ namespace wsrep void adopt_transaction(const wsrep::transaction&) WSREP_OVERRIDE; int apply_write_set(const wsrep::const_buffer&) WSREP_OVERRIDE; + int append_fragment(const wsrep::ws_meta&, + const wsrep::const_buffer& data) WSREP_OVERRIDE + { return 0; } int commit() WSREP_OVERRIDE; int rollback() WSREP_OVERRIDE; int apply_toi(const wsrep::ws_meta&, diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 0d69f01..536c281 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -9,6 +9,7 @@ #include "wsrep/server_service.hpp" #include "mock_client_state.hpp" #include "mock_high_priority_service.hpp" +#include "mock_storage_service.hpp" #include "mock_provider.hpp" #include "wsrep/compiler.hpp" @@ -32,9 +33,22 @@ namespace wsrep , cond_() , provider_(*this) , last_client_id_(0) + , last_transaction_id_(0) { } + wsrep::mock_provider& provider() const { return provider_; } + + wsrep::storage_service* storage_service(wsrep::client_service&) + { + return new wsrep::mock_storage_service(*this, ++last_client_id_); + } + + void release_storage_service(wsrep::storage_service* storage_service) + { + delete storage_service; + } + wsrep::client_state* local_client_state() { wsrep::client_state* ret(new wsrep::mock_client( @@ -43,6 +57,7 @@ namespace wsrep ret->open(ret->id()); return ret; } + void release_client_state(wsrep::client_state* client_state) { delete client_state; @@ -103,6 +118,11 @@ namespace wsrep int wait_committing_transactions(int) WSREP_OVERRIDE { return 0; } + wsrep::transaction_id next_transaction_id() + { + return wsrep::transaction_id(++last_transaction_id_); + } + void debug_sync(const char* sync_point) WSREP_OVERRIDE { if (sync_point_enabled_ == sync_point) @@ -128,6 +148,7 @@ namespace wsrep wsrep::default_condition_variable cond_; mutable wsrep::mock_provider provider_; unsigned long long last_client_id_; + unsigned long long last_transaction_id_; }; } diff --git a/test/mock_storage_service.cpp b/test/mock_storage_service.cpp new file mode 100644 index 0000000..9ae0899 --- /dev/null +++ b/test/mock_storage_service.cpp @@ -0,0 +1,56 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#include "mock_storage_service.hpp" +#include "mock_server_state.hpp" + +#include "wsrep/client_state.hpp" + +wsrep::mock_storage_service::mock_storage_service( + wsrep::mock_server_state& server_state, + wsrep::client_id client_id) + : server_state_(server_state) + , client_service_(client_state_) + , client_state_(server_state, client_service_, client_id, + wsrep::client_state::m_high_priority) +{ + client_state_.open(client_id); + client_state_.before_command(); +} + + +wsrep::mock_storage_service::~mock_storage_service() +{ + client_state_.after_command_before_result(); + client_state_.after_command_after_result(); + client_state_.close(); + client_state_.cleanup(); +} + +int wsrep::mock_storage_service::start_transaction() +{ + return client_state_.start_transaction( + server_state_.next_transaction_id()); +} + +int wsrep::mock_storage_service::commit(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + return client_state_.prepare_for_fragment_ordering( + ws_handle, ws_meta, true) || + client_state_.before_commit() || + client_state_.ordered_commit() || + client_state_.after_commit() || + client_state_.after_statement(); +} + +int wsrep::mock_storage_service::rollback(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) +{ + return client_state_.prepare_for_fragment_ordering( + ws_handle, ws_meta, false) || + client_state_.before_rollback() || + client_state_.after_rollback() || + client_state_.after_statement(); +} diff --git a/test/mock_storage_service.hpp b/test/mock_storage_service.hpp new file mode 100644 index 0000000..6aaa1be --- /dev/null +++ b/test/mock_storage_service.hpp @@ -0,0 +1,48 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_MOCK_STORAGE_SERVICE_HPP +#define WSREP_MOCK_STORAGE_SERVICE_HPP + +#include "wsrep/storage_service.hpp" +#include "mock_client_state.hpp" + +namespace wsrep +{ +class mock_server_state; + class mock_storage_service : public wsrep::storage_service + { + public: + mock_storage_service(wsrep::mock_server_state&, + wsrep::client_id); + ~mock_storage_service(); + + int start_transaction() WSREP_OVERRIDE; + + int append_fragment(const wsrep::id&, + wsrep::client_id, + int, + const wsrep::const_buffer&) + WSREP_OVERRIDE + { return 0; } + + int update_fragment_meta(const wsrep::ws_meta&) WSREP_OVERRIDE + { return 0; } + + int commit(const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; + + int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&) + WSREP_OVERRIDE; + + void store_globals() WSREP_OVERRIDE { } + void reset_globals() WSREP_OVERRIDE { } + private: + wsrep::mock_server_state& server_state_; + wsrep::mock_client_service client_service_; + wsrep::mock_client_state client_state_; + }; +} + +#endif // WSREP_MOCK_STORAGE_SERVICE_HPP