diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index 6f19479..993f029 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -56,6 +56,10 @@ namespace db return 0; } + bool statement_allowed_for_streaming() const override + { + return true; + } int prepare_fragment_for_replication(wsrep::mutable_buffer&) override { return 0; diff --git a/dbsim/db_server_state.cpp b/dbsim/db_server_state.cpp index 79a8fd2..65bde2b 100644 --- a/dbsim/db_server_state.cpp +++ b/dbsim/db_server_state.cpp @@ -28,15 +28,16 @@ bool db::server_state::sst_before_init() const return false; } -std::string db::server_state::on_sst_required() +std::string db::server_state::sst_request() { return id(); } -void db::server_state::on_sst_request( +int db::server_state::start_sst( const std::string& request, const wsrep::gtid& gtid, bool bypass) { server_.donate_sst(request, gtid, bypass); + return 0; } void db::server_state::background_rollback(wsrep::client_state&) @@ -48,3 +49,8 @@ void db::server_state::log_dummy_write_set( { wsrep::log_info() << "Dummy write set: " << meta.seqno(); } + +void db::server_state::log_view(wsrep::client_state&, const wsrep::view&) +{ + wsrep::log_info() << "View"; +} diff --git a/dbsim/db_server_state.hpp b/dbsim/db_server_state.hpp index e59d324..83cbf74 100644 --- a/dbsim/db_server_state.hpp +++ b/dbsim/db_server_state.hpp @@ -37,11 +37,12 @@ namespace db wsrep::client_state* streaming_applier_client_state() override; void release_client_state(wsrep::client_state*) override; bool sst_before_init() const override; - void on_sst_request(const std::string&, const wsrep::gtid&, bool) override; - std::string on_sst_required() override; + int start_sst(const std::string&, const wsrep::gtid&, bool) override; + std::string sst_request() override; void background_rollback(wsrep::client_state&) override; void log_dummy_write_set(wsrep::client_state&, const wsrep::ws_meta&) override; + void log_view(wsrep::client_state&, const wsrep::view&) override; private: wsrep::default_mutex mutex_; wsrep::default_condition_variable cond_; diff --git a/include/wsrep/applying_service.hpp b/include/wsrep/applying_service.hpp index 40fb484..1141abd 100644 --- a/include/wsrep/applying_service.hpp +++ b/include/wsrep/applying_service.hpp @@ -5,12 +5,17 @@ #ifndef WSREP_APPLYING_SERVICE_HPP #define WSREP_APPLYING_SERVICE_HPP +#include "transaction_termination_service.hpp" + namespace wsrep { - class applying_service + class ws_handle; + class ws_meta; + class applying_service : public wsrep::transaction_termination_service { public: - virtual int start_transaction(); + virtual int start_transaction(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; virtual int apply_write_set(const wsrep::const_buffer&) = 0; virtual int apply_toi(const wsrep::const_buffer&); virtual int apply_nbo(const wsrep::const_buffer&); diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index 3c953fc..6e27d07 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -66,6 +66,11 @@ namespace wsrep // // Streaming // + /** + * Return true if current statement is allowed for streaming, + * otherwise false. + */ + virtual bool statement_allowed_for_streaming() const = 0; virtual size_t bytes_generated() const = 0; virtual int prepare_fragment_for_replication(wsrep::mutable_buffer&) = 0; virtual void remove_fragments() = 0; diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 50414aa..3a69944 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -277,7 +277,10 @@ namespace wsrep enum wsrep::streaming_context::fragment_unit fragment_unit, size_t fragment_size); - + bool statement_allowed_for_streaming() const + { + return client_service_.statement_allowed_for_streaming(); + } /** @todo deprecate */ size_t bytes_generated() const { diff --git a/include/wsrep/server_service.hpp b/include/wsrep/server_service.hpp index 5c4fe64..596123d 100644 --- a/include/wsrep/server_service.hpp +++ b/include/wsrep/server_service.hpp @@ -12,8 +12,12 @@ namespace wsrep { class client_state; + class ws_meta; + class gtid; + class view; class server_service { + public: /** * Create client state instance which acts only locally, i.e. does * not participate in replication. However, local client diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index dbf620f..36fd70a 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -63,6 +63,7 @@ #include "mutex.hpp" #include "condition_variable.hpp" +#include "server_service.hpp" #include #include @@ -86,7 +87,7 @@ namespace wsrep * * */ - class server_state + class server_state : public wsrep::server_service { public: /** @@ -190,24 +191,6 @@ namespace wsrep */ enum rollback_mode rollback_mode() const { return rollback_mode_; } - /** - * Create client context which acts only locally, i.e. does - * not participate in replication. However, local client - * connection may execute transactions which require ordering, - * as when modifying local SR fragment storage requires - * strict commit ordering. - * - * @return Pointer to Client Context. - */ - virtual client_state* local_client_state() = 0; - - /** - * Create applier context for streaming transaction. - */ - virtual client_state* streaming_applier_client_state() = 0; - - virtual void release_client_state(wsrep::client_state*) = 0; - void start_streaming_applier( const wsrep::id&, const wsrep::transaction_id&, @@ -219,10 +202,7 @@ namespace wsrep * Return reference to streaming applier. */ client_state* find_streaming_applier(const wsrep::id&, - const wsrep::transaction_id&) const; - - virtual void log_dummy_write_set(wsrep::client_state&, - const wsrep::ws_meta&) = 0; + const wsrep::transaction_id&) const; /** * Load WSRep provider. * @@ -292,46 +272,6 @@ namespace wsrep */ void wait_until_state(wsrep::server_state::state) const; - /** - * Virtual method to return true if the configured SST - * method requires SST to be performed before DBMS storage - * engine initialization, false otherwise. - */ - virtual bool sst_before_init() const = 0; - - /** - * Virtual method which will be called on *joiner* when the provider - * requests the SST request information. This method should - * provide a string containing an information which the donor - * server can use to donate SST. - */ - virtual std::string on_sst_required() = 0; - - /** - * Virtual method which will be called on *donor* when the - * SST request has been delivered by the provider. - * This method should initiate SST transfer or throw - * a wsrep::runtime_error - * if the SST transfer cannot be initiated. If the SST request - * initiation is succesful, the server remains in s_donor - * state until the SST is over or fails. The @param bypass - * should be passed to SST implementation. If the flag is true, - * no actual SST should happen, but the joiner server should - * be notified that the donor has seen the request. The notification - * should included @param gtid provided. This must be passed - * to sst_received() call on the joiner. - * - * @todo Figure out better exception for error codition. - * - * @param sst_request SST request string provided by the joiner. - * @param gtid GTID denoting the current replication position. - * @param bypass Boolean bypass flag. - */ - virtual void on_sst_request(const std::string& sst_request, - const wsrep::gtid& gtid, - bool bypass) = 0; - - virtual void background_rollback(wsrep::client_state&) = 0; /** * */ @@ -373,20 +313,6 @@ namespace wsrep const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data); - /** - * This virtual method should be implemented by the DBMS - * to provide information if the current statement in processing - * is allowd for streaming replication. - * - * @return True if the statement is allowed for streaming - * replication, false otherwise. - * - * @todo Move to client service interface. - */ - virtual bool statement_allowed_for_streaming( - const wsrep::client_state& client_state, - const wsrep::transaction& transaction) const; - /** * Set server wide wsrep debug logging level. * @@ -420,12 +346,12 @@ namespace wsrep * @param rollback_mode Rollback mode which server operates on. */ server_state(wsrep::mutex& mutex, - wsrep::condition_variable& cond, - const std::string& name, - const std::string& id, - const std::string& address, - const std::string& working_dir, - enum rollback_mode rollback_mode) + wsrep::condition_variable& cond, + const std::string& name, + const std::string& id, + const std::string& address, + const std::string& working_dir, + enum rollback_mode rollback_mode) : mutex_(mutex) , cond_(cond) , state_(s_disconnected) diff --git a/src/server_state.cpp b/src/server_state.cpp index 7b8542a..2aced47 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -273,6 +273,7 @@ int wsrep::server_state::on_apply( return ret; } +#if 0 bool wsrep::server_state::statement_allowed_for_streaming( const wsrep::client_state&, const wsrep::transaction&) const @@ -280,6 +281,7 @@ bool wsrep::server_state::statement_allowed_for_streaming( /* Streaming not implemented yet. */ return false; } +#endif void wsrep::server_state::start_streaming_applier( const wsrep::id& server_id, diff --git a/src/transaction.cpp b/src/transaction.cpp index 681529a..c802521 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -163,8 +163,7 @@ int wsrep::transaction::before_prepare( client_state_.debug_crash( "crash_last_fragment_commit_before_fragment_removal"); lock.unlock(); - if (client_state_.server_state().statement_allowed_for_streaming( - client_state_, *this)) + if (client_state_.statement_allowed_for_streaming() == false) { client_state_.override_error(wsrep::e_error_during_commit); ret = 1; diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 6e1d623..6b77365 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -289,7 +289,7 @@ namespace try { - std::string req(server_state.on_sst_required()); + std::string req(server_state.sst_request()); *sst_req = ::strdup(req.c_str()); *sst_req_len = strlen(req.c_str()); return WSREP_CB_SUCCESS; @@ -369,7 +369,10 @@ namespace wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data, sizeof(req_gtid->uuid.data)), wsrep::seqno(req_gtid->seqno)); - server_state.on_sst_request(req, gtid, bypass); + if (server_state.start_sst(req, gtid, bypass)) + { + return WSREP_CB_FAILURE; + } return WSREP_CB_SUCCESS; } catch (const wsrep::runtime_error& e) diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 3f95104..cc03e5f 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -124,6 +124,8 @@ namespace wsrep return bytes_generated_; } + bool statement_allowed_for_streaming() const WSREP_OVERRIDE + { return true; } int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) WSREP_OVERRIDE { diff --git a/test/mock_server_state.hpp b/test/mock_server_state.hpp index 36d7b1a..3b8a8df 100644 --- a/test/mock_server_state.hpp +++ b/test/mock_server_state.hpp @@ -40,6 +40,7 @@ namespace wsrep *this, ++last_client_id_, wsrep::client_state::m_high_priority); } + void release_client_state(wsrep::client_state* client_state) { delete client_state; @@ -50,15 +51,17 @@ namespace wsrep WSREP_OVERRIDE { } + void log_view(wsrep::client_state&, const wsrep::view&) { } + void on_connect() WSREP_OVERRIDE { } void wait_until_connected() WSREP_OVERRIDE { } void on_view(const wsrep::view&) WSREP_OVERRIDE { } void on_sync() WSREP_OVERRIDE { } bool sst_before_init() const WSREP_OVERRIDE { return false; } - std::string on_sst_required() WSREP_OVERRIDE { return ""; } - void on_sst_request(const std::string&, - const wsrep::gtid&, - bool) WSREP_OVERRIDE { } + std::string sst_request() WSREP_OVERRIDE { return ""; } + int start_sst(const std::string&, + const wsrep::gtid&, + bool) WSREP_OVERRIDE { return 0; } void background_rollback(wsrep::client_state& client_state) WSREP_OVERRIDE {