From 8c4a786f79ca9cc3c3e6fef877901165a0ce3de9 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Mon, 9 Jul 2018 20:16:31 +0300 Subject: [PATCH] Collect and append SR commit keys. --- include/wsrep/sr_key_set.hpp | 44 +++++++++++++++++++++++++++++++++++ include/wsrep/transaction.hpp | 3 +++ src/transaction.cpp | 43 +++++++++++++++++++++++++++++++--- 3 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 include/wsrep/sr_key_set.hpp diff --git a/include/wsrep/sr_key_set.hpp b/include/wsrep/sr_key_set.hpp new file mode 100644 index 0000000..42c90d3 --- /dev/null +++ b/include/wsrep/sr_key_set.hpp @@ -0,0 +1,44 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_SR_KEY_SET_HPP +#define WSREP_SR_KEY_SET_HPP + +#include +#include + +namespace wsrep +{ + class sr_key_set + { + public: + typedef std::set leaf_type; + typedef std::map branch_type; + sr_key_set() + : root_() + { } + + void insert(const wsrep::key& key) + { + assert(key.size() == 3); + if (key.size() < 3) + { + throw wsrep::runtime_error("Invalid key size"); + } + + root_[std::string( + static_cast(key.key_parts()[0].data()), + key.key_parts()[0].size())].insert( + std::string(static_cast(key.key_parts()[1].data()), + key.key_parts()[1].size())); + } + + const branch_type& root() const { return root_; } + void clear() { root_.clear(); } + private: + branch_type root_; + }; +} + +#endif // WSREP_KEY_SET_HPP diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 2af1d3c..3a36e16 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -11,6 +11,7 @@ #include "transaction_id.hpp" #include "streaming_context.hpp" #include "lock.hpp" +#include "sr_key_set.hpp" #include #include @@ -149,6 +150,7 @@ namespace wsrep void flags(int flags) { flags_ = flags; } int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); + int append_sr_keys_for_commit(); void streaming_rollback(); void clear_fragments(); void cleanup(); @@ -169,6 +171,7 @@ namespace wsrep bool pa_unsafe_; bool certified_; wsrep::streaming_context streaming_context_; + wsrep::sr_key_set sr_keys_; }; static inline const char* to_c_string(enum wsrep::transaction::state state) diff --git a/src/transaction.cpp b/src/transaction.cpp index f460579..b801e18 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -98,6 +98,7 @@ wsrep::transaction::transaction( , pa_unsafe_(false) , certified_(false) , streaming_context_() + , sr_keys_() { } @@ -192,7 +193,16 @@ int wsrep::transaction::start_replaying(const wsrep::ws_meta& ws_meta) int wsrep::transaction::append_key(const wsrep::key& key) { /** @todo Collect table level keys for SR commit */ - return provider().append_key(ws_handle_, key); + try + { + sr_keys_.insert(key); + return provider().append_key(ws_handle_, key); + } + catch (...) + { + wsrep::log_error() << "Failed to append key"; + return 1; + } } int wsrep::transaction::append_data(const wsrep::const_buffer& data) @@ -954,11 +964,17 @@ int wsrep::transaction::certify_commit( } state(lock, s_certifying); + lock.unlock(); + + if (is_streaming()) + { + append_sr_keys_for_commit(); + flags(flags() | wsrep::provider::flag::pa_unsafe); + } + sr_keys_.clear(); flags(flags() | wsrep::provider::flag::commit); - lock.unlock(); - if (client_service_.prepare_data_for_replication()) { lock.lock(); @@ -1100,6 +1116,27 @@ int wsrep::transaction::certify_commit( return ret; } +int wsrep::transaction::append_sr_keys_for_commit() +{ + int ret(0); + assert(client_state_.mode() == wsrep::client_state::m_local); + for (wsrep::sr_key_set::branch_type::const_iterator + i(sr_keys_.root().begin()); + ret == 0 && i != sr_keys_.root().end(); ++i) + { + for (wsrep::sr_key_set::leaf_type::const_iterator + j(i->second.begin()); + ret == 0 && j != i->second.end(); ++j) + { + wsrep::key key(wsrep::key::shared); + key.append_key_part(i->first.data(), i->first.size()); + key.append_key_part(j->data(), j->size()); + ret = provider().append_key(ws_handle_, key); + } + } + return ret; +} + void wsrep::transaction::streaming_rollback() { assert(streaming_context_.rolled_back() == false);