From 81ac78913a49222c8963bfb75886007e6d4aec87 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 21 Jun 2018 11:45:18 +0300 Subject: [PATCH] Initial implementation of client_state TOI mode. --- include/wsrep/client_state.hpp | 29 ++++++++++++--- include/wsrep/provider.hpp | 15 +++++++- src/client_state.cpp | 67 ++++++++++++++++++++++++++++++++++ src/wsrep_provider_v26.cpp | 44 ++++++++++++++++++++++ src/wsrep_provider_v26.hpp | 7 +++- test/mock_provider.hpp | 9 +++++ 6 files changed, 163 insertions(+), 8 deletions(-) diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index 93db11d..a998daf 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -80,6 +80,7 @@ namespace wsrep m_toi }; + static const int mode_max_ = m_toi + 1; /** * Client state enumeration. * @@ -110,7 +111,7 @@ namespace wsrep s_quitting }; - const static int state_max_ = s_quitting + 1; + static const int state_max_ = s_quitting + 1; /** * Store variables related to global execution context. @@ -398,8 +399,16 @@ namespace wsrep /** * Enter total order isolation critical section. + * @param key_array Array of keys + * @param buffer Buffer containing the action to execute inside + * total order isolation section + * @param flags Provider flags for TOI operation + * + * @return Zero on success, non-zero otherwise. */ - int enter_toi(const wsrep::key_array&, const wsrep::const_buffer&); + int enter_toi(const wsrep::key_array& key_array, + const wsrep::const_buffer& buffer, + int flags); /** * Leave total order isolation critical section. @@ -545,6 +554,7 @@ namespace wsrep , mode_(mode) , state_(s_none) , transaction_(*this) + , toi_meta_() , allow_dirty_reads_() , debug_log_level_(0) , current_error_(wsrep::e_success) @@ -561,6 +571,7 @@ namespace wsrep void debug_log_state(const char*) const; void state(wsrep::unique_lock& lock, enum state state); + void mode(wsrep::unique_lock& lock, enum mode mode); void override_error(enum wsrep::client_error error); wsrep::thread::id owning_thread_id_; @@ -572,6 +583,7 @@ namespace wsrep enum mode mode_; enum state state_; wsrep::transaction transaction_; + wsrep::ws_meta toi_meta_; bool allow_dirty_reads_; int debug_log_level_; wsrep::client_error current_error_; @@ -621,11 +633,14 @@ namespace wsrep : client_(client) , orig_mode_(client.mode_) { - client_.mode_ = wsrep::client_state::m_high_priority; + wsrep::unique_lock lock(client.mutex_); + client.mode(lock, wsrep::client_state::m_high_priority); } virtual ~high_priority_context() { - client_.mode_ = orig_mode_; + wsrep::unique_lock lock(client_.mutex_); + assert(client_.mode() == wsrep::client_state::m_high_priority); + client_.mode(lock, orig_mode_); } private: wsrep::client_state& client_; @@ -642,12 +657,14 @@ namespace wsrep : client_(client) , orig_mode_(client.mode_) { - client_.mode_ = wsrep::client_state::m_toi; + wsrep::unique_lock lock(client.mutex_); + client.mode(lock, wsrep::client_state::m_toi); } ~client_toi_mode() { + wsrep::unique_lock lock(client_.mutex_); assert(client_.mode() == wsrep::client_state::m_toi); - client_.mode_ = orig_mode_; + client_.mode(lock, orig_mode_); } private: wsrep::client_state& client_; diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 896f2e7..790c21e 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -277,6 +277,19 @@ namespace wsrep virtual enum status replay( const wsrep::ws_handle& ws_handle, void* applier_ctx) = 0; + /** + * Enter total order isolation critical section + */ + virtual enum status enter_toi(wsrep::client_id, + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + wsrep::ws_meta& ws_meta, + int flags) = 0; + /** + * Leave total order isolation critical section + */ + virtual enum status leave_toi(wsrep::client_id) = 0; + /** * Perform a causal read on cluster. * @@ -284,7 +297,7 @@ namespace wsrep * * @return Provider status indicating the result of the call. */ - virtual enum wsrep::provider::status causal_read(int timeout) const = 0; + virtual enum status causal_read(int timeout) const = 0; virtual int sst_sent(const wsrep::gtid&, int) = 0; virtual int sst_received(const wsrep::gtid&, int) = 0; diff --git a/src/client_state.cpp b/src/client_state.cpp index 7d8ef8b..58682a7 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -231,6 +231,47 @@ int wsrep::client_state::enable_streaming( return 0; } +int wsrep::client_state::enter_toi(const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + int flags) +{ + assert(state_ == s_exec); + int ret; + switch (provider().enter_toi(id_, keys, buffer, toi_meta_, flags)) + { + case wsrep::provider::success: + { + wsrep::unique_lock lock(mutex_); + mode(lock, m_toi); + ret = 0; + break; + } + default: + override_error(wsrep::e_error_during_commit); + ret = 1; + break; + } + return ret; +} + +int wsrep::client_state::leave_toi() +{ + int ret; + switch (provider().leave_toi(id_)) + { + case wsrep::provider::success: + ret = 0; + break; + default: + assert(0); + override_error(wsrep::e_error_during_commit); + ret = 1; + break; + } + wsrep::unique_lock lock(mutex_); + mode(lock, m_replicating); + return ret; +} // Private void wsrep::client_state::debug_log_state(const char* context) const @@ -271,3 +312,29 @@ void wsrep::client_state::state( throw wsrep::runtime_error(os.str()); } } + +void wsrep::client_state::mode( + wsrep::unique_lock& lock WSREP_UNUSED, + enum mode mode) +{ + assert(lock.owns_lock()); + static const char allowed[mode_max_][mode_max_] = + { + { 0, 0, 0, 0}, /* local */ + { 0, 0, 1, 1}, /* repl */ + { 0, 1, 0, 0}, /* high prio */ + { 0, 1, 0, 0} /* toi */ + }; + if (allowed[mode_][mode]) + { + mode_ = mode; + } + else + { + std::ostringstream os; + os << "client_state: Unallowed mode transition: " + << mode_ << " -> " << mode; + throw wsrep::runtime_error(os.str()); + } + +} diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index 21e628b..808b582 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -633,6 +633,50 @@ wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle, wsrep_->replay_trx(wsrep_, mwsh.native(), applier_ctx)); } +enum wsrep::provider::status +wsrep::wsrep_provider_v26::enter_toi( + wsrep::client_id client_id, + const wsrep::key_array& keys, + const wsrep::const_buffer& buffer, + wsrep::ws_meta& ws_meta, + int flags) +{ + mutable_ws_meta mmeta(ws_meta, flags); + std::vector key_parts; + size_t key_parts_offset(0); + std::vector wsrep_keys; + wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()}; + for (wsrep::key_array::const_iterator i(keys.begin()); + i != keys.end(); ++i) + { + for (size_t kp(0); kp < i->size(); ++kp) + { + wsrep_buf_t buf = {i->key_parts()[kp].data(), + i->key_parts()[kp].size()}; + key_parts.push_back(buf); + } + wsrep_key_t key = {&key_parts[0] + key_parts_offset, i->size()}; + wsrep_keys.push_back(key); + key_parts_offset += i->size(); + } + return map_return_value(wsrep_->to_execute_start( + wsrep_, + client_id.get(), + &wsrep_keys[0], + wsrep_keys.size(), + &wsrep_buf, + 1, + mmeta.native_flags(), + mmeta.native())); +} + +enum wsrep::provider::status +wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id) +{ + return map_return_value(wsrep_->to_execute_end( + wsrep_, client_id.get(), 0)); +} + enum wsrep::provider::status wsrep::wsrep_provider_v26::causal_read(int timeout) const { diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index c8d9e57..0184a76 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -49,7 +49,12 @@ namespace wsrep const wsrep::ws_meta&); int release(wsrep::ws_handle&); enum wsrep::provider::status replay(const wsrep::ws_handle&, void*); - + enum wsrep::provider::status enter_toi(wsrep::client_id, + const wsrep::key_array&, + const wsrep::const_buffer&, + wsrep::ws_meta&, + int); + enum wsrep::provider::status leave_toi(wsrep::client_id); enum wsrep::provider::status causal_read(int) const; int sst_sent(const wsrep::gtid&,int); diff --git a/test/mock_provider.hpp b/test/mock_provider.hpp index 5980b64..4ad5e15 100644 --- a/test/mock_provider.hpp +++ b/test/mock_provider.hpp @@ -198,6 +198,15 @@ namespace wsrep return wsrep::provider::success; } + enum wsrep::provider::status enter_toi(wsrep::client_id, + const wsrep::key_array&, + const wsrep::const_buffer&, + wsrep::ws_meta&, + int) + { return wsrep::provider::success; } + enum wsrep::provider::status leave_toi(wsrep::client_id) + { return wsrep::provider::success; } + enum wsrep::provider::status causal_read(int) const WSREP_OVERRIDE { return wsrep::provider::success;