From e74b214c9cda2e44e79e5f0b1155ab7a024fd6d6 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Sat, 9 Jun 2018 23:28:02 +0300 Subject: [PATCH] Provider type abstraction, partially completed. --- include/wsrep/buffer.hpp | 31 +++ include/wsrep/client_context.hpp | 26 +-- include/wsrep/client_id.hpp | 28 +++ include/wsrep/data.hpp | 15 +- include/wsrep/key.hpp | 33 +-- include/wsrep/provider.hpp | 301 ++++++++++++++++++++++++-- include/wsrep/server_context.hpp | 10 +- include/wsrep/transaction_context.hpp | 53 ++--- include/wsrep/transaction_id.hpp | 46 ++++ src/mock_provider.hpp | 135 ++++++------ src/provider.cpp | 28 ++- src/server_context.cpp | 26 ++- src/transaction_context.cpp | 108 ++++----- src/wsrep_provider_v26.cpp | 27 ++- src/wsrep_provider_v26.hpp | 42 ++-- 15 files changed, 646 insertions(+), 263 deletions(-) create mode 100644 include/wsrep/buffer.hpp create mode 100644 include/wsrep/client_id.hpp create mode 100644 include/wsrep/transaction_id.hpp diff --git a/include/wsrep/buffer.hpp b/include/wsrep/buffer.hpp new file mode 100644 index 0000000..014b113 --- /dev/null +++ b/include/wsrep/buffer.hpp @@ -0,0 +1,31 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_BUFFER_HPP +#define WSREP_BUFFER_HPP + +namespace wsrep +{ + class buffer + { + public: + buffer() + : ptr_() + , size_() + { } + buffer(const void* ptr, size_t size) + : ptr_(ptr) + , size_(size) + { } + + const void* ptr() const { return ptr_; } + size_t size() const { return size_; } + + private: + const void* ptr_; + size_t size_; + }; +} + +#endif // WSREP_BUFFER_HPP diff --git a/include/wsrep/client_context.hpp b/include/wsrep/client_context.hpp index 2887e12..de1a7b9 100644 --- a/include/wsrep/client_context.hpp +++ b/include/wsrep/client_context.hpp @@ -40,6 +40,7 @@ #include "server_context.hpp" #include "transaction_context.hpp" +#include "client_id.hpp" #include "mutex.hpp" #include "lock.hpp" #include "data.hpp" @@ -68,18 +69,6 @@ namespace wsrep } return "unknown"; } - class client_id - { - public: - template - client_id(I id) - : id_(static_cast(id)) - { } - wsrep_conn_id_t get() const { return id_; } - static wsrep_conn_id_t invalid() { return -1; } - private: - wsrep_conn_id_t id_; - }; /*! \class Client Context * @@ -227,12 +216,11 @@ namespace wsrep return transaction_.start_transaction(id); } - int start_transaction(const wsrep_ws_handle_t& wsh, - const wsrep_trx_meta_t& meta, - uint32_t flags) + int start_transaction(const wsrep::ws_handle& wsh, + const wsrep::ws_meta& meta) { assert(mode_ == m_applier); - return transaction_.start_transaction(wsh, meta, flags); + return transaction_.start_transaction(wsh, meta); } int append_key(const wsrep::key& key) @@ -287,7 +275,7 @@ namespace wsrep } int bf_abort(wsrep::unique_lock& lock, - wsrep_seqno_t bf_seqno) + wsrep::seqno bf_seqno) { return transaction_.bf_abort(lock, bf_seqno); } @@ -415,7 +403,7 @@ namespace wsrep * Append SR fragment to the transaction. */ virtual int append_fragment(wsrep::transaction_context&, - uint32_t, const wsrep::data&) + int, const wsrep::data&) { return 0; } @@ -466,7 +454,7 @@ namespace wsrep const wsrep::transaction_context&, wsrep::data& data) { static const char buf[1] = { 1 }; - data.assign(buf, 1); + data = wsrep::data(buf, 1); return 0; } diff --git a/include/wsrep/client_id.hpp b/include/wsrep/client_id.hpp new file mode 100644 index 0000000..4cd12e2 --- /dev/null +++ b/include/wsrep/client_id.hpp @@ -0,0 +1,28 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_CLIENT_ID_HPP +#define WSREP_CLIENT_ID_HPP + +namespace wsrep +{ + class client_id + { + public: + typedef unsigned long long type; + client_id() + : id_(-1) + { } + template + client_id(I id) + : id_(static_cast(id)) + { } + type get() const { return id_; } + static type invalid() { return -1; } + private: + type id_; + }; +} + +#endif // WSREP_CLIENT_ID_HPP diff --git a/include/wsrep/data.hpp b/include/wsrep/data.hpp index 5d0bcb2..3718b28 100644 --- a/include/wsrep/data.hpp +++ b/include/wsrep/data.hpp @@ -13,23 +13,14 @@ namespace wsrep data() : buf_() { - assign(0, 0); } data(const void* ptr, size_t len) - : buf_() + : buf_(ptr, len) { - assign(ptr, len); } - - void assign(const void* ptr, size_t len) - { - buf_.ptr = ptr; - buf_.len = len; - } - - const wsrep_buf_t& get() const { return buf_; } + const wsrep::buffer& get() const { return buf_; } private: - wsrep_buf_t buf_; + wsrep::buffer buf_; }; } diff --git a/include/wsrep/key.hpp b/include/wsrep/key.hpp index 3603183..a75bb70 100644 --- a/include/wsrep/key.hpp +++ b/include/wsrep/key.hpp @@ -6,35 +6,42 @@ #define WSREP_KEY_HPP #include "exception.hpp" +#include "buffer.hpp" namespace wsrep { class key { public: - key() - : key_parts_() - , key_() + enum type { - key_.key_parts = key_parts_; - key_.key_parts_num = 0; - } + shared, + semi_shared, + semi_exclusive, + exclusive + }; + + key(enum type type) + : type_(type) + , key_parts_() + , key_parts_len_() + { } void append_key_part(const void* ptr, size_t len) { - if (key_.key_parts_num == 3) + if (key_parts_len_ == 3) { throw wsrep::runtime_error("key parts exceed maximum of 3"); } - key_parts_[key_.key_parts_num].ptr = ptr; - key_parts_[key_.key_parts_num].len = len; - ++key_.key_parts_num; + key_parts_[key_parts_len_] = wsrep::buffer(ptr, len); + ++key_parts_len_; } - const wsrep_key_t& get() const { return key_; } private: - wsrep_buf_t key_parts_[3]; - wsrep_key_t key_; + + enum type type_; + wsrep::buffer key_parts_[3]; + size_t key_parts_len_; }; } diff --git a/include/wsrep/provider.hpp b/include/wsrep/provider.hpp index 605b25a..0773ba6 100644 --- a/include/wsrep/provider.hpp +++ b/include/wsrep/provider.hpp @@ -5,15 +5,228 @@ #ifndef WSREP_PROVIDER_HPP #define WSREP_PROVIDER_HPP -// #include "provider_impl.hpp" +#include "key.hpp" +#include "data.hpp" +#include "client_id.hpp" +#include "transaction_id.hpp" -#include +#include +#include #include #include +#include namespace wsrep { + /*! \class seqno + * + * Sequence number type. + * + * By convention, nil value is zero, negative values are not allowed. + * Relation operators are restricted to < and > on purpose to + * enforce correct use. + */ + class seqno + { + public: + seqno() + : seqno_() + { } + + template + seqno(I seqno) + : seqno_(seqno) + { + assert(seqno_ >= 0); + } + + long long get() const + { + return seqno_; + } + + bool nil() const + { + return (seqno_ == 0); + } + + bool operator<(seqno other) const + { + return (seqno_ < other.seqno_); + } + + bool operator>(seqno other) const + { + return (seqno_ > other.seqno_); + } + + seqno operator+(seqno other) const + { + return (seqno_ + other.seqno_); + } + + static seqno undefined() { return 0; } + private: + long long seqno_; + }; + + static inline std::ostream& operator<<(std::ostream& os, wsrep::seqno seqno) + { + return (os << seqno.get()); + } + + class id + { + public: + enum type + { + none, + string, + uuid + }; + + id() + : type_(none) + , data_() + { } + + id(const std::string&); + + id (const void* data, size_t data_size) + : type_() + , data_() + { + assert(data_size <= 16); + std::memcpy(data_, data, data_size); + } +#if 0 + void assign(const void* data, size_t data_size) + { + assert(data_size == 16); + memcpy(data_, data, data_size); + } +#endif + bool operator<(const id& other) const + { + return (std::memcmp(data_, other.data_, sizeof(data_)) < 0); + } + bool operator==(const id& other) const + { + return (std::memcmp(data_, other.data_, sizeof(data_)) == 0); + } + private: + enum type type_; + unsigned char data_[16]; + }; + + class gtid + { + public: + gtid() + : id_() + , seqno_() + { } + gtid(const wsrep::id& id, wsrep::seqno seqno) + : id_(id) + , seqno_(seqno) + { } + const wsrep::id& id() const { return id_; } + wsrep::seqno seqno() const { return seqno_ ; } + private: + wsrep::id id_; + wsrep::seqno seqno_; + }; + + + class stid + { + public: + stid() + : server_id_() + , transaction_id_() + , client_id_() + { } + stid(const wsrep::id& server_id, + wsrep::transaction_id transaction_id, + wsrep::client_id client_id) + : server_id_(server_id) + , transaction_id_(transaction_id) + , client_id_(client_id) + { } + const wsrep::id& server_id() const { return server_id_; } + wsrep::transaction_id transaction_id() const + { return transaction_id_; } + wsrep::client_id client_id() const { return client_id_; } + private: + wsrep::id server_id_; + wsrep::transaction_id transaction_id_; + wsrep::client_id client_id_; + }; + + class ws_handle + { + public: + ws_handle() + : transaction_id_() + , opaque_() + { } + ws_handle(wsrep::transaction_id id) + : transaction_id_(id) + , opaque_() + { } + ws_handle(wsrep::transaction_id id, + void* opaque) + : transaction_id_(id) + , opaque_(opaque) + { } + + wsrep::transaction_id transaction_id() const + { return transaction_id_; } + + void* opaque() const { return opaque_; } + + private: + wsrep::transaction_id transaction_id_; + void* opaque_; + }; + + class ws_meta + { + public: + ws_meta() + : gtid_() + , stid_() + , depends_on_() + , flags_() + { } + ws_meta(const wsrep::gtid& gtid, + const wsrep::stid& stid, + wsrep::seqno depends_on, + int flags) + : gtid_(gtid) + , stid_(stid) + , depends_on_(depends_on) + , flags_(flags) + { } + + wsrep::transaction_id transaction_id() const + { + return stid_.transaction_id(); + } + + wsrep::seqno seqno() const + { + return gtid_.seqno(); + } + private: + wsrep::gtid gtid_; + wsrep::stid stid_; + wsrep::seqno depends_on_; + int flags_; + }; + + // Abstract interface for provider implementations class provider { @@ -33,6 +246,50 @@ namespace wsrep std::string name_; std::string value_; }; + + /*! + * Return value enumeration + */ + enum status + { + /*! Success */ + success, + /*! Warning*/ + error_warning, + /*! Transaction was not found from provider */ + error_transaction_missing, + /*! Certification failed */ + error_certification_failed, + /*! Transaction was BF aborted */ + error_bf_abort, + /*! Transaction size exceeded */ + error_size_exceeded, + /*! Connectivity to cluster lost */ + error_connection_failed, + /*! Internal provider failure, provider must be reinitialized */ + error_provider_failed, + /*! Fatal error, server must abort */ + error_fatal, + /*! Requested functionality is not implemented by the provider */ + error_not_implemented, + /*! Operation is not allowed */ + error_not_allowed, + /*! Unknown error code from the provider */ + error_unknown + }; + + struct flag + { + static const int start_transaction = (1 << 0); + static const int commit = (1 << 1); + static const int rollback = (1 << 2); + static const int isolation = (1 << 3); + static const int pa_unsafe = (1 << 4); + static const int commutative = (1 << 5); + static const int native = (1 << 6); + static const int snapshot = (1 << 7); + }; + virtual ~provider() { } // Provider state management virtual int connect(const std::string& cluster_name, @@ -43,16 +300,16 @@ namespace wsrep // Applier interface - virtual wsrep_status_t run_applier(void* applier_ctx) = 0; + virtual enum status run_applier(void* applier_ctx) = 0; // Write set replication // TODO: Rename to assing_read_view() - virtual int start_transaction(wsrep_ws_handle_t*) = 0; - virtual int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) = 0; - virtual int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) = 0; - virtual wsrep_status_t - certify(wsrep_conn_id_t, wsrep_ws_handle_t*, - uint32_t, - wsrep_trx_meta_t*) = 0; + virtual int start_transaction(wsrep::ws_handle&) = 0; + virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0; + virtual int append_data(wsrep::ws_handle&, const wsrep::data&) = 0; + virtual enum status + certify(wsrep::client_id, wsrep::ws_handle&, + int, + wsrep::ws_meta&) = 0; /*! * BF abort a transaction inside provider. * @@ -63,25 +320,25 @@ namespace wsrep * * @return wsrep_status_t */ - virtual wsrep_status_t bf_abort(wsrep_seqno_t bf_seqno, - wsrep_trx_id_t victim_trx, - wsrep_seqno_t* victim_seqno) = 0; - virtual int rollback(const wsrep_trx_id_t) = 0; - virtual wsrep_status commit_order_enter(const wsrep_ws_handle_t*, - const wsrep_trx_meta_t*) = 0; - virtual int commit_order_leave(const wsrep_ws_handle_t*, - const wsrep_trx_meta_t*) = 0; - virtual int release(wsrep_ws_handle_t*) = 0; + virtual enum status bf_abort(wsrep::seqno bf_seqno, + wsrep::transaction_id victim_trx, + wsrep::seqno& victim_seqno) = 0; + virtual int rollback(wsrep::transaction_id) = 0; + virtual enum status commit_order_enter(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; + virtual int commit_order_leave(const wsrep::ws_handle&, + const wsrep::ws_meta&) = 0; + virtual int release(wsrep::ws_handle&) = 0; /*! * Replay a transaction. * * @return Zero in case of success, non-zero on failure. */ - virtual int replay(wsrep_ws_handle_t* ws_handle, void* applier_ctx) = 0; + virtual int replay(wsrep::ws_handle& ws_handle, void* applier_ctx) = 0; - virtual int sst_sent(const wsrep_gtid_t&, int) = 0; - virtual int sst_received(const wsrep_gtid_t&, int) = 0; + virtual int sst_sent(const wsrep::gtid&, int) = 0; + virtual int sst_received(const wsrep::gtid&, int) = 0; virtual std::vector status() const = 0; diff --git a/include/wsrep/server_context.hpp b/include/wsrep/server_context.hpp index f92e9ae..00ad608 100644 --- a/include/wsrep/server_context.hpp +++ b/include/wsrep/server_context.hpp @@ -64,7 +64,7 @@ #include "exception.hpp" #include "mutex.hpp" #include "condition_variable.hpp" -#include "wsrep_api.h" +#include "provider.hpp" #include #include @@ -72,7 +72,7 @@ namespace wsrep { // Forward declarations - class provider; + // class provider; class client_context; class transaction_context; class view; @@ -292,14 +292,14 @@ namespace wsrep * \param bypass Boolean bypass flag. */ virtual void on_sst_request(const std::string& sst_request, - const wsrep_gtid_t& gtid, + const wsrep::gtid& gtid, bool bypass) = 0; /*! * */ - void sst_sent(const wsrep_gtid_t& gtid, int error); + void sst_sent(const wsrep::gtid& gtid, int error); /*! * This method must be called by the joiner after the SST @@ -307,7 +307,7 @@ namespace wsrep * * \param gtid GTID provided by the SST transfer */ - void sst_received(const wsrep_gtid_t& gtid, int error); + void sst_received(const wsrep::gtid& gtid, int error); /*! * This method must be called after the server initialization diff --git a/include/wsrep/transaction_context.hpp b/include/wsrep/transaction_context.hpp index c32f129..b25e179 100644 --- a/include/wsrep/transaction_context.hpp +++ b/include/wsrep/transaction_context.hpp @@ -7,10 +7,9 @@ #include "provider.hpp" #include "server_context.hpp" +#include "transaction_id.hpp" #include "lock.hpp" -#include - #include #include @@ -20,22 +19,6 @@ namespace wsrep class key; class data; - class transaction_id - { - public: - template - transaction_id(I id) - : id_(static_cast(id)) - { } - wsrep_trx_id_t get() const { return id_; } - static wsrep_trx_id_t invalid() { return wsrep_trx_id_t(-1); } - bool operator==(const transaction_id& other) const - { return (id_ == other.id_); } - bool operator!=(const transaction_id& other) const - { return (id_ != other.id_); } - private: - wsrep_trx_id_t id_; - }; class transaction_context { @@ -75,13 +58,14 @@ namespace wsrep // fragment succeeded bool certified() const { return certified_; } - wsrep_seqno_t seqno() const + wsrep::seqno seqno() const { - return trx_meta_.gtid.seqno; + return ws_meta_.seqno(); } // Return true if the last fragment was ordered by the // provider - bool ordered() const { return (trx_meta_.gtid.seqno > 0); } + bool ordered() const + { return (ws_meta_.seqno().nil() == false); } bool is_streaming() const { @@ -91,19 +75,18 @@ namespace wsrep bool pa_unsafe() const { return pa_unsafe_; } void pa_unsafe(bool pa_unsafe) { pa_unsafe_ = pa_unsafe; } + // int start_transaction() { assert(active() == false); - assert(trx_meta_.stid.trx != transaction_id::invalid()); - return start_transaction(trx_meta_.stid.trx); + assert(ws_meta_.transaction_id() != transaction_id::invalid()); + return start_transaction(ws_meta_.transaction_id()); } - int start_transaction(const wsrep::transaction_id& id); - int start_transaction(const wsrep_ws_handle_t& ws_handle, - const wsrep_trx_meta_t& trx_meta, - uint32_t flags); + int start_transaction(const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta); int append_key(const wsrep::key&); @@ -130,21 +113,21 @@ namespace wsrep int after_statement(); bool bf_abort(wsrep::unique_lock& lock, - wsrep_seqno_t bf_seqno); + wsrep::seqno bf_seqno); - uint32_t flags() const + int flags() const { return flags_; } wsrep::mutex& mutex(); - wsrep_ws_handle_t& ws_handle() { return ws_handle_; } + // wsrep_ws_handle_t& ws_handle() { return ws_handle_; } private: transaction_context(const transaction_context&); transaction_context operator=(const transaction_context&); - void flags(uint32_t flags) { flags_ = flags; } + void flags(int flags) { flags_ = flags; } int certify_fragment(wsrep::unique_lock&); int certify_commit(wsrep::unique_lock&); void remove_fragments(); @@ -159,13 +142,13 @@ namespace wsrep std::vector state_hist_; enum state bf_abort_state_; int bf_abort_client_state_; - wsrep_ws_handle_t ws_handle_; - wsrep_trx_meta_t trx_meta_; - uint32_t flags_; + wsrep::ws_handle ws_handle_; + wsrep::ws_meta ws_meta_; + int flags_; bool pa_unsafe_; bool certified_; - std::vector fragments_; + std::vector fragments_; wsrep::transaction_id rollback_replicated_for_; }; diff --git a/include/wsrep/transaction_id.hpp b/include/wsrep/transaction_id.hpp new file mode 100644 index 0000000..1ef4488 --- /dev/null +++ b/include/wsrep/transaction_id.hpp @@ -0,0 +1,46 @@ +// +// Copyright (C) 2018 Codership Oy +// + +#ifndef WSREP_TRANSACTION_ID_HPP +#define WSREP_TRANSACTION_ID_HPP + +#include + +namespace wsrep +{ + class transaction_id + { + public: + typedef unsigned long long type; + + + transaction_id() + : id_(-1) + { } + + template + transaction_id(I id) + : id_(static_cast(id)) + { } + type get() const { return id_; } + static unsigned long long invalid() { return type(-1); } + bool operator<(const transaction_id& other) const + { + return (id_ < other.id_); + } + bool operator==(const transaction_id& other) const + { return (id_ == other.id_); } + bool operator!=(const transaction_id& other) const + { return (id_ != other.id_); } + private: + type id_; + }; + + static inline std::ostream& operator<<(std::ostream& os, transaction_id id) + { + return (os << id.get()); + } +} + +#endif // WSREP_TRANSACTION_ID_HPP diff --git a/src/mock_provider.hpp b/src/mock_provider.hpp index 6112d68..1868365 100644 --- a/src/mock_provider.hpp +++ b/src/mock_provider.hpp @@ -16,97 +16,86 @@ namespace wsrep class mock_provider : public wsrep::provider { public: - typedef std::map bf_abort_map; + typedef std::map bf_abort_map; + mock_provider() - : group_id_() - , node_id_() + : group_id_("1") + , server_id_("1") , group_seqno_(0) , bf_abort_map_() - { - - memset(&group_id_, 0, sizeof(group_id_)); - group_id_.data[0] = 1; - - memset(&node_id_, 0, sizeof(node_id_)); - node_id_.data[0] = 1; - } + { } int connect(const std::string&, const std::string&, const std::string&, bool) { return 0; } int disconnect() { return 0; } - wsrep_status_t run_applier(void*) { return WSREP_OK; } - // Provider implemenatation interface - int start_transaction(wsrep_ws_handle_t*) { return 0; } - wsrep_status - certify(wsrep_conn_id_t conn_id, - wsrep_ws_handle_t* ws_handle, - uint32_t flags, - wsrep_trx_meta_t* trx_meta) + enum wsrep::provider::status run_applier(void*) { - assert(flags | WSREP_FLAG_TRX_END); - if ((flags | WSREP_FLAG_TRX_END) == 0) + return wsrep::provider::success; + } + // Provider implemenatation interface + int start_transaction(wsrep::ws_handle&) { return 0; } + enum wsrep::provider::status + certify(wsrep::client_id client_id, + wsrep::ws_handle& ws_handle, + int flags, + wsrep::ws_meta& ws_meta) + { + assert(flags & wsrep::provider::flag::start_transaction); + if ((flags & wsrep::provider::flag::commit) == 0) { - return WSREP_FATAL; + return wsrep::provider::error_provider_failed; } - std::cerr << "certify_commit: " << conn_id << ":" - << ws_handle->trx_id << "\n"; - trx_meta->stid.node = node_id_; - trx_meta->stid.trx = ws_handle->trx_id; - trx_meta->stid.conn = conn_id; - - wsrep_trx_id_t trx_id(ws_handle->trx_id); - bf_abort_map::iterator it(bf_abort_map_.find(trx_id)); - std::cerr << "bf aborted: " - << (it == bf_abort_map_.end() ? "no" : "yes") << "\n"; + wsrep::stid stid(server_id_, + ws_handle.transaction_id(), + client_id); + bf_abort_map::iterator it(bf_abort_map_.find( + ws_handle.transaction_id())); if (it == bf_abort_map_.end()) { ++group_seqno_; - trx_meta->gtid.uuid = group_id_; - trx_meta->gtid.seqno = group_seqno_; - trx_meta->depends_on = group_seqno_ - 1; - std::cerr << "certify_commit return: " << WSREP_OK << "\n"; - return WSREP_OK; + wsrep::gtid gtid(group_id_, group_seqno_); + ws_meta = wsrep::ws_meta(gtid, stid, group_seqno_ - 1, + flags); + return wsrep::provider::success; } else { - wsrep_status_t ret; - if (it->second == WSREP_SEQNO_UNDEFINED) + enum wsrep::provider::status ret; + if (it->second.nil()) { - trx_meta->gtid.uuid = WSREP_UUID_UNDEFINED; - trx_meta->gtid.seqno = WSREP_SEQNO_UNDEFINED; - trx_meta->depends_on = WSREP_SEQNO_UNDEFINED; - ret = WSREP_TRX_FAIL; + ws_meta = wsrep::ws_meta(wsrep::gtid(), wsrep::stid(), + wsrep::seqno::undefined(), 0); + ret = wsrep::provider::error_certification_failed; } else { ++group_seqno_; - trx_meta->gtid.uuid = group_id_; - trx_meta->gtid.seqno = group_seqno_; - trx_meta->depends_on = group_seqno_ - 1; - ret = WSREP_BF_ABORT; + wsrep::gtid gtid(group_id_, group_seqno_); + ws_meta = wsrep::ws_meta(gtid, stid, group_seqno_ - 1, flags); + ret = wsrep::provider::error_bf_abort; } bf_abort_map_.erase(it); - std::cerr << "certify_commit return: " << ret << "\n"; return ret; } - } - int append_key(wsrep_ws_handle_t*, const wsrep_key_t*) { return 0; } - int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*) { return 0; } - int rollback(const wsrep_trx_id_t) { return 0; } - wsrep_status_t commit_order_enter(const wsrep_ws_handle_t*, - const wsrep_trx_meta_t*) - { return WSREP_OK; } - int commit_order_leave(const wsrep_ws_handle_t*, - const wsrep_trx_meta_t*) { return 0;} - int release(wsrep_ws_handle_t*) { return 0; } + int append_key(wsrep::ws_handle&, const wsrep::key&) { return 0; } + int append_data(wsrep::ws_handle&, const wsrep::data&) { return 0; } + int rollback(const wsrep::transaction_id) { return 0; } + enum wsrep::provider::status + commit_order_enter(const wsrep::ws_handle&, + const wsrep::ws_meta&) + { return wsrep::provider::success; } + int commit_order_leave(const wsrep::ws_handle&, + const wsrep::ws_meta&) + { return 0;} + int release(wsrep::ws_handle&) { return 0; } - int replay(wsrep_ws_handle_t*, void*) { ::abort(); /* not impl */} + int replay(wsrep::ws_handle&, void*) { ::abort(); /* not impl */} - int sst_sent(const wsrep_gtid_t&, int) { return 0; } - int sst_received(const wsrep_gtid_t&, int) { return 0; } + int sst_sent(const wsrep::gtid&, int) { return 0; } + int sst_received(const wsrep::gtid&, int) { return 0; } std::vector status() const { @@ -115,36 +104,36 @@ namespace wsrep // Methods to modify mock state - /*! Inject BF abort event into the provider. * * \param bf_seqno Aborter sequence number * \param trx_id Trx id to be aborted * \param[out] victim_seqno */ - wsrep_status_t bf_abort(wsrep_seqno_t bf_seqno, - wsrep_trx_id_t trx_id, - wsrep_seqno_t* victim_seqno) + enum wsrep::provider::status + bf_abort(wsrep::seqno bf_seqno, + wsrep::transaction_id trx_id, + wsrep::seqno& victim_seqno) { std::cerr << "bf_abort: " << trx_id << "\n"; bf_abort_map_.insert(std::make_pair(trx_id, bf_seqno)); - if (bf_seqno != WSREP_SEQNO_UNDEFINED) + if (bf_seqno.nil() == false) { - group_seqno_ = bf_seqno; + group_seqno_ = bf_seqno.get(); } - *victim_seqno = WSREP_SEQNO_UNDEFINED; - return WSREP_OK; + victim_seqno = wsrep::seqno::undefined(); + return wsrep::provider::success; } /*! * \todo Inject an error so that the next call to any * provider call will return the given error. */ - void inject_error(wsrep_status_t); + void inject_error(enum wsrep::provider::status); private: - wsrep_uuid_t group_id_; - wsrep_uuid_t node_id_; - wsrep_seqno_t group_seqno_; + wsrep::id group_id_; + wsrep::id server_id_; + long long group_seqno_; bf_abort_map bf_abort_map_; }; } diff --git a/src/provider.cpp b/src/provider.cpp index 627291c..db5e987 100644 --- a/src/provider.cpp +++ b/src/provider.cpp @@ -6,8 +6,34 @@ #include "provider_impl.hpp" +#include +#include + +#include + +wsrep::id::id(const std::string& str) + : type_(none) + , data_() +{ + wsrep_uuid_t wsrep_uuid; + if (wsrep_uuid_scan(str.c_str(), str.size(), &wsrep_uuid) == + WSREP_UUID_STR_LEN) + { + type_ = uuid; + std::memcpy(data_, wsrep_uuid.data, sizeof(data_)); + } + else if (str.size() <= 16) + { + type_ = string; + std::memcpy(data_, str.c_str(), str.size()); + } + std::ostringstream os; + os << "String '" << str << "' does not contain UUID"; + throw wsrep::runtime_error(os.str()); +} + wsrep::provider* wsrep::provider::make_provider( - const std::string&) + const std::string&) { return 0; } diff --git a/src/server_context.cpp b/src/server_context.cpp index 8addd26..d818dec 100644 --- a/src/server_context.cpp +++ b/src/server_context.cpp @@ -13,8 +13,6 @@ #include "mock_provider.hpp" #include "wsrep_provider_v26.hpp" -#include - #include #include @@ -116,7 +114,18 @@ namespace assert(client_context->mode() == wsrep::client_context::m_applier); wsrep::data data(buf->ptr, buf->len); - if (client_context->transaction().state() != wsrep::transaction_context::s_replaying && client_context->start_transaction(*wsh, *meta, flags)) + wsrep::ws_handle ws_handle(wsh->trx_id, wsh->opaque); + wsrep::ws_meta ws_meta( + wsrep::gtid(wsrep::id(meta->gtid.uuid.data, + sizeof(meta->gtid.uuid.data)), + meta->gtid.seqno), + wsrep::stid(wsrep::id(meta->stid.node.data, + sizeof(meta->stid.node.data)), + meta->stid.trx, + meta->stid.conn), meta->depends_on, flags); + if (client_context->transaction().state() != + wsrep::transaction_context::s_replaying && + client_context->start_transaction(ws_handle, ws_meta)) { ret = WSREP_CB_FAILURE; } @@ -150,7 +159,7 @@ namespace wsrep_cb_status_t sst_donate_cb(void* app_ctx, void* , const wsrep_buf_t* req_buf, - const wsrep_gtid_t* gtid, + const wsrep_gtid_t* req_gtid, const wsrep_buf_t*, bool bypass) { @@ -161,7 +170,10 @@ namespace { std::string req(reinterpret_cast(req_buf->ptr), req_buf->len); - server_context.on_sst_request(req, *gtid, bypass); + wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data, + sizeof(req_gtid->uuid.data)), + req_gtid->seqno); + server_context.on_sst_request(req, gtid, bypass); return WSREP_CB_SUCCESS; } catch (const wsrep::runtime_error& e) @@ -232,11 +244,11 @@ wsrep::server_context::~server_context() delete provider_; } -void wsrep::server_context::sst_sent(const wsrep_gtid_t& gtid, int error) +void wsrep::server_context::sst_sent(const wsrep::gtid& gtid, int error) { provider_->sst_sent(gtid, error); } -void wsrep::server_context::sst_received(const wsrep_gtid_t& gtid, int error) +void wsrep::server_context::sst_received(const wsrep::gtid& gtid, int error) { provider_->sst_received(gtid, error); } diff --git a/src/transaction_context.cpp b/src/transaction_context.cpp index bf56d07..1b78001 100644 --- a/src/transaction_context.cpp +++ b/src/transaction_context.cpp @@ -25,7 +25,7 @@ wsrep::transaction_context::transaction_context( , bf_abort_state_(s_executing) , bf_abort_client_state_() , ws_handle_() - , trx_meta_() + , ws_meta_() , flags_() , pa_unsafe_(false) , certified_(false) @@ -45,15 +45,15 @@ int wsrep::transaction_context::start_transaction( id_ = id; state_ = s_executing; state_hist_.clear(); - ws_handle_.trx_id = id_.get(); - flags_ |= WSREP_FLAG_TRX_START; + ws_handle_ = wsrep::ws_handle(id); + flags_ |= wsrep::provider::flag::start_transaction; switch (client_context_.mode()) { case wsrep::client_context::m_local: case wsrep::client_context::m_applier: return 0; case wsrep::client_context::m_replicating: - return provider_.start_transaction(&ws_handle_); + return provider_.start_transaction(ws_handle_); default: assert(0); return 1; @@ -61,16 +61,14 @@ int wsrep::transaction_context::start_transaction( } int wsrep::transaction_context::start_transaction( - const wsrep_ws_handle_t& ws_handle, - const wsrep_trx_meta_t& trx_meta, - uint32_t flags) + const wsrep::ws_handle& ws_handle, + const wsrep::ws_meta& ws_meta) { assert(active() == false); assert(client_context_.mode() == wsrep::client_context::m_applier); state_ = s_executing; ws_handle_ = ws_handle; - trx_meta_ = trx_meta; - flags_ = flags; + ws_meta_ = ws_meta; certified_ = true; return 0; } @@ -79,13 +77,13 @@ int wsrep::transaction_context::start_transaction( int wsrep::transaction_context::append_key(const wsrep::key& key) { - return provider_.append_key(&ws_handle_, &key.get()); + return provider_.append_key(ws_handle_, key); } int wsrep::transaction_context::append_data(const wsrep::data& data) { - return provider_.append_data(&ws_handle_, &data.get()); + return provider_.append_data(ws_handle_, data); } int wsrep::transaction_context::before_prepare() @@ -203,7 +201,7 @@ int wsrep::transaction_context::before_commit() case wsrep::client_context::m_local: if (ordered()) { - ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_); + ret = provider_.commit_order_enter(ws_handle_, ws_meta_); } break; case wsrep::client_context::m_replicating: @@ -231,13 +229,15 @@ int wsrep::transaction_context::before_commit() if (ret == 0) { lock.unlock(); - wsrep_status_t status(provider_.commit_order_enter(&ws_handle_, &trx_meta_)); + enum wsrep::provider::status + status(provider_.commit_order_enter( + ws_handle_, ws_meta_)); lock.lock(); switch (status) { - case WSREP_OK: + case wsrep::provider::success: break; - case WSREP_BF_ABORT: + case wsrep::provider::error_bf_abort: if (state() != s_must_abort) { state(lock, s_must_abort); @@ -254,7 +254,7 @@ int wsrep::transaction_context::before_commit() break; case wsrep::client_context::m_applier: assert(ordered()); - ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_); + ret = provider_.commit_order_enter(ws_handle_, ws_meta_); if (ret) { state(lock, s_must_abort); @@ -289,7 +289,7 @@ int wsrep::transaction_context::ordered_commit() debug_log_state("ordered_commit_enter"); assert(state() == s_committing); assert(ordered()); - ret = provider_.commit_order_leave(&ws_handle_, &trx_meta_); + ret = provider_.commit_order_leave(ws_handle_, ws_meta_); // Should always succeed assert(ret == 0); state(lock, s_ordered_commit); @@ -315,7 +315,7 @@ int wsrep::transaction_context::after_commit() { clear_fragments(); } - ret = provider_.release(&ws_handle_); + ret = provider_.release(ws_handle_); break; case wsrep::client_context::m_applier: break; @@ -444,7 +444,7 @@ int wsrep::transaction_context::after_statement() lock.unlock(); ret = client_context_.replay(*this); lock.lock(); - provider_.release(&ws_handle_); + provider_.release(ws_handle_); break; case s_aborted: break; @@ -462,10 +462,10 @@ int wsrep::transaction_context::after_statement() { if (ordered()) { - ret = provider_.commit_order_enter(&ws_handle_, &trx_meta_); - if (ret == 0) provider_.commit_order_leave(&ws_handle_, &trx_meta_); + ret = provider_.commit_order_enter(ws_handle_, ws_meta_); + if (ret == 0) provider_.commit_order_leave(ws_handle_, ws_meta_); } - provider_.release(&ws_handle_); + provider_.release(ws_handle_); } if (state() != s_executing) @@ -480,7 +480,7 @@ int wsrep::transaction_context::after_statement() bool wsrep::transaction_context::bf_abort( wsrep::unique_lock& lock WSREP_UNUSED, - wsrep_seqno_t bf_seqno) + wsrep::seqno bf_seqno) { bool ret(false); assert(lock.owns_lock()); @@ -504,12 +504,13 @@ bool wsrep::transaction_context::bf_abort( case s_certifying: case s_committing: { - wsrep_seqno_t victim_seqno(WSREP_SEQNO_UNDEFINED); - wsrep_status_t status(client_context_.provider().bf_abort( - bf_seqno, id_.get(), &victim_seqno)); + wsrep::seqno victim_seqno; + enum wsrep::provider::status + status(client_context_.provider().bf_abort( + bf_seqno, id_.get(), victim_seqno)); switch (status) { - case WSREP_OK: + case wsrep::provider::success: wsrep::log() << "Seqno " << bf_seqno << " succesfully BF aborted " << id_.get() << " victim_seqno " << victim_seqno; @@ -613,7 +614,7 @@ int wsrep::transaction_context::certify_fragment( uint32_t flags(0); if (fragments_.empty()) { - flags |= WSREP_FLAG_TRX_START; + flags |= wsrep::provider::flag::start_transaction; } wsrep::data data; @@ -641,14 +642,15 @@ int wsrep::transaction_context::certify_fragment( return 1; } - wsrep_status_t cert_ret(provider_.certify(client_context_.id().get(), - &sr_transaction_context.ws_handle_, - flags, - &sr_transaction_context.trx_meta_)); + enum wsrep::provider::status + cert_ret(provider_.certify(client_context_.id().get(), + &sr_transaction_context.ws_handle_, + flags, + &sr_transaction_context.trx_meta_)); int ret(0); switch (cert_ret) { - case WSREP_OK: + case wsrep::provider::success: sr_client_context->commit(sr_transaction_context); break; default: @@ -679,7 +681,7 @@ int wsrep::transaction_context::certify_commit( state(lock, s_certifying); - flags(flags() | WSREP_FLAG_TRX_END); + flags(flags() | wsrep::provider::flag::commit); lock.unlock(); wsrep::data data; @@ -699,10 +701,11 @@ int wsrep::transaction_context::certify_commit( return 1; } - wsrep_status cert_ret(provider_.certify(client_context_.id().get(), - &ws_handle_, - flags(), - &trx_meta_)); + enum wsrep::provider::status + cert_ret(provider_.certify(client_context_.id().get(), + ws_handle_, + flags(), + ws_meta_)); lock.lock(); @@ -712,7 +715,7 @@ int wsrep::transaction_context::certify_commit( int ret(1); switch (cert_ret) { - case WSREP_OK: + case wsrep::provider::success: assert(ordered()); switch (state()) { @@ -734,19 +737,19 @@ int wsrep::transaction_context::certify_commit( } certified_ = true; break; - case WSREP_WARNING: + case wsrep::provider::error_warning: assert(ordered() == false); state(lock, s_must_abort); client_context_.override_error(wsrep::e_error_during_commit); break; - case WSREP_TRX_MISSING: + case wsrep::provider::error_transaction_missing: state(lock, s_must_abort); // The execution should never reach this point if the // transaction has not generated any keys or data. client_context_.override_error(wsrep::e_error_during_commit); assert(0); break; - case WSREP_BF_ABORT: + case wsrep::provider::error_bf_abort: // Transaction was replicated succesfully and it was either // certified succesfully or the result of certifying is not // yet known. Therefore the transaction must roll back @@ -760,16 +763,16 @@ int wsrep::transaction_context::certify_commit( } state(lock, s_must_replay); break; - case WSREP_TRX_FAIL: + case wsrep::provider::error_certification_failed: state(lock, s_cert_failed); client_context_.override_error(wsrep::e_deadlock_error); break; - case WSREP_SIZE_EXCEEDED: + case wsrep::provider::error_size_exceeded: state(lock, s_must_abort); client_context_.override_error(wsrep::e_error_during_commit); break; - case WSREP_CONN_FAIL: - case WSREP_NODE_FAIL: + case wsrep::provider::error_connection_failed: + case wsrep::provider::error_provider_failed: // Galera provider may return CONN_FAIL if the trx is // BF aborted O_o if (state() != s_must_abort) @@ -778,11 +781,11 @@ int wsrep::transaction_context::certify_commit( } client_context_.override_error(wsrep::e_error_during_commit); break; - case WSREP_FATAL: + case wsrep::provider::error_fatal: client_context_.abort(); break; - case WSREP_NOT_IMPLEMENTED: - case WSREP_NOT_ALLOWED: + case wsrep::provider::error_not_implemented: + case wsrep::provider::error_not_allowed: client_context_.override_error(wsrep::e_error_during_commit); state(lock, s_must_abort); assert(0); @@ -809,17 +812,14 @@ void wsrep::transaction_context::cleanup() { debug_log_state("cleanup_enter"); id_ = wsrep::transaction_id::invalid(); - ws_handle_.trx_id = -1; + ws_handle_ = wsrep::ws_handle(); if (is_streaming()) { state_ = s_executing; } // Keep the state history for troubleshooting. Reset at start_transaction(). // state_hist_.clear(); - trx_meta_.gtid = WSREP_GTID_UNDEFINED; - trx_meta_.stid.node = WSREP_UUID_UNDEFINED; - trx_meta_.stid.trx = wsrep::transaction_id::invalid(); - trx_meta_.stid.conn = wsrep::client_id::invalid(); + ws_meta_ = wsrep::ws_meta(); certified_ = false; pa_unsafe_ = false; debug_log_state("cleanup_leave"); diff --git a/src/wsrep_provider_v26.cpp b/src/wsrep_provider_v26.cpp index b612493..df8f2ed 100644 --- a/src/wsrep_provider_v26.cpp +++ b/src/wsrep_provider_v26.cpp @@ -12,6 +12,28 @@ #include #include +namespace +{ + enum wsrep::provider::status map_return_value(wsrep_status_t status) + { + switch (status) + { + case WSREP_OK: return wsrep::provider::success; + case WSREP_WARNING: return wsrep::provider::error_warning; + case WSREP_TRX_MISSING: return wsrep::provider::error_transaction_missing; + case WSREP_TRX_FAIL: return wsrep::provider::error_certification_failed; + case WSREP_BF_ABORT: return wsrep::provider::error_bf_abort; + case WSREP_SIZE_EXCEEDED: return wsrep::provider::error_size_exceeded; + case WSREP_CONN_FAIL: return wsrep::provider::error_connection_failed; + case WSREP_NODE_FAIL: return wsrep::provider::error_provider_failed; + case WSREP_FATAL: return wsrep::provider::error_fatal; + case WSREP_NOT_IMPLEMENTED: return wsrep::provider::error_not_implemented; + case WSREP_NOT_ALLOWED: return wsrep::provider::error_not_allowed; + } + return wsrep::provider::error_unknown; + } +} + wsrep::wsrep_provider_v26::wsrep_provider_v26( const char* path, wsrep_init_args* args) @@ -66,9 +88,10 @@ int wsrep::wsrep_provider_v26::disconnect() return ret; } -wsrep_status_t wsrep::wsrep_provider_v26::run_applier(void *applier_ctx) +enum wsrep::provider::status +wsrep::wsrep_provider_v26::run_applier(void *applier_ctx) { - return wsrep_->recv(wsrep_, applier_ctx); + return map_return_value(wsrep_->recv(wsrep_, applier_ctx)); } int wsrep::wsrep_provider_v26::append_key(wsrep_ws_handle_t* wsh, diff --git a/src/wsrep_provider_v26.hpp b/src/wsrep_provider_v26.hpp index 61555a8..f80facf 100644 --- a/src/wsrep_provider_v26.hpp +++ b/src/wsrep_provider_v26.hpp @@ -21,26 +21,28 @@ namespace wsrep bool); int disconnect(); - wsrep_status_t run_applier(void*); - int start_transaction(wsrep_ws_handle_t*) { return 0; } - int append_key(wsrep_ws_handle_t*, const wsrep_key_t*); - int append_data(wsrep_ws_handle_t*, const wsrep_buf_t*); - wsrep_status_t - certify(wsrep_conn_id_t, wsrep_ws_handle_t*, - uint32_t, - wsrep_trx_meta_t*); - wsrep_status_t bf_abort(wsrep_seqno_t, - wsrep_trx_id_t, - wsrep_seqno_t*); - int rollback(const wsrep_trx_id_t) { ::abort(); return 0; } - wsrep_status commit_order_enter(const wsrep_ws_handle_t*, - const wsrep_trx_meta_t*); - int commit_order_leave(const wsrep_ws_handle_t*, - const wsrep_trx_meta_t*); - int release(wsrep_ws_handle_t*); - int replay(wsrep_ws_handle_t*, void*); - int sst_sent(const wsrep_gtid_t&,int); - int sst_received(const wsrep_gtid_t& gtid, int); + enum wsrep::provider::status run_applier(void*); + int start_transaction(wsrep::ws_handle&) { return 0; } + int append_key(wsrep::ws_handle&, const wsrep::key&); + int append_data(wsrep::ws_handle&, const wsrep::data&); + enum wsrep::provider::status + certify(wsrep::client_id, wsrep::ws_handle&, + int, + wsrep::ws_meta&); + enum wsrep::provider::status + bf_abort(wsrep::seqno, + wsrep::transaction_id, + wsrep::seqno&); + int rollback(const wsrep::transaction_id) { ::abort(); return 0; } + enum wsrep::provider::status + commit_order_enter(const wsrep::ws_handle&, + const wsrep::ws_meta&); + int commit_order_leave(const wsrep::ws_handle&, + const wsrep::ws_meta&); + int release(wsrep::ws_handle&); + int replay(wsrep::ws_handle&, void*); + int sst_sent(const wsrep::gtid&,int); + int sst_received(const wsrep::gtid& gtid, int); std::vector status() const; private: