diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 4ac919e..93db11d 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -4,10 +4,17 @@ /** @file client_state.hpp * + * + * Return value conventions: + * + * The calls which may alter either client_state or associated + * transaction state will generally return zero on success and + * non-zero on failure. More detailed error information is stored + * into client state and persisted there until explicitly cleared. */ -#ifndef WSREP_CLIENT_CONTEXT_HPP -#define WSREP_CLIENT_CONTEXT_HPP +#ifndef WSREP_CLIENT_STATE_HPP +#define WSREP_CLIENT_STATE_HPP #include "server_state.hpp" #include "provider.hpp" @@ -34,7 +41,7 @@ namespace wsrep e_append_fragment_error }; - static inline const char* to_string(enum client_error error) + static inline const char* to_c_string(enum client_error error) { switch (error) { @@ -48,6 +55,10 @@ namespace wsrep return "unknown"; } + static inline std::string to_string(enum client_error error) + { + return to_c_string(error); + } /** * Client State */ @@ -382,45 +393,18 @@ namespace wsrep transaction_.streaming_context_ = transaction.streaming_context_; } - // - // Causal reads - // - - /** - * Perform a causal read in the cluster. After the call returns, - * all the causally preceding write sets have been committed - * or the error is returned. - * - * This operation may require communication with other processes - * in the DBMS cluster, so it may be relatively heavy operation. - * Method wait_for_gtid() should be used whenever possible. - */ - enum wsrep::provider::status causal_read() const; - - /** - * Wait until all the write sets up to given GTID have been - * committed. - * - * @return Zero on success, non-zero on failure. - */ - enum wsrep::provider::status wait_for_gtid(const wsrep::gtid&) const; - - // - // - // - /** @name Non-transactional operations */ /** @{*/ /** * Enter total order isolation critical section. */ - enum wsrep::provider::status enter_toi(); + int enter_toi(const wsrep::key_array&, const wsrep::const_buffer&); /** * Leave total order isolation critical section. */ - void leave_toi(); + int leave_toi(); /** * Begin non-blocking operation. @@ -430,7 +414,7 @@ namespace wsrep /** * End non-blocking operation */ - void end_nbo(); + int end_nbo(); /** * Get reference to the client mutex. @@ -696,4 +680,4 @@ namespace wsrep }; } -#endif // WSREP_CLIENT_CONTEXT_HPP +#endif // WSREP_CLIENT_STATE_HPP diff --git a/include/wsrep/gtid.hpp b/include/wsrep/gtid.hpp index 73cb11f..89d08c2 100644 --- a/include/wsrep/gtid.hpp +++ b/include/wsrep/gtid.hpp @@ -25,6 +25,10 @@ namespace wsrep { } const wsrep::id& id() const { return id_; } wsrep::seqno seqno() const { return seqno_ ; } + bool is_undefined() const + { + return (seqno_.is_undefined() && id_.is_undefined()); + } private: wsrep::id id_; wsrep::seqno seqno_; diff --git a/include/wsrep/id.hpp b/include/wsrep/id.hpp index ea403d7..25a35fe 100644 --- a/include/wsrep/id.hpp +++ b/include/wsrep/id.hpp @@ -60,11 +60,24 @@ namespace wsrep { return (std::memcmp(data_, other.data_, sizeof(data_)) == 0); } - + bool operator!=(const id& other) const + { + return !(*this == other); + } const void* data() const { return data_; } size_t size() const { return sizeof(data_); } + bool is_undefined() const + { + return (*this == undefined()); + } + + static const wsrep::id& undefined() + { + static wsrep::id ret = wsrep::id(); + return ret; + } private: unsigned char data_[16]; }; diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index e769780..896f2e7 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -232,7 +232,7 @@ namespace wsrep virtual int desync() = 0; virtual int resync() = 0; - virtual int pause() = 0; + virtual wsrep::seqno pause() = 0; virtual int resume() = 0; // Applier interface @@ -277,6 +277,14 @@ namespace wsrep virtual enum status replay( const wsrep::ws_handle& ws_handle, void* applier_ctx) = 0; + /** + * Perform a causal read on cluster. + * + * @param timeout Timeout in seconds + * + * @return Provider status indicating the result of the call. + */ + virtual enum wsrep::provider::status causal_read(int timeout) const = 0; virtual int sst_sent(const wsrep::gtid&, int) = 0; virtual int sst_received(const wsrep::gtid&, int) = 0; diff --git a/include/wsrep/seqno.hpp b/include/wsrep/seqno.hpp index 24bdd28..7e30cda 100644 --- a/include/wsrep/seqno.hpp +++ b/include/wsrep/seqno.hpp @@ -23,15 +23,15 @@ namespace wsrep { public: seqno() - : seqno_() + : seqno_(-1) { } explicit seqno(long long seqno) : seqno_(seqno) { - if (seqno_ < 0) + if (seqno_ < -1) { - throw wsrep::runtime_error("Negative seqno given"); + throw wsrep::runtime_error("Too negative seqno given"); } } @@ -40,9 +40,9 @@ namespace wsrep return seqno_; } - bool nil() const + bool is_undefined() const { - return (seqno_ == 0); + return (seqno_ == -1); } bool operator<(seqno other) const @@ -54,13 +54,19 @@ namespace wsrep { return (seqno_ > other.seqno_); } - + bool operator==(seqno other) const + { + return (seqno_ == other.seqno_); + } seqno operator+(seqno other) const { return (seqno(seqno_ + other.seqno_)); } - - static seqno undefined() { return seqno(0); } + seqno operator+(long long other) const + { + return (*this + seqno(other)); + } + static seqno undefined() { return seqno(-1); } private: long long seqno_; }; diff --git a/include/wsrep/server_state.hpp b/include/wsrep/server_state.hpp index 33d2f1a..ab41334 100644 --- a/include/wsrep/server_state.hpp +++ b/include/wsrep/server_state.hpp @@ -4,11 +4,11 @@ /** @file server_state.hpp * - * Server Context Abstraction + * Server State Abstraction * ========================== * - * This file defines an interface for WSREP Server Context. - * The Server Context will encapsulate server identification, + * This file defines an interface for WSREP Server State. + * The Server State t will encapsulate server identification, * server state and server capabilities. The class also * defines an interface for manipulating server state, applying * of remote transaction write sets, processing SST requests, @@ -56,16 +56,24 @@ * to be aborted or in case of fully optimistic concurrency control, * the conflict is detected at commit. * + * + * # Return value conventions + * + * The calls which are proxies to corresponding provider functionality + * will return wsrep::provider::status enum as a result. Otherwise + * the return value is generally zero on success, non zero on failure. */ -#ifndef WSREP_SERVER_CONTEXT_HPP -#define WSREP_SERVER_CONTEXT_HPP +#ifndef WSREP_SERVER_STATE_HPP +#define WSREP_SERVER_STATE_HPP #include "mutex.hpp" #include "condition_variable.hpp" #include "server_service.hpp" #include "id.hpp" #include "transaction_id.hpp" +#include "provider.hpp" +// #include "gtid.hpp" #include #include @@ -76,10 +84,8 @@ namespace wsrep // Forward declarations class ws_handle; class ws_meta; - class provider; class client_state; class transaction; - class gtid; class view; class const_buffer; @@ -275,6 +281,37 @@ namespace wsrep */ void wait_until_state(wsrep::server_state::state) const; + /** + * Set last committed GTID. + */ + void last_committed_gtid(const wsrep::gtid&); + /** + * Return the last committed GTID known to be committed + * on server. + */ + wsrep::gtid last_committed_gtid() const; + + /** + * Wait until all the write sets up to given GTID have been + * committed. + * + * @return Zero on success, non-zero on failure. + */ + int wait_for_gtid(const wsrep::gtid&) const; + + /** + * Perform a causal read in the cluster. After the call returns, + * all the causally preceding write sets have been committed + * or the error is returned. + * + * This operation may require communication with other processes + * in the DBMS cluster, so it may be relatively heavy operation. + * Method wait_for_gtid() should be used whenever possible. + * + * @param timeout Timeout in seconds + */ + enum wsrep::provider::status causal_read(int timeout) const; + /** * */ @@ -369,6 +406,7 @@ namespace wsrep , address_(address) , working_dir_(working_dir) , rollback_mode_(rollback_mode) + , last_committed_gtid_() , debug_log_level_(0) { } @@ -392,6 +430,7 @@ namespace wsrep std::string address_; std::string working_dir_; enum rollback_mode rollback_mode_; + wsrep::gtid last_committed_gtid_; int debug_log_level_; }; @@ -428,4 +467,4 @@ namespace wsrep } -#endif // WSREP_SERVER_CONTEXT_HPP +#endif // WSREP_SERVER_STATE_HPP diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 5d15b14..e3dc040 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -68,7 +68,7 @@ namespace wsrep // Return true if the last fragment was ordered by the // provider bool ordered() const - { return (ws_meta_.seqno().nil() == false); } + { return (ws_meta_.seqno().is_undefined() == false); } /*! * Return true if any fragments have been succesfully certified diff --git a/src/server_state.cpp b/src/server_state.cpp index cc50d44..d1c2726 100644 --- a/src/server_state.cpp +++ b/src/server_state.cpp @@ -77,6 +77,41 @@ void wsrep::server_state::wait_until_state( cond_.notify_all(); } +void wsrep::server_state::last_committed_gtid(const wsrep::gtid& gtid) +{ + wsrep::unique_lock lock(mutex_); + assert(last_committed_gtid_.is_undefined() || + last_committed_gtid_.seqno() + 1 == gtid.seqno()); + last_committed_gtid_ = gtid; + cond_.notify_all(); +} + +wsrep::gtid wsrep::server_state::last_committed_gtid() const +{ + wsrep::unique_lock lock(mutex_); + return last_committed_gtid_; +} + +int wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid) const +{ + wsrep::unique_lock lock(mutex_); + if (gtid.id() != last_committed_gtid_.id()) + { + return 1; + } + while (last_committed_gtid_.seqno() < gtid.seqno()) + { + cond_.wait(lock); + } + return 0; +} + +enum wsrep::provider::status +wsrep::server_state::causal_read(int timeout) const +{ + return provider_->causal_read(timeout); +} + void wsrep::server_state::on_connect() { wsrep::log() << "Server " << name_ << " connected to cluster"; diff --git a/src/transaction.cpp b/src/transaction.cpp index 01d0cab..3923dab 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -89,7 +89,7 @@ int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta) assert(active()); assert(client_state_.mode() == wsrep::client_state::m_high_priority); assert(state() == s_replaying); - assert(ws_meta_.seqno().nil() == false); + assert(ws_meta_.seqno().is_undefined() == false); certified_ = true; return 0; } diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 794e7c8..21e628b 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -69,12 +69,12 @@ namespace static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno) { - return (seqno.nil() ? WSREP_SEQNO_UNDEFINED : seqno.get()); + return seqno.get(); } static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno) { - return wsrep::seqno(seqno == WSREP_SEQNO_UNDEFINED ? 0 : seqno); + return wsrep::seqno(seqno); } template inline uint32_t map_one(const int flags, const F from, @@ -520,9 +520,9 @@ int wsrep::wsrep_provider_v26::resync() return (wsrep_->resync(wsrep_) != WSREP_OK); } -int wsrep::wsrep_provider_v26::pause() +wsrep::seqno wsrep::wsrep_provider_v26::pause() { - return (wsrep_->pause(wsrep_) != WSREP_OK); + return wsrep::seqno(wsrep_->pause(wsrep_)); } int wsrep::wsrep_provider_v26::resume() @@ -633,6 +633,12 @@ wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle, wsrep_->replay_trx(wsrep_, mwsh.native(), applier_ctx)); } +enum wsrep::provider::status +wsrep::wsrep_provider_v26::causal_read(int timeout) const +{ + return map_return_value(wsrep_->sync_wait(wsrep_, 0, timeout, 0)); +} + int wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err) { wsrep_gtid_t wsrep_gtid; diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 3e67935..c8d9e57 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -25,7 +25,7 @@ namespace wsrep int desync(); int resync(); - int pause(); + wsrep::seqno pause(); int resume(); enum wsrep::provider::status run_applier(void*); @@ -49,6 +49,9 @@ namespace wsrep const wsrep::ws_meta&); int release(wsrep::ws_handle&); enum wsrep::provider::status replay(const wsrep::ws_handle&, void*); + + enum wsrep::provider::status causal_read(int) const; + int sst_sent(const wsrep::gtid&,int); int sst_received(const wsrep::gtid& gtid, int); diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index a8388c6..5980b64 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -46,7 +46,7 @@ namespace wsrep int capabilities() const { return 0; } int desync() { return 0; } int resync() { return 0; } - int pause() { return 0; } + wsrep::seqno pause() { return wsrep::seqno(0); } int resume() { return 0; } enum wsrep::provider::status run_applier(void*) { @@ -104,7 +104,7 @@ namespace wsrep else { enum wsrep::provider::status ret; - if (it->second.nil()) + if (it->second.is_undefined()) { ws_meta = wsrep::ws_meta(wsrep::gtid(), wsrep::stid(), wsrep::seqno::undefined(), 0); @@ -140,7 +140,7 @@ namespace wsrep const wsrep::ws_meta& ws_meta) { BOOST_REQUIRE(ws_handle.opaque()); - BOOST_REQUIRE(ws_meta.seqno().nil() == false); + BOOST_REQUIRE(ws_meta.seqno().is_undefined() == false); return commit_order_enter_result_; } @@ -148,7 +148,7 @@ namespace wsrep const wsrep::ws_meta& ws_meta) { BOOST_REQUIRE(ws_handle.opaque()); - BOOST_REQUIRE(ws_meta.seqno().nil() == false); + BOOST_REQUIRE(ws_meta.seqno().is_undefined() == false); return commit_order_leave_result_; } @@ -170,7 +170,7 @@ namespace wsrep { // If the ws_meta was not assigned yet, the certify // returned early due to BF abort. - if (tc.ws_meta().seqno().nil()) + if (tc.ws_meta().seqno().is_undefined()) { ++group_seqno_; ws_meta = wsrep::ws_meta( @@ -198,6 +198,10 @@ namespace wsrep return wsrep::provider::success; } + enum wsrep::provider::status causal_read(int) const WSREP_OVERRIDE + { + return wsrep::provider::success; + } int sst_sent(const wsrep::gtid&, int) { return 0; } int sst_received(const wsrep::gtid&, int) { return 0; } @@ -225,7 +229,7 @@ namespace wsrep wsrep::seqno& victim_seqno) { bf_abort_map_.insert(std::make_pair(trx_id, bf_seqno)); - if (bf_seqno.nil() == false) + if (bf_seqno.is_undefined() == false) { group_seqno_ = bf_seqno.get(); }